record-daemon: fix event subscribe
This commit is contained in:
@@ -68,8 +68,11 @@ impl rustls::client::danger::ServerCertVerifier for InsecureVerifier {
|
|||||||
/// LQP WebSocket endpoints to subscribe to.
|
/// LQP WebSocket endpoints to subscribe to.
|
||||||
const SUBSCRIBE_ENDPOINTS: &[&str] = &[
|
const SUBSCRIBE_ENDPOINTS: &[&str] = &[
|
||||||
"/lol-gameflow/v1/gameflow-phase",
|
"/lol-gameflow/v1/gameflow-phase",
|
||||||
|
"/lol-gameflow/v1/session",
|
||||||
"/lol-matchmaking/v1/ready-check",
|
"/lol-matchmaking/v1/ready-check",
|
||||||
"/lol-game-events/v1/game-events",
|
"/lol-game-events/v1/game-events",
|
||||||
|
"/lol-champ-select/v1/session",
|
||||||
|
"/lol-lobby/v2/lobby",
|
||||||
];
|
];
|
||||||
|
|
||||||
/// LQP REST API endpoints.
|
/// LQP REST API endpoints.
|
||||||
@@ -274,16 +277,18 @@ impl LqpClient {
|
|||||||
|
|
||||||
let (mut write, mut read) = ws_stream.split();
|
let (mut write, mut read) = ws_stream.split();
|
||||||
|
|
||||||
// Subscribe to endpoints
|
// Subscribe to endpoints using OnJsonApiEvent format
|
||||||
|
// Format: [5, "OnJsonApiEvent", endpoint]
|
||||||
for endpoint in SUBSCRIBE_ENDPOINTS {
|
for endpoint in SUBSCRIBE_ENDPOINTS {
|
||||||
let subscribe_msg = serde_json::json!([5, endpoint]);
|
let subscribe_msg = serde_json::json!([5, "OnJsonApiEvent", endpoint]);
|
||||||
let msg = Message::Text(subscribe_msg.to_string());
|
let msg = Message::Text(subscribe_msg.to_string());
|
||||||
|
info!("Subscribing to: {} with OnJsonApiEvent", endpoint);
|
||||||
write
|
write
|
||||||
.send(msg)
|
.send(msg)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| LqpError::WebSocketError(e.to_string()))?;
|
.map_err(|e| LqpError::WebSocketError(e.to_string()))?;
|
||||||
trace!("Subscribed to {}", endpoint);
|
|
||||||
}
|
}
|
||||||
|
info!("All subscriptions sent");
|
||||||
|
|
||||||
// Clone references for the async block
|
// Clone references for the async block
|
||||||
let event_sender = self.event_sender.clone();
|
let event_sender = self.event_sender.clone();
|
||||||
@@ -301,6 +306,11 @@ impl LqpClient {
|
|||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
Ok(Message::Text(text)) => {
|
Ok(Message::Text(text)) => {
|
||||||
|
debug!("Received text message: {} bytes", text.len());
|
||||||
|
if text.is_empty() {
|
||||||
|
debug!("Empty text message received, skipping");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if let Some(event) = Self::parse_websocket_message(&text) {
|
if let Some(event) = Self::parse_websocket_message(&text) {
|
||||||
// Update state based on event
|
// Update state based on event
|
||||||
Self::update_state_from_event(&state, &event).await;
|
Self::update_state_from_event(&state, &event).await;
|
||||||
@@ -311,19 +321,42 @@ impl LqpClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(Message::Binary(data)) => {
|
||||||
|
debug!("Received binary message: {} bytes", data.len());
|
||||||
|
// Try to parse as UTF-8
|
||||||
|
if let Ok(text) = String::from_utf8(data) {
|
||||||
|
if !text.is_empty() {
|
||||||
|
if let Some(event) = Self::parse_websocket_message(&text) {
|
||||||
|
// Update state based on event
|
||||||
|
Self::update_state_from_event(&state, &event).await;
|
||||||
|
|
||||||
|
// Broadcast event
|
||||||
|
if event_sender.send(event.clone()).is_err() {
|
||||||
|
trace!("No event subscribers");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Message::Close(_)) => {
|
Ok(Message::Close(_)) => {
|
||||||
info!("WebSocket closed by server");
|
info!("WebSocket closed by server");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Ok(Message::Ping(data)) => {
|
Ok(Message::Ping(data)) => {
|
||||||
// Respond with pong
|
// Respond with pong
|
||||||
|
debug!("Received ping, sending pong");
|
||||||
let _ = write.send(Message::Pong(data)).await;
|
let _ = write.send(Message::Pong(data)).await;
|
||||||
}
|
}
|
||||||
|
Ok(Message::Pong(_)) => {
|
||||||
|
debug!("Received pong");
|
||||||
|
}
|
||||||
|
Ok(Message::Frame(_)) => {
|
||||||
|
debug!("Received raw frame");
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("WebSocket error: {}", e);
|
error!("WebSocket error: {}", e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -337,49 +370,136 @@ impl LqpClient {
|
|||||||
|
|
||||||
/// Parse a WebSocket message into a game event.
|
/// Parse a WebSocket message into a game event.
|
||||||
fn parse_websocket_message(text: &str) -> Option<GameEvent> {
|
fn parse_websocket_message(text: &str) -> Option<GameEvent> {
|
||||||
trace!("WebSocket message: {}", text);
|
debug!("WebSocket message: {}", text);
|
||||||
|
|
||||||
// Parse the message array format: [type, endpoint, data]
|
// Parse the message array format: [type, callback, data]
|
||||||
let value: serde_json::Value = serde_json::from_str(text).ok()?;
|
let value: serde_json::Value = match serde_json::from_str(text) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to parse WebSocket message as JSON: {}", e);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Check if it's an event message (type 8)
|
// Check if it's an event message (type 8)
|
||||||
if let Some(arr) = value.as_array() {
|
if let Some(arr) = value.as_array() {
|
||||||
|
debug!("Message is array with {} elements", arr.len());
|
||||||
if arr.len() >= 3 {
|
if arr.len() >= 3 {
|
||||||
let msg_type = arr.first()?.as_u64()?;
|
let msg_type = arr.first()?.as_u64()?;
|
||||||
|
debug!("Message type: {}", msg_type);
|
||||||
|
|
||||||
if msg_type == 8 {
|
if msg_type == 8 {
|
||||||
// Event message
|
// Event message format: [8, "OnJsonApiEvent", {"data": ..., "eventType": ..., "uri": ...}]
|
||||||
let endpoint = arr.get(1)?.as_str()?;
|
let callback = arr.get(1)?.as_str()?;
|
||||||
let data = arr.get(2)?;
|
let event_data = arr.get(2)?;
|
||||||
|
|
||||||
return Self::parse_event_from_endpoint(endpoint, data);
|
if callback == "OnJsonApiEvent" {
|
||||||
|
// Extract the actual URI and data from the event
|
||||||
|
let uri = event_data.get("uri")?.as_str()?;
|
||||||
|
let data = event_data.get("data")?;
|
||||||
|
let event_type = event_data.get("eventType").and_then(|t| t.as_str()).unwrap_or("Update");
|
||||||
|
|
||||||
|
debug!("OnJsonApiEvent: uri={}, eventType={}, data={:?}", uri, event_type, data);
|
||||||
|
|
||||||
|
return Self::parse_event_from_uri(uri, event_type, data);
|
||||||
|
} else {
|
||||||
|
debug!("Unknown callback: {}", callback);
|
||||||
|
}
|
||||||
|
} else if msg_type == 4 {
|
||||||
|
// Response to subscription - this is normal
|
||||||
|
debug!("Subscription response received");
|
||||||
|
} else if msg_type == 0 {
|
||||||
|
// Welcome message
|
||||||
|
info!("WebSocket welcome message received");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
debug!("Message is not an array: {:?}", value);
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an event based on the endpoint.
|
/// Parse an event based on the URI.
|
||||||
fn parse_event_from_endpoint(endpoint: &str, data: &serde_json::Value) -> Option<GameEvent> {
|
fn parse_event_from_uri(uri: &str, event_type: &str, data: &serde_json::Value) -> Option<GameEvent> {
|
||||||
match endpoint {
|
info!("Parsing event from URI: {} (type: {})", uri, event_type);
|
||||||
"/lol-gameflow/v1/gameflow-phase" => {
|
|
||||||
|
// Handle gameflow phase changes
|
||||||
|
if uri == "/lol-gameflow/v1/gameflow-phase" {
|
||||||
let phase = data.as_str()?;
|
let phase = data.as_str()?;
|
||||||
Some(
|
info!("Gameflow phase changed to: {}", phase);
|
||||||
|
|
||||||
|
// Update internal state based on phase
|
||||||
|
return Some(
|
||||||
GameEvent::from_json(&serde_json::json!({
|
GameEvent::from_json(&serde_json::json!({
|
||||||
"eventType": "lcu-phase-change",
|
"eventType": "lcu-phase-change",
|
||||||
"phase": phase
|
"phase": phase
|
||||||
}))
|
}))
|
||||||
.unwrap_or(GameEvent::Unknown),
|
.unwrap_or(GameEvent::Unknown),
|
||||||
)
|
);
|
||||||
}
|
}
|
||||||
"/lol-game-events/v1/game-events" => GameEvent::from_json(data),
|
|
||||||
_ => {
|
// Handle gameflow session updates
|
||||||
trace!("Unhandled endpoint: {}", endpoint);
|
if uri == "/lol-gameflow/v1/session" {
|
||||||
|
if let Some(phase) = data.get("phase").and_then(|p| p.as_str()) {
|
||||||
|
info!("Gameflow session phase: {}", phase);
|
||||||
|
|
||||||
|
// Check for game start
|
||||||
|
if phase == "InProgress" {
|
||||||
|
info!("Game is now in progress!");
|
||||||
|
|
||||||
|
// Extract game info
|
||||||
|
let game_id = data.get("gameData")
|
||||||
|
.and_then(|gd| gd.get("gameId"))
|
||||||
|
.and_then(|id| id.as_u64())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
return Some(
|
||||||
|
GameEvent::from_json(&serde_json::json!({
|
||||||
|
"eventType": "lcu-game-start",
|
||||||
|
"gameId": game_id
|
||||||
|
}))
|
||||||
|
.unwrap_or(GameEvent::Unknown),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Some(
|
||||||
|
GameEvent::from_json(&serde_json::json!({
|
||||||
|
"eventType": "lcu-phase-change",
|
||||||
|
"phase": phase
|
||||||
|
}))
|
||||||
|
.unwrap_or(GameEvent::Unknown),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle game events (kills, deaths, objectives)
|
||||||
|
if uri == "/lol-game-events/v1/game-events" {
|
||||||
|
info!("Game event received: {:?}", data);
|
||||||
|
return GameEvent::from_json(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle ready check
|
||||||
|
if uri == "/lol-matchmaking/v1/ready-check" {
|
||||||
|
info!("Ready check event: {:?}", data);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle champion select
|
||||||
|
if uri == "/lol-champ-select/v1/session" {
|
||||||
|
info!("Champion select event: {:?}", data);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle lobby
|
||||||
|
if uri.starts_with("/lol-lobby") {
|
||||||
|
debug!("Lobby event: {}", uri);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Unhandled URI: {}", uri);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update internal state from a game event.
|
/// Update internal state from a game event.
|
||||||
async fn update_state_from_event(state: &Arc<RwLock<ClientState>>, event: &GameEvent) {
|
async fn update_state_from_event(state: &Arc<RwLock<ClientState>>, event: &GameEvent) {
|
||||||
|
|||||||
@@ -276,7 +276,14 @@ pub enum EventData {
|
|||||||
impl GameEvent {
|
impl GameEvent {
|
||||||
/// Parse a game event from raw WebSocket data.
|
/// Parse a game event from raw WebSocket data.
|
||||||
pub fn from_json(value: &serde_json::Value) -> Option<Self> {
|
pub fn from_json(value: &serde_json::Value) -> Option<Self> {
|
||||||
serde_json::from_value(value.clone()).ok()
|
match serde_json::from_value(value.clone()) {
|
||||||
|
Ok(event) => Some(event),
|
||||||
|
Err(e) => {
|
||||||
|
// Log the parsing error for debugging
|
||||||
|
tracing::warn!("Failed to parse game event: {}. Data: {:?}", e, value);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if this event is relevant for recording.
|
/// Check if this event is relevant for recording.
|
||||||
|
|||||||
Reference in New Issue
Block a user