fix(sensing-server): handle WebSocket Lagged + add ping keepalive (#484)
Root cause: broadcast channel Lagged error caused instant disconnect when clients fell behind 256 frames (10Hz * 50-200KB = easy to lag). Client reconnects, immediately lags again, rapid cycling ensues. Sensing handler: Lagged error now continues (skips missed frames) instead of breaking. Added 30s ping interval for proxy keepalive. Pose handler: same Lagged handling + Pong match arm. CHANGELOG updated under Unreleased/Fixed. Co-authored-by: Deploy Bot <deploy@example.com>
This commit is contained in:
parent
9d4f7820b2
commit
8b297dd706
|
|
@ -127,6 +127,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
saturation, hyperfine spectroscopy, or pulsed protocols become required.
|
saturation, hyperfine spectroscopy, or pulsed protocols become required.
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
- **WebSocket broadcast handler now handles Lagged events gracefully and sends periodic ping keepalives to prevent dashboard disconnects** —
|
||||||
|
`handle_ws_client` and `handle_ws_pose_client` in `wifi-densepose-sensing-server`
|
||||||
|
were treating `RecvError::Lagged` as a fatal error, causing instant disconnect
|
||||||
|
when clients fell behind the 256-frame broadcast buffer at 10 Hz ingest.
|
||||||
|
Clients would reconnect, immediately lag again, and rapid-cycle every 2–4 s.
|
||||||
|
`Lagged` now continues (drops missed frames, logs debug) rather than breaking.
|
||||||
|
Added 30 s ping keepalive on the sensing handler to prevent proxy idle timeouts.
|
||||||
- **Ghost skeletons in live UI with multi-node ESP32 setups** (#420, ADR-082) —
|
- **Ghost skeletons in live UI with multi-node ESP32 setups** (#420, ADR-082) —
|
||||||
`tracker_bridge::tracker_to_person_detections` documented itself as filtering
|
`tracker_bridge::tracker_to_person_detections` documented itself as filtering
|
||||||
to `is_alive()` tracks but in fact passed every non-Terminated track to the
|
to `is_alive()` tracks but in fact passed every non-Terminated track to the
|
||||||
|
|
|
||||||
|
|
@ -2023,6 +2023,10 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) {
|
||||||
|
|
||||||
info!("WebSocket client connected (sensing)");
|
info!("WebSocket client connected (sensing)");
|
||||||
|
|
||||||
|
// ADR-044/045: ping/pong keepalive to prevent proxy idle timeouts.
|
||||||
|
let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
|
||||||
|
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = rx.recv() => {
|
msg = rx.recv() => {
|
||||||
|
|
@ -2032,13 +2036,24 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => break,
|
// Lagged: client fell behind — skip missed frames, don't disconnect.
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
tracing::debug!("WS client lagged by {n} frames, skipping");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(_) => break, // channel closed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = ping_interval.tick() => {
|
||||||
|
if socket.send(Message::Ping(vec![].into())).await.is_err() {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
msg = socket.recv() => {
|
msg = socket.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
Some(Ok(Message::Close(_))) | None => break,
|
Some(Ok(Message::Close(_))) | None => break,
|
||||||
_ => {} // ignore client messages
|
Some(Ok(Message::Pong(_))) => {} // keepalive response
|
||||||
|
_ => {} // ignore other client messages
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -2207,7 +2222,12 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => break,
|
// Lagged: skip missed frames, don't disconnect.
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
tracing::debug!("WS pose client lagged by {n} frames, skipping");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(_) => break, // channel closed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
msg = socket.recv() => {
|
msg = socket.recv() => {
|
||||||
|
|
@ -2222,6 +2242,7 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Close(_))) | None => break,
|
Some(Ok(Message::Close(_))) | None => break,
|
||||||
|
Some(Ok(Message::Pong(_))) => {} // keepalive response
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue