""" VitalLink Database Layer SQLite persistence for patients, vitals, and audit trail Enables replay, analysis, and incident investigation """ import sqlite3 import json from datetime import datetime from typing import List, Dict, Optional from contextlib import asynccontextmanager import aiosqlite # ============================================================================ # DATABASE SCHEMA # ============================================================================ SCHEMA_SQL = """ -- Patients table CREATE TABLE IF NOT EXISTS patients ( patient_id TEXT PRIMARY KEY, band_id TEXT NOT NULL, first_name TEXT NOT NULL, last_name TEXT NOT NULL, dob TEXT NOT NULL, symptoms TEXT, -- JSON array severity TEXT, check_in_time TIMESTAMP NOT NULL, discharge_time TIMESTAMP, current_tier TEXT DEFAULT 'NORMAL', is_active BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Vitals readings table (time-series data) CREATE TABLE IF NOT EXISTS vitals_readings ( id INTEGER PRIMARY KEY AUTOINCREMENT, patient_id TEXT NOT NULL, band_id TEXT NOT NULL, timestamp REAL NOT NULL, seq INTEGER, hr_bpm INTEGER, spo2 INTEGER, temp_c REAL, activity REAL, tier TEXT, flags TEXT, -- JSON array created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (patient_id) REFERENCES patients(patient_id) ); -- Triage assessments (audit trail) CREATE TABLE IF NOT EXISTS triage_assessments ( id INTEGER PRIMARY KEY AUTOINCREMENT, patient_id TEXT NOT NULL, assessment_time TIMESTAMP NOT NULL, triage_level INTEGER, tier_name TEXT, priority_score REAL, reasoning TEXT, abnormalities TEXT, -- JSON array wait_time_minutes INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (patient_id) REFERENCES patients(patient_id) ); -- Tier changes (for incident investigation) CREATE TABLE IF NOT EXISTS tier_changes ( id INTEGER PRIMARY KEY AUTOINCREMENT, patient_id TEXT NOT NULL, change_time TIMESTAMP NOT NULL, old_tier TEXT, new_tier TEXT, trigger_reason TEXT, vitals_snapshot TEXT, -- JSON created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (patient_id) REFERENCES patients(patient_id) ); -- System events (audit log) CREATE TABLE IF NOT EXISTS system_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_time TIMESTAMP NOT NULL, event_type TEXT NOT NULL, -- 'patient_checkin', 'discharge', 'tier_change', 'alert', etc. patient_id TEXT, band_id TEXT, details TEXT, -- JSON created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Wristband assignments (inventory tracking) CREATE TABLE IF NOT EXISTS wristband_assignments ( id INTEGER PRIMARY KEY AUTOINCREMENT, band_id TEXT NOT NULL, patient_id TEXT, assigned_at TIMESTAMP, released_at TIMESTAMP, packet_count INTEGER DEFAULT 0, band_type TEXT, -- 'real' or 'simulated' created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Indexes for performance CREATE INDEX IF NOT EXISTS idx_vitals_patient ON vitals_readings(patient_id, timestamp); CREATE INDEX IF NOT EXISTS idx_vitals_timestamp ON vitals_readings(timestamp); CREATE INDEX IF NOT EXISTS idx_patients_active ON patients(is_active); CREATE INDEX IF NOT EXISTS idx_tier_changes_patient ON tier_changes(patient_id, change_time); """ # ============================================================================ # DATABASE MANAGER # ============================================================================ class VitalLinkDatabase: """Database manager for VitalLink system""" def __init__(self, db_path: str = "vitallink.db"): self.db_path = db_path self.conn = None async def initialize(self): """Initialize database and create tables""" self.conn = await aiosqlite.connect(self.db_path) await self.conn.executescript(SCHEMA_SQL) await self.conn.commit() print(f"✓ Database initialized: {self.db_path}") async def close(self): """Close database connection""" if self.conn: await self.conn.close() # ======================================================================== # PATIENT OPERATIONS # ======================================================================== async def save_patient(self, patient_data: Dict): """Save new patient to database""" await self.conn.execute( """ INSERT INTO patients ( patient_id, band_id, first_name, last_name, dob, symptoms, severity, check_in_time, current_tier ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( patient_data["patient_id"], patient_data["band_id"], patient_data["first_name"], patient_data["last_name"], patient_data["dob"], json.dumps(patient_data["symptoms"]), patient_data["severity"], patient_data["check_in_time"], patient_data.get("current_tier", "NORMAL"), ), ) await self.conn.commit() # Log event await self.log_event( "patient_checkin", patient_data["patient_id"], patient_data["band_id"], patient_data, ) async def update_patient_tier(self, patient_id: str, new_tier: str): """Update patient's current tier""" await self.conn.execute( """ UPDATE patients SET current_tier = ? WHERE patient_id = ? """, (new_tier, patient_id), ) await self.conn.commit() async def discharge_patient(self, patient_id: str): """Mark patient as discharged""" await self.conn.execute( """ UPDATE patients SET is_active = 0, discharge_time = ? WHERE patient_id = ? """, (datetime.now(), patient_id), ) await self.conn.commit() await self.log_event( "discharge", patient_id, None, {"discharge_time": datetime.now().isoformat()}, ) # ======================================================================== # VITALS OPERATIONS # ======================================================================== async def save_vitals(self, vitals_data: Dict): """Save vital signs reading""" await self.conn.execute( """ INSERT INTO vitals_readings ( patient_id, band_id, timestamp, seq, hr_bpm, spo2, temp_c, activity, tier, flags ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( vitals_data["patient_id"], vitals_data["band_id"], vitals_data["timestamp"], vitals_data.get("seq", 0), vitals_data["hr_bpm"], vitals_data["spo2"], vitals_data["temp_c"], vitals_data["activity"], vitals_data["tier"], json.dumps(vitals_data.get("flags", [])), ), ) await self.conn.commit() async def get_patient_vitals_history( self, patient_id: str, limit: int = 100 ) -> List[Dict]: """Get vital signs history for a patient""" cursor = await self.conn.execute( """ SELECT timestamp, hr_bpm, spo2, temp_c, activity, tier, seq FROM vitals_readings WHERE patient_id = ? ORDER BY timestamp DESC LIMIT ? """, (patient_id, limit), ) rows = await cursor.fetchall() return [ { "timestamp": row[0], "hr_bpm": row[1], "spo2": row[2], "temp_c": row[3], "activity": row[4], "tier": row[5], "seq": row[6], } for row in rows ] # ======================================================================== # TIER CHANGE TRACKING # ======================================================================== async def log_tier_change( self, patient_id: str, old_tier: str, new_tier: str, reason: str, vitals: Dict ): """Log tier change for audit trail""" await self.conn.execute( """ INSERT INTO tier_changes ( patient_id, change_time, old_tier, new_tier, trigger_reason, vitals_snapshot ) VALUES (?, ?, ?, ?, ?, ?) """, ( patient_id, datetime.now(), old_tier, new_tier, reason, json.dumps(vitals), ), ) await self.conn.commit() await self.log_event( "tier_change", patient_id, None, {"old_tier": old_tier, "new_tier": new_tier, "reason": reason}, ) async def get_tier_history(self, patient_id: str) -> List[Dict]: """Get tier change history for incident review""" cursor = await self.conn.execute( """ SELECT change_time, old_tier, new_tier, trigger_reason, vitals_snapshot FROM tier_changes WHERE patient_id = ? ORDER BY change_time ASC """, (patient_id,), ) rows = await cursor.fetchall() return [ { "change_time": row[0], "old_tier": row[1], "new_tier": row[2], "reason": row[3], "vitals": json.loads(row[4]) if row[4] else {}, } for row in rows ] # ======================================================================== # SYSTEM EVENTS (Audit Trail) # ======================================================================== async def log_event( self, event_type: str, patient_id: Optional[str], band_id: Optional[str], details: Dict, ): """Log system event for audit trail""" await self.conn.execute( """ INSERT INTO system_events ( event_time, event_type, patient_id, band_id, details ) VALUES (?, ?, ?, ?, ?) """, (datetime.now(), event_type, patient_id, band_id, json.dumps(details)), ) await self.conn.commit() async def get_events( self, event_type: Optional[str] = None, patient_id: Optional[str] = None, hours: int = 24, ) -> List[Dict]: """Get system events for analysis""" query = "SELECT event_time, event_type, patient_id, band_id, details FROM system_events WHERE 1=1" params = [] if event_type: query += " AND event_type = ?" params.append(event_type) if patient_id: query += " AND patient_id = ?" params.append(patient_id) query += " AND event_time > datetime('now', '-' || ? || ' hours')" params.append(hours) query += " ORDER BY event_time DESC LIMIT 1000" cursor = await self.conn.execute(query, params) rows = await cursor.fetchall() return [ { "event_time": row[0], "event_type": row[1], "patient_id": row[2], "band_id": row[3], "details": json.loads(row[4]) if row[4] else {}, } for row in rows ] # ======================================================================== # ANALYTICS & REPLAY # ======================================================================== async def get_session_summary( self, start_time: datetime, end_time: datetime ) -> Dict: """Get summary statistics for a session (for incident review)""" cursor = await self.conn.execute( """ SELECT COUNT(DISTINCT patient_id) as total_patients, COUNT(*) as total_vitals, AVG(hr_bpm) as avg_hr, AVG(spo2) as avg_spo2, AVG(temp_c) as avg_temp FROM vitals_readings WHERE timestamp BETWEEN ? AND ? """, (start_time.timestamp(), end_time.timestamp()), ) row = await cursor.fetchone() return { "total_patients": row[0], "total_vitals_recorded": row[1], "average_hr": round(row[2], 1) if row[2] else 0, "average_spo2": round(row[3], 1) if row[3] else 0, "average_temp": round(row[4], 2) if row[4] else 0, } async def export_patient_data(self, patient_id: str) -> Dict: """Export complete patient data for incident investigation""" # Get patient info cursor = await self.conn.execute( """ SELECT * FROM patients WHERE patient_id = ? """, (patient_id,), ) patient_row = await cursor.fetchone() if not patient_row: return None # Get all vitals vitals = await self.get_patient_vitals_history(patient_id, limit=10000) # Get tier changes tier_changes = await self.get_tier_history(patient_id) # Get related events events = await self.get_events(patient_id=patient_id, hours=24) return { "patient_id": patient_id, "name": f"{patient_row[2]} {patient_row[3]}", "dob": patient_row[4], "symptoms": json.loads(patient_row[5]) if patient_row[5] else [], "severity": patient_row[6], "check_in_time": patient_row[7], "discharge_time": patient_row[8], "total_vitals": len(vitals), "vitals_timeline": vitals, "tier_changes": tier_changes, "events": events, "export_time": datetime.now().isoformat(), } # ============================================================================ # REPLAY SYSTEM # ============================================================================ class VitalsReplaySystem: """Replay historical vitals data for analysis""" def __init__(self, db: VitalLinkDatabase): self.db = db async def replay_patient_session(self, patient_id: str, speed: float = 1.0): """ Replay a patient's entire session speed: 1.0 = real-time, 10.0 = 10x faster, 0.1 = slow motion """ vitals = await self.db.get_patient_vitals_history(patient_id, limit=10000) vitals.reverse() # Chronological order if not vitals: print(f"No data found for patient {patient_id}") return print(f"\n{'=' * 80}") print(f"REPLAYING SESSION: {patient_id} ({len(vitals)} readings)") print(f"Speed: {speed}x | Press Ctrl+C to stop") print(f"{'=' * 80}\n") start_time = vitals[0]["timestamp"] for i, reading in enumerate(vitals): # Calculate delay if i > 0: time_diff = reading["timestamp"] - vitals[i - 1]["timestamp"] await asyncio.sleep(time_diff / speed) # Display reading elapsed = reading["timestamp"] - start_time tier_symbol = ( "🔴" if reading["tier"] == "EMERGENCY" else "🟡" if reading["tier"] == "ALERT" else "🟢" ) print( f"[{elapsed:7.1f}s] {tier_symbol} Seq={reading['seq']:3d} | " f"HR={reading['hr_bpm']:3d} SpO2={reading['spo2']:2d}% " f"Temp={reading['temp_c']:.1f}°C | {reading['tier']}" ) print(f"\n{'=' * 80}") print(f"Replay complete: {len(vitals)} readings") print(f"{'=' * 80}\n") async def analyze_critical_events(self, patient_id: str): """Analyze critical tier changes and deterioration events""" tier_changes = await self.db.get_tier_history(patient_id) print(f"\n{'=' * 80}") print(f"CRITICAL EVENT ANALYSIS: {patient_id}") print(f"{'=' * 80}\n") for change in tier_changes: print(f"[{change['change_time']}]") print(f" {change['old_tier']} → {change['new_tier']}") print(f" Reason: {change['reason']}") print(f" Vitals: {change['vitals']}") print() print(f"{'=' * 80}\n") # ============================================================================ # INTEGRATION HELPERS # ============================================================================ async def init_database(db_path: str = "vitallink.db") -> VitalLinkDatabase: """Initialize database for use in FastAPI""" db = VitalLinkDatabase(db_path) await db.initialize() return db # ============================================================================ # CLI TOOLS # ============================================================================ async def cli_export_patient(patient_id: str, output_file: str = None): """Export patient data to JSON file""" db = VitalLinkDatabase() await db.initialize() data = await db.export_patient_data(patient_id) if not data: print(f"Patient {patient_id} not found") return if output_file: with open(output_file, "w") as f: json.dump(data, f, indent=2) print(f"✓ Exported to {output_file}") else: print(json.dumps(data, indent=2)) await db.close() async def cli_replay_session(patient_id: str, speed: float = 1.0): """Replay a patient session""" db = VitalLinkDatabase() await db.initialize() replay = VitalsReplaySystem(db) await replay.replay_patient_session(patient_id, speed) await db.close() async def cli_analyze_incident(patient_id: str): """Analyze critical events for a patient""" db = VitalLinkDatabase() await db.initialize() replay = VitalsReplaySystem(db) await replay.analyze_critical_events(patient_id) # Also show vital trends vitals = await db.get_patient_vitals_history(patient_id, limit=1000) if vitals: print("VITAL SIGN TRENDS:") print( f" HR range: {min(v['hr_bpm'] for v in vitals)} - {max(v['hr_bpm'] for v in vitals)} bpm" ) print( f" SpO2 range: {min(v['spo2'] for v in vitals)} - {max(v['spo2'] for v in vitals)}%" ) print( f" Temp range: {min(v['temp_c'] for v in vitals):.1f} - {max(v['temp_c'] for v in vitals):.1f}°C" ) print() await db.close() if __name__ == "__main__": import argparse import asyncio parser = argparse.ArgumentParser(description="VitalLink Database Tools") parser.add_argument("--export", metavar="PATIENT_ID", help="Export patient data") parser.add_argument("--replay", metavar="PATIENT_ID", help="Replay patient session") parser.add_argument( "--analyze", metavar="PATIENT_ID", help="Analyze critical events" ) parser.add_argument( "--speed", type=float, default=1.0, help="Replay speed multiplier" ) parser.add_argument("--output", "-o", help="Output file for export") args = parser.parse_args() if args.export: asyncio.run(cli_export_patient(args.export, args.output)) elif args.replay: asyncio.run(cli_replay_session(args.replay, args.speed)) elif args.analyze: asyncio.run(cli_analyze_incident(args.analyze)) else: parser.print_help()