//! IPC server for communication with the Tauri app. //! //! Uses Unix domain sockets on Linux and named pipes on Windows. use std::path::PathBuf; use std::sync::Arc; use tokio::sync::{broadcast, RwLock}; use tracing::{debug, error, info, trace, warn}; use super::handlers::IpcHandlers; use super::protocol::{IpcMessage, IpcNotification, IpcResponse, MessageType}; use crate::error::{IpcError, Result}; /// IPC server configuration. #[derive(Debug, Clone)] pub struct IpcServerConfig { /// Socket path (Linux) or pipe name (Windows). pub socket_path: PathBuf, /// Maximum connections. pub max_connections: usize, /// Connection timeout in seconds. pub timeout_secs: u64, } impl Default for IpcServerConfig { fn default() -> Self { Self { socket_path: super::default_socket_path(), max_connections: 10, timeout_secs: 30, } } } // ============================================================================ // Linux-specific types and implementation // ============================================================================ #[cfg(target_os = "linux")] use futures::SinkExt; #[cfg(target_os = "linux")] use futures::StreamExt; #[cfg(target_os = "linux")] use tokio_util::codec::{Framed, LinesCodec}; /// Platform-specific listener type. #[cfg(target_os = "linux")] type PlatformListener = tokio::net::UnixListener; /// Platform-specific stream type. #[cfg(target_os = "linux")] type PlatformStream = tokio::net::UnixStream; /// IPC server for communication with Tauri app (Linux). #[cfg(target_os = "linux")] pub struct IpcServer { /// Server configuration. config: IpcServerConfig, /// Platform-specific listener. listener: Option, /// Command handlers. handlers: Arc, /// Notification broadcaster. notification_tx: broadcast::Sender, /// Shutdown signal. shutdown: Arc>, /// Connected clients count. client_count: Arc>, } #[cfg(target_os = "linux")] impl IpcServer { /// Create a new IPC server. pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self { let (notification_tx, _) = broadcast::channel(64); Self { config, listener: None, handlers: Arc::new(handlers), notification_tx, shutdown: Arc::new(RwLock::new(false)), client_count: Arc::new(RwLock::new(0)), } } /// Get the socket path. pub fn socket_path(&self) -> &PathBuf { &self.config.socket_path } /// Get a subscriber for notifications. pub fn subscribe(&self) -> broadcast::Receiver { self.notification_tx.subscribe() } /// Broadcast a notification to all connected clients. pub fn broadcast(&self, notification: IpcNotification) { if self.notification_tx.send(notification).is_err() { trace!("No clients connected to receive notification"); } } /// Start the IPC server. pub async fn start(&mut self) -> Result<()> { // Remove existing socket if present if self.config.socket_path.exists() { std::fs::remove_file(&self.config.socket_path).map_err(IpcError::BindError)?; } // Ensure parent directory exists if let Some(parent) = self.config.socket_path.parent() { if !parent.exists() { std::fs::create_dir_all(parent).map_err(IpcError::BindError)?; } } info!("Starting IPC server at {:?}", self.config.socket_path); let listener = PlatformListener::bind(&self.config.socket_path).map_err(IpcError::BindError)?; self.listener = Some(listener); info!("IPC server started successfully"); Ok(()) } /// Run the server loop. pub async fn run(&self) -> Result<()> { let listener = self.listener.as_ref().ok_or_else(|| { IpcError::BindError(std::io::Error::new( std::io::ErrorKind::NotConnected, "Server not started", )) })?; info!("IPC server listening for connections"); loop { if *self.shutdown.read().await { info!("IPC server shutting down"); break; } // Accept new connection match listener.accept().await { Ok((stream, addr)) => { self.handle_new_connection(stream, format!("{:?}", addr)) .await; } Err(e) => { warn!("Failed to accept connection: {}", e); } } } Ok(()) } /// Handle a new connection (Linux). async fn handle_new_connection(&self, stream: PlatformStream, addr: String) { let client_count = self.client_count.clone(); let max_connections = self.config.max_connections; // Check connection limit let current_count = *client_count.read().await; if current_count >= max_connections { warn!( "Connection limit reached, rejecting connection from {}", addr ); return; } debug!("New IPC client connected from {}", addr); // Spawn handler for this connection let handlers = self.handlers.clone(); let shutdown = self.shutdown.clone(); let notification_tx = self.notification_tx.clone(); tokio::spawn(async move { *client_count.write().await += 1; if let Err(e) = Self::handle_connection(stream, handlers, shutdown.clone(), notification_tx).await { error!("Connection error: {}", e); } *client_count.write().await -= 1; debug!("Client disconnected"); }); } /// Handle a single client connection (Linux). async fn handle_connection( stream: PlatformStream, handlers: Arc, shutdown: Arc>, _notification_tx: broadcast::Sender, ) -> Result<()> { let framed = Framed::new(stream, LinesCodec::new()); let (mut writer, mut reader) = framed.split(); while !*shutdown.read().await { // Read message let line = tokio::time::timeout(std::time::Duration::from_secs(30), reader.next()) .await .map_err(|_| { IpcError::ReadError(std::io::Error::new( std::io::ErrorKind::TimedOut, "Read timeout", )) })?; let line = match line { Some(Ok(line)) => line, Some(Err(e)) => { warn!("Error reading from client: {}", e); break; } None => { debug!("Client disconnected"); break; } }; trace!("Received: {}", line); // Parse message let message = match IpcMessage::from_json(&line) { Ok(msg) => msg, Err(e) => { warn!("Failed to parse message: {}", e); let response = IpcResponse::error(uuid::Uuid::nil(), format!("Parse error: {}", e)); writer .send(response.to_json()?) .await .map_err(|e| IpcError::CodecError(e.to_string()))?; continue; } }; // Handle message let response = match message.message_type { MessageType::Request => handlers.handle(message).await, MessageType::Notification => { // Notifications don't get responses trace!("Received notification: {:?}", message); continue; } MessageType::Response => { // We shouldn't receive responses warn!("Unexpected response message from client"); continue; } }; // Send response let response_json = response.to_json()?; writer .send(response_json) .await .map_err(|e| IpcError::CodecError(e.to_string()))?; } Ok(()) } /// Stop the IPC server. pub async fn stop(&mut self) -> Result<()> { *self.shutdown.write().await = true; // Remove socket file if self.config.socket_path.exists() { std::fs::remove_file(&self.config.socket_path).map_err(IpcError::BindError)?; } info!("IPC server stopped"); Ok(()) } } #[cfg(target_os = "linux")] impl Clone for IpcServer { fn clone(&self) -> Self { Self { config: self.config.clone(), listener: None, // Can't clone the listener handlers: self.handlers.clone(), notification_tx: self.notification_tx.clone(), shutdown: self.shutdown.clone(), client_count: self.client_count.clone(), } } } // ============================================================================ // Windows-specific types and implementation // ============================================================================ #[cfg(target_os = "windows")] use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; /// IPC server for communication with Tauri app (Windows). #[cfg(target_os = "windows")] pub struct IpcServer { /// Server configuration. config: IpcServerConfig, /// Command handlers. handlers: Arc, /// Notification broadcaster. notification_tx: broadcast::Sender, /// Shutdown signal. shutdown: Arc>, /// Connected clients count. client_count: Arc>, } #[cfg(target_os = "windows")] impl IpcServer { /// Create a new IPC server. pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self { let (notification_tx, _) = broadcast::channel(64); Self { config, handlers: Arc::new(handlers), notification_tx, shutdown: Arc::new(RwLock::new(false)), client_count: Arc::new(RwLock::new(0)), } } /// Get the socket path. pub fn socket_path(&self) -> &PathBuf { &self.config.socket_path } /// Get a subscriber for notifications. pub fn subscribe(&self) -> broadcast::Receiver { self.notification_tx.subscribe() } /// Broadcast a notification to all connected clients. pub fn broadcast(&self, notification: IpcNotification) { if self.notification_tx.send(notification).is_err() { trace!("No clients connected to receive notification"); } } /// Start the IPC server (Windows). pub async fn start(&mut self) -> Result<()> { // On Windows, we don't need to bind in advance for named pipes // The server will create pipe instances on demand info!("Starting IPC server at {:?}", self.config.socket_path); info!("IPC server started successfully"); Ok(()) } /// Run the server loop (Windows). pub async fn run(&self) -> Result<()> { use tokio::net::windows::named_pipe::ServerOptions; info!( "IPC server listening for connections on {:?}", self.config.socket_path ); let pipe_name: String = self.config.socket_path.to_string_lossy().into_owned(); loop { if *self.shutdown.read().await { info!("IPC server shutting down"); break; } // Create a new named pipe instance let server = ServerOptions::new() .first_pipe_instance(false) .reject_remote_clients(true) .create(&pipe_name) .map_err(IpcError::BindError)?; // Wait for a client to connect match server.connect().await { Ok(()) => { debug!("New IPC client connected"); // Spawn handler for this connection let handlers = self.handlers.clone(); let shutdown = self.shutdown.clone(); let notification_tx = self.notification_tx.clone(); let client_count = self.client_count.clone(); let max_connections = self.config.max_connections; // Check connection limit let current_count = *client_count.read().await; if current_count >= max_connections { warn!("Connection limit reached, rejecting connection"); continue; } tokio::spawn(async move { *client_count.write().await += 1; if let Err(e) = Self::handle_connection_windows( server, handlers, shutdown.clone(), notification_tx, ) .await { error!("Connection error: {}", e); } *client_count.write().await -= 1; debug!("Client disconnected"); }); } Err(e) => { warn!("Failed to accept connection: {}", e); } } } Ok(()) } /// Handle a single client connection (Windows). async fn handle_connection_windows( server: tokio::net::windows::named_pipe::NamedPipeServer, handlers: Arc, shutdown: Arc>, _notification_tx: broadcast::Sender, ) -> Result<()> { let (reader, mut writer) = tokio::io::split(server); let mut lines = BufReader::new(reader).lines(); while !*shutdown.read().await { // Read message with timeout let line = tokio::time::timeout(std::time::Duration::from_secs(30), lines.next_line()) .await .map_err(|_| { IpcError::ReadError(std::io::Error::new( std::io::ErrorKind::TimedOut, "Read timeout", )) })?; let line = match line { Ok(Some(line)) => line, Ok(None) => { debug!("Client disconnected"); break; } Err(e) => { warn!("Error reading from client: {}", e); break; } }; trace!("Received: {}", line); // Parse message let message = match IpcMessage::from_json(&line) { Ok(msg) => msg, Err(e) => { warn!("Failed to parse message: {}", e); let response = IpcResponse::error(uuid::Uuid::nil(), format!("Parse error: {}", e)); writer .write_all(response.to_json()?.as_bytes()) .await .map_err(IpcError::WriteError)?; writer .write_all(b"\n") .await .map_err(IpcError::WriteError)?; continue; } }; // Handle message let response = match message.message_type { MessageType::Request => handlers.handle(message).await, MessageType::Notification => { // Notifications don't get responses trace!("Received notification: {:?}", message); continue; } MessageType::Response => { // We shouldn't receive responses warn!("Unexpected response message from client"); continue; } }; // Send response let response_json = response.to_json()?; writer .write_all(response_json.as_bytes()) .await .map_err(IpcError::WriteError)?; writer .write_all(b"\n") .await .map_err(IpcError::WriteError)?; } Ok(()) } /// Stop the IPC server. pub async fn stop(&mut self) -> Result<()> { *self.shutdown.write().await = true; info!("IPC server stopped"); Ok(()) } } #[cfg(target_os = "windows")] impl Clone for IpcServer { fn clone(&self) -> Self { Self { config: self.config.clone(), handlers: self.handlers.clone(), notification_tx: self.notification_tx.clone(), shutdown: self.shutdown.clone(), client_count: self.client_count.clone(), } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_ipc_server_config_default() { let config = IpcServerConfig::default(); assert!(config .socket_path .to_string_lossy() .contains("record-daemon")); assert_eq!(config.max_connections, 10); assert_eq!(config.timeout_secs, 30); } }