Files
leaguerecorder/record-daemon/src/lqp/client.rs
T
vhaudiquet 384ccda515
record-daemon / Build, check and test (push) Failing after 10m47s
tryfix: reconnect to league client
2026-06-05 21:17:15 +02:00

687 lines
27 KiB
Rust

//! LQP Client for communicating with the League Client API.
//!
//! Provides both WebSocket (for events) and REST (for queries) interfaces.
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use tokio::sync::{broadcast, RwLock};
use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::protocol::Message};
use tracing::{debug, error, info, trace, warn};
use super::auth::LockfileCredentials;
use super::endpoints;
use super::events::EVENT_TYPE_GAME_END;
use super::state::{ClientState, GameflowPhase};
use super::tls::create_insecure_tls_config;
use super::websocket::{parse_live_client_event, parse_websocket_message, ParsedEvent};
use crate::error::{LqpError, Result};
/// LQP Client for League Client communication.
pub struct LqpClient {
/// Connection credentials.
credentials: Arc<RwLock<Option<LockfileCredentials>>>,
/// Current client state.
state: Arc<RwLock<ClientState>>,
/// Event broadcaster.
event_sender: broadcast::Sender<ParsedEvent>,
/// HTTP client for REST API.
http_client: reqwest::Client,
/// Shutdown signal.
shutdown: Arc<RwLock<bool>>,
/// Last emitted game ID for deduplication of GameStart events.
last_emitted_game_id: Arc<RwLock<Option<u64>>>,
/// WebSocket connection state (true if WebSocket is connected).
ws_connected: Arc<RwLock<bool>>,
}
impl LqpClient {
/// Create a new LQP client.
pub fn new() -> Self {
let (event_sender, _) = broadcast::channel(256);
let http_client = reqwest::Client::builder()
.danger_accept_invalid_certs(true) // LQP uses self-signed certs
.build()
.expect("Failed to create HTTP client");
Self {
credentials: Arc::new(RwLock::new(None)),
state: Arc::new(RwLock::new(ClientState::default())),
event_sender,
http_client,
shutdown: Arc::new(RwLock::new(false)),
last_emitted_game_id: Arc::new(RwLock::new(None)),
ws_connected: Arc::new(RwLock::new(false)),
}
}
/// Get a subscriber for game events.
pub fn subscribe(&self) -> broadcast::Receiver<ParsedEvent> {
self.event_sender.subscribe()
}
/// Get current client state.
pub async fn state(&self) -> ClientState {
self.state.read().await.clone()
}
/// Check if connected to League Client (has valid credentials).
pub async fn is_connected(&self) -> bool {
self.credentials.read().await.is_some()
}
/// Check if WebSocket is connected.
pub async fn is_ws_connected(&self) -> bool {
*self.ws_connected.read().await
}
/// Connect to the League Client with the given credentials.
///
/// This only stores credentials and verifies basic connectivity.
/// The actual WebSocket connection is established in start_event_listener().
pub async fn connect(&self, creds: LockfileCredentials) -> Result<()> {
info!("Connecting to League Client at port {}", creds.port);
// Reset shutdown flag for new connection
*self.shutdown.write().await = false;
// Store credentials
*self.credentials.write().await = Some(creds.clone());
// Verify connection by fetching current phase
match self.get_gameflow_phase().await {
Ok(phase) => {
self.state.write().await.phase = phase;
info!("Connected to League Client, current phase: {:?}", phase);
}
Err(e) => {
warn!("Failed to verify connection via REST API: {}", e);
// REST API might not be ready yet, but WebSocket could work
// Don't fail here - let start_event_listener() try the WebSocket
}
}
// Fetch local player's puuid for champion extraction (best effort)
if let Ok(summoner) = self.get_summoner().await {
if let Some(puuid) = summoner.get("puuid").and_then(|p| p.as_str()) {
self.state.write().await.local_puuid = Some(puuid.to_string());
info!("Fetched local player puuid: {}", puuid);
}
}
Ok(())
}
/// Disconnect from the League Client.
pub async fn disconnect(&self) {
*self.shutdown.write().await = true;
*self.ws_connected.write().await = false;
*self.credentials.write().await = None;
*self.state.write().await = ClientState::default();
*self.last_emitted_game_id.write().await = None;
info!("Disconnected from League Client");
}
/// Start the WebSocket event listener.
///
/// This runs in a background task and broadcasts events to subscribers.
pub async fn start_event_listener(&self) -> Result<()> {
let creds = self
.credentials
.read()
.await
.clone()
.ok_or(LqpError::ClientNotRunning)?;
let ws_url = format!("{}/", creds.ws_url());
let auth_header = creds.auth_header();
info!("Connecting to LQP WebSocket at {}", ws_url);
// Create a TLS connector that accepts the self-signed certificate from League Client
use tokio_tungstenite::Connector;
let connector = Connector::Rustls(create_insecure_tls_config());
// Build WebSocket request with auth header
let request = tokio_tungstenite::tungstenite::http::Request::builder()
.uri(&ws_url)
.header("Authorization", auth_header)
.header("Host", format!("127.0.0.1:{}", creds.port))
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header(
"Sec-WebSocket-Key",
tokio_tungstenite::tungstenite::handshake::client::generate_key(),
)
.body(())
.map_err(|e| LqpError::ConnectionFailed(e.to_string()))?;
let (ws_stream, _) = connect_async_tls_with_config(request, None, false, Some(connector))
.await
.map_err(|e| LqpError::WebSocketError(e.to_string()))?;
info!("WebSocket connected, subscribing to events");
let (mut write, mut read) = ws_stream.split();
// Subscribe to endpoints using OnJsonApiEvent format
// Format: [5, "OnJsonApiEvent", endpoint]
for endpoint in super::endpoints::SUBSCRIBE_ENDPOINTS {
let subscribe_msg = serde_json::json!([5, "OnJsonApiEvent", endpoint]);
let msg = Message::Text(subscribe_msg.to_string());
info!("Subscribing to: {} with OnJsonApiEvent", endpoint);
write
.send(msg)
.await
.map_err(|e| LqpError::WebSocketError(e.to_string()))?;
}
info!("All subscriptions sent");
// Mark WebSocket as connected
*self.ws_connected.write().await = true;
// Clone references for the async block
let event_sender = self.event_sender.clone();
let state = self.state.clone();
let shutdown = self.shutdown.clone();
let credentials = self.credentials.clone();
let last_emitted_game_id = self.last_emitted_game_id.clone();
let ws_connected = self.ws_connected.clone();
// Spawn the message handler
tokio::spawn(async move {
while let Some(msg) = read.next().await {
if *shutdown.read().await {
debug!("WebSocket listener shutting down");
break;
}
match msg {
Ok(Message::Text(text)) => {
if text.is_empty() {
continue;
}
if let Some(parsed) = parse_websocket_message(&text) {
// Update state based on event
Self::update_state_from_event(&state, &parsed).await;
// Check for duplicate GameStart events
if parsed.event_type == super::events::EVENT_TYPE_GAME_START {
let game_id = parsed
.raw_data
.get("gameId")
.or_else(|| {
parsed
.raw_data
.get("gameData")
.and_then(|gd| gd.get("gameId"))
})
.and_then(|v| v.as_u64())
.unwrap_or(0);
let mut last_game_id = last_emitted_game_id.write().await;
if *last_game_id == Some(game_id) && game_id != 0 {
info!(
"Skipping duplicate GameStart event for game_id={}",
game_id
);
continue;
}
*last_game_id = Some(game_id);
}
// Reset last_emitted_game_id on GameEnd to allow new game starts
if parsed.event_type == EVENT_TYPE_GAME_END {
*last_emitted_game_id.write().await = None;
}
// Broadcast event
if event_sender.send(parsed).is_err() {
trace!("No event subscribers");
}
}
}
Ok(Message::Binary(data)) => {
debug!("Received binary message: {} bytes", data.len());
// Try to parse as UTF-8
if let Ok(text) = String::from_utf8(data) {
if !text.is_empty() {
if let Some(parsed) = parse_websocket_message(&text) {
// Update state based on event
Self::update_state_from_event(&state, &parsed).await;
// Check for duplicate GameStart events
if parsed.event_type == super::events::EVENT_TYPE_GAME_START {
let game_id = parsed
.raw_data
.get("gameId")
.or_else(|| {
parsed
.raw_data
.get("gameData")
.and_then(|gd| gd.get("gameId"))
})
.and_then(|v| v.as_u64())
.unwrap_or(0);
let mut last_game_id = last_emitted_game_id.write().await;
if *last_game_id == Some(game_id) && game_id != 0 {
info!(
"Skipping duplicate GameStart event for game_id={}",
game_id
);
continue;
}
*last_game_id = Some(game_id);
}
// Reset last_emitted_game_id on GameEnd to allow new game starts
if parsed.event_type == EVENT_TYPE_GAME_END {
*last_emitted_game_id.write().await = None;
}
// Broadcast event
if event_sender.send(parsed).is_err() {
trace!("No event subscribers");
}
}
}
}
}
Ok(Message::Close(_)) => {
info!("WebSocket closed by server");
break;
}
Ok(Message::Ping(data)) => {
// Respond with pong
debug!("Received ping, sending pong");
let _ = write.send(Message::Pong(data)).await;
}
Ok(Message::Pong(_)) => {
debug!("Received pong");
}
Ok(Message::Frame(_)) => {
debug!("Received raw frame");
}
Err(e) => {
error!("WebSocket error: {}", e);
break;
}
}
}
// Clear connection state on disconnect
*ws_connected.write().await = false;
*credentials.write().await = None;
info!("WebSocket listener ended");
});
Ok(())
}
/// Update internal state from a parsed event.
async fn update_state_from_event(state: &Arc<RwLock<ClientState>>, parsed: &ParsedEvent) {
let mut state = state.write().await;
match parsed.event_type.as_str() {
super::events::EVENT_TYPE_GAME_START => {
state.phase = GameflowPhase::InProgress;
state.game_id = parsed
.raw_data
.get("gameId")
.or_else(|| {
parsed
.raw_data
.get("gameData")
.and_then(|gd| gd.get("gameId"))
})
.and_then(|v| v.as_u64());
// Champion is not extracted here — raw session data has it
}
super::events::EVENT_TYPE_GAME_END => {
state.phase = GameflowPhase::EndOfGame;
}
super::events::EVENT_TYPE_PHASE_CHANGE => {
let phase_str = parsed
.raw_data
.as_str()
.or_else(|| parsed.raw_data.get("phase").and_then(|v| v.as_str()))
.unwrap_or("");
state.phase = GameflowPhase::from(phase_str);
}
_ => {}
}
}
// =========================================================================
// REST API Methods
// =========================================================================
/// Make a REST API request to the League Client.
pub async fn request(&self, method: &str, endpoint: &str) -> Result<serde_json::Value> {
let creds = self
.credentials
.read()
.await
.clone()
.ok_or(LqpError::ClientNotRunning)?;
let url = format!("{}{}", creds.base_url(), endpoint);
let request = match method {
"GET" => self.http_client.get(&url),
"POST" => self.http_client.post(&url),
"PUT" => self.http_client.put(&url),
"DELETE" => self.http_client.delete(&url),
_ => {
return Err(
LqpError::ConnectionFailed(format!("Invalid method: {}", method)).into(),
)
}
}
.header("Authorization", creds.auth_header());
let response = request
.send()
.await
.map_err(|e| LqpError::ConnectionFailed(e.to_string()))?;
if !response.status().is_success() {
return Err(LqpError::ConnectionFailed(format!(
"API request failed: {}",
response.status()
))
.into());
}
let json = response
.json()
.await
.map_err(|e| LqpError::EventParseError(e.to_string()))?;
Ok(json)
}
/// Get the current gameflow phase.
pub async fn get_gameflow_phase(&self) -> Result<GameflowPhase> {
let json = self.request("GET", endpoints::GAMEFLOW_PHASE).await?;
let phase_str = json
.as_str()
.ok_or_else(|| LqpError::EventParseError("Invalid phase response".to_string()))?;
Ok(GameflowPhase::from(phase_str))
}
/// Get the current game session info (raw JSON for backward compatibility).
pub async fn get_session(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::SESSION).await
}
/// Get current summoner info (raw JSON for backward compatibility).
pub async fn get_summoner(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::SUMMONER).await
}
/// Get champion select session info (raw JSON for backward compatibility).
pub async fn get_champion_select(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::CHAMPION_SELECT).await
}
/// Get end-of-game stats (raw JSON for backward compatibility).
pub async fn get_game_stats(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::GAME_STATS).await
}
/// Get the currently selected champion in champ select.
pub async fn get_current_champion(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::CHAMPION_SUMMARY).await
}
/// Get current rune page (raw JSON for backward compatibility).
pub async fn get_rune_page(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::RUNE_PAGES).await
}
/// Get all rune pages.
pub async fn get_all_rune_pages(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::ALL_RUNE_PAGES).await
}
/// Get match history.
pub async fn get_match_history(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::MATCH_HISTORY).await
}
/// Get live client data (available during game).
pub async fn get_live_client_data(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::LIVE_CLIENT_DATA).await
}
/// Get active player data from live client (raw JSON for backward compatibility).
pub async fn get_live_client_active_player(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::LIVE_CLIENT_DATA_ACTIVE_PLAYER)
.await
}
/// Get player list from live client (raw JSON for backward compatibility).
pub async fn get_live_client_player_list(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::LIVE_CLIENT_DATA_PLAYER_LIST)
.await
}
/// Get live client event data (kills, deaths, objectives) from port 2999.
/// This endpoint is available during games and provides real-time events.
pub async fn get_live_client_events(&self) -> Result<serde_json::Value> {
// Live client data runs on port 2999, separate from the LQP client port
let url = format!(
"{}{}",
endpoints::LIVE_CLIENT_DATA_BASE_URL,
endpoints::LIVE_CLIENT_DATA_EVENTS
);
let response = self
.http_client
.get(&url)
.send()
.await
.map_err(|e| LqpError::ConnectionFailed(e.to_string()))?;
if !response.status().is_success() {
return Err(LqpError::ConnectionFailed(format!(
"Live client events request failed: {}",
response.status()
))
.into());
}
let json = response
.json()
.await
.map_err(|e| LqpError::EventParseError(e.to_string()))?;
Ok(json)
}
/// Get local player selection from champion select.
pub async fn get_local_player_selection(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::CHAMPION_SELECT_LOCAL_PLAYER)
.await
}
/// Start polling for live client events during a game.
/// This polls the /liveclientdata/eventdata endpoint and broadcasts events.
pub async fn start_live_client_event_poller(&self) {
let event_sender = self.event_sender.clone();
let state = self.state.clone();
let shutdown = self.shutdown.clone();
let http_client = self.http_client.clone();
info!("Starting live client event poller");
tokio::spawn(async move {
// Small delay to ensure connection is stable
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let mut last_event_id: Option<u64> = None;
let mut poll_count = 0u32;
loop {
let is_shutdown = *shutdown.read().await;
if is_shutdown {
info!("Live client event poller shutting down (shutdown flag is true)");
break;
}
// Only poll when in game
let current_phase = state.read().await.phase;
if current_phase != GameflowPhase::InProgress {
// Reset event tracking when not in game
last_event_id = None;
poll_count = 0;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
}
poll_count += 1;
if poll_count % 20 == 1 {
// Log every 10 seconds (20 * 500ms)
info!("Live client event poller active, polling for events...");
}
// Poll for events
let url = format!(
"{}{}",
endpoints::LIVE_CLIENT_DATA_BASE_URL,
endpoints::LIVE_CLIENT_DATA_EVENTS
);
match http_client.get(&url).send().await {
Ok(response) if response.status().is_success() => {
match response.json::<serde_json::Value>().await {
Ok(events) => {
// The response has an "Events" key containing the array
let events_array = events.get("Events").and_then(|e| e.as_array());
if let Some(events_array) = events_array {
let event_count = events_array.len();
if event_count > 0 {
info!(
"Received {} events from live client API",
event_count
);
}
for event in events_array {
// Check if this is a new event
let event_id =
event.get("EventID").and_then(|id| id.as_u64());
// Skip events we've already processed
if let Some(id) = event_id {
if let Some(last_id) = last_event_id {
if id <= last_id {
continue;
}
}
last_event_id = Some(id);
}
// Parse and broadcast the event
if let Some(parsed) = parse_live_client_event(event) {
info!(
"Parsed live client event: type={}",
parsed.event_type
);
// Update state based on event
Self::update_state_from_event(&state, &parsed).await;
// Broadcast event
if event_sender.send(parsed).is_err() {
trace!("No event subscribers");
}
}
}
}
}
Err(e) => {
debug!("Failed to parse live client events: {}", e);
}
}
}
Ok(response) => {
info!("Live client events request failed: {}", response.status());
}
Err(e) => {
info!("Failed to fetch live client events: {}", e);
}
}
// Poll every 500ms during games
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
});
}
// =========================================================================
// Metadata Fetching Methods
// =========================================================================
/// Fetch raw session data as JSON.
pub async fn fetch_raw_session(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::SESSION).await
}
/// Fetch raw summoner data as JSON.
pub async fn fetch_raw_summoner(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::SUMMONER).await
}
/// Fetch raw champion select data as JSON.
pub async fn fetch_raw_champion_select(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::CHAMPION_SELECT).await
}
/// Fetch raw rune page data as JSON.
pub async fn fetch_raw_rune_page(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::RUNE_PAGES).await
}
/// Fetch raw live client data as JSON.
pub async fn fetch_raw_live_client_data(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::LIVE_CLIENT_DATA).await
}
/// Fetch raw end-of-game stats as JSON.
pub async fn fetch_raw_end_game_stats(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::GAME_STATS).await
}
/// Fetch ranked stats as JSON (for LP tracking).
pub async fn fetch_ranked_stats(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::RANKED_STATS).await
}
/// Fetch current ranked stats as JSON.
pub async fn fetch_current_ranked_stats(&self) -> Result<serde_json::Value> {
self.request("GET", endpoints::CURRENT_RANKED_STATS).await
}
}
impl Default for LqpClient {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_creation() {
let client = LqpClient::new();
assert!(!tokio_test::block_on(client.is_connected()));
}
}