From f062cee010958939e9c465ebc8f0029221d1e181 Mon Sep 17 00:00:00 2001 From: Valentin Haudiquet Date: Thu, 19 Mar 2026 18:17:31 +0100 Subject: [PATCH] record-daemon: fix windows build --- record-daemon/Cargo.toml | 9 +- record-daemon/src/ipc/server.rs | 286 ++++++++++++++++++++------------ 2 files changed, 183 insertions(+), 112 deletions(-) diff --git a/record-daemon/Cargo.toml b/record-daemon/Cargo.toml index 7641aae..62646f1 100644 --- a/record-daemon/Cargo.toml +++ b/record-daemon/Cargo.toml @@ -54,16 +54,17 @@ directories = "5" # CLI (for debugging and control) clap = { version = "4", features = ["derive"] } -# Signal handling for graceful shutdown -signal-hook = "0.3" -signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } - # Base64 for LQP authentication base64 = "0.22" # Regex for lockfile parsing regex = "1" +# Signal handling for graceful shutdown (Unix only) +[target.'cfg(unix)'.dependencies] +signal-hook = "0.3" +signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } + [dev-dependencies] # Testing utilities tokio-test = "0.4" diff --git a/record-daemon/src/ipc/server.rs b/record-daemon/src/ipc/server.rs index 9fda977..13275cb 100644 --- a/record-daemon/src/ipc/server.rs +++ b/record-daemon/src/ipc/server.rs @@ -5,10 +5,7 @@ use std::path::PathBuf; use std::sync::Arc; -use futures::SinkExt; -use futures::StreamExt; use tokio::sync::{broadcast, RwLock}; -use tokio_util::codec::{Framed, LinesCodec}; use tracing::{debug, error, info, trace, warn}; use super::handlers::IpcHandlers; @@ -36,6 +33,17 @@ impl Default for IpcServerConfig { } } +// ============================================================================ +// Linux-specific types and implementation +// ============================================================================ + +#[cfg(target_os = "linux")] +use futures::SinkExt; +#[cfg(target_os = "linux")] +use futures::StreamExt; +#[cfg(target_os = "linux")] +use tokio_util::codec::{Framed, LinesCodec}; + /// Platform-specific listener type. #[cfg(target_os = "linux")] type PlatformListener = tokio::net::UnixListener; @@ -44,7 +52,8 @@ type PlatformListener = tokio::net::UnixListener; #[cfg(target_os = "linux")] type PlatformStream = tokio::net::UnixStream; -/// IPC server for communication with Tauri app. +/// IPC server for communication with Tauri app (Linux). +#[cfg(target_os = "linux")] pub struct IpcServer { /// Server configuration. config: IpcServerConfig, @@ -60,6 +69,7 @@ pub struct IpcServer { client_count: Arc>, } +#[cfg(target_os = "linux")] impl IpcServer { /// Create a new IPC server. pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self { @@ -93,7 +103,6 @@ impl IpcServer { } /// Start the IPC server. - #[cfg(target_os = "linux")] pub async fn start(&mut self) -> Result<()> { // Remove existing socket if present if self.config.socket_path.exists() { @@ -117,22 +126,7 @@ impl IpcServer { Ok(()) } - /// Start the IPC server (Windows). - #[cfg(target_os = "windows")] - pub async fn start(&mut self) -> Result<()> { - // On Windows, we don't need to bind in advance for named pipes - // The server will create pipe instances on demand - info!("Starting IPC server at {:?}", self.config.socket_path); - - // Mark as "started" - actual pipe creation happens in accept loop - self.listener = None; // We'll create pipes on demand - - info!("IPC server started successfully"); - Ok(()) - } - /// Run the server loop. - #[cfg(target_os = "linux")] pub async fn run(&self) -> Result<()> { let listener = self.listener.as_ref().ok_or_else(|| { IpcError::BindError(std::io::Error::new( @@ -163,77 +157,7 @@ impl IpcServer { Ok(()) } - /// Run the server loop (Windows). - #[cfg(target_os = "windows")] - pub async fn run(&self) -> Result<()> { - use tokio::net::windows::named_pipe::{ServerOptions, NamedPipeServer}; - use std::os::windows::io::AsRawHandle; - - info!("IPC server listening for connections on {:?}", self.config.socket_path); - - let pipe_name = self.config.socket_path.to_string_lossy(); - - loop { - if *self.shutdown.read().await { - info!("IPC server shutting down"); - break; - } - - // Create a new named pipe instance - let server = ServerOptions::new() - .first_pipe_instance(false) - .reject_remote_clients(true) - .create(&pipe_name) - .map_err(IpcError::BindError)?; - - // Wait for a client to connect - match server.connect().await { - Ok(()) => { - debug!("New IPC client connected"); - - // Spawn handler for this connection - let handlers = self.handlers.clone(); - let shutdown = self.shutdown.clone(); - let notification_tx = self.notification_tx.clone(); - let client_count = self.client_count.clone(); - let max_connections = self.config.max_connections; - - // Check connection limit - let current_count = *client_count.read().await; - if current_count >= max_connections { - warn!("Connection limit reached, rejecting connection"); - continue; - } - - tokio::spawn(async move { - *client_count.write().await += 1; - - if let Err(e) = Self::handle_connection_windows( - server, - handlers, - shutdown.clone(), - notification_tx, - ) - .await - { - error!("Connection error: {}", e); - } - - *client_count.write().await -= 1; - debug!("Client disconnected"); - }); - } - Err(e) => { - warn!("Failed to accept connection: {}", e); - } - } - } - - Ok(()) - } - /// Handle a new connection (Linux). - #[cfg(target_os = "linux")] async fn handle_new_connection(&self, stream: PlatformStream, addr: String) { let client_count = self.client_count.clone(); let max_connections = self.config.max_connections; @@ -269,7 +193,6 @@ impl IpcServer { } /// Handle a single client connection (Linux). - #[cfg(target_os = "linux")] async fn handle_connection( stream: PlatformStream, handlers: Arc, @@ -342,22 +265,179 @@ impl IpcServer { Ok(()) } + /// Stop the IPC server. + pub async fn stop(&mut self) -> Result<()> { + *self.shutdown.write().await = true; + + // Remove socket file + if self.config.socket_path.exists() { + std::fs::remove_file(&self.config.socket_path) + .map_err(IpcError::BindError)?; + } + + info!("IPC server stopped"); + Ok(()) + } +} + +#[cfg(target_os = "linux")] +impl Clone for IpcServer { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + listener: None, // Can't clone the listener + handlers: self.handlers.clone(), + notification_tx: self.notification_tx.clone(), + shutdown: self.shutdown.clone(), + client_count: self.client_count.clone(), + } + } +} + +// ============================================================================ +// Windows-specific types and implementation +// ============================================================================ + +#[cfg(target_os = "windows")] +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + +/// IPC server for communication with Tauri app (Windows). +#[cfg(target_os = "windows")] +pub struct IpcServer { + /// Server configuration. + config: IpcServerConfig, + /// Command handlers. + handlers: Arc, + /// Notification broadcaster. + notification_tx: broadcast::Sender, + /// Shutdown signal. + shutdown: Arc>, + /// Connected clients count. + client_count: Arc>, +} + +#[cfg(target_os = "windows")] +impl IpcServer { + /// Create a new IPC server. + pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self { + let (notification_tx, _) = broadcast::channel(64); + + Self { + config, + handlers: Arc::new(handlers), + notification_tx, + shutdown: Arc::new(RwLock::new(false)), + client_count: Arc::new(RwLock::new(0)), + } + } + + /// Get the socket path. + pub fn socket_path(&self) -> &PathBuf { + &self.config.socket_path + } + + /// Get a subscriber for notifications. + pub fn subscribe(&self) -> broadcast::Receiver { + self.notification_tx.subscribe() + } + + /// Broadcast a notification to all connected clients. + pub fn broadcast(&self, notification: IpcNotification) { + if self.notification_tx.send(notification).is_err() { + trace!("No clients connected to receive notification"); + } + } + + /// Start the IPC server (Windows). + pub async fn start(&mut self) -> Result<()> { + // On Windows, we don't need to bind in advance for named pipes + // The server will create pipe instances on demand + info!("Starting IPC server at {:?}", self.config.socket_path); + + info!("IPC server started successfully"); + Ok(()) + } + + /// Run the server loop (Windows). + pub async fn run(&self) -> Result<()> { + use tokio::net::windows::named_pipe::ServerOptions; + + info!("IPC server listening for connections on {:?}", self.config.socket_path); + + let pipe_name: String = self.config.socket_path.to_string_lossy().into_owned(); + + loop { + if *self.shutdown.read().await { + info!("IPC server shutting down"); + break; + } + + // Create a new named pipe instance + let server = ServerOptions::new() + .first_pipe_instance(false) + .reject_remote_clients(true) + .create(&pipe_name) + .map_err(IpcError::BindError)?; + + // Wait for a client to connect + match server.connect().await { + Ok(()) => { + debug!("New IPC client connected"); + + // Spawn handler for this connection + let handlers = self.handlers.clone(); + let shutdown = self.shutdown.clone(); + let notification_tx = self.notification_tx.clone(); + let client_count = self.client_count.clone(); + let max_connections = self.config.max_connections; + + // Check connection limit + let current_count = *client_count.read().await; + if current_count >= max_connections { + warn!("Connection limit reached, rejecting connection"); + continue; + } + + tokio::spawn(async move { + *client_count.write().await += 1; + + if let Err(e) = Self::handle_connection_windows( + server, + handlers, + shutdown.clone(), + notification_tx, + ) + .await + { + error!("Connection error: {}", e); + } + + *client_count.write().await -= 1; + debug!("Client disconnected"); + }); + } + Err(e) => { + warn!("Failed to accept connection: {}", e); + } + } + } + + Ok(()) + } + /// Handle a single client connection (Windows). - #[cfg(target_os = "windows")] async fn handle_connection_windows( server: tokio::net::windows::named_pipe::NamedPipeServer, handlers: Arc, shutdown: Arc>, _notification_tx: broadcast::Sender, ) -> Result<()> { - use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; - - let (mut reader, mut writer) = tokio::io::split(server); - let mut reader = BufReader::new(reader).lines(); + let (reader, mut writer) = tokio::io::split(server); + let mut lines = BufReader::new(reader).lines(); while !*shutdown.read().await { // Read message with timeout - let line = tokio::time::timeout(std::time::Duration::from_secs(30), reader.next_line()) + let line = tokio::time::timeout(std::time::Duration::from_secs(30), lines.next_line()) .await .map_err(|_| { IpcError::ReadError(std::io::Error::new( @@ -422,26 +502,16 @@ impl IpcServer { /// Stop the IPC server. pub async fn stop(&mut self) -> Result<()> { *self.shutdown.write().await = true; - - // Remove socket file (Linux only) - #[cfg(target_os = "linux")] - { - if self.config.socket_path.exists() { - std::fs::remove_file(&self.config.socket_path) - .map_err(IpcError::BindError)?; - } - } - info!("IPC server stopped"); Ok(()) } } +#[cfg(target_os = "windows")] impl Clone for IpcServer { fn clone(&self) -> Self { Self { config: self.config.clone(), - listener: None, // Can't clone the listener handlers: self.handlers.clone(), notification_tx: self.notification_tx.clone(), shutdown: self.shutdown.clone(),