""" VitalLink Wristband Management System Unified system for managing real and simulated wristbands """ import asyncio import struct import time import aiohttp from typing import Dict, List, Optional, Union from dataclasses import dataclass from enum import Enum from abc import ABC, abstractmethod # Try to import BLE library (for real hardware) try: from bleak import BleakScanner, BleakClient BLE_AVAILABLE = True except ImportError: BLE_AVAILABLE = False print( "⚠️ Bleak not installed. Real wristbands disabled. Install with: pip install bleak" ) # ============================================================================ # CONFIGURATION # ============================================================================ SERVICE_UUID = "8f5a84f1-22a8-4a4b-9b5f-3fe1d8b2a3a1" CHAR_UUID = "d3e2c4b7-39b2-4b2a-8d5a-7d2a5e3f1199" BACKEND_URL = "http://localhost:8000" PKT_STRUCT = struct.Struct(" Optional[dict]: """Decode 16-byte packet from wristband""" if len(data) != 16: return None # Verify checksum checksum_calc = sum(data[0:14]) & 0xFF if checksum_calc != data[14]: print(f"⚠️ Checksum failed: expected {data[14]}, got {checksum_calc}") return None # Unpack according to spec ( ver, seq, ts_ms, flags, hr_bpm, spo2, skin_c_x100, act_rms_x100, checksum, rfu, ) = PKT_STRUCT.unpack(data) # Determine tier from flags tier = "NORMAL" if flags & (1 << 4): # EMERGENCY bit tier = "EMERGENCY" elif flags & (1 << 3): # ALERT bit tier = "ALERT" return { "ver": ver, "seq": seq, "ts_ms": ts_ms, "tier": tier, "hr_bpm": hr_bpm, "spo2": spo2, "temp_c": skin_c_x100 / 100.0, "activity": act_rms_x100 / 100.0, "flags": flags, "checksum": f"0x{checksum:02X}", } # ============================================================================ # ABSTRACT WRISTBAND INTERFACE # ============================================================================ class BaseWristband(ABC): """Abstract base class for all wristbands (real and simulated)""" def __init__(self, band_id: str, wristband_type: WristbandType): self.band_id = band_id self.type = wristband_type self.status = WristbandStatus.AVAILABLE self.patient_id: Optional[str] = None self.last_packet: Optional[dict] = None self.packet_count = 0 @abstractmethod async def start_monitoring(self): """Start receiving data from wristband""" pass @abstractmethod async def stop_monitoring(self): """Stop receiving data""" pass def assign_to_patient(self, patient_id: str): """Assign wristband to a patient""" self.patient_id = patient_id self.status = WristbandStatus.ASSIGNED print(f"✓ {self.band_id} assigned to patient {patient_id}") def release(self): """Release wristband (return to inventory)""" self.patient_id = None self.status = WristbandStatus.AVAILABLE self.packet_count = 0 print(f"✓ {self.band_id} released and available") # ============================================================================ # REAL WRISTBAND (Hardware BLE) # ============================================================================ class RealWristband(BaseWristband): """Real physical wristband using BLE""" def __init__(self, band_id: str, ble_address: str): super().__init__(band_id, WristbandType.REAL) self.ble_address = ble_address self.client: Optional[BleakClient] = None self.decoder = PacketDecoder() async def start_monitoring(self): """Connect to real wristband via BLE and start receiving data""" if not BLE_AVAILABLE: raise RuntimeError("Bleak library not available") self.status = WristbandStatus.IN_USE print( f"🔵 Connecting to real wristband {self.band_id} at {self.ble_address}..." ) try: self.client = BleakClient(self.ble_address) await self.client.connect() print(f"✓ Connected to {self.band_id}") # Subscribe to notifications await self.client.start_notify(CHAR_UUID, self._notification_handler) print(f"✓ Subscribed to notifications from {self.band_id}") except Exception as e: print(f"❌ Failed to connect to {self.band_id}: {e}") self.status = WristbandStatus.MAINTENANCE def _notification_handler(self, sender, data: bytearray): """Handle incoming BLE notifications""" decoded = self.decoder.decode(bytes(data)) if decoded: self.last_packet = decoded self.packet_count += 1 # Send to backend asyncio.create_task(self._send_to_backend(decoded)) async def _send_to_backend(self, decoded: dict): """Send decoded data to backend""" if not self.patient_id: return payload = { "band_id": self.band_id, "patient_id": self.patient_id, "timestamp": time.time(), "ver": decoded["ver"], "seq": decoded["seq"], "ts_ms": decoded["ts_ms"], "tier": decoded["tier"], "flags": [], # Convert flags to list if needed "hr_bpm": decoded["hr_bpm"], "spo2": decoded["spo2"], "temp_c": decoded["temp_c"], "activity": decoded["activity"], } try: async with aiohttp.ClientSession() as session: await session.post(f"{BACKEND_URL}/api/vitals", json=payload) except: pass # Silently fail if backend unavailable async def stop_monitoring(self): """Disconnect from wristband""" if self.client and self.client.is_connected: await self.client.stop_notify(CHAR_UUID) await self.client.disconnect() print(f"✓ Disconnected from {self.band_id}") self.status = WristbandStatus.AVAILABLE # ============================================================================ # SIMULATED WRISTBAND (For testing without hardware) # ============================================================================ class SimulatedWristband(BaseWristband): """Simulated wristband for testing""" def __init__(self, band_id: str, profile: str = "stable"): super().__init__(band_id, WristbandType.SIMULATED) self.profile = profile self.seq = 0 self.running = False # Import simulator profiles from simulator.wristband_simulator import PATIENT_PROFILES, WristbandSimulator self.simulator = WristbandSimulator( band_id, PATIENT_PROFILES.get(profile, PATIENT_PROFILES["stable"]), None ) async def start_monitoring(self): """Start simulated data generation""" self.status = WristbandStatus.IN_USE self.running = True self.simulator.patient_id = self.patient_id print(f"🟢 Starting simulated wristband {self.band_id} ({self.profile})") while self.running: # Generate packet packet = self.simulator.generate_packet() # Decode it decoder = PacketDecoder() decoded = decoder.decode(packet) if decoded: self.last_packet = decoded self.packet_count += 1 # Send to backend await self._send_to_backend(decoded) # Wait based on tier tier = self.simulator.tier interval = 1.0 if tier in ["ALERT", "EMERGENCY"] else 60.0 await asyncio.sleep(interval) async def _send_to_backend(self, decoded: dict): """Send to backend""" if not self.patient_id: return payload = { "band_id": self.band_id, "patient_id": self.patient_id, "timestamp": time.time(), "ver": decoded["ver"], "seq": decoded["seq"], "ts_ms": decoded["ts_ms"], "tier": decoded["tier"], "flags": [], "hr_bpm": decoded["hr_bpm"], "spo2": decoded["spo2"], "temp_c": decoded["temp_c"], "activity": decoded["activity"], } try: async with aiohttp.ClientSession() as session: await session.post(f"{BACKEND_URL}/api/vitals", json=payload) except: pass async def stop_monitoring(self): """Stop simulation""" self.running = False self.status = WristbandStatus.AVAILABLE print(f"✓ Stopped simulated wristband {self.band_id}") # ============================================================================ # WRISTBAND MANAGER # ============================================================================ class WristbandManager: """Central manager for all wristbands (real and simulated)""" def __init__(self): self.inventory: Dict[str, BaseWristband] = {} self.active_monitoring: Dict[str, asyncio.Task] = {} def add_simulated_band(self, band_id: str, profile: str = "stable"): """Add a simulated wristband to inventory""" band = SimulatedWristband(band_id, profile) self.inventory[band_id] = band print(f"➕ Added simulated band {band_id} ({profile})") return band def add_real_band(self, band_id: str, ble_address: str): """Add a real wristband to inventory""" if not BLE_AVAILABLE: print("❌ Cannot add real band: Bleak not installed") return None band = RealWristband(band_id, ble_address) self.inventory[band_id] = band print(f"➕ Added real band {band_id} (BLE: {ble_address})") return band async def scan_for_real_bands(self, timeout: float = 10.0): """Scan for real wristbands and add them to inventory""" if not BLE_AVAILABLE: print("❌ BLE scanning not available: Install bleak") return [] print(f"🔍 Scanning for VitalLink wristbands ({timeout}s)...") devices = await BleakScanner.discover(timeout=timeout) found = [] for device in devices: # Check if device advertises our service UUID uuids = device.metadata.get("uuids", []) if any(uuid.lower() == SERVICE_UUID.lower() for uuid in uuids): band_id = ( device.name or f"VitalLink-{device.address[-5:].replace(':', '')}" ) self.add_real_band(band_id, device.address) found.append(band_id) print(f"✓ Found {len(found)} real wristband(s)") return found def get_available_bands(self) -> List[BaseWristband]: """Get list of available (not assigned) wristbands""" return [ b for b in self.inventory.values() if b.status == WristbandStatus.AVAILABLE ] def assign_band( self, patient_id: str, prefer_real: bool = False ) -> Optional[BaseWristband]: """Assign an available band to a patient""" available = self.get_available_bands() if not available: print("❌ No wristbands available") return None # Prefer real bands if requested and available if prefer_real: real_bands = [b for b in available if b.type == WristbandType.REAL] if real_bands: band = real_bands[0] else: band = available[0] else: band = available[0] band.assign_to_patient(patient_id) return band async def start_monitoring(self, band_id: str): """Start monitoring a wristband""" if band_id not in self.inventory: print(f"❌ Band {band_id} not in inventory") return band = self.inventory[band_id] if band_id in self.active_monitoring: print(f"⚠️ {band_id} already being monitored") return # Create monitoring task task = asyncio.create_task(band.start_monitoring()) self.active_monitoring[band_id] = task async def stop_monitoring(self, band_id: str): """Stop monitoring a wristband""" if band_id in self.active_monitoring: task = self.active_monitoring[band_id] task.cancel() del self.active_monitoring[band_id] if band_id in self.inventory: await self.inventory[band_id].stop_monitoring() async def release_band(self, band_id: str): """Release band back to inventory""" await self.stop_monitoring(band_id) if band_id in self.inventory: self.inventory[band_id].release() def get_status(self) -> dict: """Get overall status""" status_counts = {} for status in WristbandStatus: status_counts[status.value] = sum( 1 for b in self.inventory.values() if b.status == status ) return { "total_bands": len(self.inventory), "real_bands": sum( 1 for b in self.inventory.values() if b.type == WristbandType.REAL ), "simulated_bands": sum( 1 for b in self.inventory.values() if b.type == WristbandType.SIMULATED ), "status_breakdown": status_counts, "active_monitoring": len(self.active_monitoring), } def print_inventory(self): """Print current inventory""" print("\n" + "=" * 80) print("WRISTBAND INVENTORY") print("=" * 80) for band_id, band in self.inventory.items(): type_symbol = "🔵" if band.type == WristbandType.REAL else "🟢" status_str = band.status.value.upper() patient_str = f"(Patient: {band.patient_id})" if band.patient_id else "" packets_str = ( f"[{band.packet_count} packets]" if band.packet_count > 0 else "" ) print( f"{type_symbol} {band_id:20} | {status_str:15} {patient_str:20} {packets_str}" ) print("=" * 80) status = self.get_status() print( f"Total: {status['total_bands']} | " f"Real: {status['real_bands']} | " f"Simulated: {status['simulated_bands']} | " f"Active: {status['active_monitoring']}" ) print("=" * 80 + "\n") # ============================================================================ # EXAMPLE USAGE # ============================================================================ async def main(): """Example usage of wristband manager""" manager = WristbandManager() print("VitalLink Wristband Management System") print("=" * 80) # Option 1: Scan for real wristbands # await manager.scan_for_real_bands(timeout=10.0) # Option 2: Add simulated wristbands manager.add_simulated_band("VitalLink-SIM1", "stable") manager.add_simulated_band("VitalLink-SIM2", "mild_anxiety") manager.add_simulated_band("VitalLink-SIM3", "deteriorating") # Option 3: Manually add real wristband if you know the address # manager.add_real_band("VitalLink-REAL1", "D7:91:3F:9A:12:34") # Show inventory manager.print_inventory() # Assign bands to patients band1 = manager.assign_band("P100001") band2 = manager.assign_band("P100002") # Start monitoring if band1: await manager.start_monitoring(band1.band_id) if band2: await manager.start_monitoring(band2.band_id) # Monitor for 30 seconds print("\nMonitoring for 30 seconds...") await asyncio.sleep(30) # Stop and release if band1: await manager.release_band(band1.band_id) if band2: await manager.release_band(band2.band_id) manager.print_inventory() if __name__ == "__main__": asyncio.run(main())