vitallink-BS/vitallink/simulator/wristband_manager.py

528 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
VitalLink Wristband Management System
Unified system for managing real and simulated wristbands
"""
import asyncio
import struct
import time
import aiohttp
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
from abc import ABC, abstractmethod
# Try to import BLE library (for real hardware)
try:
from bleak import BleakScanner, BleakClient
BLE_AVAILABLE = True
except ImportError:
BLE_AVAILABLE = False
print(
"⚠️ Bleak not installed. Real wristbands disabled. Install with: pip install bleak"
)
# ============================================================================
# CONFIGURATION
# ============================================================================
SERVICE_UUID = "8f5a84f1-22a8-4a4b-9b5f-3fe1d8b2a3a1"
CHAR_UUID = "d3e2c4b7-39b2-4b2a-8d5a-7d2a5e3f1199"
BACKEND_URL = "http://localhost:8000"
PKT_STRUCT = struct.Struct("<B H I B B B h H B B")
class WristbandType(Enum):
REAL = "real"
SIMULATED = "simulated"
class WristbandStatus(Enum):
AVAILABLE = "available" # Charged and ready
ASSIGNED = "assigned" # Checked out to patient
IN_USE = "in_use" # Actively sending data
CHARGING = "charging" # On charger
MAINTENANCE = "maintenance" # Needs attention
# ============================================================================
# PACKET DECODER (Works with real hardware packets)
# ============================================================================
class PacketDecoder:
"""Decodes BLE packets from real wristbands according to spec"""
@staticmethod
def decode(data: bytes) -> Optional[dict]:
"""Decode 16-byte packet from wristband"""
if len(data) != 16:
return None
# Verify checksum
checksum_calc = sum(data[0:14]) & 0xFF
if checksum_calc != data[14]:
print(f"⚠️ Checksum failed: expected {data[14]}, got {checksum_calc}")
return None
# Unpack according to spec
(
ver,
seq,
ts_ms,
flags,
hr_bpm,
spo2,
skin_c_x100,
act_rms_x100,
checksum,
rfu,
) = PKT_STRUCT.unpack(data)
# Determine tier from flags
tier = "NORMAL"
if flags & (1 << 4): # EMERGENCY bit
tier = "EMERGENCY"
elif flags & (1 << 3): # ALERT bit
tier = "ALERT"
return {
"ver": ver,
"seq": seq,
"ts_ms": ts_ms,
"tier": tier,
"hr_bpm": hr_bpm,
"spo2": spo2,
"temp_c": skin_c_x100 / 100.0,
"activity": act_rms_x100 / 100.0,
"flags": flags,
"checksum": f"0x{checksum:02X}",
}
# ============================================================================
# ABSTRACT WRISTBAND INTERFACE
# ============================================================================
class BaseWristband(ABC):
"""Abstract base class for all wristbands (real and simulated)"""
def __init__(self, band_id: str, wristband_type: WristbandType):
self.band_id = band_id
self.type = wristband_type
self.status = WristbandStatus.AVAILABLE
self.patient_id: Optional[str] = None
self.last_packet: Optional[dict] = None
self.packet_count = 0
@abstractmethod
async def start_monitoring(self):
"""Start receiving data from wristband"""
pass
@abstractmethod
async def stop_monitoring(self):
"""Stop receiving data"""
pass
def assign_to_patient(self, patient_id: str):
"""Assign wristband to a patient"""
self.patient_id = patient_id
self.status = WristbandStatus.ASSIGNED
print(f"{self.band_id} assigned to patient {patient_id}")
def release(self):
"""Release wristband (return to inventory)"""
self.patient_id = None
self.status = WristbandStatus.AVAILABLE
self.packet_count = 0
print(f"{self.band_id} released and available")
# ============================================================================
# REAL WRISTBAND (Hardware BLE)
# ============================================================================
class RealWristband(BaseWristband):
"""Real physical wristband using BLE"""
def __init__(self, band_id: str, ble_address: str):
super().__init__(band_id, WristbandType.REAL)
self.ble_address = ble_address
self.client: Optional[BleakClient] = None
self.decoder = PacketDecoder()
async def start_monitoring(self):
"""Connect to real wristband via BLE and start receiving data"""
if not BLE_AVAILABLE:
raise RuntimeError("Bleak library not available")
self.status = WristbandStatus.IN_USE
print(
f"🔵 Connecting to real wristband {self.band_id} at {self.ble_address}..."
)
try:
self.client = BleakClient(self.ble_address)
await self.client.connect()
print(f"✓ Connected to {self.band_id}")
# Subscribe to notifications
await self.client.start_notify(CHAR_UUID, self._notification_handler)
print(f"✓ Subscribed to notifications from {self.band_id}")
except Exception as e:
print(f"❌ Failed to connect to {self.band_id}: {e}")
self.status = WristbandStatus.MAINTENANCE
def _notification_handler(self, sender, data: bytearray):
"""Handle incoming BLE notifications"""
decoded = self.decoder.decode(bytes(data))
if decoded:
self.last_packet = decoded
self.packet_count += 1
# Send to backend
asyncio.create_task(self._send_to_backend(decoded))
async def _send_to_backend(self, decoded: dict):
"""Send decoded data to backend"""
if not self.patient_id:
return
payload = {
"band_id": self.band_id,
"patient_id": self.patient_id,
"timestamp": time.time(),
"ver": decoded["ver"],
"seq": decoded["seq"],
"ts_ms": decoded["ts_ms"],
"tier": decoded["tier"],
"flags": [], # Convert flags to list if needed
"hr_bpm": decoded["hr_bpm"],
"spo2": decoded["spo2"],
"temp_c": decoded["temp_c"],
"activity": decoded["activity"],
}
try:
async with aiohttp.ClientSession() as session:
await session.post(f"{BACKEND_URL}/api/vitals", json=payload)
except:
pass # Silently fail if backend unavailable
async def stop_monitoring(self):
"""Disconnect from wristband"""
if self.client and self.client.is_connected:
await self.client.stop_notify(CHAR_UUID)
await self.client.disconnect()
print(f"✓ Disconnected from {self.band_id}")
self.status = WristbandStatus.AVAILABLE
# ============================================================================
# SIMULATED WRISTBAND (For testing without hardware)
# ============================================================================
class SimulatedWristband(BaseWristband):
"""Simulated wristband for testing"""
def __init__(self, band_id: str, profile: str = "stable"):
super().__init__(band_id, WristbandType.SIMULATED)
self.profile = profile
self.seq = 0
self.running = False
# Import simulator profiles
from wristband_simulator import PATIENT_PROFILES, WristbandSimulator
self.simulator = WristbandSimulator(
band_id, PATIENT_PROFILES.get(profile, PATIENT_PROFILES["stable"]), None
)
async def start_monitoring(self):
"""Start simulated data generation"""
self.status = WristbandStatus.IN_USE
self.running = True
self.simulator.patient_id = self.patient_id
print(f"🟢 Starting simulated wristband {self.band_id} ({self.profile})")
while self.running:
# Generate packet
packet = self.simulator.generate_packet()
# Decode it
decoder = PacketDecoder()
decoded = decoder.decode(packet)
if decoded:
self.last_packet = decoded
self.packet_count += 1
# Send to backend
await self._send_to_backend(decoded)
# Wait based on tier
tier = self.simulator.tier
interval = 1.0 if tier in ["ALERT", "EMERGENCY"] else 60.0
await asyncio.sleep(interval)
async def _send_to_backend(self, decoded: dict):
"""Send to backend"""
if not self.patient_id:
return
payload = {
"band_id": self.band_id,
"patient_id": self.patient_id,
"timestamp": time.time(),
"ver": decoded["ver"],
"seq": decoded["seq"],
"ts_ms": decoded["ts_ms"],
"tier": decoded["tier"],
"flags": [],
"hr_bpm": decoded["hr_bpm"],
"spo2": decoded["spo2"],
"temp_c": decoded["temp_c"],
"activity": decoded["activity"],
}
try:
async with aiohttp.ClientSession() as session:
await session.post(f"{BACKEND_URL}/api/vitals", json=payload)
except:
pass
async def stop_monitoring(self):
"""Stop simulation"""
self.running = False
self.status = WristbandStatus.AVAILABLE
print(f"✓ Stopped simulated wristband {self.band_id}")
# ============================================================================
# WRISTBAND MANAGER
# ============================================================================
class WristbandManager:
"""Central manager for all wristbands (real and simulated)"""
def __init__(self):
self.inventory: Dict[str, BaseWristband] = {}
self.active_monitoring: Dict[str, asyncio.Task] = {}
def add_simulated_band(self, band_id: str, profile: str = "stable"):
"""Add a simulated wristband to inventory"""
# Change naming to use MOCK prefix if not already present
if not band_id.startswith("MOCK-"):
band_id = f"MOCK-{band_id.replace('VitalLink-', '')}"
band = SimulatedWristband(band_id, profile)
self.inventory[band_id] = band
print(f" Added simulated band {band_id} ({profile})")
return band
def add_real_band(self, band_id: str, ble_address: str):
"""Add a real wristband to inventory"""
if not BLE_AVAILABLE:
print("❌ Cannot add real band: Bleak not installed")
return None
band = RealWristband(band_id, ble_address)
self.inventory[band_id] = band
print(f" Added real band {band_id} (BLE: {ble_address})")
return band
async def scan_for_real_bands(self, timeout: float = 10.0):
"""Scan for real wristbands and add them to inventory"""
if not BLE_AVAILABLE:
print("❌ BLE scanning not available: Install bleak")
return []
print(f"🔍 Scanning for VitalLink wristbands ({timeout}s)...")
devices = await BleakScanner.discover(timeout=timeout)
found = []
for device in devices:
# Check if device advertises our service UUID
uuids = device.metadata.get("uuids", [])
if any(uuid.lower() == SERVICE_UUID.lower() for uuid in uuids):
band_id = (
device.name or f"VitalLink-{device.address[-5:].replace(':', '')}"
)
self.add_real_band(band_id, device.address)
found.append(band_id)
print(f"✓ Found {len(found)} real wristband(s)")
return found
def get_available_bands(self) -> List[BaseWristband]:
"""Get list of available (not assigned) wristbands"""
return [
b for b in self.inventory.values() if b.status == WristbandStatus.AVAILABLE
]
def assign_band(
self, patient_id: str, prefer_real: bool = False
) -> Optional[BaseWristband]:
"""Assign an available band to a patient"""
available = self.get_available_bands()
if not available:
print("❌ No wristbands available")
return None
# Prefer real bands if requested and available
if prefer_real:
real_bands = [b for b in available if b.type == WristbandType.REAL]
if real_bands:
band = real_bands[0]
else:
band = available[0]
else:
band = available[0]
band.assign_to_patient(patient_id)
return band
async def start_monitoring(self, band_id: str):
"""Start monitoring a wristband"""
if band_id not in self.inventory:
print(f"❌ Band {band_id} not in inventory")
return
band = self.inventory[band_id]
if band_id in self.active_monitoring:
print(f"⚠️ {band_id} already being monitored")
return
# Create monitoring task
task = asyncio.create_task(band.start_monitoring())
self.active_monitoring[band_id] = task
async def stop_monitoring(self, band_id: str):
"""Stop monitoring a wristband"""
if band_id in self.active_monitoring:
task = self.active_monitoring[band_id]
task.cancel()
del self.active_monitoring[band_id]
if band_id in self.inventory:
await self.inventory[band_id].stop_monitoring()
async def release_band(self, band_id: str):
"""Release band back to inventory"""
await self.stop_monitoring(band_id)
if band_id in self.inventory:
self.inventory[band_id].release()
def get_status(self) -> dict:
"""Get overall status"""
status_counts = {}
for status in WristbandStatus:
status_counts[status.value] = sum(
1 for b in self.inventory.values() if b.status == status
)
return {
"total_bands": len(self.inventory),
"real_bands": sum(
1 for b in self.inventory.values() if b.type == WristbandType.REAL
),
"simulated_bands": sum(
1 for b in self.inventory.values() if b.type == WristbandType.SIMULATED
),
"status_breakdown": status_counts,
"active_monitoring": len(self.active_monitoring),
}
def print_inventory(self):
"""Print current inventory"""
print("\n" + "=" * 80)
print("WRISTBAND INVENTORY")
print("=" * 80)
for band_id, band in self.inventory.items():
type_symbol = "🔵" if band.type == WristbandType.REAL else "🟢"
status_str = band.status.value.upper()
patient_str = f"(Patient: {band.patient_id})" if band.patient_id else ""
packets_str = (
f"[{band.packet_count} packets]" if band.packet_count > 0 else ""
)
print(
f"{type_symbol} {band_id:20} | {status_str:15} {patient_str:20} {packets_str}"
)
print("=" * 80)
status = self.get_status()
print(
f"Total: {status['total_bands']} | "
f"Real: {status['real_bands']} | "
f"Simulated: {status['simulated_bands']} | "
f"Active: {status['active_monitoring']}"
)
print("=" * 80 + "\n")
# ============================================================================
# EXAMPLE USAGE
# ============================================================================
async def main():
"""Example usage of wristband manager"""
manager = WristbandManager()
print("VitalLink Wristband Management System")
print("=" * 80)
# Option 1: Scan for real wristbands
# await manager.scan_for_real_bands(timeout=10.0)
# Option 2: Add simulated wristbands
manager.add_simulated_band("VitalLink-SIM1", "stable")
manager.add_simulated_band("VitalLink-SIM2", "mild_anxiety")
manager.add_simulated_band("VitalLink-SIM3", "deteriorating")
# Option 3: Manually add real wristband if you know the address
# manager.add_real_band("VitalLink-REAL1", "D7:91:3F:9A:12:34")
# Show inventory
manager.print_inventory()
# Assign bands to patients
band1 = manager.assign_band("P100001")
band2 = manager.assign_band("P100002")
# Start monitoring
if band1:
await manager.start_monitoring(band1.band_id)
if band2:
await manager.start_monitoring(band2.band_id)
# Monitor for 30 seconds
print("\nMonitoring for 30 seconds...")
await asyncio.sleep(30)
# Stop and release
if band1:
await manager.release_band(band1.band_id)
if band2:
await manager.release_band(band2.band_id)
manager.print_inventory()
if __name__ == "__main__":
asyncio.run(main())