record-daemon: initial commit
This commit is contained in:
464
record-daemon/src/ipc/server.rs
Normal file
464
record-daemon/src/ipc/server.rs
Normal file
@@ -0,0 +1,464 @@
|
||||
//! IPC server for communication with the Tauri app.
|
||||
//!
|
||||
//! Uses Unix domain sockets on Linux and named pipes on Windows.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tokio_util::codec::{Framed, LinesCodec};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
use super::handlers::IpcHandlers;
|
||||
use super::protocol::{IpcMessage, IpcNotification, IpcResponse, MessageType};
|
||||
use crate::error::{IpcError, Result};
|
||||
|
||||
/// IPC server configuration.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IpcServerConfig {
|
||||
/// Socket path (Linux) or pipe name (Windows).
|
||||
pub socket_path: PathBuf,
|
||||
/// Maximum connections.
|
||||
pub max_connections: usize,
|
||||
/// Connection timeout in seconds.
|
||||
pub timeout_secs: u64,
|
||||
}
|
||||
|
||||
impl Default for IpcServerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
socket_path: super::default_socket_path(),
|
||||
max_connections: 10,
|
||||
timeout_secs: 30,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Platform-specific listener type.
|
||||
#[cfg(target_os = "linux")]
|
||||
type PlatformListener = tokio::net::UnixListener;
|
||||
|
||||
/// Platform-specific stream type.
|
||||
#[cfg(target_os = "linux")]
|
||||
type PlatformStream = tokio::net::UnixStream;
|
||||
|
||||
/// IPC server for communication with Tauri app.
|
||||
pub struct IpcServer {
|
||||
/// Server configuration.
|
||||
config: IpcServerConfig,
|
||||
/// Platform-specific listener.
|
||||
listener: Option<PlatformListener>,
|
||||
/// Command handlers.
|
||||
handlers: Arc<IpcHandlers>,
|
||||
/// Notification broadcaster.
|
||||
notification_tx: broadcast::Sender<IpcNotification>,
|
||||
/// Shutdown signal.
|
||||
shutdown: Arc<RwLock<bool>>,
|
||||
/// Connected clients count.
|
||||
client_count: Arc<RwLock<usize>>,
|
||||
}
|
||||
|
||||
impl IpcServer {
|
||||
/// Create a new IPC server.
|
||||
pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self {
|
||||
let (notification_tx, _) = broadcast::channel(64);
|
||||
|
||||
Self {
|
||||
config,
|
||||
listener: None,
|
||||
handlers: Arc::new(handlers),
|
||||
notification_tx,
|
||||
shutdown: Arc::new(RwLock::new(false)),
|
||||
client_count: Arc::new(RwLock::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the socket path.
|
||||
pub fn socket_path(&self) -> &PathBuf {
|
||||
&self.config.socket_path
|
||||
}
|
||||
|
||||
/// Get a subscriber for notifications.
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<IpcNotification> {
|
||||
self.notification_tx.subscribe()
|
||||
}
|
||||
|
||||
/// Broadcast a notification to all connected clients.
|
||||
pub fn broadcast(&self, notification: IpcNotification) {
|
||||
if self.notification_tx.send(notification).is_err() {
|
||||
trace!("No clients connected to receive notification");
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the IPC server.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
// Remove existing socket if present
|
||||
if self.config.socket_path.exists() {
|
||||
std::fs::remove_file(&self.config.socket_path).map_err(IpcError::BindError)?;
|
||||
}
|
||||
|
||||
// Ensure parent directory exists
|
||||
if let Some(parent) = self.config.socket_path.parent() {
|
||||
if !parent.exists() {
|
||||
std::fs::create_dir_all(parent).map_err(IpcError::BindError)?;
|
||||
}
|
||||
}
|
||||
|
||||
info!("Starting IPC server at {:?}", self.config.socket_path);
|
||||
|
||||
let listener = PlatformListener::bind(&self.config.socket_path).map_err(IpcError::BindError)?;
|
||||
|
||||
self.listener = Some(listener);
|
||||
|
||||
info!("IPC server started successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start the IPC server (Windows).
|
||||
#[cfg(target_os = "windows")]
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
// On Windows, we don't need to bind in advance for named pipes
|
||||
// The server will create pipe instances on demand
|
||||
info!("Starting IPC server at {:?}", self.config.socket_path);
|
||||
|
||||
// Mark as "started" - actual pipe creation happens in accept loop
|
||||
self.listener = None; // We'll create pipes on demand
|
||||
|
||||
info!("IPC server started successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the server loop.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub async fn run(&self) -> Result<()> {
|
||||
let listener = self.listener.as_ref().ok_or_else(|| {
|
||||
IpcError::BindError(std::io::Error::new(
|
||||
std::io::ErrorKind::NotConnected,
|
||||
"Server not started",
|
||||
))
|
||||
})?;
|
||||
|
||||
info!("IPC server listening for connections");
|
||||
|
||||
loop {
|
||||
if *self.shutdown.read().await {
|
||||
info!("IPC server shutting down");
|
||||
break;
|
||||
}
|
||||
|
||||
// Accept new connection
|
||||
match listener.accept().await {
|
||||
Ok((stream, addr)) => {
|
||||
self.handle_new_connection(stream, format!("{:?}", addr)).await;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to accept connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the server loop (Windows).
|
||||
#[cfg(target_os = "windows")]
|
||||
pub async fn run(&self) -> Result<()> {
|
||||
use tokio::net::windows::named_pipe::{ServerOptions, NamedPipeServer};
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
|
||||
info!("IPC server listening for connections on {:?}", self.config.socket_path);
|
||||
|
||||
let pipe_name = self.config.socket_path.to_string_lossy();
|
||||
|
||||
loop {
|
||||
if *self.shutdown.read().await {
|
||||
info!("IPC server shutting down");
|
||||
break;
|
||||
}
|
||||
|
||||
// Create a new named pipe instance
|
||||
let server = ServerOptions::new()
|
||||
.first_pipe_instance(false)
|
||||
.reject_remote_clients(true)
|
||||
.create(&pipe_name)
|
||||
.map_err(IpcError::BindError)?;
|
||||
|
||||
// Wait for a client to connect
|
||||
match server.connect().await {
|
||||
Ok(()) => {
|
||||
debug!("New IPC client connected");
|
||||
|
||||
// Spawn handler for this connection
|
||||
let handlers = self.handlers.clone();
|
||||
let shutdown = self.shutdown.clone();
|
||||
let notification_tx = self.notification_tx.clone();
|
||||
let client_count = self.client_count.clone();
|
||||
let max_connections = self.config.max_connections;
|
||||
|
||||
// Check connection limit
|
||||
let current_count = *client_count.read().await;
|
||||
if current_count >= max_connections {
|
||||
warn!("Connection limit reached, rejecting connection");
|
||||
continue;
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
*client_count.write().await += 1;
|
||||
|
||||
if let Err(e) = Self::handle_connection_windows(
|
||||
server,
|
||||
handlers,
|
||||
shutdown.clone(),
|
||||
notification_tx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Connection error: {}", e);
|
||||
}
|
||||
|
||||
*client_count.write().await -= 1;
|
||||
debug!("Client disconnected");
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to accept connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a new connection (Linux).
|
||||
#[cfg(target_os = "linux")]
|
||||
async fn handle_new_connection(&self, stream: PlatformStream, addr: String) {
|
||||
let client_count = self.client_count.clone();
|
||||
let max_connections = self.config.max_connections;
|
||||
|
||||
// Check connection limit
|
||||
let current_count = *client_count.read().await;
|
||||
if current_count >= max_connections {
|
||||
warn!(
|
||||
"Connection limit reached, rejecting connection from {}",
|
||||
addr
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
debug!("New IPC client connected from {}", addr);
|
||||
|
||||
// Spawn handler for this connection
|
||||
let handlers = self.handlers.clone();
|
||||
let shutdown = self.shutdown.clone();
|
||||
let notification_tx = self.notification_tx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
*client_count.write().await += 1;
|
||||
|
||||
if let Err(e) = Self::handle_connection(stream, handlers, shutdown.clone(), notification_tx).await
|
||||
{
|
||||
error!("Connection error: {}", e);
|
||||
}
|
||||
|
||||
*client_count.write().await -= 1;
|
||||
debug!("Client disconnected");
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a single client connection (Linux).
|
||||
#[cfg(target_os = "linux")]
|
||||
async fn handle_connection(
|
||||
stream: PlatformStream,
|
||||
handlers: Arc<IpcHandlers>,
|
||||
shutdown: Arc<RwLock<bool>>,
|
||||
_notification_tx: broadcast::Sender<IpcNotification>,
|
||||
) -> Result<()> {
|
||||
let framed = Framed::new(stream, LinesCodec::new());
|
||||
let (mut writer, mut reader) = framed.split();
|
||||
|
||||
while !*shutdown.read().await {
|
||||
// Read message
|
||||
let line = tokio::time::timeout(std::time::Duration::from_secs(30), reader.next())
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IpcError::ReadError(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Read timeout",
|
||||
))
|
||||
})?;
|
||||
|
||||
let line = match line {
|
||||
Some(Ok(line)) => line,
|
||||
Some(Err(e)) => {
|
||||
warn!("Error reading from client: {}", e);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
debug!("Client disconnected");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Received: {}", line);
|
||||
|
||||
// Parse message
|
||||
let message = match IpcMessage::from_json(&line) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
warn!("Failed to parse message: {}", e);
|
||||
let response = IpcResponse::error(uuid::Uuid::nil(), format!("Parse error: {}", e));
|
||||
writer.send(response.to_json()?).await
|
||||
.map_err(|e| IpcError::CodecError(e.to_string()))?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Handle message
|
||||
let response = match message.message_type {
|
||||
MessageType::Request => {
|
||||
handlers.handle(message).await
|
||||
}
|
||||
MessageType::Notification => {
|
||||
// Notifications don't get responses
|
||||
trace!("Received notification: {:?}", message);
|
||||
continue;
|
||||
}
|
||||
MessageType::Response => {
|
||||
// We shouldn't receive responses
|
||||
warn!("Unexpected response message from client");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Send response
|
||||
let response_json = response.to_json()?;
|
||||
writer.send(response_json).await
|
||||
.map_err(|e| IpcError::CodecError(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a single client connection (Windows).
|
||||
#[cfg(target_os = "windows")]
|
||||
async fn handle_connection_windows(
|
||||
server: tokio::net::windows::named_pipe::NamedPipeServer,
|
||||
handlers: Arc<IpcHandlers>,
|
||||
shutdown: Arc<RwLock<bool>>,
|
||||
_notification_tx: broadcast::Sender<IpcNotification>,
|
||||
) -> Result<()> {
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
|
||||
let (mut reader, mut writer) = tokio::io::split(server);
|
||||
let mut reader = BufReader::new(reader).lines();
|
||||
|
||||
while !*shutdown.read().await {
|
||||
// Read message with timeout
|
||||
let line = tokio::time::timeout(std::time::Duration::from_secs(30), reader.next_line())
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IpcError::ReadError(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Read timeout",
|
||||
))
|
||||
})?;
|
||||
|
||||
let line = match line {
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => {
|
||||
debug!("Client disconnected");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error reading from client: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Received: {}", line);
|
||||
|
||||
// Parse message
|
||||
let message = match IpcMessage::from_json(&line) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
warn!("Failed to parse message: {}", e);
|
||||
let response = IpcResponse::error(uuid::Uuid::nil(), format!("Parse error: {}", e));
|
||||
writer.write_all(response.to_json()?.as_bytes()).await
|
||||
.map_err(IpcError::WriteError)?;
|
||||
writer.write_all(b"\n").await.map_err(IpcError::WriteError)?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Handle message
|
||||
let response = match message.message_type {
|
||||
MessageType::Request => {
|
||||
handlers.handle(message).await
|
||||
}
|
||||
MessageType::Notification => {
|
||||
// Notifications don't get responses
|
||||
trace!("Received notification: {:?}", message);
|
||||
continue;
|
||||
}
|
||||
MessageType::Response => {
|
||||
// We shouldn't receive responses
|
||||
warn!("Unexpected response message from client");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Send response
|
||||
let response_json = response.to_json()?;
|
||||
writer.write_all(response_json.as_bytes()).await.map_err(IpcError::WriteError)?;
|
||||
writer.write_all(b"\n").await.map_err(IpcError::WriteError)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the IPC server.
|
||||
pub async fn stop(&mut self) -> Result<()> {
|
||||
*self.shutdown.write().await = true;
|
||||
|
||||
// Remove socket file (Linux only)
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
if self.config.socket_path.exists() {
|
||||
std::fs::remove_file(&self.config.socket_path)
|
||||
.map_err(IpcError::BindError)?;
|
||||
}
|
||||
}
|
||||
|
||||
info!("IPC server stopped");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for IpcServer {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
config: self.config.clone(),
|
||||
listener: None, // Can't clone the listener
|
||||
handlers: self.handlers.clone(),
|
||||
notification_tx: self.notification_tx.clone(),
|
||||
shutdown: self.shutdown.clone(),
|
||||
client_count: self.client_count.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_ipc_server_config_default() {
|
||||
let config = IpcServerConfig::default();
|
||||
assert!(config.socket_path.to_string_lossy().contains("record-daemon"));
|
||||
assert_eq!(config.max_connections, 10);
|
||||
assert_eq!(config.timeout_secs, 30);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user