286 lines
10 KiB
Python

"""
VitalLink Main System Runner
Runs the complete system with real and/or simulated wristbands
Automatically assigns bands when patients check in via kiosk
"""
import asyncio
import aiohttp
from wristband_manager import WristbandManager
from config_system import WristbandConfig
import sys
# ============================================================================
# MAIN SYSTEM
# ============================================================================
class VitalLinkSystem:
"""Main system orchestrator"""
def __init__(self):
self.manager = WristbandManager()
self.config = WristbandConfig()
self.backend_url = self.config.get("backend_url", "http://localhost:8000")
self.running = False
self.monitoring_task = None
async def initialize(self):
"""Initialize the system"""
print("\n" + "=" * 80)
print("VitalLink System Initialization")
print("=" * 80 + "\n")
# Check backend availability
backend_ok = await self.check_backend()
if not backend_ok:
print("\n⚠️ Warning: Backend not running. System will wait for backend...")
# Scan for real wristbands if configured
if self.config.get("auto_scan_ble", False):
timeout = self.config.get("scan_timeout", 10.0)
await self.manager.scan_for_real_bands(timeout)
# Load configured real wristbands
for band_config in self.config.get_real_bands() or []:
self.manager.add_real_band(
band_config["band_id"], band_config["ble_address"]
)
# Load configured simulated wristbands
for band_config in self.config.get_simulated_bands() or []:
self.manager.add_simulated_band(
band_config["band_id"], band_config.get("profile", "stable")
)
# Show inventory
self.manager.print_inventory()
async def check_backend(self):
"""Check if backend is running"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.backend_url}/", timeout=aiohttp.ClientTimeout(total=3)
) as resp:
if resp.status == 200:
print(f"✓ Backend is running at {self.backend_url}")
return True
except Exception as e:
print(f"❌ Backend not reachable at {self.backend_url}")
return False
return False
async def monitor_new_patients(self):
"""Monitor backend for new patient check-ins and auto-assign bands"""
print("\n🔍 Monitoring for new patient check-ins...")
known_patients = set()
prefer_real = self.config.get("prefer_real_bands", False)
while self.running:
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.backend_url}/api/queue") as resp:
if resp.status == 200:
queue = await resp.json()
for patient in queue:
patient_id = patient["patient_id"]
# New patient detected
if patient_id not in known_patients:
known_patients.add(patient_id)
# Check if already has a band assigned and monitoring
has_active_band = any(
b.patient_id == patient_id
and b.band_id in self.manager.active_monitoring
for b in self.manager.inventory.values()
)
if not has_active_band:
print(
f"\n🆕 New patient detected: {patient_id} ({patient['name']})"
)
# Try to assign a band
band = self.manager.assign_band(
patient_id, prefer_real=prefer_real
)
if band:
print(
f" ✓ Assigned {band.band_id} ({band.type.value})"
)
# Start monitoring
await self.manager.start_monitoring(
band.band_id
)
else:
# No bands available - create a new simulated one on the fly
print(
f" ⚠️ No bands available, creating emergency simulated band..."
)
emergency_band_id = f"VitalLink-EMRG{len(self.manager.inventory):02d}"
band = self.manager.add_simulated_band(
emergency_band_id, "stable"
)
band.assign_to_patient(patient_id)
print(
f" ✓ Created and assigned {emergency_band_id}"
)
await self.manager.start_monitoring(
band.band_id
)
except Exception as e:
# Silently continue if backend temporarily unavailable
pass
# Check every 2 seconds
await asyncio.sleep(2)
async def run(self):
"""Run the main system"""
self.running = True
await self.initialize()
print("\n" + "=" * 80)
print("VitalLink System Running")
print("=" * 80)
print("\n✓ Monitoring for new patients from kiosk check-ins")
print(
"✓ Auto-assigning wristbands (prefer real: {})".format(
self.config.get("prefer_real_bands", False)
)
)
print("\nPress Ctrl+C to stop\n")
print("=" * 80 + "\n")
# Start monitoring for new patients
self.monitoring_task = asyncio.create_task(self.monitor_new_patients())
try:
# Keep running until interrupted
while self.running:
await asyncio.sleep(10)
# Periodic status update
status = self.manager.get_status()
available = status["status_breakdown"].get("available", 0)
print(
f"[Status] Active: {status['active_monitoring']} monitoring | "
f"Available: {available} bands | "
f"Real: {status['real_bands']} | "
f"Sim: {status['simulated_bands']}"
)
except KeyboardInterrupt:
print("\n\n⚠️ Shutting down...")
await self.shutdown()
async def shutdown(self):
"""Clean shutdown"""
self.running = False
# Cancel monitoring task
if self.monitoring_task:
self.monitoring_task.cancel()
# Stop all monitoring
print("Stopping all wristband monitoring...")
for band_id in list(self.manager.active_monitoring.keys()):
await self.manager.stop_monitoring(band_id)
print("\n✓ VitalLink system stopped")
self.manager.print_inventory()
# ============================================================================
# CLI MODES
# ============================================================================
async def interactive_mode():
"""Interactive mode with menu"""
system = VitalLinkSystem()
await system.initialize()
while True:
print("\n" + "=" * 80)
print("VITALLINK INTERACTIVE MODE")
print("=" * 80)
print("\n1. Show inventory")
print("2. Scan for real wristbands")
print("3. Assign band to patient")
print("4. Release band")
print("5. Start auto-monitoring mode")
print("6. Exit")
choice = input("\nSelect option: ")
if choice == "1":
system.manager.print_inventory()
elif choice == "2":
await system.manager.scan_for_real_bands(timeout=10.0)
elif choice == "3":
patient_id = input("Patient ID: ")
prefer_real = input("Prefer real band? (y/n): ").lower() == "y"
band = system.manager.assign_band(patient_id, prefer_real=prefer_real)
if band:
await system.manager.start_monitoring(band.band_id)
elif choice == "4":
band_id = input("Band ID to release: ")
await system.manager.release_band(band_id)
elif choice == "5":
await system.run()
break
elif choice == "6":
print("Goodbye!")
break
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="VitalLink System")
parser.add_argument(
"--interactive", "-i", action="store_true", help="Run in interactive mode"
)
parser.add_argument(
"--config",
"-c",
default="wristband_config.yaml",
help="Configuration file path",
)
args = parser.parse_args()
try:
if args.interactive:
asyncio.run(interactive_mode())
else:
# Normal automatic mode
system = VitalLinkSystem()
asyncio.run(system.run())
except KeyboardInterrupt:
print("\n\nExiting...")
sys.exit(0)