record-daemon: fix windows build

This commit is contained in:
2026-03-19 18:17:31 +01:00
parent d6c0334369
commit f062cee010
2 changed files with 183 additions and 112 deletions

View File

@@ -54,16 +54,17 @@ directories = "5"
# CLI (for debugging and control) # CLI (for debugging and control)
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
# Signal handling for graceful shutdown
signal-hook = "0.3"
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
# Base64 for LQP authentication # Base64 for LQP authentication
base64 = "0.22" base64 = "0.22"
# Regex for lockfile parsing # Regex for lockfile parsing
regex = "1" regex = "1"
# Signal handling for graceful shutdown (Unix only)
[target.'cfg(unix)'.dependencies]
signal-hook = "0.3"
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
[dev-dependencies] [dev-dependencies]
# Testing utilities # Testing utilities
tokio-test = "0.4" tokio-test = "0.4"

View File

@@ -5,10 +5,7 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use futures::SinkExt;
use futures::StreamExt;
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
use super::handlers::IpcHandlers; use super::handlers::IpcHandlers;
@@ -36,6 +33,17 @@ impl Default for IpcServerConfig {
} }
} }
// ============================================================================
// Linux-specific types and implementation
// ============================================================================
#[cfg(target_os = "linux")]
use futures::SinkExt;
#[cfg(target_os = "linux")]
use futures::StreamExt;
#[cfg(target_os = "linux")]
use tokio_util::codec::{Framed, LinesCodec};
/// Platform-specific listener type. /// Platform-specific listener type.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
type PlatformListener = tokio::net::UnixListener; type PlatformListener = tokio::net::UnixListener;
@@ -44,7 +52,8 @@ type PlatformListener = tokio::net::UnixListener;
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
type PlatformStream = tokio::net::UnixStream; type PlatformStream = tokio::net::UnixStream;
/// IPC server for communication with Tauri app. /// IPC server for communication with Tauri app (Linux).
#[cfg(target_os = "linux")]
pub struct IpcServer { pub struct IpcServer {
/// Server configuration. /// Server configuration.
config: IpcServerConfig, config: IpcServerConfig,
@@ -60,6 +69,7 @@ pub struct IpcServer {
client_count: Arc<RwLock<usize>>, client_count: Arc<RwLock<usize>>,
} }
#[cfg(target_os = "linux")]
impl IpcServer { impl IpcServer {
/// Create a new IPC server. /// Create a new IPC server.
pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self { pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self {
@@ -93,7 +103,6 @@ impl IpcServer {
} }
/// Start the IPC server. /// Start the IPC server.
#[cfg(target_os = "linux")]
pub async fn start(&mut self) -> Result<()> { pub async fn start(&mut self) -> Result<()> {
// Remove existing socket if present // Remove existing socket if present
if self.config.socket_path.exists() { if self.config.socket_path.exists() {
@@ -117,22 +126,7 @@ impl IpcServer {
Ok(()) Ok(())
} }
/// Start the IPC server (Windows).
#[cfg(target_os = "windows")]
pub async fn start(&mut self) -> Result<()> {
// On Windows, we don't need to bind in advance for named pipes
// The server will create pipe instances on demand
info!("Starting IPC server at {:?}", self.config.socket_path);
// Mark as "started" - actual pipe creation happens in accept loop
self.listener = None; // We'll create pipes on demand
info!("IPC server started successfully");
Ok(())
}
/// Run the server loop. /// Run the server loop.
#[cfg(target_os = "linux")]
pub async fn run(&self) -> Result<()> { pub async fn run(&self) -> Result<()> {
let listener = self.listener.as_ref().ok_or_else(|| { let listener = self.listener.as_ref().ok_or_else(|| {
IpcError::BindError(std::io::Error::new( IpcError::BindError(std::io::Error::new(
@@ -163,77 +157,7 @@ impl IpcServer {
Ok(()) Ok(())
} }
/// Run the server loop (Windows).
#[cfg(target_os = "windows")]
pub async fn run(&self) -> Result<()> {
use tokio::net::windows::named_pipe::{ServerOptions, NamedPipeServer};
use std::os::windows::io::AsRawHandle;
info!("IPC server listening for connections on {:?}", self.config.socket_path);
let pipe_name = self.config.socket_path.to_string_lossy();
loop {
if *self.shutdown.read().await {
info!("IPC server shutting down");
break;
}
// Create a new named pipe instance
let server = ServerOptions::new()
.first_pipe_instance(false)
.reject_remote_clients(true)
.create(&pipe_name)
.map_err(IpcError::BindError)?;
// Wait for a client to connect
match server.connect().await {
Ok(()) => {
debug!("New IPC client connected");
// Spawn handler for this connection
let handlers = self.handlers.clone();
let shutdown = self.shutdown.clone();
let notification_tx = self.notification_tx.clone();
let client_count = self.client_count.clone();
let max_connections = self.config.max_connections;
// Check connection limit
let current_count = *client_count.read().await;
if current_count >= max_connections {
warn!("Connection limit reached, rejecting connection");
continue;
}
tokio::spawn(async move {
*client_count.write().await += 1;
if let Err(e) = Self::handle_connection_windows(
server,
handlers,
shutdown.clone(),
notification_tx,
)
.await
{
error!("Connection error: {}", e);
}
*client_count.write().await -= 1;
debug!("Client disconnected");
});
}
Err(e) => {
warn!("Failed to accept connection: {}", e);
}
}
}
Ok(())
}
/// Handle a new connection (Linux). /// Handle a new connection (Linux).
#[cfg(target_os = "linux")]
async fn handle_new_connection(&self, stream: PlatformStream, addr: String) { async fn handle_new_connection(&self, stream: PlatformStream, addr: String) {
let client_count = self.client_count.clone(); let client_count = self.client_count.clone();
let max_connections = self.config.max_connections; let max_connections = self.config.max_connections;
@@ -269,7 +193,6 @@ impl IpcServer {
} }
/// Handle a single client connection (Linux). /// Handle a single client connection (Linux).
#[cfg(target_os = "linux")]
async fn handle_connection( async fn handle_connection(
stream: PlatformStream, stream: PlatformStream,
handlers: Arc<IpcHandlers>, handlers: Arc<IpcHandlers>,
@@ -342,22 +265,179 @@ impl IpcServer {
Ok(()) Ok(())
} }
/// Stop the IPC server.
pub async fn stop(&mut self) -> Result<()> {
*self.shutdown.write().await = true;
// Remove socket file
if self.config.socket_path.exists() {
std::fs::remove_file(&self.config.socket_path)
.map_err(IpcError::BindError)?;
}
info!("IPC server stopped");
Ok(())
}
}
#[cfg(target_os = "linux")]
impl Clone for IpcServer {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
listener: None, // Can't clone the listener
handlers: self.handlers.clone(),
notification_tx: self.notification_tx.clone(),
shutdown: self.shutdown.clone(),
client_count: self.client_count.clone(),
}
}
}
// ============================================================================
// Windows-specific types and implementation
// ============================================================================
#[cfg(target_os = "windows")]
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
/// IPC server for communication with Tauri app (Windows).
#[cfg(target_os = "windows")]
pub struct IpcServer {
/// Server configuration.
config: IpcServerConfig,
/// Command handlers.
handlers: Arc<IpcHandlers>,
/// Notification broadcaster.
notification_tx: broadcast::Sender<IpcNotification>,
/// Shutdown signal.
shutdown: Arc<RwLock<bool>>,
/// Connected clients count.
client_count: Arc<RwLock<usize>>,
}
#[cfg(target_os = "windows")]
impl IpcServer {
/// Create a new IPC server.
pub fn new(config: IpcServerConfig, handlers: IpcHandlers) -> Self {
let (notification_tx, _) = broadcast::channel(64);
Self {
config,
handlers: Arc::new(handlers),
notification_tx,
shutdown: Arc::new(RwLock::new(false)),
client_count: Arc::new(RwLock::new(0)),
}
}
/// Get the socket path.
pub fn socket_path(&self) -> &PathBuf {
&self.config.socket_path
}
/// Get a subscriber for notifications.
pub fn subscribe(&self) -> broadcast::Receiver<IpcNotification> {
self.notification_tx.subscribe()
}
/// Broadcast a notification to all connected clients.
pub fn broadcast(&self, notification: IpcNotification) {
if self.notification_tx.send(notification).is_err() {
trace!("No clients connected to receive notification");
}
}
/// Start the IPC server (Windows).
pub async fn start(&mut self) -> Result<()> {
// On Windows, we don't need to bind in advance for named pipes
// The server will create pipe instances on demand
info!("Starting IPC server at {:?}", self.config.socket_path);
info!("IPC server started successfully");
Ok(())
}
/// Run the server loop (Windows).
pub async fn run(&self) -> Result<()> {
use tokio::net::windows::named_pipe::ServerOptions;
info!("IPC server listening for connections on {:?}", self.config.socket_path);
let pipe_name: String = self.config.socket_path.to_string_lossy().into_owned();
loop {
if *self.shutdown.read().await {
info!("IPC server shutting down");
break;
}
// Create a new named pipe instance
let server = ServerOptions::new()
.first_pipe_instance(false)
.reject_remote_clients(true)
.create(&pipe_name)
.map_err(IpcError::BindError)?;
// Wait for a client to connect
match server.connect().await {
Ok(()) => {
debug!("New IPC client connected");
// Spawn handler for this connection
let handlers = self.handlers.clone();
let shutdown = self.shutdown.clone();
let notification_tx = self.notification_tx.clone();
let client_count = self.client_count.clone();
let max_connections = self.config.max_connections;
// Check connection limit
let current_count = *client_count.read().await;
if current_count >= max_connections {
warn!("Connection limit reached, rejecting connection");
continue;
}
tokio::spawn(async move {
*client_count.write().await += 1;
if let Err(e) = Self::handle_connection_windows(
server,
handlers,
shutdown.clone(),
notification_tx,
)
.await
{
error!("Connection error: {}", e);
}
*client_count.write().await -= 1;
debug!("Client disconnected");
});
}
Err(e) => {
warn!("Failed to accept connection: {}", e);
}
}
}
Ok(())
}
/// Handle a single client connection (Windows). /// Handle a single client connection (Windows).
#[cfg(target_os = "windows")]
async fn handle_connection_windows( async fn handle_connection_windows(
server: tokio::net::windows::named_pipe::NamedPipeServer, server: tokio::net::windows::named_pipe::NamedPipeServer,
handlers: Arc<IpcHandlers>, handlers: Arc<IpcHandlers>,
shutdown: Arc<RwLock<bool>>, shutdown: Arc<RwLock<bool>>,
_notification_tx: broadcast::Sender<IpcNotification>, _notification_tx: broadcast::Sender<IpcNotification>,
) -> Result<()> { ) -> Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; let (reader, mut writer) = tokio::io::split(server);
let mut lines = BufReader::new(reader).lines();
let (mut reader, mut writer) = tokio::io::split(server);
let mut reader = BufReader::new(reader).lines();
while !*shutdown.read().await { while !*shutdown.read().await {
// Read message with timeout // Read message with timeout
let line = tokio::time::timeout(std::time::Duration::from_secs(30), reader.next_line()) let line = tokio::time::timeout(std::time::Duration::from_secs(30), lines.next_line())
.await .await
.map_err(|_| { .map_err(|_| {
IpcError::ReadError(std::io::Error::new( IpcError::ReadError(std::io::Error::new(
@@ -422,26 +502,16 @@ impl IpcServer {
/// Stop the IPC server. /// Stop the IPC server.
pub async fn stop(&mut self) -> Result<()> { pub async fn stop(&mut self) -> Result<()> {
*self.shutdown.write().await = true; *self.shutdown.write().await = true;
// Remove socket file (Linux only)
#[cfg(target_os = "linux")]
{
if self.config.socket_path.exists() {
std::fs::remove_file(&self.config.socket_path)
.map_err(IpcError::BindError)?;
}
}
info!("IPC server stopped"); info!("IPC server stopped");
Ok(()) Ok(())
} }
} }
#[cfg(target_os = "windows")]
impl Clone for IpcServer { impl Clone for IpcServer {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
config: self.config.clone(), config: self.config.clone(),
listener: None, // Can't clone the listener
handlers: self.handlers.clone(), handlers: self.handlers.clone(),
notification_tx: self.notification_tx.clone(), notification_tx: self.notification_tx.clone(),
shutdown: self.shutdown.clone(), shutdown: self.shutdown.clone(),