130 lines
4.4 KiB
Python
130 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sleep Apnea Screener — Contactless via 60 GHz mmWave
|
|
|
|
Monitors breathing rate from MR60BHA2 and detects apnea events
|
|
(breathing cessation > 10 seconds). Clinical threshold: > 5 events/hour
|
|
= Obstructive Sleep Apnea (mild), > 15 = moderate, > 30 = severe.
|
|
|
|
Usage:
|
|
python examples/sleep/apnea_screener.py --port COM4
|
|
python examples/sleep/apnea_screener.py --port COM4 --duration 3600 # 1 hour
|
|
"""
|
|
|
|
import argparse
|
|
import collections
|
|
import re
|
|
import serial
|
|
import sys
|
|
import time
|
|
|
|
RE_BR = re.compile(r"'Real-time respiratory rate'.*?(\d+\.?\d*)", re.IGNORECASE)
|
|
RE_HR = re.compile(r"'Real-time heart rate'.*?(\d+\.?\d*)", re.IGNORECASE)
|
|
RE_PRES = re.compile(r"'Person Information'.*?state\s+(ON|OFF)", re.IGNORECASE)
|
|
RE_ANSI = re.compile(r"\x1b\[[0-9;]*m")
|
|
|
|
APNEA_THRESHOLD_SEC = 10 # Breathing absent for >10s = apnea event
|
|
HYPOPNEA_BR = 6.0 # BR < 6/min = hypopnea (shallow breathing)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sleep Apnea Screener (mmWave)")
|
|
parser.add_argument("--port", default="COM4")
|
|
parser.add_argument("--baud", type=int, default=115200)
|
|
parser.add_argument("--duration", type=int, default=120, help="Duration in seconds")
|
|
args = parser.parse_args()
|
|
|
|
ser = serial.Serial(args.port, args.baud, timeout=1)
|
|
|
|
print()
|
|
print("=" * 60)
|
|
print(" Sleep Apnea Screener (60 GHz mmWave)")
|
|
print(" Lie still within 1m of sensor. Monitoring breathing.")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
br_history = collections.deque(maxlen=600)
|
|
apnea_events = []
|
|
hypopnea_events = []
|
|
last_br_time = time.time()
|
|
last_br_value = 0.0
|
|
last_hr = 0.0
|
|
in_apnea = False
|
|
apnea_start = 0.0
|
|
start = time.time()
|
|
last_print = 0
|
|
|
|
try:
|
|
while time.time() - start < args.duration:
|
|
line = ser.readline().decode("utf-8", errors="replace")
|
|
clean = RE_ANSI.sub("", line)
|
|
|
|
m = RE_BR.search(clean)
|
|
if m:
|
|
br = float(m.group(1))
|
|
br_history.append((time.time(), br))
|
|
|
|
if br > 0:
|
|
last_br_time = time.time()
|
|
last_br_value = br
|
|
|
|
if in_apnea:
|
|
duration = time.time() - apnea_start
|
|
apnea_events.append(duration)
|
|
print(f" ** APNEA EVENT ENDED: {duration:.1f}s **")
|
|
in_apnea = False
|
|
|
|
if br < HYPOPNEA_BR and br > 0:
|
|
hypopnea_events.append(br)
|
|
|
|
elif br == 0 and not in_apnea:
|
|
gap = time.time() - last_br_time
|
|
if gap >= APNEA_THRESHOLD_SEC:
|
|
in_apnea = True
|
|
apnea_start = last_br_time
|
|
print(f" ** APNEA DETECTED at {int(time.time()-start)}s (no breath for {gap:.0f}s) **")
|
|
|
|
m = RE_HR.search(clean)
|
|
if m:
|
|
last_hr = float(m.group(1))
|
|
|
|
elapsed = int(time.time() - start)
|
|
if elapsed > last_print and elapsed % 10 == 0:
|
|
last_print = elapsed
|
|
gap = time.time() - last_br_time
|
|
status = "APNEA" if in_apnea else ("OK" if gap < 5 else f"gap {gap:.0f}s")
|
|
print(f" {elapsed:>4}s | BR {last_br_value:>4.0f}/min | HR {last_hr:>4.0f} | "
|
|
f"Apneas: {len(apnea_events)} | Hypopneas: {len(hypopnea_events)} | {status}")
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
ser.close()
|
|
duration_hr = (time.time() - start) / 3600.0
|
|
|
|
print()
|
|
print("=" * 60)
|
|
print(" APNEA SCREENING RESULTS")
|
|
print("=" * 60)
|
|
ahi = (len(apnea_events) + len(hypopnea_events)) / max(duration_hr, 0.01)
|
|
print(f" Duration: {time.time()-start:.0f}s ({duration_hr*60:.1f} min)")
|
|
print(f" Apnea events: {len(apnea_events)} (breathing absent > {APNEA_THRESHOLD_SEC}s)")
|
|
print(f" Hypopneas: {len(hypopnea_events)} (BR < {HYPOPNEA_BR}/min)")
|
|
print(f" AHI estimate: {ahi:.1f} events/hour")
|
|
print()
|
|
if ahi < 5:
|
|
print(" Classification: Normal (AHI < 5)")
|
|
elif ahi < 15:
|
|
print(" Classification: Mild OSA (AHI 5-14)")
|
|
elif ahi < 30:
|
|
print(" Classification: Moderate OSA (AHI 15-29)")
|
|
else:
|
|
print(" Classification: Severe OSA (AHI >= 30)")
|
|
print()
|
|
print(" NOT A MEDICAL DEVICE. Consult a sleep specialist for diagnosis.")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|