2025-11-24 15:32:12 -05:00

321 lines
9.7 KiB
Python

"""
VitalLink Database Layer
SQLite persistence for patients, vitals, and audit trail
"""
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Optional
import aiosqlite
# ============================================================================
# DATABASE SCHEMA
# ============================================================================
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS patients (
patient_id TEXT PRIMARY KEY,
band_id TEXT NOT NULL,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
dob TEXT NOT NULL,
symptoms TEXT,
severity TEXT,
check_in_time TEXT NOT NULL,
discharge_time TEXT,
current_tier TEXT DEFAULT 'NORMAL',
is_active INTEGER DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS vitals_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
patient_id TEXT NOT NULL,
band_id TEXT NOT NULL,
timestamp REAL NOT NULL,
seq INTEGER,
hr_bpm INTEGER,
spo2 INTEGER,
temp_c REAL,
activity REAL,
tier TEXT,
flags TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);
CREATE TABLE IF NOT EXISTS tier_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
patient_id TEXT NOT NULL,
change_time TEXT NOT NULL,
old_tier TEXT,
new_tier TEXT,
trigger_reason TEXT,
vitals_snapshot TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);
CREATE TABLE IF NOT EXISTS system_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_time TEXT NOT NULL,
event_type TEXT NOT NULL,
patient_id TEXT,
band_id TEXT,
details TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_vitals_patient ON vitals_readings(patient_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_vitals_timestamp ON vitals_readings(timestamp);
CREATE INDEX IF NOT EXISTS idx_patients_active ON patients(is_active);
CREATE INDEX IF NOT EXISTS idx_tier_changes_patient ON tier_changes(patient_id, change_time);
"""
# ============================================================================
# DATABASE MANAGER
# ============================================================================
class VitalLinkDatabase:
"""Database manager for VitalLink system"""
def __init__(self, db_path: str = "vitallink.db"):
self.db_path = db_path
self.conn = None
async def initialize(self):
"""Initialize database and create tables"""
self.conn = await aiosqlite.connect(self.db_path)
await self.conn.executescript(SCHEMA_SQL)
await self.conn.commit()
print(f"✓ Database initialized: {self.db_path}")
async def close(self):
"""Close database connection"""
if self.conn:
await self.conn.close()
async def save_patient(self, patient_data: Dict):
"""Save new patient to database"""
check_in_time = patient_data["check_in_time"]
if isinstance(check_in_time, datetime):
check_in_time = check_in_time.isoformat()
# Check if patient already exists
cursor = await self.conn.execute(
"SELECT patient_id FROM patients WHERE patient_id = ?",
(patient_data["patient_id"],),
)
existing = await cursor.fetchone()
if existing:
# Update instead of insert
await self.conn.execute(
"""
UPDATE patients SET
band_id = ?, current_tier = ?
WHERE patient_id = ?
""",
(
patient_data["band_id"],
patient_data.get("current_tier", "NORMAL"),
patient_data["patient_id"],
),
)
else:
# Insert new patient
await self.conn.execute(
"""
INSERT INTO patients (
patient_id, band_id, first_name, last_name, dob,
symptoms, severity, check_in_time, current_tier
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
patient_data["patient_id"],
patient_data["band_id"],
patient_data["first_name"],
patient_data["last_name"],
patient_data["dob"],
json.dumps(patient_data["symptoms"]),
patient_data["severity"],
check_in_time,
patient_data.get("current_tier", "NORMAL"),
),
)
await self.conn.commit()
if not existing:
await self.log_event(
"patient_checkin",
patient_data["patient_id"],
patient_data["band_id"],
{
"first_name": patient_data["first_name"],
"last_name": patient_data["last_name"],
"symptoms": patient_data["symptoms"],
"severity": patient_data["severity"],
},
)
async def save_vitals(self, vitals_data: Dict):
"""Save vital signs reading"""
await self.conn.execute(
"""
INSERT INTO vitals_readings (
patient_id, band_id, timestamp, seq, hr_bpm, spo2,
temp_c, activity, tier, flags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
vitals_data["patient_id"],
vitals_data["band_id"],
vitals_data["timestamp"],
vitals_data.get("seq", 0),
vitals_data["hr_bpm"],
vitals_data["spo2"],
vitals_data["temp_c"],
vitals_data["activity"],
vitals_data["tier"],
json.dumps(vitals_data.get("flags", [])),
),
)
await self.conn.commit()
async def get_patient_vitals_history(
self, patient_id: str, limit: int = 100
) -> List[Dict]:
"""Get vital signs history for a patient"""
cursor = await self.conn.execute(
"""
SELECT timestamp, hr_bpm, spo2, temp_c, activity, tier, seq
FROM vitals_readings
WHERE patient_id = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(patient_id, limit),
)
rows = await cursor.fetchall()
return [
{
"timestamp": row[0],
"hr_bpm": row[1],
"spo2": row[2],
"temp_c": row[3],
"activity": row[4],
"tier": row[5],
"seq": row[6],
}
for row in rows
]
async def log_tier_change(
self, patient_id: str, old_tier: str, new_tier: str, reason: str, vitals: Dict
):
"""Log tier change for audit trail"""
await self.conn.execute(
"""
INSERT INTO tier_changes (
patient_id, change_time, old_tier, new_tier,
trigger_reason, vitals_snapshot
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
patient_id,
datetime.now().isoformat(),
old_tier,
new_tier,
reason,
json.dumps(vitals),
),
)
await self.conn.commit()
await self.log_event(
"tier_change",
patient_id,
None,
{"old_tier": old_tier, "new_tier": new_tier, "reason": reason},
)
async def get_tier_history(self, patient_id: str) -> List[Dict]:
"""Get tier change history"""
cursor = await self.conn.execute(
"""
SELECT change_time, old_tier, new_tier, trigger_reason, vitals_snapshot
FROM tier_changes
WHERE patient_id = ?
ORDER BY change_time ASC
""",
(patient_id,),
)
rows = await cursor.fetchall()
return [
{
"change_time": row[0],
"old_tier": row[1],
"new_tier": row[2],
"reason": row[3],
"vitals": json.loads(row[4]) if row[4] else {},
}
for row in rows
]
async def log_event(
self,
event_type: str,
patient_id: Optional[str],
band_id: Optional[str],
details: Dict,
):
"""Log system event for audit trail"""
serializable_details = {}
for key, value in details.items():
if isinstance(value, datetime):
serializable_details[key] = value.isoformat()
else:
serializable_details[key] = value
await self.conn.execute(
"""
INSERT INTO system_events (
event_time, event_type, patient_id, band_id, details
) VALUES (?, ?, ?, ?, ?)
""",
(
datetime.now().isoformat(),
event_type,
patient_id,
band_id,
json.dumps(serializable_details),
),
)
await self.conn.commit()
async def discharge_patient(self, patient_id: str):
"""Mark patient as discharged"""
await self.conn.execute(
"""
UPDATE patients
SET is_active = 0, discharge_time = ?
WHERE patient_id = ?
""",
(datetime.now().isoformat(), patient_id),
)
await self.conn.commit()
await self.log_event(
"discharge",
patient_id,
None,
{"discharge_time": datetime.now().isoformat()},
)