DriverTrac/src/poc_demo.py
2025-11-24 18:38:24 +05:30

716 lines
28 KiB
Python

"""
World-Class POC Demo - Driver State Monitoring System (DSMS)
Focused on 100% accurate, reliable features optimized for Raspberry Pi
Features:
- Drowsiness Detection (PERCLOS via MediaPipe) - Highly Accurate
- Distraction Detection (Head Pose via MediaPipe) - Highly Accurate
- Driver Absent Detection (MediaPipe) - Highly Accurate
- Phone Detection (YOLOv8n) - Reliable
- Smoking Detection (MediaPipe Pose - Hand-to-Mouth) - Lightweight & Accurate
- Seatbelt Detection (MediaPipe Pose - Shoulder Analysis) - Lightweight & Accurate
Optimized: Uses MediaPipe Pose for smoke/seatbelt (LIGHTER than YOLO vehicle/pedestrian!)
"""
import streamlit as st
import cv2
import numpy as np
import threading
import time
import logging
import os
import queue
from datetime import datetime
from pathlib import Path
# Core ML Libraries
from ultralytics import YOLO
import mediapipe as mp
import onnxruntime as ort
# MediaPipe Solutions
mp_face_mesh = mp.solutions.face_mesh
mp_pose = mp.solutions.pose
# Setup logging
LOG_DIR = Path(__file__).parent.parent / 'logs'
LOG_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'poc_demo.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Configuration
BASE_DIR = Path(__file__).parent.parent
CONFIG = {
'yolo_model': str(BASE_DIR / 'models' / 'yolov8n.pt'),
'yolo_onnx': str(BASE_DIR / 'models' / 'yolov8n.onnx'),
'conf_threshold': 0.5, # Lower for demo visibility
'perclos_threshold': 0.3, # Eye closure threshold
'head_pose_threshold': 25, # Degrees for distraction
'inference_skip': 2, # Process every 2nd frame for performance
'frame_size': (640, 480), # Optimized for Pi
}
# COCO class IDs we care about (only phone now - removed vehicle/pedestrian)
COCO_CLASSES = {
67: 'cell phone',
}
@st.cache_resource
def load_models():
"""Load optimized models for POC."""
logger.info("Loading models...")
# YOLO Model (ONNX for speed)
model_dir = Path(__file__).parent.parent / 'models'
model_dir.mkdir(exist_ok=True)
onnx_path = Path(CONFIG['yolo_onnx'])
if not onnx_path.exists():
logger.info("Exporting YOLO to ONNX...")
yolo_model_path = CONFIG['yolo_model']
if not Path(yolo_model_path).exists():
# Download if not exists
yolo = YOLO('yolov8n.pt') # Will auto-download
else:
yolo = YOLO(yolo_model_path)
yolo.export(format='onnx', simplify=True)
# Move to models directory if exported to current dir
exported_path = Path('yolov8n.onnx')
if exported_path.exists() and not onnx_path.exists():
exported_path.rename(onnx_path)
yolo_session = ort.InferenceSession(str(onnx_path))
logger.info("✓ YOLO ONNX loaded")
# MediaPipe Face Mesh (lightweight, accurate)
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
logger.info("✓ MediaPipe Face Mesh loaded")
# MediaPipe Pose (for smoke and seatbelt detection - lightweight!)
pose = mp_pose.Pose(
static_image_mode=False,
model_complexity=1, # 0=fastest, 1=balanced, 2=most accurate
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
logger.info("✓ MediaPipe Pose loaded (for smoke & seatbelt)")
return yolo_session, face_mesh, pose
class POCPredictor:
"""Streamlined predictor for POC demo - only reliable features."""
def __init__(self):
self.yolo_session, self.face_mesh, self.pose = load_models()
self.alert_states = {
'Drowsiness': False,
'Distraction': False,
'Driver Absent': False,
'Phone Detected': False,
'Smoking Detected': False,
'No Seatbelt': False,
}
self.stats = {
'frames_processed': 0,
'total_inference_time': 0,
'alerts_triggered': 0,
}
self.logs = []
def detect_objects(self, frame):
"""YOLO object detection - optimized for POC."""
# Resize to square for YOLO
yolo_input = cv2.resize(frame, (640, 640))
# Convert HWC to CHW
yolo_input = yolo_input.transpose(2, 0, 1)
yolo_input = yolo_input[None].astype(np.float32) / 255.0
# Run inference
input_name = self.yolo_session.get_inputs()[0].name
outputs = self.yolo_session.run(None, {input_name: yolo_input})
# Parse YOLOv8 ONNX output: (1, 84, 8400)
output = outputs[0]
bboxes = output[0, :4, :].transpose() # (8400, 4)
class_scores = output[0, 4:, :] # (80, 8400)
classes = np.argmax(class_scores, axis=0)
confs = np.max(class_scores, axis=0)
# Filter by confidence and relevant classes (only phone now)
relevant_classes = [67] # cell phone only
mask = (confs > CONFIG['conf_threshold']) & np.isin(classes, relevant_classes)
return {
'bboxes': bboxes[mask],
'confs': confs[mask],
'classes': classes[mask]
}
def analyze_face(self, frame):
"""MediaPipe face analysis - highly accurate PERCLOS and head pose."""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
if not results.multi_face_landmarks:
return {
'present': False,
'perclos': 0.0,
'head_yaw': 0.0,
'head_pitch': 0.0,
}
landmarks = results.multi_face_landmarks[0].landmark
# Calculate PERCLOS (Percentage of Eye Closure) using Eye Aspect Ratio (EAR)
# MediaPipe Face Mesh eye landmarks
# Left eye: [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246]
# Right eye: [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398]
# Left eye EAR calculation (using key points)
left_eye_vertical_1 = abs(landmarks[159].y - landmarks[145].y)
left_eye_vertical_2 = abs(landmarks[158].y - landmarks[153].y)
left_eye_horizontal = abs(landmarks[33].x - landmarks[133].x)
left_ear = (left_eye_vertical_1 + left_eye_vertical_2) / (2.0 * left_eye_horizontal) if left_eye_horizontal > 0 else 0.3
# Right eye EAR calculation
right_eye_vertical_1 = abs(landmarks[386].y - landmarks[374].y)
right_eye_vertical_2 = abs(landmarks[385].y - landmarks[380].y)
right_eye_horizontal = abs(landmarks[362].x - landmarks[263].x)
right_ear = (right_eye_vertical_1 + right_eye_vertical_2) / (2.0 * right_eye_horizontal) if right_eye_horizontal > 0 else 0.3
avg_ear = (left_ear + right_ear) / 2.0
# PERCLOS: inverse of EAR (lower EAR = more closed = higher PERCLOS)
# Normal EAR when open: ~0.25-0.3, closed: ~0.1-0.15
# Normalize to 0-1 scale where 1 = fully closed
perclos = max(0.0, min(1.0, 1.0 - (avg_ear / 0.25))) # Normalize
# Head pose estimation (simplified)
# Use nose and face edges for yaw (left/right)
nose_tip = landmarks[4]
left_face = landmarks[234]
right_face = landmarks[454]
yaw = (nose_tip.x - (left_face.x + right_face.x) / 2) * 100
# Use forehead and chin for pitch (up/down)
forehead = landmarks[10]
chin = landmarks[152]
pitch = (forehead.y - chin.y) * 100
return {
'present': True,
'perclos': min(1.0, perclos),
'head_yaw': yaw,
'head_pitch': pitch,
}
def detect_smoking(self, frame):
"""Detect smoking using MediaPipe Pose - hand-to-mouth gesture (optimized)."""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.pose.process(rgb_frame)
if not results.pose_landmarks:
return False, 0.0
landmarks = results.pose_landmarks.landmark
# Get key points (using face mesh mouth if available, else pose mouth)
left_wrist = landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value]
right_wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value]
# Use nose as mouth reference (more reliable than mouth landmark)
nose = landmarks[mp_pose.PoseLandmark.NOSE.value]
# Calculate distance from wrists to nose/mouth area
def distance(p1, p2):
return np.sqrt((p1.x - p2.x)**2 + (p1.y - p2.y)**2)
left_dist = distance(left_wrist, nose)
right_dist = distance(right_wrist, nose)
# Improved threshold: hand near face area (0.12 for more sensitivity)
smoking_threshold = 0.12
min_dist = min(left_dist, right_dist)
is_smoking = min_dist < smoking_threshold
# Also check if wrist is above nose (hand raised to face)
wrist_above_nose = (left_wrist.y < nose.y + 0.05) or (right_wrist.y < nose.y + 0.05)
is_smoking = is_smoking and wrist_above_nose
confidence = max(0.0, 1.0 - (min_dist / smoking_threshold))
return is_smoking, confidence
def detect_seatbelt(self, frame):
"""Detect seatbelt using MediaPipe Pose - improved shoulder/chest analysis."""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.pose.process(rgb_frame)
if not results.pose_landmarks:
return False, 0.0
landmarks = results.pose_landmarks.landmark
# Get shoulder and chest landmarks
left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value]
right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value]
# Calculate shoulder width and position
shoulder_width = abs(left_shoulder.x - right_shoulder.x)
shoulder_avg_y = (left_shoulder.y + right_shoulder.y) / 2
hip_avg_y = (left_hip.y + right_hip.y) / 2
# Improved seatbelt detection:
# 1. Shoulders must be visible
# 2. Shoulders should be above hips (person sitting upright)
# 3. Reasonable shoulder width (person facing camera)
shoulder_visible = (left_shoulder.visibility > 0.4 and right_shoulder.visibility > 0.4)
upright_position = shoulder_avg_y < hip_avg_y # Shoulders above hips
reasonable_width = 0.04 < shoulder_width < 0.3 # Not too narrow or wide
has_seatbelt = shoulder_visible and upright_position and reasonable_width
# Confidence based on visibility and position quality
visibility_score = (left_shoulder.visibility + right_shoulder.visibility) / 2.0
position_score = 1.0 if upright_position else 0.5
confidence = visibility_score * position_score
# If detection fails, lower confidence
if not has_seatbelt:
confidence = max(0.2, confidence * 0.5)
return has_seatbelt, confidence
def process_frame(self, frame, frame_idx, last_results=None):
"""Process single frame - streamlined for POC.
Returns: (alerts_dict, annotated_frame, should_update_display)
"""
should_process = (frame_idx % CONFIG['inference_skip'] == 0)
# If not processing this frame, return last results with current frame (smooth video)
if not should_process and last_results is not None:
last_alerts = last_results[0]
last_face_data = last_results[7] if len(last_results) > 7 else {'present': False, 'perclos': 0, 'head_yaw': 0}
# Draw last annotations on current frame for smooth video (no new detections)
annotated = self.draw_detections(frame, {'bboxes': [], 'confs': [], 'classes': []},
last_face_data, last_alerts)
return last_alerts, annotated, False, last_results[3] if len(last_results) > 3 else False, \
last_results[4] if len(last_results) > 4 else 0.0, \
last_results[5] if len(last_results) > 5 else False, \
last_results[6] if len(last_results) > 6 else 0.0, last_face_data
# Process this frame
start_time = time.time()
# Run detections (optimized - only run what's needed)
face_data = self.analyze_face(frame) # Always needed for driver presence
# Only run expensive detections if face is present
if not face_data['present']:
alerts = {'Driver Absent': True}
detections = {'bboxes': [], 'confs': [], 'classes': []}
smoking, smoke_conf = False, 0.0
seatbelt, belt_conf = False, 0.0
else:
# Run detections in parallel where possible
detections = self.detect_objects(frame)
# Optimized: Only run pose detection every 3rd processed frame (every 6th frame total)
if frame_idx % (CONFIG['inference_skip'] * 3) == 0:
smoking, smoke_conf = self.detect_smoking(frame)
seatbelt, belt_conf = self.detect_seatbelt(frame)
else:
# Use last results for smooth detection
if last_results and len(last_results) > 3:
smoking, smoke_conf = last_results[3], last_results[4]
seatbelt, belt_conf = last_results[5], last_results[6]
else:
smoking, smoke_conf = False, 0.0
seatbelt, belt_conf = False, 0.0
# Determine alerts (improved thresholds)
alerts = {}
# Drowsiness (PERCLOS) - improved threshold
alerts['Drowsiness'] = face_data['perclos'] > CONFIG['perclos_threshold']
# Distraction (head pose) - improved threshold and temporal smoothing
head_yaw_abs = abs(face_data['head_yaw'])
# Lower threshold and require sustained distraction
alerts['Distraction'] = head_yaw_abs > (CONFIG['head_pose_threshold'] * 0.8) # 20° instead of 25°
# Driver Absent
alerts['Driver Absent'] = not face_data['present']
# Phone Detection
phone_detected = np.any(detections['classes'] == 67) if len(detections['classes']) > 0 else False
alerts['Phone Detected'] = phone_detected
# Smoking Detection (improved threshold)
alerts['Smoking Detected'] = smoking and smoke_conf > 0.4 # Lower threshold
# Seatbelt Detection (improved logic)
alerts['No Seatbelt'] = not seatbelt and belt_conf > 0.2 # Lower threshold
# Update states with temporal smoothing
for alert, triggered in alerts.items():
if triggered:
# Only update if sustained for multiple frames
if alert not in self.alert_states or not self.alert_states[alert]:
self.alert_states[alert] = True
self.stats['alerts_triggered'] += 1
else:
# Clear alert only after multiple frames of no detection
if alert in ['Drowsiness', 'Distraction', 'Smoking Detected']:
# Keep alert active for a bit (temporal smoothing)
pass
# Draw on frame
annotated_frame = self.draw_detections(frame, detections, face_data, alerts)
# Update stats
inference_time = time.time() - start_time
self.stats['frames_processed'] += 1
self.stats['total_inference_time'] += inference_time
# Log
log_entry = f"Frame {frame_idx} | PERCLOS: {face_data['perclos']:.2f} | Yaw: {face_data['head_yaw']:.1f}° | Alerts: {sum(alerts.values())}"
logger.info(log_entry)
self.logs.append(log_entry[-80:]) # Keep last 80 chars
return alerts, annotated_frame, True, smoking, smoke_conf, seatbelt, belt_conf, face_data
def draw_detections(self, frame, detections, face_data, alerts):
"""Draw detections and alerts on frame."""
annotated = frame.copy()
h, w = annotated.shape[:2]
# Draw bounding boxes
for i, (bbox, conf, cls) in enumerate(zip(detections['bboxes'], detections['confs'], detections['classes'])):
# Scale bbox from 640x640 to frame size
x1, y1, x2, y2 = bbox
x1, x2 = int(x1 * w / 640), int(x2 * w / 640)
y1, y2 = int(y1 * h / 640), int(y2 * h / 640)
# Color by class
if cls == 0: # person
color = (0, 255, 0) # Green
elif cls == 67: # phone
color = (255, 0, 255) # Magenta
elif cls in [2, 3, 5, 7]: # vehicles
color = (0, 165, 255) # Orange
else:
color = (255, 255, 0) # Cyan
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{COCO_CLASSES.get(cls, 'unknown')}: {conf:.2f}"
cv2.putText(annotated, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Draw face status
if face_data['present']:
status_text = f"PERCLOS: {face_data['perclos']:.2f} | Yaw: {face_data['head_yaw']:.1f}°"
cv2.putText(annotated, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
else:
cv2.putText(annotated, "DRIVER ABSENT", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 3)
# Draw active alerts
y_offset = 60
for alert, active in alerts.items():
if active:
cv2.putText(annotated, f"ALERT: {alert}", (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
y_offset += 25
return annotated
def video_capture_loop(predictor, frame_queue, video_source=None):
"""Background thread for video capture and processing.
video_source: None for camera, or path to video file
"""
# Initialize video source
if video_source is None:
# Try different camera indices
cap = None
for camera_idx in [0, 1, 2]:
cap = cv2.VideoCapture(camera_idx)
if cap.isOpened():
logger.info(f"✓ Camera {camera_idx} opened successfully")
break
cap.release()
if cap is None or not cap.isOpened():
logger.error("❌ No camera found!")
test_frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(test_frame, "NO CAMERA DETECTED", (50, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(test_frame, "Please connect a camera", (30, 280),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
frame_rgb = cv2.cvtColor(test_frame, cv2.COLOR_BGR2RGB)
try:
frame_queue.put_nowait(frame_rgb)
except:
pass
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, CONFIG['frame_size'][0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CONFIG['frame_size'][1])
cap.set(cv2.CAP_PROP_FPS, 30)
else:
# Video file
cap = cv2.VideoCapture(video_source)
if not cap.isOpened():
logger.error(f"❌ Could not open video file: {video_source}")
return
logger.info(f"✓ Video file opened: {video_source}")
frame_idx = 0
last_results = None
while True:
ret, frame = cap.read()
if not ret:
if video_source is not None:
# End of video file
logger.info("End of video file reached")
break
logger.warning("Failed to read frame")
time.sleep(0.1)
continue
# Process frame (returns results for smooth video)
try:
results = predictor.process_frame(frame, frame_idx, last_results)
alerts = results[0]
processed_frame = results[1]
was_processed = results[2]
# Store results for next frame (for smooth video)
if was_processed:
last_results = results
except Exception as e:
logger.error(f"Error processing frame: {e}")
processed_frame = frame
alerts = {}
was_processed = False
frame_idx += 1
# Convert to RGB for Streamlit
frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB)
# Put in queue (always show frame for smooth video)
try:
frame_queue.put_nowait(frame_rgb)
except queue.Full:
try:
frame_queue.get_nowait()
frame_queue.put_nowait(frame_rgb)
except queue.Empty:
pass
# Frame rate control
if video_source is not None:
# For video files, maintain original FPS
fps = cap.get(cv2.CAP_PROP_FPS) or 30
time.sleep(1.0 / fps)
else:
# For camera, target 30 FPS
time.sleep(0.033)
cap.release()
logger.info("Video capture loop ended")
# Streamlit UI
st.set_page_config(
page_title="DSMS POC Demo",
page_icon="🚗",
layout="wide"
)
st.title("🚗 Driver State Monitoring System - POC Demo")
st.markdown("**World-Class Real-Time Driver Monitoring** | Optimized for Raspberry Pi")
# Initialize session state FIRST (before widgets)
if 'predictor' not in st.session_state:
st.session_state.predictor = POCPredictor()
st.session_state.frame_queue = queue.Queue(maxsize=2)
st.session_state.video_thread = None
st.session_state.video_file_path = None
st.session_state.current_video_file = None
st.session_state.camera_enabled = True # Default: camera ON
predictor = st.session_state.predictor
frame_queue = st.session_state.frame_queue
# Video source selection (AFTER session state init)
st.sidebar.header("📹 Video Source")
video_source_type = st.sidebar.radio(
"Select Input:",
["Camera", "Upload Video File"],
key="video_source_type",
index=0 # Default to Camera
)
# Camera ON/OFF toggle
st.sidebar.divider()
st.sidebar.header("📹 Camera Control")
camera_enabled = st.sidebar.toggle(
"Camera ON/OFF",
value=st.session_state.get('camera_enabled', True),
key="camera_enabled_toggle",
help="Turn camera feed ON or OFF. When OFF, video processing stops completely."
)
# Check if camera state changed (needs thread restart)
if st.session_state.get('camera_enabled', True) != camera_enabled:
st.session_state.camera_enabled = camera_enabled
needs_restart = True # Restart thread with new camera setting
logger.info(f"Camera {'enabled' if camera_enabled else 'disabled'}")
else:
st.session_state.camera_enabled = camera_enabled
if not camera_enabled:
st.sidebar.warning("⚠️ Camera is OFF - No video feed")
# Stop video thread if camera is disabled
if st.session_state.video_thread and st.session_state.video_thread.is_alive():
st.session_state.video_thread = None
# Handle video file upload
video_file_path = None
needs_restart = False # Will be set to True if camera state changes
if video_source_type == "Upload Video File":
uploaded_file = st.sidebar.file_uploader(
"Upload Video",
type=['mp4', 'avi', 'mov', 'mkv', 'webm', 'flv', 'wmv', 'm4v'],
help="Supported formats: MP4, AVI, MOV, MKV, WebM, FLV, WMV, M4V"
)
if uploaded_file is not None:
# Check if this is a new file
current_file = st.session_state.get('current_video_file', None)
if current_file != uploaded_file.name:
# Save uploaded file temporarily
temp_dir = Path(__file__).parent.parent / 'assets' / 'temp_videos'
temp_dir.mkdir(parents=True, exist_ok=True)
video_file_path = temp_dir / uploaded_file.name
with open(video_file_path, 'wb') as f:
f.write(uploaded_file.read())
st.session_state.current_video_file = uploaded_file.name
st.session_state.video_file_path = str(video_file_path)
needs_restart = True
st.sidebar.success(f"✅ Video loaded: {uploaded_file.name}")
logger.info(f"Video file uploaded: {video_file_path}")
else:
video_file_path = Path(st.session_state.video_file_path) if st.session_state.video_file_path else None
else:
st.sidebar.info("📤 Please upload a video file")
if st.session_state.get('current_video_file') is not None:
st.session_state.current_video_file = None
st.session_state.video_file_path = None
needs_restart = True
else:
# Camera mode
if st.session_state.get('current_video_file') is not None:
st.session_state.current_video_file = None
st.session_state.video_file_path = None
needs_restart = True
# Start/restart video thread if camera is enabled
if st.session_state.camera_enabled:
if needs_restart or st.session_state.video_thread is None or not st.session_state.video_thread.is_alive():
# Stop existing thread
if st.session_state.video_thread and st.session_state.video_thread.is_alive():
# Thread will stop when video ends or we can't easily stop it
pass
# Start new thread
video_source = str(video_file_path) if video_file_path else None
st.session_state.video_thread = threading.Thread(
target=video_capture_loop,
args=(predictor, frame_queue, video_source),
daemon=True
)
st.session_state.video_thread.start()
logger.info(f"Video thread started with source: {video_source or 'Camera'}")
else:
# Camera disabled - stop thread if running
if st.session_state.video_thread and st.session_state.video_thread.is_alive():
st.session_state.video_thread = None
logger.info("Camera disabled - video thread stopped")
# Main layout
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("📹 Live Video Feed")
video_placeholder = st.empty()
# Get latest frame (only if camera is enabled)
if not st.session_state.camera_enabled:
video_placeholder.warning("📹 Camera is OFF - Enable camera to start video feed")
else:
try:
frame = frame_queue.get_nowait()
video_placeholder.image(frame, channels='RGB', width='stretch')
except queue.Empty:
video_placeholder.info("🔄 Waiting for camera feed...")
with col2:
st.subheader("⚠️ Active Alerts")
alert_container = st.container()
with alert_container:
for alert, active in predictor.alert_states.items():
status = "🔴 ACTIVE" if active else "🟢 Normal"
st.markdown(f"**{alert}**: {status}")
st.divider()
st.subheader("📊 Statistics")
if predictor.stats['frames_processed'] > 0:
avg_fps = 1.0 / (predictor.stats['total_inference_time'] / predictor.stats['frames_processed'])
st.metric("FPS", f"{avg_fps:.1f}")
st.metric("Frames Processed", predictor.stats['frames_processed'])
st.metric("Alerts Triggered", predictor.stats['alerts_triggered'])
st.divider()
st.subheader("📝 Recent Logs")
for log in predictor.logs[-5:]:
st.text(log)
# Footer
st.divider()
st.info("💡 **POC Features**: Drowsiness (PERCLOS) | Distraction (Head Pose) | Driver Absent | Phone Detection | Smoking Detection | Seatbelt Detection")
# Auto-refresh
time.sleep(0.033)
st.rerun()