Files
leaguerecorder/record-daemon/src/main.rs
T
vhaudiquet fcfa55d0aa
record-daemon / Build, check and test (push) Waiting to run
record raw events everywhere
2026-05-06 23:53:01 +02:00

688 lines
24 KiB
Rust

//! Record Daemon entry point.
use std::sync::Arc;
use clap::Parser;
use libobs_bootstrapper::{ObsBootstrapper, ObsBootstrapperOptions, ObsBootstrapperResult};
use parking_lot::RwLock;
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use record_daemon::{
config::{self, Settings},
error::Result,
ipc::{self, IpcHandlers, IpcServer, IpcServerConfig},
lqp::{
describe_event, LockfileWatcher, LqpClient, EVENT_TYPE_CHAMPION_PICK,
EVENT_TYPE_GAME_START, EVENT_TYPE_PHASE_CHANGE,
},
recording::RecordingEngine,
state::{DaemonStateMachine, DaemonStatus, StateTransition},
timeline::{EventMapper, TimelineStore, TimestampedEvent},
};
/// Record Daemon - League of Legends recording daemon.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path to configuration file.
#[arg(short, long, value_name = "PATH")]
config: Option<std::path::PathBuf>,
/// Log level (trace, debug, info, warn, error).
#[arg(short, long, default_value = "info")]
log_level: String,
/// Run in foreground (don't daemonize).
#[arg(short, long)]
foreground: bool,
/// Socket path for IPC.
#[arg(short, long)]
socket: Option<std::path::PathBuf>,
}
/// Main daemon structure.
struct Daemon {
/// Configuration.
settings: Arc<RwLock<Settings>>,
/// State machine.
state_machine: Arc<DaemonStateMachine>,
/// LQP client.
lqp_client: Arc<LqpClient>,
/// Recording engine.
recording_engine: Arc<RwLock<Option<RecordingEngine>>>,
/// Timeline store.
timeline_store: Arc<RwLock<TimelineStore>>,
/// Event mapper.
event_mapper: Arc<RwLock<EventMapper>>,
/// Current recording ID (if recording).
current_recording_id: Arc<RwLock<Option<uuid::Uuid>>>,
/// IPC server.
ipc_server: Option<IpcServer>,
/// Shutdown signal.
shutdown_tx: broadcast::Sender<()>,
}
impl Daemon {
/// Create a new daemon instance.
fn new(settings: Settings) -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
Self {
settings: Arc::new(RwLock::new(settings.clone())),
state_machine: Arc::new(DaemonStateMachine::new()),
lqp_client: Arc::new(LqpClient::new()),
recording_engine: Arc::new(RwLock::new(None)),
timeline_store: Arc::new(RwLock::new(TimelineStore::new())),
event_mapper: Arc::new(RwLock::new(EventMapper::new())),
current_recording_id: Arc::new(RwLock::new(None)),
ipc_server: None,
shutdown_tx,
}
}
/// Initialize the daemon.
async fn init(&mut self) -> Result<()> {
info!("Initializing record daemon v{}", record_daemon::VERSION);
// Initialize recording engine (blocking operation)
let settings = self.settings.read().clone();
let recording_engine = self.recording_engine.clone();
let result = tokio::task::spawn_blocking(move || {
let mut engine = RecordingEngine::new(settings);
if let Err(e) = engine.initialize() {
return Err(format!("Failed to initialize recording engine: {:?}", e));
}
*recording_engine.write() = Some(engine);
Ok::<_, String>(())
})
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(record_daemon::error::RecordingError::ObsInitError(e).into()),
Err(e) => {
return Err(record_daemon::error::RecordingError::ObsInitError(format!(
"spawn_blocking error during init: {:?}",
e
))
.into())
}
}
// Load existing recordings from disk
self.timeline_store.read().load_from_disk()?;
// Initialize IPC server
let ipc_config = IpcServerConfig {
socket_path: self
.settings
.read()
.daemon
.socket_path
.clone()
.unwrap_or_else(ipc::default_socket_path),
..Default::default()
};
let handlers = IpcHandlers::new(
self.settings.clone(),
self.recording_engine.clone(),
self.timeline_store.clone(),
Arc::new(RwLock::new(DaemonStatus::Idle)),
Arc::new(RwLock::new(false)),
);
let mut ipc_server = IpcServer::new(ipc_config, handlers);
ipc_server.start().await?;
self.ipc_server = Some(ipc_server);
info!("Daemon initialized successfully");
Ok(())
}
/// Run the main daemon loop.
async fn run(&mut self) -> Result<()> {
info!("Starting main daemon loop");
let mut shutdown_rx = self.shutdown_tx.subscribe();
let mut lockfile_watcher = LockfileWatcher::new();
// Spawn IPC server task - take ownership
if let Some(ipc_server) = self.ipc_server.take() {
tokio::spawn(async move {
if let Err(e) = ipc_server.run().await {
error!("IPC server error: {}", e);
}
});
}
// Subscribe to LQP events before the loop
let mut event_rx = self.lqp_client.subscribe();
// Main event loop
loop {
tokio::select! {
// Shutdown signal
_ = shutdown_rx.recv() => {
info!("Shutdown signal received");
break;
}
// Check for League Client
result = self.check_client(&mut lockfile_watcher) => {
if let Err(e) = result {
warn!("Client check error: {}", e);
}
}
// Process LQP events
event = event_rx.recv() => {
if let Ok(event) = event {
if let Err(e) = self.handle_game_event(event).await {
warn!("Event handling error: {}", e);
}
}
}
}
}
info!("Daemon loop ended");
Ok(())
}
/// Check for League Client connection.
async fn check_client(&self, watcher: &mut LockfileWatcher) -> Result<()> {
let poll_interval =
std::time::Duration::from_millis(self.settings.read().daemon.poll_interval_ms);
match watcher.check()? {
Some(true) => {
// Client started
info!("League Client detected");
self.state_machine
.transition(StateTransition::ClientStarted);
if let Some(creds) = watcher.credentials() {
self.lqp_client.connect(creds.clone()).await?;
self.lqp_client.start_event_listener().await?;
// Start polling for live client events (kills, deaths, objectives)
self.lqp_client.start_live_client_event_poller().await;
}
}
Some(false) => {
// Client stopped
info!("League Client stopped");
self.state_machine
.transition(StateTransition::ClientStopped);
self.lqp_client.disconnect().await;
}
None => {}
}
tokio::time::sleep(poll_interval).await;
Ok(())
}
/// Handle a game event.
async fn handle_game_event(&self, parsed: record_daemon::lqp::ParsedEvent) -> Result<()> {
let event_type = &parsed.event_type;
let raw_data = &parsed.raw_data;
let description = describe_event(event_type, raw_data);
info!(
"[EVENT_HANDLER] Game event received: type={}, desc={}",
event_type, description
);
// Handle pre-game data collection
match event_type.as_str() {
EVENT_TYPE_PHASE_CHANGE => {
let phase = raw_data
.as_str()
.or_else(|| raw_data.get("phase").and_then(|v| v.as_str()))
.unwrap_or("");
if phase == "ChampSelect" {
info!("[EVENT_HANDLER] Champion select started");
}
}
EVENT_TYPE_CHAMPION_PICK => {
let is_local = raw_data
.get("isLocalPlayer")
.or_else(|| raw_data.get("is_local_player"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_local {
let champion = raw_data
.get("championName")
.or_else(|| raw_data.get("champion_name"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
info!("[EVENT_HANDLER] Local player picked champion: {}", champion);
}
}
EVENT_TYPE_GAME_START => {
let game_id = raw_data.get("gameId").and_then(|v| v.as_u64()).unwrap_or(0);
info!("[EVENT_HANDLER] Game started with game_id: {}", game_id);
}
_ => {}
}
// Record event to timeline if recording (BEFORE state transition for GameEnd)
// This ensures GameEnd events are recorded while still in recording state
if self.state_machine.is_recording() {
if let Some((video_ts, game_ts)) = self.event_mapper.write().handle_event(event_type) {
// Get the current recording ID
if let Some(recording_id) = *self.current_recording_id.read() {
// Create a timestamped event with raw data
let timestamped_event = TimestampedEvent {
video_timestamp: video_ts,
game_timestamp: game_ts,
timestamp: chrono::Utc::now(),
event_type: event_type.clone(),
description: description.clone(),
raw_data: parsed.raw_data.clone(),
uri: parsed.uri.clone(),
};
// Add the event to the timeline store
if let Err(e) = self
.timeline_store
.write()
.add_event(recording_id, timestamped_event)
{
warn!("Failed to add event to timeline: {:?}", e);
} else {
debug!(
"Event added to timeline: video_ts={:?}, game_ts={:?}, type={}",
video_ts, game_ts, event_type
);
}
} else {
warn!("Recording in progress but no recording ID set");
}
}
}
// Process state transitions
if let Some(transition) = self.state_machine.process_event(event_type, raw_data) {
info!("[EVENT_HANDLER] State transition: {:?}", transition);
// Only process the transition if it's valid
if let Some(_new_state) = self.state_machine.transition(transition.clone()) {
// Handle recording start/stop
match transition {
StateTransition::GameStarted => {
// Extract game_id from raw_data
let game_id = raw_data.get("gameId").and_then(|v| v.as_u64()).unwrap_or(0);
info!(
"[EVENT_HANDLER] GameStarted transition - game_id: {}",
game_id
);
// If already recording, stop the current recording first
if self.state_machine.is_recording() {
info!(
"[EVENT_HANDLER] Stopping previous recording before starting new one"
);
if let Err(e) = self.stop_recording().await {
warn!("[EVENT_HANDLER] Failed to stop previous recording: {}", e);
}
}
info!("[EVENT_HANDLER] Calling start_recording...");
// Fetch raw API data in parallel
let (
raw_session,
raw_summoner,
raw_champion_select,
raw_rune_page,
raw_live_client_data,
) = tokio::join!(
self.lqp_client.fetch_raw_session(),
self.lqp_client.fetch_raw_summoner(),
self.lqp_client.fetch_raw_champion_select(),
self.lqp_client.fetch_raw_rune_page(),
self.lqp_client.fetch_raw_live_client_data()
);
// Build game metadata for timeline with raw JSON
let metadata_update = record_daemon::timeline::MetadataUpdate {
game_id: Some(game_id),
raw_session: raw_session.ok(),
raw_summoner: raw_summoner.ok(),
raw_champion_select: raw_champion_select.ok(),
raw_rune_page: raw_rune_page.ok(),
raw_live_client_data: raw_live_client_data.ok(),
raw_end_game_stats: None,
};
if let Err(e) = self
.start_recording_with_metadata(game_id, metadata_update)
.await
{
error!("[EVENT_HANDLER] Failed to start recording: {}", e);
// Don't propagate error - keep daemon running
} else {
info!("[EVENT_HANDLER] start_recording completed successfully");
}
}
StateTransition::GameEnded => {
info!("[EVENT_HANDLER] GameEnded transition");
// Fetch raw end-of-game stats from API
let raw_end_game_stats =
self.lqp_client.fetch_raw_end_game_stats().await.ok();
info!(
"[EVENT_HANDLER] Game end stats from API: {:?}",
raw_end_game_stats.is_some()
);
if let Err(e) = self.stop_recording_with_metadata(raw_end_game_stats).await
{
error!("[EVENT_HANDLER] Failed to stop recording: {}", e);
// Don't propagate error - keep daemon running
}
}
_ => {}
}
} else {
warn!(
"[EVENT_HANDLER] State transition rejected: {:?}",
transition
);
}
}
info!("[EVENT_HANDLER] Event handling complete");
Ok(())
}
/// Start recording with game metadata.
async fn start_recording_with_metadata(
&self,
game_id: u64,
metadata_update: record_daemon::timeline::MetadataUpdate,
) -> Result<()> {
info!(
"Daemon::start_recording_with_metadata called - game {}",
game_id
);
// Create a recording entry in the timeline store first
let recording_id = self
.timeline_store
.write()
.start_recording_entry(Some(game_id), None);
// Update metadata immediately with game start info
if let Err(e) = self
.timeline_store
.write()
.update_metadata(recording_id, metadata_update)
{
warn!("Failed to update recording metadata: {:?}", e);
}
// Store the recording ID for event tracking
*self.current_recording_id.write() = Some(recording_id);
info!("Created recording entry with ID: {}", recording_id);
// Clone Arc references for use in spawn_blocking
let recording_engine = self.recording_engine.clone();
let event_mapper = self.event_mapper.clone();
// Use spawn_blocking to avoid blocking the async runtime
tokio::task::spawn_blocking(move || {
info!("Acquiring recording engine write lock...");
let mut engine_guard = recording_engine.write();
info!("Recording engine lock acquired");
if let Some(ref mut engine) = *engine_guard {
info!("Calling engine.start_recording...");
engine.start_recording(Some(game_id), None)?;
info!("engine.start_recording returned successfully");
event_mapper.write().start();
info!("Event mapper started");
} else {
warn!("Recording engine is None!");
}
info!("Daemon::start_recording_with_metadata completed successfully");
Ok(())
})
.await
.map_err(|e| {
record_daemon::error::RecordingError::ObsInitError(format!(
"spawn_blocking error: {:?}",
e
))
})?
}
/// Stop recording.
async fn stop_recording(&self) -> Result<()> {
self.stop_recording_with_metadata(None).await
}
/// Stop recording with optional raw game end stats JSON.
async fn stop_recording_with_metadata(
&self,
raw_end_game_stats: Option<serde_json::Value>,
) -> Result<()> {
info!("Stopping recording");
// Get the current recording ID and clear it
let recording_id = self.current_recording_id.write().take();
// Clone Arc references for use in spawn_blocking
let recording_engine = self.recording_engine.clone();
let event_mapper = self.event_mapper.clone();
let timeline_store = self.timeline_store.clone();
// Use spawn_blocking to avoid blocking the async runtime
tokio::task::spawn_blocking(move || {
let mut engine_guard = recording_engine.write();
if let Some(ref mut engine) = *engine_guard {
let result = engine.stop_recording()?;
event_mapper.write().stop();
// Use the existing recording ID if available, otherwise create new
let recording_id = match recording_id {
Some(id) => {
// Finalize the existing recording entry
if let Err(e) = timeline_store.write().finalize_recording(id, result) {
warn!("Failed to finalize recording: {}", e);
}
id
}
None => {
// Fallback: create new recording entry (legacy behavior)
timeline_store.write().add_recording(result)?
}
};
// Update metadata with raw end game stats
let update = record_daemon::timeline::MetadataUpdate {
raw_end_game_stats,
..Default::default()
};
// Apply the update
if let Err(e) = timeline_store.write().update_metadata(recording_id, update) {
warn!("Failed to update recording metadata: {}", e);
}
}
Ok(())
})
.await
.map_err(|e| {
record_daemon::error::RecordingError::ObsInitError(format!(
"spawn_blocking error: {:?}",
e
))
})?
}
/// Shutdown the daemon.
async fn shutdown(&mut self) -> Result<()> {
info!("Shutting down daemon");
// Stop recording if active
if self.state_machine.is_recording() {
self.stop_recording().await?;
}
// Stop IPC server
if let Some(ref mut ipc_server) = self.ipc_server {
ipc_server.stop().await?;
}
// Shutdown recording engine (blocking operation)
let recording_engine = self.recording_engine.clone();
let result = tokio::task::spawn_blocking(move || {
if let Some(ref mut engine) = *recording_engine.write() {
if let Err(e) = engine.shutdown() {
return Err(format!("Failed to shutdown recording engine: {:?}", e));
}
}
Ok::<_, String>(())
})
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(record_daemon::error::RecordingError::ObsInitError(e).into()),
Err(e) => {
return Err(record_daemon::error::RecordingError::ObsInitError(format!(
"spawn_blocking error during shutdown: {:?}",
e
))
.into())
}
}
info!("Daemon shutdown complete");
Ok(())
}
}
/// Initialize logging.
fn init_logging(level: &str) {
let filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level));
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer())
.init();
// Set up panic hook to log panics
std::panic::set_hook(Box::new(|panic_info| {
let location = panic_info
.location()
.map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()))
.unwrap_or_else(|| "unknown location".to_string());
let message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic".to_string()
};
error!("PANIC at {}: {}", location, message);
eprintln!("PANIC at {}: {}", location, message);
}));
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// Initialize logging
init_logging(&args.log_level);
info!("Record Daemon v{} starting", record_daemon::VERSION);
// Bootstrap OBS - download and extract if needed
info!("Bootstrapping OBS...");
let bootstrap_options = ObsBootstrapperOptions::default().set_update(false);
let bootstrap_result = ObsBootstrapper::bootstrap(&bootstrap_options).await;
match bootstrap_result {
Ok(ObsBootstrapperResult::Restart) => {
info!("OBS has been downloaded and extracted.");
return Ok(());
}
Ok(ObsBootstrapperResult::None) => {
info!("OBS bootstrap complete, continuing...");
}
Err(e) => {
error!("Failed to bootstrap OBS: {:?}", e);
return Err(record_daemon::error::RecordingError::ObsInitError(format!(
"OBS bootstrap failed: {:?}",
e
))
.into());
}
}
// Load configuration
let settings = if let Some(config_path) = args.config {
config::ConfigPersistence::new(config_path).load()?
} else {
config::load_config()?
};
// Create and run daemon
let mut daemon = Daemon::new(settings);
#[cfg(unix)]
{
// Handle shutdown signals
let shutdown_tx = daemon.shutdown_tx.clone();
use futures::StreamExt;
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT])?;
let tx = shutdown_tx.clone();
tokio::spawn(async move {
if let Some(signal) = signals.next().await {
info!("Received signal {:?}", signal);
let _ = tx.send(());
}
});
}
// Initialize and run
if let Err(e) = daemon.init().await {
error!("Failed to initialize daemon: {}", e);
return Err(e);
}
// Run main loop
let result = daemon.run().await;
// Cleanup
if let Err(e) = daemon.shutdown().await {
error!("Error during shutdown: {}", e);
}
result
}