392 lines
13 KiB
Python
392 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RuView Medical Vitals Suite — 10 capabilities from a single mmWave sensor
|
|
|
|
Capabilities:
|
|
1. Heart rate monitoring (continuous)
|
|
2. Breathing rate monitoring (continuous)
|
|
3. Blood pressure estimation (HRV-based)
|
|
4. HRV stress analysis (SDNN, RMSSD, pNN50, LF/HF)
|
|
5. Sleep stage classification (awake/light/deep/REM)
|
|
6. Apnea event detection (BR=0 for >10s)
|
|
7. Cough detection (BR spike pattern)
|
|
8. Snoring detection (periodic high-amplitude BR)
|
|
9. Activity state (resting/active/exercising)
|
|
10. Meditation quality scorer (coherence of BR+HR)
|
|
|
|
Usage:
|
|
python examples/medical/vitals_suite.py --port COM4 --duration 120
|
|
"""
|
|
|
|
import argparse
|
|
import collections
|
|
import math
|
|
import re
|
|
import serial
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import numpy as np
|
|
HAS_NP = True
|
|
except ImportError:
|
|
HAS_NP = False
|
|
|
|
RE_HR = re.compile(r"'Real-time heart rate'.*?(\d+\.?\d*)\s*bpm", re.I)
|
|
RE_BR = re.compile(r"'Real-time respiratory rate'.*?(\d+\.?\d*)", re.I)
|
|
RE_PRES = re.compile(r"'Person Information'.*?state\s+(ON|OFF)", re.I)
|
|
RE_DIST = re.compile(r"'Distance to detection object'.*?(\d+\.?\d*)\s*cm", re.I)
|
|
RE_ANSI = re.compile(r"\x1b\[[0-9;]*m")
|
|
|
|
|
|
class WelfordStats:
|
|
def __init__(self):
|
|
self.count = 0
|
|
self.mean = 0.0
|
|
self.m2 = 0.0
|
|
|
|
def update(self, v):
|
|
self.count += 1
|
|
d = v - self.mean
|
|
self.mean += d / self.count
|
|
self.m2 += d * (v - self.mean)
|
|
|
|
def std(self):
|
|
return math.sqrt(self.m2 / self.count) if self.count > 1 else 0.0
|
|
|
|
def cv(self):
|
|
return self.std() / self.mean if self.mean > 0 else 0.0
|
|
|
|
|
|
class VitalsSuite:
|
|
def __init__(self):
|
|
# Raw buffers
|
|
self.hr_buf = collections.deque(maxlen=300)
|
|
self.br_buf = collections.deque(maxlen=300)
|
|
self.hr_ts = collections.deque(maxlen=300)
|
|
self.br_ts = collections.deque(maxlen=300)
|
|
self.distance = 0.0
|
|
self.presence = False
|
|
self.frames = 0
|
|
|
|
# Welford trackers
|
|
self.hr_stats = WelfordStats()
|
|
self.br_stats = WelfordStats()
|
|
|
|
# Apnea detection
|
|
self.last_br_time = time.time()
|
|
self.last_nonzero_br = 0.0
|
|
self.apnea_events = []
|
|
self.in_apnea = False
|
|
self.apnea_start = 0.0
|
|
|
|
# Cough detection
|
|
self.cough_events = []
|
|
self.prev_br = 0.0
|
|
|
|
# Snoring detection
|
|
self.snore_events = 0
|
|
self.br_amplitude_buf = collections.deque(maxlen=30)
|
|
|
|
# Sleep state
|
|
self.sleep_state = "Awake"
|
|
self.sleep_onset = 0.0
|
|
|
|
# Meditation
|
|
self.meditation_score = 0.0
|
|
|
|
# Events
|
|
self.events = collections.deque(maxlen=50)
|
|
|
|
def feed(self, hr=0.0, br=0.0, presence=False, distance=0.0):
|
|
now = time.time()
|
|
self.presence = presence
|
|
self.distance = distance
|
|
self.frames += 1
|
|
|
|
if hr > 0:
|
|
self.hr_buf.append(hr)
|
|
self.hr_ts.append(now)
|
|
self.hr_stats.update(hr)
|
|
|
|
if br > 0:
|
|
self.br_buf.append(br)
|
|
self.br_ts.append(now)
|
|
self.br_stats.update(br)
|
|
self.last_br_time = now
|
|
self.last_nonzero_br = br
|
|
|
|
# Cough: sudden BR spike > 2x baseline
|
|
if self.prev_br > 0 and br > self.prev_br * 2.5 and self.br_stats.count > 10:
|
|
self.cough_events.append(now)
|
|
self.events.append((now, "Cough detected"))
|
|
|
|
# Snoring: track BR amplitude variation
|
|
if len(self.br_buf) >= 2:
|
|
amp = abs(br - list(self.br_buf)[-2])
|
|
self.br_amplitude_buf.append(amp)
|
|
|
|
self.prev_br = br
|
|
|
|
# End apnea
|
|
if self.in_apnea:
|
|
duration = now - self.apnea_start
|
|
self.apnea_events.append(duration)
|
|
self.events.append((now, f"Apnea ended ({duration:.0f}s)"))
|
|
self.in_apnea = False
|
|
else:
|
|
# Apnea: BR=0 for >10s
|
|
gap = now - self.last_br_time
|
|
if gap >= 10 and not self.in_apnea and self.br_stats.count > 5:
|
|
self.in_apnea = True
|
|
self.apnea_start = self.last_br_time
|
|
self.events.append((now, f"APNEA started (no breath for {gap:.0f}s)"))
|
|
|
|
# Sleep stage classification
|
|
self._classify_sleep()
|
|
|
|
# Meditation score
|
|
self._compute_meditation()
|
|
|
|
# Snoring: periodic high-amplitude BR oscillation
|
|
if len(self.br_amplitude_buf) >= 10:
|
|
amps = list(self.br_amplitude_buf)
|
|
mean_amp = sum(amps) / len(amps)
|
|
if mean_amp > 3.0 and self.sleep_state != "Awake":
|
|
self.snore_events += 1
|
|
|
|
def _classify_sleep(self):
|
|
"""Sleep stage from BR variability + HR patterns."""
|
|
hrs = list(self.hr_buf)
|
|
brs = list(self.br_buf)
|
|
|
|
if len(hrs) < 10 or len(brs) < 10:
|
|
self.sleep_state = "Awake"
|
|
return
|
|
|
|
recent_hr = hrs[-10:]
|
|
recent_br = brs[-10:]
|
|
mean_hr = sum(recent_hr) / len(recent_hr)
|
|
mean_br = sum(recent_br) / len(recent_br)
|
|
|
|
# HR variability of last 10 readings
|
|
hr_std = math.sqrt(sum((h - mean_hr) ** 2 for h in recent_hr) / len(recent_hr))
|
|
br_std = math.sqrt(sum((b - mean_br) ** 2 for b in recent_br) / len(recent_br))
|
|
|
|
# Activity check
|
|
if mean_hr > 100 or mean_br > 25:
|
|
self.sleep_state = "Awake"
|
|
return
|
|
|
|
# Low HR + low BR + low variability = deep sleep
|
|
if mean_hr < 60 and mean_br < 14 and hr_std < 3 and br_std < 1:
|
|
if self.sleep_state != "Deep Sleep":
|
|
self.events.append((time.time(), "Entered deep sleep"))
|
|
self.sleep_state = "Deep Sleep"
|
|
# Moderate HR + high HR variability = REM
|
|
elif hr_std > 5 and br_std > 2 and mean_br < 20:
|
|
if self.sleep_state != "REM":
|
|
self.events.append((time.time(), "Entered REM sleep"))
|
|
self.sleep_state = "REM"
|
|
# Low-moderate HR + low motion = light sleep
|
|
elif mean_hr < 75 and mean_br < 20:
|
|
if self.sleep_state != "Light Sleep":
|
|
self.events.append((time.time(), "Entered light sleep"))
|
|
self.sleep_state = "Light Sleep"
|
|
else:
|
|
self.sleep_state = "Awake"
|
|
|
|
def _compute_meditation(self):
|
|
"""Meditation quality: BR regularity + HR deceleration + HRV increase."""
|
|
brs = list(self.br_buf)
|
|
hrs = list(self.hr_buf)
|
|
if len(brs) < 15 or len(hrs) < 15:
|
|
self.meditation_score = 0.0
|
|
return
|
|
|
|
# BR regularity (lower CV = more regular breathing)
|
|
br_recent = brs[-15:]
|
|
br_mean = sum(br_recent) / len(br_recent)
|
|
br_std = math.sqrt(sum((b - br_mean) ** 2 for b in br_recent) / len(br_recent))
|
|
br_cv = br_std / br_mean if br_mean > 0 else 1.0
|
|
br_score = max(0, min(1, 1.0 - br_cv * 5)) # CV < 0.05 = perfect
|
|
|
|
# HR deceleration (lower HR = better)
|
|
hr_recent = hrs[-15:]
|
|
mean_hr = sum(hr_recent) / len(hr_recent)
|
|
hr_score = max(0, min(1, (90 - mean_hr) / 30)) # 60bpm=1.0, 90bpm=0.0
|
|
|
|
# HRV increase (higher SDNN = better)
|
|
rr = [60000 / h for h in hr_recent if h > 0]
|
|
if len(rr) >= 5:
|
|
rr_mean = sum(rr) / len(rr)
|
|
sdnn = math.sqrt(sum((r - rr_mean) ** 2 for r in rr) / len(rr))
|
|
hrv_score = max(0, min(1, sdnn / 100)) # 100ms SDNN = perfect
|
|
else:
|
|
hrv_score = 0.0
|
|
|
|
self.meditation_score = (br_score * 0.4 + hr_score * 0.3 + hrv_score * 0.3) * 100
|
|
|
|
def activity_state(self):
|
|
if len(self.hr_buf) < 3:
|
|
return "Unknown"
|
|
recent = list(self.hr_buf)[-5:]
|
|
mean_hr = sum(recent) / len(recent)
|
|
if mean_hr > 120:
|
|
return "Exercising"
|
|
elif mean_hr > 90:
|
|
return "Active"
|
|
elif mean_hr > 60:
|
|
return "Resting"
|
|
else:
|
|
return "Deep Rest"
|
|
|
|
def hrv(self):
|
|
hrs = list(self.hr_buf)
|
|
if len(hrs) < 5:
|
|
return {"sdnn": 0, "rmssd": 0, "pnn50": 0}
|
|
rr = [60000 / h for h in hrs if h > 0]
|
|
if len(rr) < 5:
|
|
return {"sdnn": 0, "rmssd": 0, "pnn50": 0}
|
|
mean = sum(rr) / len(rr)
|
|
sdnn = math.sqrt(sum((r - mean) ** 2 for r in rr) / len(rr))
|
|
diffs = [abs(rr[i + 1] - rr[i]) for i in range(len(rr) - 1)]
|
|
rmssd = math.sqrt(sum(d ** 2 for d in diffs) / len(diffs)) if diffs else 0
|
|
pnn50 = sum(1 for d in diffs if d > 50) / len(diffs) * 100 if diffs else 0
|
|
return {"sdnn": sdnn, "rmssd": rmssd, "pnn50": pnn50}
|
|
|
|
def bp(self):
|
|
hrs = list(self.hr_buf)
|
|
if len(hrs) < 5:
|
|
return 0, 0
|
|
mean_hr = sum(hrs) / len(hrs)
|
|
hrv = self.hrv()
|
|
if hrv["sdnn"] <= 0:
|
|
return 0, 0
|
|
delta = mean_hr - 72
|
|
sbp = round(max(80, min(200, 120 + 0.5 * delta - 0.8 * (hrv["sdnn"] - 50) / 50)))
|
|
dbp = round(max(50, min(130, 80 + 0.3 * delta - 0.5 * (hrv["sdnn"] - 50) / 50)))
|
|
return sbp, dbp
|
|
|
|
def stress(self):
|
|
h = self.hrv()
|
|
s = h["sdnn"]
|
|
if s <= 0: return "---"
|
|
if s < 30: return "HIGH"
|
|
if s < 50: return "Moderate"
|
|
if s < 80: return "Mild"
|
|
if s < 100: return "Relaxed"
|
|
return "Calm"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Medical Vitals Suite (10 capabilities)")
|
|
parser.add_argument("--port", default="COM4")
|
|
parser.add_argument("--baud", type=int, default=115200)
|
|
parser.add_argument("--duration", type=int, default=120)
|
|
args = parser.parse_args()
|
|
|
|
ser = serial.Serial(args.port, args.baud, timeout=1)
|
|
suite = VitalsSuite()
|
|
start = time.time()
|
|
last_print = 0
|
|
|
|
print()
|
|
print("=" * 80)
|
|
print(" RuView Medical Vitals Suite (10 capabilities from 1 sensor)")
|
|
print(" Point MR60BHA2 at yourself within 1m. Sit still.")
|
|
print("=" * 80)
|
|
print()
|
|
print(f"{'s':>4} {'HR':>4} {'BR':>3} {'BP':>7} {'Stress':>8} {'SDNN':>5} "
|
|
f"{'Sleep':>11} {'Activity':>10} {'Medit':>5} "
|
|
f"{'Apnea':>5} {'Cough':>5} {'Snore':>5}")
|
|
print("-" * 80)
|
|
|
|
try:
|
|
while time.time() - start < args.duration:
|
|
line = ser.readline().decode("utf-8", errors="replace")
|
|
clean = RE_ANSI.sub("", line)
|
|
|
|
hr, br, pres, dist = 0.0, 0.0, suite.presence, suite.distance
|
|
m = RE_HR.search(clean)
|
|
if m: hr = float(m.group(1))
|
|
m = RE_BR.search(clean)
|
|
if m: br = float(m.group(1))
|
|
m = RE_PRES.search(clean)
|
|
if m: pres = m.group(1) == "ON"
|
|
m = RE_DIST.search(clean)
|
|
if m: dist = float(m.group(1))
|
|
|
|
if hr > 0 or br > 0:
|
|
suite.feed(hr=hr, br=br, presence=pres, distance=dist)
|
|
|
|
elapsed = int(time.time() - start)
|
|
if elapsed > last_print and elapsed % 5 == 0:
|
|
last_print = elapsed
|
|
hrv = suite.hrv()
|
|
sbp, dbp = suite.bp()
|
|
bp_s = f"{sbp:>3}/{dbp:<3}" if sbp > 0 else " --- "
|
|
sdnn_s = f"{hrv['sdnn']:>5.0f}" if hrv["sdnn"] > 0 else " ---"
|
|
|
|
hrs = list(suite.hr_buf)
|
|
mean_hr = sum(hrs) / len(hrs) if hrs else 0
|
|
|
|
brs = list(suite.br_buf)
|
|
mean_br = sum(brs) / len(brs) if brs else 0
|
|
|
|
print(f"{elapsed:>3}s {mean_hr:>4.0f} {mean_br:>3.0f} {bp_s} {suite.stress():>8} {sdnn_s} "
|
|
f"{suite.sleep_state:>11} {suite.activity_state():>10} {suite.meditation_score:>5.0f} "
|
|
f"{len(suite.apnea_events):>5} {len(suite.cough_events):>5} {suite.snore_events:>5}")
|
|
|
|
# Print recent events
|
|
for ts, msg in list(suite.events)[-3:]:
|
|
if time.time() - ts < 6:
|
|
print(f" >> {msg}")
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
ser.close()
|
|
elapsed = time.time() - start
|
|
|
|
print()
|
|
print("=" * 80)
|
|
print(" VITALS SUITE SUMMARY")
|
|
print("=" * 80)
|
|
hrv = suite.hrv()
|
|
sbp, dbp = suite.bp()
|
|
hrs = list(suite.hr_buf)
|
|
brs = list(suite.br_buf)
|
|
|
|
print(f" Duration: {elapsed:.0f}s")
|
|
print(f" Readings: {suite.frames}")
|
|
print()
|
|
|
|
if hrs:
|
|
print(f" 1. Heart Rate: {sum(hrs)/len(hrs):.0f} bpm (range {min(hrs):.0f}-{max(hrs):.0f})")
|
|
if brs:
|
|
print(f" 2. Breathing: {sum(brs)/len(brs):.0f}/min (range {min(brs):.0f}-{max(brs):.0f})")
|
|
if sbp:
|
|
print(f" 3. BP Estimate: {sbp}/{dbp} mmHg")
|
|
if hrv["sdnn"] > 0:
|
|
print(f" 4. HRV/Stress: SDNN={hrv['sdnn']:.0f}ms RMSSD={hrv['rmssd']:.0f}ms pNN50={hrv['pnn50']:.1f}% -> {suite.stress()}")
|
|
print(f" 5. Sleep State: {suite.sleep_state}")
|
|
print(f" 6. Apnea Events: {len(suite.apnea_events)} {'(AHI=' + str(round(len(suite.apnea_events)/(elapsed/3600),1)) + '/hr)' if suite.apnea_events else ''}")
|
|
print(f" 7. Cough Events: {len(suite.cough_events)}")
|
|
print(f" 8. Snore Events: {suite.snore_events}")
|
|
print(f" 9. Activity: {suite.activity_state()}")
|
|
print(f" 10. Meditation: {suite.meditation_score:.0f}/100")
|
|
|
|
if suite.events:
|
|
print(f"\n Events ({len(suite.events)}):")
|
|
for ts, msg in list(suite.events)[-15:]:
|
|
print(f" [{int(ts-start):>4}s] {msg}")
|
|
|
|
print()
|
|
print(" NOT A MEDICAL DEVICE. For research/wellness only.")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|