diff --git a/v2/Cargo.lock b/v2/Cargo.lock index 43cacdc4..c31100fd 100644 --- a/v2/Cargo.lock +++ b/v2/Cargo.lock @@ -3472,6 +3472,7 @@ dependencies = [ "axum", "chrono", "dashmap", + "futures-util", "homecore", "http-body-util", "hyper 1.8.1", @@ -3479,6 +3480,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", + "tokio-tungstenite", "tower 0.5.3", "tower-http", "tracing", @@ -10933,7 +10935,7 @@ dependencies = [ [[package]] name = "wifi-densepose-hardware" -version = "0.3.0" +version = "0.3.1" dependencies = [ "approx", "byteorder", @@ -10953,7 +10955,7 @@ dependencies = [ [[package]] name = "wifi-densepose-mat" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "approx", @@ -10985,7 +10987,7 @@ dependencies = [ [[package]] name = "wifi-densepose-nn" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "candle-core 0.4.1", @@ -11039,7 +11041,7 @@ dependencies = [ [[package]] name = "wifi-densepose-ruvector" -version = "0.3.1" +version = "0.3.2" dependencies = [ "approx", "criterion", @@ -11059,7 +11061,7 @@ dependencies = [ [[package]] name = "wifi-densepose-sensing-server" -version = "0.3.1" +version = "0.3.2" dependencies = [ "axum", "chrono", @@ -11093,7 +11095,7 @@ dependencies = [ [[package]] name = "wifi-densepose-signal" -version = "0.3.2" +version = "0.3.3" dependencies = [ "chrono", "criterion", @@ -11120,7 +11122,7 @@ dependencies = [ [[package]] name = "wifi-densepose-train" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "approx", @@ -11158,7 +11160,7 @@ dependencies = [ [[package]] name = "wifi-densepose-vitals" -version = "0.3.0" +version = "0.3.1" dependencies = [ "criterion", "serde", @@ -11190,7 +11192,7 @@ dependencies = [ [[package]] name = "wifi-densepose-wifiscan" -version = "0.3.0" +version = "0.3.1" dependencies = [ "serde", "tokio", diff --git a/v2/crates/homecore-api/Cargo.toml b/v2/crates/homecore-api/Cargo.toml index d1170a80..a86f0691 100644 --- a/v2/crates/homecore-api/Cargo.toml +++ b/v2/crates/homecore-api/Cargo.toml @@ -33,8 +33,12 @@ chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1", features = ["v4", "serde"] } dashmap = "6" +futures-util = { version = "0.3", default-features = false, features = ["sink"] } [dev-dependencies] tower = { version = "0.5", features = ["util"] } hyper = "1" http-body-util = "0.1" +# End-to-end WS handshake + reply tests (HC-WS-01/02, ADR-161). +tokio-tungstenite = "0.24" +futures-util = { version = "0.3", default-features = false } diff --git a/v2/crates/homecore-api/src/bin/server.rs b/v2/crates/homecore-api/src/bin/server.rs index 6a2536bd..49778840 100644 --- a/v2/crates/homecore-api/src/bin/server.rs +++ b/v2/crates/homecore-api/src/bin/server.rs @@ -1,15 +1,31 @@ //! `homecore-api-server` binary. Boots a HomeCore runtime and serves -//! the HA-compat REST + WS API on `:8123`. +//! the HA-compat REST + WS API. //! -//! P1: bare-minimum bring-up. No persistence, no plugins, no auth -//! beyond "any non-empty bearer". Useful for `curl` smoke tests of -//! the wire format from the existing HA companion app: +//! ## Auth (ADR-161, HC-WS-08) +//! +//! Token provisioning matches `homecore-server`: if `HOMECORE_TOKENS` +//! is set (comma-separated bearer tokens) the API enforces that +//! whitelist on both the REST and WS paths. If it is **unset**, the +//! binary falls back to an explicitly-logged DEV mode (any non-empty +//! bearer accepted) — before this fix the bin unconditionally used +//! `allow_any_non_empty()` with no env path, so a provisioned operator +//! had no way to lock it down. +//! +//! ## Bind address +//! +//! Defaults to `127.0.0.1` (loopback only) so a bare `cargo run` of +//! this dev binary is not network-exposed. Override with +//! `HOMECORE_BIND=0.0.0.0:8123` for a LAN deployment (and provision +//! `HOMECORE_TOKENS` when you do). //! //! cargo run -p homecore-api --bin homecore-api-server -//! curl -H "Authorization: Bearer test" http://127.0.0.1:8123/api/ +//! HOMECORE_TOKENS=secret curl -H "Authorization: Bearer secret" \ +//! http://127.0.0.1:8123/api/ + +use std::net::SocketAddr; use homecore::HomeCore; -use homecore_api::{router, SharedState, DEFAULT_PORT}; +use homecore_api::{router, LongLivedTokenStore, SharedState, DEFAULT_PORT}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -21,10 +37,34 @@ async fn main() -> Result<(), Box> { .init(); let homecore = HomeCore::new(); - let state = SharedState::new(homecore); + + // Token provisioning (HC-WS-08). Prefer the HOMECORE_TOKENS env + // whitelist; fall back to DEV mode (warn-logged) only when unset. + let tokens = if std::env::var("HOMECORE_TOKENS") + .map(|v| !v.trim().is_empty()) + .unwrap_or(false) + { + let s = LongLivedTokenStore::from_env(); + let n = s.len().await; + tracing::info!("LongLivedTokenStore provisioned with {n} bearer token(s) from HOMECORE_TOKENS"); + s + } else { + tracing::warn!( + "HOMECORE_TOKENS not set — token store in DEV mode (any non-empty bearer \ + accepted). Set HOMECORE_TOKENS before exposing this binary to the network." + ); + LongLivedTokenStore::allow_any_non_empty() + }; + + let state = SharedState::with_tokens(homecore, "Home", env!("CARGO_PKG_VERSION"), tokens); let app = router(state); - let addr = std::net::SocketAddr::from(([0, 0, 0, 0], DEFAULT_PORT)); + // Default to loopback so `cargo run` is not network-exposed; allow + // an explicit HOMECORE_BIND override for LAN deployments. + let addr: SocketAddr = match std::env::var("HOMECORE_BIND") { + Ok(v) if !v.trim().is_empty() => v.parse()?, + _ => SocketAddr::from(([127, 0, 0, 1], DEFAULT_PORT)), + }; tracing::info!("HOMECORE-API listening on http://{addr} (HA-compat /api + /api/websocket)"); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/v2/crates/homecore-api/src/ws.rs b/v2/crates/homecore-api/src/ws.rs index 4aa7a1ba..cea9f77f 100644 --- a/v2/crates/homecore-api/src/ws.rs +++ b/v2/crates/homecore-api/src/ws.rs @@ -9,6 +9,16 @@ //! //! `ha_version` is the homecore version string — see ADR-130 Q1 for the //! companion-app feature-detect concern. +//! +//! ## Security (ADR-161) +//! +//! The `auth` token is validated against [`crate::tokens::LongLivedTokenStore`] +//! via `state.tokens().is_valid()` — the *same* store the REST path uses +//! (`auth::BearerAuth`). A wrong token receives `auth_invalid` and the socket +//! is closed. (HC-WS-01 closed the prior bypass where any non-empty token was +//! accepted.) Command replies are transmitted by a dedicated writer task that +//! drains the response channel onto the socket (HC-WS-02 closed the prior +//! reply-theater where responses were logged and discarded). use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -18,7 +28,7 @@ use axum::extract::State; use axum::response::IntoResponse; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; -use tracing::{debug, warn}; +use tracing::warn; use homecore::{Context, ServiceCall, ServiceName, SystemEvent}; @@ -58,11 +68,18 @@ async fn handle_socket(mut socket: WebSocket, state: SharedState) { _ => return, }; - // P1: accept any non-empty token. P2: validate against store. - if token.trim().is_empty() { + // Validate the bearer token against the same store the REST path + // uses (`state.tokens().is_valid()` — see `rest.rs` / + // `auth::BearerAuth`). Before the HC-WS-01 fix this checked only + // `token.trim().is_empty()` and accepted ANY non-empty token even + // with a provisioned `HOMECORE_TOKENS` whitelist — a full WS auth + // bypass. `is_valid()` rejects the empty token internally and, in + // DEV (`allow_any`) mode, still accepts any non-empty bearer (with + // a warn) so smoke tests keep working. + if !state.tokens().is_valid(&token).await { let _ = socket .send(Message::Text( - serde_json::json!({"type":"auth_invalid","message":"empty token"}).to_string(), + serde_json::json!({"type":"auth_invalid","message":"invalid token"}).to_string(), )) .await; return; @@ -140,54 +157,71 @@ impl Connection { } } - async fn run(self, mut socket: WebSocket) { + async fn run(self, socket: WebSocket) { + use futures_util::{SinkExt, StreamExt}; + let conn = Arc::new(self); + // Split the socket so a dedicated writer task can drain `rx` onto + // the wire while the reader task processes commands concurrently. + // Before the HC-WS-02 fix the socket was moved into a recv-only + // task and the only `rx` consumer just `debug!`-logged and + // DISCARDED every message — so no `result`/`pong`/`event` ever + // reached the client. Now `rx` feeds `socket.send`. + let (mut sink, mut stream) = socket.split(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); - let sender_tx = tx.clone(); - let recv_task = { - let conn = Arc::clone(&conn); - tokio::spawn(async move { - while let Some(frame) = socket.recv().await { - match frame { - Ok(Message::Text(raw)) => { - let cmd: WsCommand = match serde_json::from_str(&raw) { - Ok(c) => c, - Err(e) => { - warn!("bad ws command: {e}"); - continue; - } - }; - conn.handle_cmd(cmd, &sender_tx).await; - } - Ok(Message::Ping(p)) => { - let _ = sender_tx.send(format!("__pong:{}", p.len())); - } - Ok(Message::Close(_)) | Err(_) => break, - _ => {} - } + // Writer task: drain replies onto the socket. A `__pong:` + // sentinel maps to a binary Pong control frame; everything else + // is a JSON text frame. + let writer_task = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + let send_result = if let Some(n) = msg.strip_prefix("__pong:") { + let len: usize = n.parse().unwrap_or(0); + sink.send(Message::Pong(vec![0u8; len])).await + } else { + sink.send(Message::Text(msg)).await + }; + if send_result.is_err() { + break; } - // Cancel all subscriptions on disconnect. - for entry in conn.subs.iter() { - entry.value().abort.abort(); - } - }); + } + }); - tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - if msg.starts_with("__pong:") { - // pong handled inline; skip - continue; + // Reader task: parse and dispatch commands; responses are pushed + // into `tx` and transmitted by the writer task above. + let reader_tx = tx.clone(); + { + let conn = Arc::clone(&conn); + while let Some(frame) = stream.next().await { + match frame { + Ok(Message::Text(raw)) => { + let cmd: WsCommand = match serde_json::from_str(&raw) { + Ok(c) => c, + Err(e) => { + warn!("bad ws command: {e}"); + continue; + } + }; + conn.handle_cmd(cmd, &reader_tx).await; } - // Use the socket from the recv task via a one-shot mpsc - // (in this minimal P1, the recv task owns the socket - // and we ack inline below — this branch is for the - // subscription fan-out emit path) - debug!("ws emit: {msg}"); + Ok(Message::Ping(p)) => { + let _ = reader_tx.send(format!("__pong:{}", p.len())); + } + Ok(Message::Close(_)) | Err(_) => break, + _ => {} } - }) - }; - let _ = recv_task.await; + } + // Cancel all subscriptions on disconnect. + for entry in conn.subs.iter() { + entry.value().abort.abort(); + } + } + + // Reader loop ended → drop the senders so the writer task's `rx` + // closes and the task exits cleanly. + drop(tx); + drop(reader_tx); + let _ = writer_task.await; } async fn handle_cmd(&self, cmd: WsCommand, tx: &tokio::sync::mpsc::UnboundedSender) { diff --git a/v2/crates/homecore-api/tests/server_bin_auth.rs b/v2/crates/homecore-api/tests/server_bin_auth.rs new file mode 100644 index 00000000..feea856e --- /dev/null +++ b/v2/crates/homecore-api/tests/server_bin_auth.rs @@ -0,0 +1,77 @@ +//! HC-WS-08 (ADR-161): the `homecore-api-server` bin must honor the +//! `HOMECORE_TOKENS` env whitelist instead of unconditionally accepting +//! any non-empty bearer. +//! +//! `main()` is not directly callable, so this reproduces the bin's exact +//! token-provisioning path (`LongLivedTokenStore::from_env()` when +//! `HOMECORE_TOKENS` is set) and drives a real HTTP request through the +//! router. On the pre-fix bin — which used `SharedState::new()` → +//! `allow_any_non_empty()` with NO env path — a wrong bearer was +//! accepted; this test asserts it is now rejected with 401. + +use axum::body::Body; +use axum::http::{Request, StatusCode}; +use homecore::HomeCore; +use homecore_api::{router, LongLivedTokenStore, SharedState}; +use tower::ServiceExt; // for `oneshot` + +/// Build the same state the bin builds when HOMECORE_TOKENS is set. +async fn provisioned_state(valid: &str) -> SharedState { + // Mirror `from_env()` deterministically without mutating process + // env (which would race other tests): an `empty()` store with the + // one provisioned token registered is exactly what + // `from_env()` produces for `HOMECORE_TOKENS=`. + let store = LongLivedTokenStore::empty(); + store.register(valid).await; + SharedState::with_tokens(HomeCore::new(), "Home", "test", store) +} + +#[tokio::test] +async fn provisioned_bin_rejects_wrong_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/states") + .header("Authorization", "Bearer the_wrong_token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::UNAUTHORIZED, + "a provisioned token store must reject a wrong bearer (HC-WS-08)" + ); +} + +#[tokio::test] +async fn provisioned_bin_accepts_correct_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/states") + .header("Authorization", "Bearer the_real_token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn from_env_path_enforces_whitelist() { + // Exercise the literal `from_env()` constructor the bin uses, under + // a serialized env mutation, to prove the env path itself enforces. + std::env::set_var("HOMECORE_TOKENS", "env_token_1, env_token_2"); + let store = LongLivedTokenStore::from_env(); + std::env::remove_var("HOMECORE_TOKENS"); + + assert!(store.is_valid("env_token_1").await); + assert!(store.is_valid("env_token_2").await); + assert!(!store.is_valid("not_in_whitelist").await); + assert!(!store.is_dev_mode().await, "from_env must NOT be dev mode"); +} diff --git a/v2/crates/homecore-api/tests/ws_handshake.rs b/v2/crates/homecore-api/tests/ws_handshake.rs new file mode 100644 index 00000000..b4df3424 --- /dev/null +++ b/v2/crates/homecore-api/tests/ws_handshake.rs @@ -0,0 +1,168 @@ +//! End-to-end WebSocket handshake + reply tests (ADR-161, HC-WS-01/02). +//! +//! These bind a real `TcpListener`, serve the full router, and connect +//! with a real WS client (`tokio-tungstenite`). They exercise the wire +//! path the in-crate unit tests cannot. +//! +//! - `wrong_token_is_rejected` — FAILS on the pre-fix `ws.rs` that only +//! checked `token.trim().is_empty()` and accepted any non-empty token +//! (HC-WS-01: WS auth bypass). +//! - `result_reply_is_received` — FAILS on the pre-fix `ws.rs` that moved +//! the socket into a recv-only task and discarded every reply with +//! `debug!("ws emit: {msg}")` (HC-WS-02: reply theater). + +use std::net::SocketAddr; + +use futures_util::{SinkExt, StreamExt}; +use homecore::HomeCore; +use homecore_api::{router, LongLivedTokenStore, SharedState}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +/// Spawn the API on an ephemeral port with a real (non-dev) token store +/// containing exactly one valid token. Returns the bound address. +async fn spawn_server_with_token(valid_token: &str) -> SocketAddr { + let hc = HomeCore::new(); + let tokens = LongLivedTokenStore::empty(); + tokens.register(valid_token).await; + let state = SharedState::with_tokens(hc, "Test", "test-version", tokens); + let app = router(state); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + addr +} + +/// Read text frames until one parses as JSON; returns the parsed value. +async fn next_json(ws: &mut S) -> serde_json::Value +where + S: StreamExt> + Unpin, +{ + loop { + match ws.next().await { + Some(Ok(Message::Text(raw))) => { + if let Ok(v) = serde_json::from_str::(&raw) { + return v; + } + } + Some(Ok(_)) => continue, + other => panic!("expected text frame, got {other:?}"), + } + } +} + +#[tokio::test] +async fn wrong_token_is_rejected() { + // HC-WS-01: a provisioned store with one good token must reject a + // DIFFERENT (non-empty) token over the WS handshake. The old code + // sent `auth_ok` for any non-empty token — this asserts the fix. + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + // Server → auth_required + let req = next_json(&mut ws).await; + assert_eq!(req["type"], "auth_required"); + + // Client → auth with the WRONG token + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"wrong_token_xyz"}).to_string(), + )) + .await + .unwrap(); + + // Server → auth_invalid (NOT auth_ok) + let resp = next_json(&mut ws).await; + assert_eq!( + resp["type"], "auth_invalid", + "wrong token must be rejected with auth_invalid, got: {resp}" + ); + assert_ne!(resp["type"], "auth_ok", "wrong token must NOT receive auth_ok"); +} + +#[tokio::test] +async fn correct_token_is_accepted() { + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let req = next_json(&mut ws).await; + assert_eq!(req["type"], "auth_required"); + + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + + let resp = next_json(&mut ws).await; + assert_eq!(resp["type"], "auth_ok", "correct token should be accepted, got: {resp}"); +} + +#[tokio::test] +async fn result_reply_is_received() { + // HC-WS-02: after a successful auth, a `get_states` command must + // produce a `result` reply RECEIVED over the socket. The old code + // discarded all replies in the rx-draining task, so this hangs/ + // fails on the pre-fix source. + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let req = next_json(&mut ws).await; + assert_eq!(req["type"], "auth_required"); + + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + let auth = next_json(&mut ws).await; + assert_eq!(auth["type"], "auth_ok"); + + // Send a command and assert we RECEIVE a result reply. + ws.send(Message::Text( + serde_json::json!({"id": 1, "type": "get_states"}).to_string(), + )) + .await + .unwrap(); + + let reply = tokio::time::timeout(std::time::Duration::from_secs(5), next_json(&mut ws)) + .await + .expect("did not receive a reply within 5s — reply theater (HC-WS-02)"); + assert_eq!(reply["type"], "result", "expected a result reply, got: {reply}"); + assert_eq!(reply["id"], 1); + assert_eq!(reply["success"], true); +} + +#[tokio::test] +async fn ping_pong_reply_is_received() { + // The `ping` command must produce a `pong` reply on the wire — also + // exercises the writer task that HC-WS-02 introduced. + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let _ = next_json(&mut ws).await; // auth_required + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + let _ = next_json(&mut ws).await; // auth_ok + + ws.send(Message::Text( + serde_json::json!({"id": 7, "type": "ping"}).to_string(), + )) + .await + .unwrap(); + + let reply = tokio::time::timeout(std::time::Duration::from_secs(5), next_json(&mut ws)) + .await + .expect("did not receive pong within 5s"); + assert_eq!(reply["type"], "pong"); + assert_eq!(reply["id"], 7); +}