651 lines
20 KiB
Python

"""
VitalLink Database Layer
SQLite persistence for patients, vitals, and audit trail
Enables replay, analysis, and incident investigation
"""
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Optional
from contextlib import asynccontextmanager
import aiosqlite
# ============================================================================
# DATABASE SCHEMA
# ============================================================================
SCHEMA_SQL = """
-- Patients table
CREATE TABLE IF NOT EXISTS patients (
patient_id TEXT PRIMARY KEY,
band_id TEXT NOT NULL,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
dob TEXT NOT NULL,
symptoms TEXT, -- JSON array
severity TEXT,
check_in_time TIMESTAMP NOT NULL,
discharge_time TIMESTAMP,
current_tier TEXT DEFAULT 'NORMAL',
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Vitals readings table (time-series data)
CREATE TABLE IF NOT EXISTS vitals_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
patient_id TEXT NOT NULL,
band_id TEXT NOT NULL,
timestamp REAL NOT NULL,
seq INTEGER,
hr_bpm INTEGER,
spo2 INTEGER,
temp_c REAL,
activity REAL,
tier TEXT,
flags TEXT, -- JSON array
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);
-- Triage assessments (audit trail)
CREATE TABLE IF NOT EXISTS triage_assessments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
patient_id TEXT NOT NULL,
assessment_time TIMESTAMP NOT NULL,
triage_level INTEGER,
tier_name TEXT,
priority_score REAL,
reasoning TEXT,
abnormalities TEXT, -- JSON array
wait_time_minutes INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);
-- Tier changes (for incident investigation)
CREATE TABLE IF NOT EXISTS tier_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
patient_id TEXT NOT NULL,
change_time TIMESTAMP NOT NULL,
old_tier TEXT,
new_tier TEXT,
trigger_reason TEXT,
vitals_snapshot TEXT, -- JSON
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);
-- System events (audit log)
CREATE TABLE IF NOT EXISTS system_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_time TIMESTAMP NOT NULL,
event_type TEXT NOT NULL, -- 'patient_checkin', 'discharge', 'tier_change', 'alert', etc.
patient_id TEXT,
band_id TEXT,
details TEXT, -- JSON
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Wristband assignments (inventory tracking)
CREATE TABLE IF NOT EXISTS wristband_assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
band_id TEXT NOT NULL,
patient_id TEXT,
assigned_at TIMESTAMP,
released_at TIMESTAMP,
packet_count INTEGER DEFAULT 0,
band_type TEXT, -- 'real' or 'simulated'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_vitals_patient ON vitals_readings(patient_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_vitals_timestamp ON vitals_readings(timestamp);
CREATE INDEX IF NOT EXISTS idx_patients_active ON patients(is_active);
CREATE INDEX IF NOT EXISTS idx_tier_changes_patient ON tier_changes(patient_id, change_time);
"""
# ============================================================================
# DATABASE MANAGER
# ============================================================================
class VitalLinkDatabase:
"""Database manager for VitalLink system"""
def __init__(self, db_path: str = "vitallink.db"):
self.db_path = db_path
self.conn = None
async def initialize(self):
"""Initialize database and create tables"""
self.conn = await aiosqlite.connect(self.db_path)
await self.conn.executescript(SCHEMA_SQL)
await self.conn.commit()
print(f"✓ Database initialized: {self.db_path}")
async def close(self):
"""Close database connection"""
if self.conn:
await self.conn.close()
# ========================================================================
# PATIENT OPERATIONS
# ========================================================================
async def save_patient(self, patient_data: Dict):
"""Save new patient to database"""
# Convert datetime to ISO string if needed
check_in_time = patient_data["check_in_time"]
if isinstance(check_in_time, datetime):
check_in_time = check_in_time.isoformat()
await self.conn.execute(
"""
INSERT INTO patients (
patient_id, band_id, first_name, last_name, dob,
symptoms, severity, check_in_time, current_tier
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
patient_data["patient_id"],
patient_data["band_id"],
patient_data["first_name"],
patient_data["last_name"],
patient_data["dob"],
json.dumps(patient_data["symptoms"]),
patient_data["severity"],
check_in_time, # Now a string
patient_data.get("current_tier", "NORMAL"),
),
)
await self.conn.commit()
# Log event with serializable data
await self.log_event(
"patient_checkin",
patient_data["patient_id"],
patient_data["band_id"],
{
"first_name": patient_data["first_name"],
"last_name": patient_data["last_name"],
"symptoms": patient_data["symptoms"],
"severity": patient_data["severity"],
},
)
async def update_patient_tier(self, patient_id: str, new_tier: str):
"""Update patient's current tier"""
await self.conn.execute(
"""
UPDATE patients SET current_tier = ? WHERE patient_id = ?
""",
(new_tier, patient_id),
)
await self.conn.commit()
async def discharge_patient(self, patient_id: str):
"""Mark patient as discharged"""
await self.conn.execute(
"""
UPDATE patients
SET is_active = 0, discharge_time = ?
WHERE patient_id = ?
""",
(datetime.now(), patient_id),
)
await self.conn.commit()
await self.log_event(
"discharge",
patient_id,
None,
{"discharge_time": datetime.now().isoformat()},
)
# ========================================================================
# VITALS OPERATIONS
# ========================================================================
async def save_vitals(self, vitals_data: Dict):
"""Save vital signs reading"""
await self.conn.execute(
"""
INSERT INTO vitals_readings (
patient_id, band_id, timestamp, seq, hr_bpm, spo2,
temp_c, activity, tier, flags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
vitals_data["patient_id"],
vitals_data["band_id"],
vitals_data["timestamp"],
vitals_data.get("seq", 0),
vitals_data["hr_bpm"],
vitals_data["spo2"],
vitals_data["temp_c"],
vitals_data["activity"],
vitals_data["tier"],
json.dumps(vitals_data.get("flags", [])),
),
)
await self.conn.commit()
async def get_patient_vitals_history(
self, patient_id: str, limit: int = 100
) -> List[Dict]:
"""Get vital signs history for a patient"""
cursor = await self.conn.execute(
"""
SELECT timestamp, hr_bpm, spo2, temp_c, activity, tier, seq
FROM vitals_readings
WHERE patient_id = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(patient_id, limit),
)
rows = await cursor.fetchall()
return [
{
"timestamp": row[0],
"hr_bpm": row[1],
"spo2": row[2],
"temp_c": row[3],
"activity": row[4],
"tier": row[5],
"seq": row[6],
}
for row in rows
]
# ========================================================================
# TIER CHANGE TRACKING
# ========================================================================
async def log_tier_change(
self, patient_id: str, old_tier: str, new_tier: str, reason: str, vitals: Dict
):
"""Log tier change for audit trail"""
await self.conn.execute(
"""
INSERT INTO tier_changes (
patient_id, change_time, old_tier, new_tier,
trigger_reason, vitals_snapshot
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
patient_id,
datetime.now(),
old_tier,
new_tier,
reason,
json.dumps(vitals),
),
)
await self.conn.commit()
await self.log_event(
"tier_change",
patient_id,
None,
{"old_tier": old_tier, "new_tier": new_tier, "reason": reason},
)
async def get_tier_history(self, patient_id: str) -> List[Dict]:
"""Get tier change history for incident review"""
cursor = await self.conn.execute(
"""
SELECT change_time, old_tier, new_tier, trigger_reason, vitals_snapshot
FROM tier_changes
WHERE patient_id = ?
ORDER BY change_time ASC
""",
(patient_id,),
)
rows = await cursor.fetchall()
return [
{
"change_time": row[0],
"old_tier": row[1],
"new_tier": row[2],
"reason": row[3],
"vitals": json.loads(row[4]) if row[4] else {},
}
for row in rows
]
# ========================================================================
# SYSTEM EVENTS (Audit Trail)
# ========================================================================
async def log_event(
self,
event_type: str,
patient_id: Optional[str],
band_id: Optional[str],
details: Dict,
):
"""Log system event for audit trail"""
# Ensure details is JSON serializable
serializable_details = {}
for key, value in details.items():
if isinstance(value, datetime):
serializable_details[key] = value.isoformat()
else:
serializable_details[key] = value
await self.conn.execute(
"""
INSERT INTO system_events (
event_time, event_type, patient_id, band_id, details
) VALUES (?, ?, ?, ?, ?)
""",
(
datetime.now().isoformat(), # Convert to string
event_type,
patient_id,
band_id,
json.dumps(serializable_details),
),
)
await self.conn.commit()
async def get_events(
self,
event_type: Optional[str] = None,
patient_id: Optional[str] = None,
hours: int = 24,
) -> List[Dict]:
"""Get system events for analysis"""
query = "SELECT event_time, event_type, patient_id, band_id, details FROM system_events WHERE 1=1"
params = []
if event_type:
query += " AND event_type = ?"
params.append(event_type)
if patient_id:
query += " AND patient_id = ?"
params.append(patient_id)
query += " AND event_time > datetime('now', '-' || ? || ' hours')"
params.append(hours)
query += " ORDER BY event_time DESC LIMIT 1000"
cursor = await self.conn.execute(query, params)
rows = await cursor.fetchall()
return [
{
"event_time": row[0],
"event_type": row[1],
"patient_id": row[2],
"band_id": row[3],
"details": json.loads(row[4]) if row[4] else {},
}
for row in rows
]
# ========================================================================
# ANALYTICS & REPLAY
# ========================================================================
async def get_session_summary(
self, start_time: datetime, end_time: datetime
) -> Dict:
"""Get summary statistics for a session (for incident review)"""
cursor = await self.conn.execute(
"""
SELECT
COUNT(DISTINCT patient_id) as total_patients,
COUNT(*) as total_vitals,
AVG(hr_bpm) as avg_hr,
AVG(spo2) as avg_spo2,
AVG(temp_c) as avg_temp
FROM vitals_readings
WHERE timestamp BETWEEN ? AND ?
""",
(start_time.timestamp(), end_time.timestamp()),
)
row = await cursor.fetchone()
return {
"total_patients": row[0],
"total_vitals_recorded": row[1],
"average_hr": round(row[2], 1) if row[2] else 0,
"average_spo2": round(row[3], 1) if row[3] else 0,
"average_temp": round(row[4], 2) if row[4] else 0,
}
async def export_patient_data(self, patient_id: str) -> Dict:
"""Export complete patient data for incident investigation"""
# Get patient info
cursor = await self.conn.execute(
"""
SELECT * FROM patients WHERE patient_id = ?
""",
(patient_id,),
)
patient_row = await cursor.fetchone()
if not patient_row:
return None
# Get all vitals
vitals = await self.get_patient_vitals_history(patient_id, limit=10000)
# Get tier changes
tier_changes = await self.get_tier_history(patient_id)
# Get related events
events = await self.get_events(patient_id=patient_id, hours=24)
return {
"patient_id": patient_id,
"name": f"{patient_row[2]} {patient_row[3]}",
"dob": patient_row[4],
"symptoms": json.loads(patient_row[5]) if patient_row[5] else [],
"severity": patient_row[6],
"check_in_time": patient_row[7],
"discharge_time": patient_row[8],
"total_vitals": len(vitals),
"vitals_timeline": vitals,
"tier_changes": tier_changes,
"events": events,
"export_time": datetime.now().isoformat(),
}
# ============================================================================
# REPLAY SYSTEM
# ============================================================================
class VitalsReplaySystem:
"""Replay historical vitals data for analysis"""
def __init__(self, db: VitalLinkDatabase):
self.db = db
async def replay_patient_session(self, patient_id: str, speed: float = 1.0):
"""
Replay a patient's entire session
speed: 1.0 = real-time, 10.0 = 10x faster, 0.1 = slow motion
"""
vitals = await self.db.get_patient_vitals_history(patient_id, limit=10000)
vitals.reverse() # Chronological order
if not vitals:
print(f"No data found for patient {patient_id}")
return
print(f"\n{'=' * 80}")
print(f"REPLAYING SESSION: {patient_id} ({len(vitals)} readings)")
print(f"Speed: {speed}x | Press Ctrl+C to stop")
print(f"{'=' * 80}\n")
start_time = vitals[0]["timestamp"]
for i, reading in enumerate(vitals):
# Calculate delay
if i > 0:
time_diff = reading["timestamp"] - vitals[i - 1]["timestamp"]
await asyncio.sleep(time_diff / speed)
# Display reading
elapsed = reading["timestamp"] - start_time
tier_symbol = (
"🔴"
if reading["tier"] == "EMERGENCY"
else "🟡"
if reading["tier"] == "ALERT"
else "🟢"
)
print(
f"[{elapsed:7.1f}s] {tier_symbol} Seq={reading['seq']:3d} | "
f"HR={reading['hr_bpm']:3d} SpO2={reading['spo2']:2d}% "
f"Temp={reading['temp_c']:.1f}°C | {reading['tier']}"
)
print(f"\n{'=' * 80}")
print(f"Replay complete: {len(vitals)} readings")
print(f"{'=' * 80}\n")
async def analyze_critical_events(self, patient_id: str):
"""Analyze critical tier changes and deterioration events"""
tier_changes = await self.db.get_tier_history(patient_id)
print(f"\n{'=' * 80}")
print(f"CRITICAL EVENT ANALYSIS: {patient_id}")
print(f"{'=' * 80}\n")
for change in tier_changes:
print(f"[{change['change_time']}]")
print(f" {change['old_tier']}{change['new_tier']}")
print(f" Reason: {change['reason']}")
print(f" Vitals: {change['vitals']}")
print()
print(f"{'=' * 80}\n")
# ============================================================================
# INTEGRATION HELPERS
# ============================================================================
async def init_database(db_path: str = "vitallink.db") -> VitalLinkDatabase:
"""Initialize database for use in FastAPI"""
db = VitalLinkDatabase(db_path)
await db.initialize()
return db
# ============================================================================
# CLI TOOLS
# ============================================================================
async def cli_export_patient(patient_id: str, output_file: str = None):
"""Export patient data to JSON file"""
db = VitalLinkDatabase()
await db.initialize()
data = await db.export_patient_data(patient_id)
if not data:
print(f"Patient {patient_id} not found")
return
if output_file:
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"✓ Exported to {output_file}")
else:
print(json.dumps(data, indent=2))
await db.close()
async def cli_replay_session(patient_id: str, speed: float = 1.0):
"""Replay a patient session"""
db = VitalLinkDatabase()
await db.initialize()
replay = VitalsReplaySystem(db)
await replay.replay_patient_session(patient_id, speed)
await db.close()
async def cli_analyze_incident(patient_id: str):
"""Analyze critical events for a patient"""
db = VitalLinkDatabase()
await db.initialize()
replay = VitalsReplaySystem(db)
await replay.analyze_critical_events(patient_id)
# Also show vital trends
vitals = await db.get_patient_vitals_history(patient_id, limit=1000)
if vitals:
print("VITAL SIGN TRENDS:")
print(
f" HR range: {min(v['hr_bpm'] for v in vitals)} - {max(v['hr_bpm'] for v in vitals)} bpm"
)
print(
f" SpO2 range: {min(v['spo2'] for v in vitals)} - {max(v['spo2'] for v in vitals)}%"
)
print(
f" Temp range: {min(v['temp_c'] for v in vitals):.1f} - {max(v['temp_c'] for v in vitals):.1f}°C"
)
print()
await db.close()
if __name__ == "__main__":
import argparse
import asyncio
parser = argparse.ArgumentParser(description="VitalLink Database Tools")
parser.add_argument("--export", metavar="PATIENT_ID", help="Export patient data")
parser.add_argument("--replay", metavar="PATIENT_ID", help="Replay patient session")
parser.add_argument(
"--analyze", metavar="PATIENT_ID", help="Analyze critical events"
)
parser.add_argument(
"--speed", type=float, default=1.0, help="Replay speed multiplier"
)
parser.add_argument("--output", "-o", help="Output file for export")
args = parser.parse_args()
if args.export:
asyncio.run(cli_export_patient(args.export, args.output))
elif args.replay:
asyncio.run(cli_replay_session(args.replay, args.speed))
elif args.analyze:
asyncio.run(cli_analyze_incident(args.analyze))
else:
parser.print_help()