wifi-densepose/firmware/esp32-csi-node/main/temporal_task.c

305 lines
9.7 KiB
C

/**
* @file temporal_task.c
* @brief ADR-095 / #513 — On-device temporal head FreeRTOS task.
*
* Owns the only `ruv_temporal_ctx_t` in the firmware. Receives feature
* frames from the adaptive_controller fast loop via a FreeRTOS queue,
* pushes them into the rolling window, and at ~1 Hz runs a
* classification forward through the Rust `ruvllm_sparse_attention`
* staticlib (when built — see CONFIG_CSI_TEMPORAL_HEAD_ENABLED).
*
* The whole file compiles down to no-op shims when the feature is off,
* so adaptive_controller.c can call `temporal_task_push_frame()`
* unconditionally — the function returns ESP_ERR_NOT_SUPPORTED and
* costs one nullable check.
*/
#include "temporal_task.h"
#include <string.h>
#include "esp_log.h"
#include "esp_timer.h"
#include "sdkconfig.h"
static const char *TAG = "temporal";
#ifdef CONFIG_CSI_TEMPORAL_HEAD_ENABLED
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "freertos/task.h"
#include "csi_collector.h" /* node_id */
#include "stream_sender.h"
#include "ruv_temporal.h" /* C ABI from components/ruv_temporal */
/* Queue depth — picked so that the adaptive controller's fast loop
* (default 5 Hz) can't overrun the temporal task even if classify()
* stalls for ~6 s. Drops beyond that are logged. */
#define TEMPORAL_QUEUE_DEPTH 32
/* Stack sized per ADR-095 §3.3. The kernel forward + intermediate
* tensors are bounded by `forward_flash` tiling, but rv_feature_state
* marshalling, logging, and stream_sender_send all share this stack. */
#define TEMPORAL_TASK_STACK 16384
/* Pinned to Core 1, like edge_dsp. WiFi runs on Core 0 — keep them
* apart so the temporal forward doesn't compete with CSI capture. */
#define TEMPORAL_TASK_CORE 1
/* Classification cadence in milliseconds. 1 Hz is the ADR-095 §3 default. */
#ifndef CONFIG_TEMPORAL_CLASSIFY_PERIOD_MS
#define CONFIG_TEMPORAL_CLASSIFY_PERIOD_MS 1000
#endif
/* Maximum logits buffer — sized to the largest n_classes any of the
* ADR-095 §4 use cases needs (anomaly = 2, fall = 3, gesture = 8). */
#define TEMPORAL_MAX_LOGITS 16
/* ---- Module state ----------------------------------------------------- */
typedef struct {
float frame[TEMPORAL_MAX_LOGITS * 8]; /* generous; trimmed via input_dim */
uint32_t frame_len;
} temporal_msg_t;
static QueueHandle_t s_queue;
static TaskHandle_t s_task;
static ruv_temporal_ctx_t *s_ctx;
static uint32_t s_input_dim;
static uint32_t s_window_len;
static uint32_t s_n_classes;
static uint32_t s_seq;
static uint32_t s_drop_count;
static uint64_t s_last_drop_log_us;
/* Lightweight CRC32 (IEEE 802.3 polynomial 0xEDB88320), table-free.
* Used only for the 36-byte classification packet — speed isn't
* critical. Existing firmware has its own CRC32 in csi_collector.c
* but we don't link against it from here to keep coupling narrow. */
static uint32_t crc32_ieee(const uint8_t *data, size_t len)
{
uint32_t crc = 0xFFFFFFFFu;
for (size_t i = 0; i < len; i++) {
crc ^= data[i];
for (int b = 0; b < 8; b++) {
uint32_t mask = -(int32_t)(crc & 1u);
crc = (crc >> 1) ^ (0xEDB88320u & mask);
}
}
return ~crc;
}
static void emit_classification(const float *logits, uint32_t n)
{
/* Find argmax + margin in one pass. */
uint32_t argmax = 0;
float top1 = logits[0];
float top2 = -1e30f;
for (uint32_t i = 1; i < n; i++) {
float v = logits[i];
if (v > top1) {
top2 = top1;
top1 = v;
argmax = i;
} else if (v > top2) {
top2 = v;
}
}
rv_temporal_pkt_t pkt;
memset(&pkt, 0, sizeof(pkt));
pkt.magic = RV_TEMPORAL_PKT_MAGIC;
pkt.version = 1;
pkt.n_classes = (uint16_t)n;
pkt.node_id = csi_collector_get_node_id();
pkt.ts_us = (uint64_t)esp_timer_get_time();
pkt.seq = ++s_seq;
pkt.argmax = (uint8_t)argmax;
pkt.top_logit = top1;
pkt.top1_minus_top2 = top1 - top2;
pkt.crc32 = crc32_ieee((const uint8_t *)&pkt, sizeof(pkt) - sizeof(pkt.crc32));
int sent = stream_sender_send((const uint8_t *)&pkt, sizeof(pkt));
if (sent < 0) {
ESP_LOGW(TAG, "classification emit failed");
}
}
static void temporal_task_loop(void *arg)
{
(void)arg;
ESP_LOGI(TAG, "temporal task online (window=%u dim=%u classes=%u core=%d)",
(unsigned)s_window_len, (unsigned)s_input_dim,
(unsigned)s_n_classes, TEMPORAL_TASK_CORE);
/* Self-test the kernel link before touching real frames. */
if (ruv_temporal_kernel_self_test() != ESP_OK) {
ESP_LOGE(TAG, "ruv_temporal_kernel_self_test FAILED — temporal head disabled");
s_ctx = NULL;
vTaskDelete(NULL);
return;
}
uint64_t next_classify_us = esp_timer_get_time()
+ (uint64_t)CONFIG_TEMPORAL_CLASSIFY_PERIOD_MS * 1000ull;
float logits[TEMPORAL_MAX_LOGITS];
for (;;) {
temporal_msg_t msg;
/* Block up to 100 ms for a frame, then check if it's time to
* classify. This double-poll keeps the cadence honest even
* during long quiet periods. */
if (xQueueReceive(s_queue, &msg, pdMS_TO_TICKS(100)) == pdTRUE) {
if (s_ctx != NULL) {
(void)ruv_temporal_push(s_ctx, msg.frame);
}
}
uint64_t now_us = esp_timer_get_time();
if (now_us >= next_classify_us && s_ctx != NULL) {
esp_err_t cret = ruv_temporal_classify(s_ctx, logits, s_n_classes);
if (cret == ESP_OK) {
emit_classification(logits, s_n_classes);
} else {
ESP_LOGW(TAG, "classify returned 0x%x", (unsigned)cret);
}
next_classify_us = now_us
+ (uint64_t)CONFIG_TEMPORAL_CLASSIFY_PERIOD_MS * 1000ull;
}
/* Coalesce drop-count logs to once per second so a backlog
* doesn't flood the serial console. */
if (s_drop_count > 0 && now_us - s_last_drop_log_us > 1000000ull) {
ESP_LOGW(TAG, "queue full — dropped %u feature frames",
(unsigned)s_drop_count);
s_drop_count = 0;
s_last_drop_log_us = now_us;
}
}
}
esp_err_t temporal_task_start(uint32_t input_dim,
uint32_t window_len,
uint32_t n_classes)
{
if (s_task != NULL) {
return ESP_OK; /* idempotent */
}
if (input_dim == 0 || window_len == 0 || n_classes == 0) {
return ESP_ERR_INVALID_ARG;
}
if (n_classes > TEMPORAL_MAX_LOGITS) {
ESP_LOGE(TAG, "n_classes=%u exceeds TEMPORAL_MAX_LOGITS=%d",
(unsigned)n_classes, TEMPORAL_MAX_LOGITS);
return ESP_ERR_INVALID_SIZE;
}
/* Allocate the kernel context. Phase 4 stub returns ESP_OK without
* weights; Phase 5b will accept a real weights blob. */
esp_err_t ret = ruv_temporal_init(NULL, 0, input_dim, window_len, n_classes,
&s_ctx);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "ruv_temporal_init failed: 0x%x", (unsigned)ret);
return ret;
}
s_input_dim = input_dim;
s_window_len = window_len;
s_n_classes = n_classes;
s_seq = 0;
s_drop_count = 0;
s_last_drop_log_us = 0;
s_queue = xQueueCreate(TEMPORAL_QUEUE_DEPTH, sizeof(temporal_msg_t));
if (s_queue == NULL) {
ESP_LOGE(TAG, "queue create failed");
ruv_temporal_destroy(s_ctx);
s_ctx = NULL;
return ESP_ERR_NO_MEM;
}
BaseType_t ok = xTaskCreatePinnedToCore(
temporal_task_loop, "ruv_temporal", TEMPORAL_TASK_STACK,
NULL, 4 /* priority, below edge_dsp */,
&s_task, TEMPORAL_TASK_CORE);
if (ok != pdPASS) {
ESP_LOGE(TAG, "task create failed");
vQueueDelete(s_queue);
s_queue = NULL;
ruv_temporal_destroy(s_ctx);
s_ctx = NULL;
return ESP_ERR_NO_MEM;
}
return ESP_OK;
}
esp_err_t temporal_task_push_frame(const float *frame, uint32_t frame_len)
{
if (frame == NULL || frame_len == 0) {
return ESP_ERR_INVALID_ARG;
}
if (s_queue == NULL) {
return ESP_ERR_NOT_FOUND;
}
temporal_msg_t msg;
uint32_t cap = (uint32_t)(sizeof(msg.frame) / sizeof(msg.frame[0]));
uint32_t n = (frame_len < cap) ? frame_len : cap;
if (n < s_input_dim) {
/* Pad short frames with zeros so the rolling window stays
* dimension-stable from the kernel's perspective. */
memcpy(msg.frame, frame, n * sizeof(float));
memset(&msg.frame[n], 0, (s_input_dim - n) * sizeof(float));
msg.frame_len = s_input_dim;
} else {
memcpy(msg.frame, frame, s_input_dim * sizeof(float));
msg.frame_len = s_input_dim;
}
/* Non-blocking — temporal head is best-effort. */
if (xQueueSend(s_queue, &msg, 0) != pdPASS) {
s_drop_count++;
return ESP_ERR_TIMEOUT;
}
return ESP_OK;
}
void temporal_task_stop(void)
{
if (s_task != NULL) {
vTaskDelete(s_task);
s_task = NULL;
}
if (s_queue != NULL) {
vQueueDelete(s_queue);
s_queue = NULL;
}
if (s_ctx != NULL) {
ruv_temporal_destroy(s_ctx);
s_ctx = NULL;
}
}
#else /* !CONFIG_CSI_TEMPORAL_HEAD_ENABLED */
esp_err_t temporal_task_start(uint32_t input_dim,
uint32_t window_len,
uint32_t n_classes)
{
(void)input_dim;
(void)window_len;
(void)n_classes;
return ESP_ERR_NOT_SUPPORTED;
}
esp_err_t temporal_task_push_frame(const float *frame, uint32_t frame_len)
{
(void)frame;
(void)frame_len;
return ESP_ERR_NOT_SUPPORTED;
}
void temporal_task_stop(void) {}
#endif /* CONFIG_CSI_TEMPORAL_HEAD_ENABLED */