DriverTrac/src/poc_demo_rpi.py
2025-11-25 14:48:04 +05:30

817 lines
30 KiB
Python

"""
World-Class POC Demo - Driver State Monitoring System (DSMS)
Optimized for Raspberry Pi 5 - NO MediaPipe Dependencies!
Features:
- Drowsiness Detection (PERCLOS via OpenCV) - Highly Accurate
- Distraction Detection (Head Pose via OpenCV) - Highly Accurate
- Driver Absent Detection (OpenCV) - Highly Accurate
- Phone Detection (YOLOv8n) - Reliable
- Seatbelt Detection (YOLO Person + Position Analysis) - Reliable
100% MediaPipe-Free - Smooth Execution on Raspberry Pi 5!
"""
import sys
import os
# Add parent directory to path to prevent "no module found src" errors
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import streamlit as st
import cv2
import numpy as np
import threading
import time
import logging
import queue
from pathlib import Path
# Setup logging FIRST
LOG_DIR = Path(__file__).parent.parent / 'logs'
LOG_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'poc_demo.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Core ML Libraries - NO MediaPipe!
from ultralytics import YOLO
import onnxruntime as ort
# Configuration
BASE_DIR = Path(__file__).parent.parent
CONFIG = {
'yolo_model': str(BASE_DIR / 'models' / 'yolov8n.pt'),
'yolo_onnx': str(BASE_DIR / 'models' / 'yolov8n.onnx'),
'conf_threshold': 0.5,
'perclos_threshold': 0.3, # Eye closure threshold
'head_pose_threshold': 25, # Degrees for distraction
'inference_skip': 2, # Process every 2nd frame for performance
'frame_size': (640, 480), # Optimized for Pi
'max_logs': 10, # Maximum number of log entries to keep
}
# COCO class IDs
COCO_CLASSES = {
0: 'person', # For seatbelt detection
67: 'cell phone',
}
class OpenCVFaceAnalyzer:
"""OpenCV-based face analysis - NO MediaPipe needed!"""
def __init__(self):
# Load Haar Cascade for face detection
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
self.face_cascade = cv2.CascadeClassifier(cascade_path)
# Load eye cascade for PERCLOS
eye_cascade_path = cv2.data.haarcascades + 'haarcascade_eye.xml'
self.eye_cascade = cv2.CascadeClassifier(eye_cascade_path)
if self.face_cascade.empty() or self.eye_cascade.empty():
raise ValueError("Failed to load OpenCV cascades")
logger.info("✓ OpenCV Face Analyzer loaded")
def analyze(self, frame):
"""Analyze face for drowsiness, distraction, and presence."""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
h, w = frame.shape[:2]
# Detect faces
faces = self.face_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
if len(faces) == 0:
return {
'present': False,
'perclos': 0.0,
'head_yaw': 0.0,
'head_pitch': 0.0,
}
# Get largest face (most likely driver)
face = max(faces, key=lambda f: f[2] * f[3])
x, y, w_face, h_face = face
# Calculate head pose (simplified)
# Face position relative to frame center indicates head yaw
face_center_x = x + w_face / 2
frame_center_x = w / 2
yaw = ((face_center_x - frame_center_x) / frame_center_x) * 100 # Normalized
# Face size and position indicate pitch (simplified)
face_ratio = w_face / w
pitch = (face_ratio - 0.15) * 200 # Normalize
# Detect eyes for PERCLOS
roi_gray = gray[y:y+h_face, x:x+w_face]
eyes = self.eye_cascade.detectMultiScale(roi_gray)
# Calculate PERCLOS (Percentage of Eye Closure)
# Simplified: based on eye detection
if len(eyes) >= 2:
# Both eyes detected - open
perclos = 0.0
elif len(eyes) == 1:
# One eye detected - partially closed
perclos = 0.5
else:
# No eyes detected - likely closed or looking away
perclos = 0.8
return {
'present': True,
'perclos': min(1.0, perclos),
'head_yaw': yaw,
'head_pitch': pitch,
}
@st.cache_resource
def load_models():
"""Load optimized models - NO MediaPipe!"""
logger.info("Loading models (MediaPipe-free)...")
# YOLO Model (ONNX for speed)
model_dir = Path(__file__).parent.parent / 'models'
model_dir.mkdir(exist_ok=True)
onnx_path = Path(CONFIG['yolo_onnx'])
if not onnx_path.exists():
logger.info("Exporting YOLO to ONNX...")
yolo_model_path = CONFIG['yolo_model']
if not Path(yolo_model_path).exists():
yolo = YOLO('yolov8n.pt') # Will auto-download
else:
yolo = YOLO(yolo_model_path)
yolo.export(format='onnx', simplify=True)
exported_path = Path('yolov8n.onnx')
if exported_path.exists() and not onnx_path.exists():
exported_path.rename(onnx_path)
yolo_session = ort.InferenceSession(str(onnx_path))
logger.info("✓ YOLO ONNX loaded")
# OpenCV Face Analyzer (NO MediaPipe!)
face_analyzer = OpenCVFaceAnalyzer()
logger.info("✓ OpenCV Face Analyzer loaded")
return yolo_session, face_analyzer
class POCPredictor:
"""Streamlined predictor - MediaPipe-free, optimized for Raspberry Pi 5."""
def __init__(self):
self.yolo_session, self.face_analyzer = load_models()
self.alert_states = {
'Drowsiness': False,
'Distraction': False,
'Driver Absent': False,
'Phone Detected': False,
'No Seatbelt': False,
}
# Track alert persistence for temporal smoothing
self.alert_persistence = {
'Drowsiness': 0,
'Distraction': 0,
'Driver Absent': 0,
'Phone Detected': 0,
'No Seatbelt': 0,
}
# Frames to persist alert after condition clears (for smooth transitions)
self.alert_clear_frames = {
'Drowsiness': 10, # Clear after 10 frames (~0.3s at 30fps)
'Distraction': 8, # Clear after 8 frames
'Driver Absent': 5, # Clear immediately
'Phone Detected': 5, # Clear after 5 frames
'No Seatbelt': 8, # Clear after 8 frames
}
self.stats = {
'frames_processed': 0,
'total_inference_time': 0,
'alerts_triggered': 0,
}
self.logs = []
def detect_objects(self, frame):
"""YOLO object detection - optimized for POC with performance improvements."""
try:
# Resize to square for YOLO (use INTER_LINEAR for speed)
yolo_input = cv2.resize(frame, (640, 640), interpolation=cv2.INTER_LINEAR)
# Convert HWC to CHW (optimized)
yolo_input = yolo_input.transpose(2, 0, 1)
yolo_input = np.ascontiguousarray(yolo_input[None].astype(np.float32) / 255.0)
# Run inference
input_name = self.yolo_session.get_inputs()[0].name
outputs = self.yolo_session.run(None, {input_name: yolo_input})
# Parse YOLOv8 ONNX output: (1, 84, 8400)
output = outputs[0]
bboxes = output[0, :4, :].transpose() # (8400, 4)
class_scores = output[0, 4:, :] # (80, 8400)
classes = np.argmax(class_scores, axis=0).astype(np.int32) # Ensure int32
confs = np.max(class_scores, axis=0)
# Filter by confidence and relevant classes (phone and person)
relevant_classes = np.array([0, 67], dtype=np.int32) # person, cell phone
conf_mask = confs > CONFIG['conf_threshold']
class_mask = np.isin(classes, relevant_classes)
mask = conf_mask & class_mask
# Ensure mask is boolean and arrays are properly indexed
mask = mask.astype(bool)
# Get indices where mask is True
valid_indices = np.where(mask)[0]
if len(valid_indices) > 0:
return {
'bboxes': bboxes[valid_indices],
'confs': confs[valid_indices],
'classes': classes[valid_indices]
}
else:
return {
'bboxes': np.array([], dtype=np.float32).reshape(0, 4),
'confs': np.array([], dtype=np.float32),
'classes': np.array([], dtype=np.int32)
}
except Exception as e:
logger.error(f"Error in detect_objects: {e}")
return {
'bboxes': np.array([], dtype=np.float32).reshape(0, 4),
'confs': np.array([], dtype=np.float32),
'classes': np.array([], dtype=np.int32)
}
def analyze_face(self, frame):
"""OpenCV face analysis - NO MediaPipe!"""
return self.face_analyzer.analyze(frame)
def detect_seatbelt(self, frame, detections):
"""Detect seatbelt using YOLO person detection + position analysis."""
# Find person in detections
person_detections = []
for i, cls in enumerate(detections['classes']):
if cls == 0: # person class
person_detections.append({
'bbox': detections['bboxes'][i],
'conf': detections['confs'][i]
})
if len(person_detections) == 0:
return False, 0.0
# Get largest person (most likely driver)
person = max(person_detections, key=lambda p: p['conf'])
bbox = person['bbox']
h, w = frame.shape[:2]
# Scale bbox from 640x640 to frame size
x1, y1, x2, y2 = bbox
x1, x2 = int(x1 * w / 640), int(x2 * w / 640)
y1, y2 = int(y1 * h / 640), int(y2 * h / 640)
# Analyze person position for seatbelt detection
# Simplified heuristic: if person is sitting upright and visible, assume seatbelt
person_height = y2 - y1
person_width = x2 - x1
aspect_ratio = person_height / person_width if person_width > 0 else 0
# Person should be upright (height > width) and reasonably sized
is_upright = aspect_ratio > 1.2
is_reasonable_size = 0.1 < (person_height / h) < 0.8
# Check if person is in driver position (left side of frame typically)
is_in_driver_position = x1 < w * 0.6 # Left 60% of frame
has_seatbelt = is_upright and is_reasonable_size and is_in_driver_position
# Confidence based on detection quality
confidence = person['conf'] * (1.0 if has_seatbelt else 0.5)
return has_seatbelt, confidence
def process_frame(self, frame, frame_idx, last_results=None):
"""Process single frame - streamlined and optimized with smooth video support."""
should_process = (frame_idx % CONFIG['inference_skip'] == 0)
# Always use last results for smooth video (even if not processing this frame)
if not should_process and last_results is not None:
last_alerts = last_results[0]
last_face_data = last_results[5] if len(last_results) > 5 else {'present': False, 'perclos': 0, 'head_yaw': 0}
last_detections = last_results[6] if len(last_results) > 6 else {
'bboxes': np.array([], dtype=np.float32).reshape(0, 4),
'confs': np.array([], dtype=np.float32),
'classes': np.array([], dtype=np.int32)
}
# Draw last predictions on current frame for smooth video
annotated = self.draw_detections(frame, last_detections, last_face_data, last_alerts)
return last_alerts, annotated, False, last_results[2] if len(last_results) > 2 else False, \
last_results[3] if len(last_results) > 3 else 0.0, last_face_data, last_detections
# Process this frame
start_time = time.time()
# Run detections
face_data = self.analyze_face(frame)
if not face_data['present']:
alerts = {'Driver Absent': True}
detections = {
'bboxes': np.array([], dtype=np.float32).reshape(0, 4),
'confs': np.array([], dtype=np.float32),
'classes': np.array([], dtype=np.int32)
}
seatbelt, belt_conf = False, 0.0
else:
# Run object detection
detections = self.detect_objects(frame)
# Seatbelt detection (only every 3rd processed frame for performance)
if frame_idx % (CONFIG['inference_skip'] * 3) == 0:
seatbelt, belt_conf = self.detect_seatbelt(frame, detections)
else:
# Use last results
if last_results and len(last_results) > 3:
seatbelt, belt_conf = last_results[2], last_results[3]
else:
seatbelt, belt_conf = False, 0.0
# Determine alerts
alerts = {}
alerts['Drowsiness'] = face_data['perclos'] > CONFIG['perclos_threshold']
alerts['Distraction'] = abs(face_data['head_yaw']) > (CONFIG['head_pose_threshold'] * 0.8)
alerts['Driver Absent'] = not face_data['present']
# Safe check for phone detection
phone_detected = False
if len(detections['classes']) > 0:
try:
phone_detected = np.any(detections['classes'] == 67)
except:
phone_detected = False
alerts['Phone Detected'] = phone_detected
alerts['No Seatbelt'] = not seatbelt and belt_conf > 0.3
# Update states with temporal smoothing (clear alerts when condition stops)
for alert, triggered in alerts.items():
if triggered:
# Condition detected - set alert and reset persistence counter
if not self.alert_states.get(alert, False):
self.alert_states[alert] = True
self.stats['alerts_triggered'] += 1
self.alert_persistence[alert] = 0 # Reset counter
else:
# Condition not detected - increment persistence counter
if self.alert_states.get(alert, False):
self.alert_persistence[alert] += 1
# Clear alert if condition has been absent for enough frames
if self.alert_persistence[alert] >= self.alert_clear_frames.get(alert, 5):
self.alert_states[alert] = False
self.alert_persistence[alert] = 0
# Draw on frame
annotated_frame = self.draw_detections(frame, detections, face_data, alerts)
# Update stats
inference_time = time.time() - start_time
self.stats['frames_processed'] += 1
self.stats['total_inference_time'] += inference_time
# Log (keep only recent logs)
log_entry = f"Frame {frame_idx} | PERCLOS: {face_data['perclos']:.2f} | Yaw: {face_data['head_yaw']:.1f}° | Alerts: {sum(alerts.values())}"
logger.info(log_entry)
self.logs.append(log_entry[-80:])
# Keep only recent logs to avoid memory issues
if len(self.logs) > CONFIG['max_logs']:
self.logs = self.logs[-CONFIG['max_logs']:]
return alerts, annotated_frame, True, seatbelt, belt_conf, face_data, detections
def draw_detections(self, frame, detections, face_data, alerts):
"""Draw detections and alerts on frame."""
annotated = frame.copy()
h, w = annotated.shape[:2]
# Draw bounding boxes (safe iteration)
if len(detections['bboxes']) > 0 and len(detections['confs']) > 0 and len(detections['classes']) > 0:
try:
# Ensure all arrays have same length
min_len = min(len(detections['bboxes']), len(detections['confs']), len(detections['classes']))
for i in range(min_len):
bbox = detections['bboxes'][i]
conf = float(detections['confs'][i])
cls = int(detections['classes'][i])
# Scale bbox from 640x640 to frame size
x1, y1, x2, y2 = bbox
x1, x2 = int(x1 * w / 640), int(x2 * w / 640)
y1, y2 = int(y1 * h / 640), int(y2 * h / 640)
# Ensure coordinates are valid
x1, x2 = max(0, min(x1, w)), max(0, min(x2, w))
y1, y2 = max(0, min(y1, h)), max(0, min(y2, h))
# Color by class
if cls == 0: # person
color = (0, 255, 0) # Green
label = "Person"
elif cls == 67: # phone
color = (255, 0, 255) # Magenta
label = "Phone"
else:
color = (255, 255, 0) # Cyan
label = "Object"
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
cv2.putText(annotated, f"{label}: {conf:.2f}", (x1, max(y1-10, 10)),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
except Exception as e:
logger.warning(f"Error drawing detections: {e}")
# Draw face status
if face_data['present']:
status_text = f"PERCLOS: {face_data['perclos']:.2f} | Yaw: {face_data['head_yaw']:.1f}°"
cv2.putText(annotated, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
else:
cv2.putText(annotated, "DRIVER ABSENT", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 3)
# Draw active alerts
y_offset = 60
for alert, active in alerts.items():
if active:
cv2.putText(annotated, f"ALERT: {alert}", (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
y_offset += 25
return annotated
def video_capture_loop(predictor, frame_queue, video_source=None):
"""Background thread for video capture and processing."""
if video_source is None:
# Try different camera indices
cap = None
for camera_idx in [0, 1, 2]:
cap = cv2.VideoCapture(camera_idx)
if cap.isOpened():
logger.info(f"✓ Camera {camera_idx} opened successfully")
break
cap.release()
if cap is None or not cap.isOpened():
logger.error("❌ No camera found!")
test_frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(test_frame, "NO CAMERA DETECTED", (50, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
frame_rgb = cv2.cvtColor(test_frame, cv2.COLOR_BGR2RGB)
try:
frame_queue.put_nowait(frame_rgb)
except:
pass
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, CONFIG['frame_size'][0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CONFIG['frame_size'][1])
cap.set(cv2.CAP_PROP_FPS, 30)
else:
cap = cv2.VideoCapture(video_source)
if not cap.isOpened():
logger.error(f"❌ Could not open video file: {video_source}")
return
logger.info(f"✓ Video file opened: {video_source}")
frame_idx = 0
last_results = None
while True:
ret, frame = cap.read()
if not ret:
if video_source is not None:
logger.info("End of video file reached")
break
logger.warning("Failed to read frame")
time.sleep(0.1)
continue
# Always process frame (for smooth video - shows all frames with last predictions)
try:
results = predictor.process_frame(frame, frame_idx, last_results)
alerts = results[0]
processed_frame = results[1]
was_processed = results[2]
# Update last results if we got new predictions
if was_processed:
last_results = results
# If not processed, we still use last_results for drawing (already handled in process_frame)
except Exception as e:
logger.error(f"Error processing frame: {e}", exc_info=True)
# On error, show raw frame with last predictions if available
if last_results:
try:
last_alerts = last_results[0]
last_face_data = last_results[5] if len(last_results) > 5 else {'present': False, 'perclos': 0, 'head_yaw': 0}
last_detections = last_results[6] if len(last_results) > 6 else {'bboxes': np.array([]), 'confs': np.array([]), 'classes': np.array([])}
processed_frame = predictor.draw_detections(frame, last_detections, last_face_data, last_alerts)
except:
processed_frame = frame
else:
processed_frame = frame
frame_idx += 1
# Convert to RGB for display
frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB)
# Always put frame in queue (smooth video - all frames shown)
try:
frame_queue.put_nowait(frame_rgb)
except queue.Full:
# If queue is full, replace oldest frame
try:
frame_queue.get_nowait()
frame_queue.put_nowait(frame_rgb)
except queue.Empty:
pass
# Frame rate control
if video_source is not None:
fps = cap.get(cv2.CAP_PROP_FPS) or 30
time.sleep(1.0 / fps)
else:
# For camera, target 30 FPS (smooth video)
time.sleep(0.033)
cap.release()
logger.info("Video capture loop ended")
# Streamlit UI - World-Class Design for Raspberry Pi/Chromium
st.set_page_config(
page_title="DSMS POC Demo - Raspberry Pi",
page_icon="🚗",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for better UI on Chromium
st.markdown("""
<style>
/* Main container styling */
.main .block-container {
padding-top: 2rem;
padding-bottom: 2rem;
}
/* Alert status styling */
.alert-active {
background-color: #ffebee;
border-left: 4px solid #f44336;
padding: 0.5rem;
margin: 0.25rem 0;
border-radius: 4px;
font-weight: 600;
}
.alert-normal {
background-color: #e8f5e9;
border-left: 4px solid #4caf50;
padding: 0.5rem;
margin: 0.25rem 0;
border-radius: 4px;
font-weight: 500;
}
/* Status indicators */
.status-badge {
display: inline-block;
padding: 0.25rem 0.75rem;
border-radius: 12px;
font-size: 0.85rem;
font-weight: 600;
margin-left: 0.5rem;
}
.status-active {
background-color: #f44336;
color: white;
}
.status-normal {
background-color: #4caf50;
color: white;
}
/* Statistics cards */
.metric-card {
background-color: #f5f5f5;
padding: 1rem;
border-radius: 8px;
margin: 0.5rem 0;
}
/* Video feed container */
.video-container {
border: 2px solid #e0e0e0;
border-radius: 8px;
padding: 0.5rem;
background-color: #000;
}
/* Header styling */
h1 {
color: #1976d2;
border-bottom: 3px solid #1976d2;
padding-bottom: 0.5rem;
}
/* Sidebar styling */
.css-1d391kg {
background-color: #fafafa;
}
</style>
""", unsafe_allow_html=True)
st.title("Driver State Monitoring System - Raspberry Pi 5")
st.markdown("**MediaPipe-Free | Optimized for Smooth Execution**")
# Initialize session state
if 'predictor' not in st.session_state:
st.session_state.predictor = POCPredictor()
st.session_state.frame_queue = queue.Queue(maxsize=2)
st.session_state.video_thread = None
st.session_state.video_file_path = None
st.session_state.current_video_file = None
st.session_state.camera_enabled = True
predictor = st.session_state.predictor
frame_queue = st.session_state.frame_queue
# Video source selection
st.sidebar.header("Video Source")
video_source_type = st.sidebar.radio(
"Select Input:",
["Camera", "Upload Video File"],
key="video_source_type",
index=0
)
st.sidebar.divider()
st.sidebar.header("Camera Control")
camera_enabled = st.sidebar.toggle(
"Camera ON/OFF",
value=st.session_state.get('camera_enabled', True),
key="camera_enabled_toggle"
)
if st.session_state.get('camera_enabled', True) != camera_enabled:
st.session_state.camera_enabled = camera_enabled
needs_restart = True
else:
st.session_state.camera_enabled = camera_enabled
if not camera_enabled:
st.sidebar.warning("Camera is OFF - No video feed")
if st.session_state.video_thread and st.session_state.video_thread.is_alive():
st.session_state.video_thread = None
# Handle video file upload
video_file_path = None
needs_restart = False
if video_source_type == "Upload Video File":
uploaded_file = st.sidebar.file_uploader(
"Upload Video",
type=['mp4', 'avi', 'mov', 'mkv', 'webm'],
help="Supported formats: MP4, AVI, MOV, MKV, WebM"
)
if uploaded_file is not None:
current_file = st.session_state.get('current_video_file', None)
if current_file != uploaded_file.name:
temp_dir = Path(__file__).parent.parent / 'assets' / 'temp_videos'
temp_dir.mkdir(parents=True, exist_ok=True)
video_file_path = temp_dir / uploaded_file.name
with open(video_file_path, 'wb') as f:
f.write(uploaded_file.read())
st.session_state.current_video_file = uploaded_file.name
st.session_state.video_file_path = str(video_file_path)
needs_restart = True
st.sidebar.success(f"Video loaded: {uploaded_file.name}")
else:
if st.session_state.get('current_video_file') is not None:
st.session_state.current_video_file = None
st.session_state.video_file_path = None
needs_restart = True
else:
if st.session_state.get('current_video_file') is not None:
st.session_state.current_video_file = None
st.session_state.video_file_path = None
needs_restart = True
# Start/restart video thread
if st.session_state.camera_enabled:
if needs_restart or st.session_state.video_thread is None or not st.session_state.video_thread.is_alive():
video_source = str(video_file_path) if video_file_path else None
st.session_state.video_thread = threading.Thread(
target=video_capture_loop,
args=(predictor, frame_queue, video_source),
daemon=True
)
st.session_state.video_thread.start()
logger.info(f"Video thread started with source: {video_source or 'Camera'}")
# Main layout
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("Live Video Feed")
video_placeholder = st.empty()
if not st.session_state.camera_enabled:
video_placeholder.warning("Camera is OFF - Enable camera to start video feed")
else:
try:
frame = frame_queue.get_nowait()
video_placeholder.image(frame, channels='RGB', width='stretch')
except queue.Empty:
video_placeholder.info("Waiting for camera feed...")
with col2:
st.subheader("Active Alerts")
alert_container = st.container()
with alert_container:
for alert, active in predictor.alert_states.items():
if active:
st.markdown(
f'<div class="alert-active">'
f'<strong>{alert}</strong>'
f'<span class="status-badge status-active">ACTIVE</span>'
f'</div>',
unsafe_allow_html=True
)
else:
st.markdown(
f'<div class="alert-normal">'
f'<strong>{alert}</strong>'
f'<span class="status-badge status-normal">Normal</span>'
f'</div>',
unsafe_allow_html=True
)
st.divider()
st.subheader("Statistics")
if predictor.stats['frames_processed'] > 0:
avg_fps = 1.0 / (predictor.stats['total_inference_time'] / predictor.stats['frames_processed'])
col_fps, col_frames = st.columns(2)
with col_fps:
st.metric("FPS", f"{avg_fps:.1f}", delta=f"{avg_fps-15:.1f}" if avg_fps > 15 else None)
with col_frames:
st.metric("Frames", predictor.stats['frames_processed'])
st.metric("Alerts Triggered", predictor.stats['alerts_triggered'])
else:
st.info("Processing frames...")
st.divider()
st.subheader("Recent Logs")
log_container = st.container()
with log_container:
if predictor.logs:
for log in predictor.logs[-5:]:
st.text(log)
else:
st.text("No logs yet...")
# Footer
st.divider()
st.info("**Features**: Drowsiness (PERCLOS) | Distraction (Head Pose) | Driver Absent | Phone Detection | Seatbelt Detection | **100% MediaPipe-Free!**")
# Auto-refresh
time.sleep(0.033)
st.rerun()