""" VitalLink Database Layer SQLite persistence for patients, vitals, and audit trail """ import sqlite3 import json from datetime import datetime from typing import List, Dict, Optional import aiosqlite # ============================================================================ # DATABASE SCHEMA # ============================================================================ SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS patients ( patient_id TEXT PRIMARY KEY, band_id TEXT NOT NULL, first_name TEXT NOT NULL, last_name TEXT NOT NULL, dob TEXT NOT NULL, symptoms TEXT, severity TEXT, check_in_time TEXT NOT NULL, discharge_time TEXT, current_tier TEXT DEFAULT 'NORMAL', is_active INTEGER DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS vitals_readings ( id INTEGER PRIMARY KEY AUTOINCREMENT, patient_id TEXT NOT NULL, band_id TEXT NOT NULL, timestamp REAL NOT NULL, seq INTEGER, hr_bpm INTEGER, spo2 INTEGER, temp_c REAL, activity REAL, tier TEXT, flags TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (patient_id) REFERENCES patients(patient_id) ); CREATE TABLE IF NOT EXISTS tier_changes ( id INTEGER PRIMARY KEY AUTOINCREMENT, patient_id TEXT NOT NULL, change_time TEXT NOT NULL, old_tier TEXT, new_tier TEXT, trigger_reason TEXT, vitals_snapshot TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (patient_id) REFERENCES patients(patient_id) ); CREATE TABLE IF NOT EXISTS system_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_time TEXT NOT NULL, event_type TEXT NOT NULL, patient_id TEXT, band_id TEXT, details TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_vitals_patient ON vitals_readings(patient_id, timestamp); CREATE INDEX IF NOT EXISTS idx_vitals_timestamp ON vitals_readings(timestamp); CREATE INDEX IF NOT EXISTS idx_patients_active ON patients(is_active); CREATE INDEX IF NOT EXISTS idx_tier_changes_patient ON tier_changes(patient_id, change_time); """ # ============================================================================ # DATABASE MANAGER # ============================================================================ class VitalLinkDatabase: """Database manager for VitalLink system""" def __init__(self, db_path: str = "vitallink.db"): self.db_path = db_path self.conn = None async def initialize(self): """Initialize database and create tables""" self.conn = await aiosqlite.connect(self.db_path) await self.conn.executescript(SCHEMA_SQL) await self.conn.commit() print(f"✓ Database initialized: {self.db_path}") async def close(self): """Close database connection""" if self.conn: await self.conn.close() async def save_patient(self, patient_data: Dict): """Save new patient to database""" check_in_time = patient_data["check_in_time"] if isinstance(check_in_time, datetime): check_in_time = check_in_time.isoformat() # Check if patient already exists cursor = await self.conn.execute( "SELECT patient_id FROM patients WHERE patient_id = ?", (patient_data["patient_id"],), ) existing = await cursor.fetchone() if existing: # Update instead of insert await self.conn.execute( """ UPDATE patients SET band_id = ?, current_tier = ? WHERE patient_id = ? """, ( patient_data["band_id"], patient_data.get("current_tier", "NORMAL"), patient_data["patient_id"], ), ) else: # Insert new patient await self.conn.execute( """ INSERT INTO patients ( patient_id, band_id, first_name, last_name, dob, symptoms, severity, check_in_time, current_tier ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( patient_data["patient_id"], patient_data["band_id"], patient_data["first_name"], patient_data["last_name"], patient_data["dob"], json.dumps(patient_data["symptoms"]), patient_data["severity"], check_in_time, patient_data.get("current_tier", "NORMAL"), ), ) await self.conn.commit() if not existing: await self.log_event( "patient_checkin", patient_data["patient_id"], patient_data["band_id"], { "first_name": patient_data["first_name"], "last_name": patient_data["last_name"], "symptoms": patient_data["symptoms"], "severity": patient_data["severity"], }, ) async def save_vitals(self, vitals_data: Dict): """Save vital signs reading""" await self.conn.execute( """ INSERT INTO vitals_readings ( patient_id, band_id, timestamp, seq, hr_bpm, spo2, temp_c, activity, tier, flags ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( vitals_data["patient_id"], vitals_data["band_id"], vitals_data["timestamp"], vitals_data.get("seq", 0), vitals_data["hr_bpm"], vitals_data["spo2"], vitals_data["temp_c"], vitals_data["activity"], vitals_data["tier"], json.dumps(vitals_data.get("flags", [])), ), ) await self.conn.commit() async def get_patient_vitals_history( self, patient_id: str, limit: int = 100 ) -> List[Dict]: """Get vital signs history for a patient""" cursor = await self.conn.execute( """ SELECT timestamp, hr_bpm, spo2, temp_c, activity, tier, seq FROM vitals_readings WHERE patient_id = ? ORDER BY timestamp DESC LIMIT ? """, (patient_id, limit), ) rows = await cursor.fetchall() return [ { "timestamp": row[0], "hr_bpm": row[1], "spo2": row[2], "temp_c": row[3], "activity": row[4], "tier": row[5], "seq": row[6], } for row in rows ] async def log_tier_change( self, patient_id: str, old_tier: str, new_tier: str, reason: str, vitals: Dict ): """Log tier change for audit trail""" await self.conn.execute( """ INSERT INTO tier_changes ( patient_id, change_time, old_tier, new_tier, trigger_reason, vitals_snapshot ) VALUES (?, ?, ?, ?, ?, ?) """, ( patient_id, datetime.now().isoformat(), old_tier, new_tier, reason, json.dumps(vitals), ), ) await self.conn.commit() await self.log_event( "tier_change", patient_id, None, {"old_tier": old_tier, "new_tier": new_tier, "reason": reason}, ) async def get_tier_history(self, patient_id: str) -> List[Dict]: """Get tier change history""" cursor = await self.conn.execute( """ SELECT change_time, old_tier, new_tier, trigger_reason, vitals_snapshot FROM tier_changes WHERE patient_id = ? ORDER BY change_time ASC """, (patient_id,), ) rows = await cursor.fetchall() return [ { "change_time": row[0], "old_tier": row[1], "new_tier": row[2], "reason": row[3], "vitals": json.loads(row[4]) if row[4] else {}, } for row in rows ] async def log_event( self, event_type: str, patient_id: Optional[str], band_id: Optional[str], details: Dict, ): """Log system event for audit trail""" serializable_details = {} for key, value in details.items(): if isinstance(value, datetime): serializable_details[key] = value.isoformat() else: serializable_details[key] = value await self.conn.execute( """ INSERT INTO system_events ( event_time, event_type, patient_id, band_id, details ) VALUES (?, ?, ?, ?, ?) """, ( datetime.now().isoformat(), event_type, patient_id, band_id, json.dumps(serializable_details), ), ) await self.conn.commit() async def discharge_patient(self, patient_id: str): """Mark patient as discharged""" await self.conn.execute( """ UPDATE patients SET is_active = 0, discharge_time = ? WHERE patient_id = ? """, (datetime.now().isoformat(), patient_id), ) await self.conn.commit() await self.log_event( "discharge", patient_id, None, {"discharge_time": datetime.now().isoformat()}, )