360 lines
16 KiB
Python
360 lines
16 KiB
Python
import streamlit as st
|
|
import cv2
|
|
import numpy as np
|
|
import threading
|
|
import time
|
|
import logging
|
|
import os
|
|
import queue
|
|
from datetime import datetime
|
|
import yaml
|
|
from ultralytics import YOLO
|
|
import mediapipe as mp
|
|
from roboflow import Roboflow
|
|
from sklearn.ensemble import IsolationForest
|
|
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
|
|
import torch
|
|
import onnxruntime as ort # For quantized inference
|
|
|
|
# Setup logging for traceability
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler('predictions.log'), logging.StreamHandler()])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Config (save as config.yaml or inline)
|
|
CONFIG = {
|
|
'yolo_base': 'yolov8n.pt', # COCO pretrained
|
|
'conf_threshold': 0.7,
|
|
'perclos_threshold': 0.35,
|
|
'distraction_duration': 3, # seconds
|
|
'ttc_threshold': 2.5, # for FCW
|
|
'speed_limit': 60, # km/h sim
|
|
'min_tailgate_dist': 5, # meters est
|
|
'roboflow_api_key': 'gwfyWZIBeb6RIQfbU4ha', # Replace
|
|
'videomae_model': 'MCG-NJU/videomae-base',
|
|
'inference_skip': 3, # Frames between inferences
|
|
}
|
|
|
|
@st.cache_resource
|
|
def load_models():
|
|
"""Load all pre-trained models efficiently."""
|
|
# YOLO Base (vehicles, peds, phones)
|
|
yolo_base = YOLO(CONFIG['yolo_base'])
|
|
# Export to ONNX only if file doesn't exist (int8 quantization not supported in Ultralytics ONNX export)
|
|
onnx_path = 'yolov8n.onnx'
|
|
if not os.path.exists(onnx_path):
|
|
yolo_base.export(format='onnx', simplify=True) # Simplify for faster inference
|
|
logger.info(f"Exported YOLO to {onnx_path}")
|
|
yolo_session = ort.InferenceSession(onnx_path)
|
|
|
|
# Seatbelt (Roboflow pretrained)
|
|
rf = Roboflow(api_key=CONFIG['roboflow_api_key'])
|
|
seatbelt_project = rf.workspace('karan-panja').project('seat-belt-detection-uhqwa')
|
|
seatbelt_model = seatbelt_project.version(1).model
|
|
|
|
# VideoMAE for actions (zero-shot) - DISABLED: Too heavy for low-spec/Raspberry Pi
|
|
# JIT scripting fails with transformers, and model is too large for edge devices
|
|
# TODO: Replace with lightweight MediaPipe Pose-based action detection
|
|
processor = None
|
|
videomae = None
|
|
logger.warning("VideoMAE disabled - too heavy for low-spec CPUs. Action recognition will use face analysis only.")
|
|
|
|
# MediaPipe for face/PERCLOS
|
|
mp_face_mesh = mp.solutions.face_mesh
|
|
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, refine_landmarks=True)
|
|
|
|
# Isolation Forest for anomalies - train with dummy data for now
|
|
# TODO: Replace with real training data from normal driving scenarios
|
|
iso_forest = IsolationForest(contamination=0.1, random_state=42)
|
|
# Train with dummy "normal" data (3 features: perclos, phone_action, avg_confidence)
|
|
# Normal values: low perclos (<0.3), no phone (0), good confidence (>0.5)
|
|
dummy_normal_data = np.random.rand(100, 3) * np.array([0.3, 0.1, 0.3]) + np.array([0.0, 0.0, 0.5])
|
|
iso_forest.fit(dummy_normal_data)
|
|
logger.info("Isolation Forest trained with dummy data (replace with real training data)")
|
|
|
|
return yolo_session, seatbelt_model, (processor, videomae), face_mesh, iso_forest
|
|
|
|
class RealTimePredictor:
|
|
def __init__(self):
|
|
self.yolo_session, self.seatbelt_model, self.videomae, self.face_mesh, self.iso_forest = load_models()
|
|
self.frame_buffer = [] # For temporal (last 10 frames)
|
|
self.alert_states = {alert: False for alert in [
|
|
'Drowsiness', 'Distraction', 'Smoking', 'No Seatbelt', 'Driver Absent',
|
|
'FCW', 'LDW', 'Pedestrian', 'Hard Braking', 'Hard Acceleration', 'Tailgating', 'Overspeed'
|
|
]}
|
|
self.last_inference = 0
|
|
self.logs = []
|
|
|
|
def preprocess_frame(self, frame):
|
|
"""Resize and normalize for speed."""
|
|
frame = cv2.resize(frame, (640, 480))
|
|
return frame
|
|
|
|
def detect_objects(self, frame):
|
|
"""YOLO for vehicles, peds, phones."""
|
|
# ONNX inference (fast)
|
|
# YOLO expects square input (640x640) in BCHW format (batch, channels, height, width)
|
|
# Current frame is HWC format (height, width, channels) after resize to (480, 640, 3)
|
|
|
|
# Resize to square for YOLO
|
|
yolo_input = cv2.resize(frame, (640, 640))
|
|
|
|
# Convert HWC to CHW: (640, 640, 3) -> (3, 640, 640)
|
|
yolo_input = yolo_input.transpose(2, 0, 1)
|
|
|
|
# Add batch dimension and normalize: (3, 640, 640) -> (1, 3, 640, 640)
|
|
yolo_input = yolo_input[None].astype(np.float32) / 255.0
|
|
|
|
input_name = self.yolo_session.get_inputs()[0].name
|
|
inputs = {input_name: yolo_input}
|
|
outputs = self.yolo_session.run(None, inputs)
|
|
|
|
# YOLOv8 ONNX output format: (1, 84, 8400) = (batch, features, detections)
|
|
# Features: 4 (bbox xyxy) + 80 (COCO classes) = 84
|
|
# Detections: 8400 anchor points
|
|
output = outputs[0] # Shape: (1, 84, 8400)
|
|
|
|
# Extract bboxes: first 4 features, all detections -> (4, 8400) -> transpose to (8400, 4)
|
|
bboxes = output[0, :4, :].transpose() # (8400, 4) in xyxy format
|
|
|
|
# Extract class scores: features 4:84, all detections -> (80, 8400)
|
|
class_scores = output[0, 4:, :] # (80, 8400)
|
|
|
|
# Get class indices and confidences
|
|
classes = np.argmax(class_scores, axis=0) # (8400,) class indices
|
|
confs = np.max(class_scores, axis=0) # (8400,) confidence scores
|
|
|
|
# Filter by confidence threshold
|
|
high_conf = confs > CONFIG['conf_threshold']
|
|
|
|
# Scale bboxes back to original frame size (from 640x640 to original frame size)
|
|
# Note: bboxes are in 640x640 coordinate space, need to scale if frame was different size
|
|
# For now, return as-is (will need proper scaling if using different input sizes)
|
|
|
|
return {'bboxes': bboxes[high_conf], 'confs': confs[high_conf], 'classes': classes[high_conf]}
|
|
|
|
def detect_seatbelt(self, frame):
|
|
"""Roboflow seatbelt."""
|
|
predictions = self.seatbelt_model.predict(frame, confidence=CONFIG['conf_threshold']).json()
|
|
has_belt = any(p['class'] == 'with_mask' for p in predictions['predictions']) # Adapt class
|
|
return has_belt, predictions[0]['confidence'] if predictions['predictions'] else 0
|
|
|
|
def analyze_face(self, frame):
|
|
"""MediaPipe PERCLOS, head pose, absence."""
|
|
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
results = self.face_mesh.process(rgb)
|
|
if not results.multi_face_landmarks:
|
|
return {'perclos': 0, 'head_pose': [0,0,0], 'absent': True, 'conf': 0}
|
|
|
|
landmarks = results.multi_face_landmarks[0].landmark
|
|
# PERCLOS (eye closure %)
|
|
left_eye = np.mean([landmarks[i].y for i in [33, 7, 163, 144]])
|
|
right_eye = np.mean([landmarks[i].y for i in [362, 382, 381, 380]])
|
|
ear = (landmarks[10].y + landmarks[152].y) / 2 # Eye aspect simplified
|
|
perclos = max((left_eye - ear) / (ear - min(left_eye, ear)), (right_eye - ear) / (ear - min(right_eye, ear)))
|
|
# Head pose (simplified yaw for looking away)
|
|
yaw = (landmarks[454].x - landmarks[323].x) * 100 # Rough estimate
|
|
return {'perclos': perclos, 'head_pose': [0, yaw, 0], 'absent': False, 'conf': 0.9}
|
|
|
|
def recognize_actions(self, buffer):
|
|
"""Action recognition - VideoMAE disabled, using placeholder for now."""
|
|
# TODO: Implement lightweight action detection using MediaPipe Pose
|
|
# For now, return zeros (actions detected via face analysis in validate_alerts)
|
|
return {'yawn': 0, 'phone': 0, 'look_away': 0}
|
|
|
|
def optical_flow(self, prev_frame, curr_frame):
|
|
"""OpenCV dense optical flow for speed, braking, accel estimation."""
|
|
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
|
curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
|
|
# Use Farneback dense optical flow (correct API for full-frame flow)
|
|
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
|
|
# Calculate magnitude of flow vectors
|
|
magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
|
|
return np.mean(magnitude) # High = accel/braking; est speed ~ magnitude * scale (calib)
|
|
|
|
def estimate_distance(self, bboxes):
|
|
"""Simple bbox size for tailgating/FCW dist est (calib needed)."""
|
|
if len(bboxes) == 0: return float('inf')
|
|
areas = (bboxes[:, 2] - bboxes[:, 0]) * (bboxes[:, 3] - bboxes[:, 1])
|
|
return 10 / np.sqrt(np.max(areas)) # Inverse sqrt for dist (rough)
|
|
|
|
def detect_anomaly(self, features):
|
|
"""Flag unusual (low conf)."""
|
|
pred = self.iso_forest.predict(features.reshape(1, -1))[0]
|
|
return 1 if pred == -1 else 0
|
|
|
|
def validate_alerts(self, frame, prev_frame, detections, face_data, actions, seatbelt, flow_mag, buffer):
|
|
"""Rule-based validation for all alerts."""
|
|
features = np.array([face_data['perclos'], actions['phone'], detections['confs'].mean() if len(detections['confs']) else 0])
|
|
anomaly = self.detect_anomaly(features)
|
|
|
|
results = {}
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# DSMS
|
|
drowsy = (face_data['perclos'] > CONFIG['perclos_threshold']) and (actions['yawn'] > CONFIG['conf_threshold'])
|
|
results['Drowsiness'] = drowsy and not anomaly
|
|
distraction = (actions['phone'] > CONFIG['conf_threshold']) or (abs(face_data['head_pose'][1]) > 20)
|
|
results['Distraction'] = distraction and not anomaly
|
|
smoke = 'cigarette' in [c for c in detections['classes']] # YOLO class proxy
|
|
results['Smoking'] = smoke and detections['confs'][detections['classes'] == 67].max() > CONFIG['conf_threshold']
|
|
results['No Seatbelt'] = not seatbelt[0] and seatbelt[1] > CONFIG['conf_threshold']
|
|
results['Driver Absent'] = face_data['absent']
|
|
|
|
# ADAS (heuristics)
|
|
vehicles = sum(1 for c in detections['classes'] if c == 2) # Car class
|
|
peds = sum(1 for c in detections['classes'] if c == 0)
|
|
dist_est = self.estimate_distance(detections['bboxes'][detections['classes'] == 2])
|
|
ttc = dist_est / (flow_mag + 1e-5) if flow_mag > 0 else float('inf') # Rough TTC
|
|
results['FCW'] = (ttc < CONFIG['ttc_threshold']) and vehicles > 0
|
|
results['Tailgating'] = (dist_est < CONFIG['min_tailgate_dist']) and vehicles > 0
|
|
results['Pedestrian'] = peds > 0 and detections['confs'][detections['classes'] == 0].max() > CONFIG['conf_threshold']
|
|
|
|
# LDW: Simple edge detect for lane (OpenCV)
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
edges = cv2.Canny(gray, 50, 150)
|
|
lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, minLineLength=100)
|
|
in_lane = len(lines) > 2 if lines is not None else False # Basic: many lines = on lane
|
|
results['LDW'] = not in_lane
|
|
|
|
# Braking/Accel/Overspeed via flow magnitude
|
|
# Note: flow_mag is now a scalar (mean magnitude), direction detection needs full flow array
|
|
# For now, use magnitude threshold - TODO: Add direction analysis for better detection
|
|
speed_est = flow_mag * 0.1 # Calib: km/h proxy (needs calibration)
|
|
braking = flow_mag > 15 # High magnitude suggests sudden change
|
|
accel = flow_mag > 12 and flow_mag < 15 # Moderate-high magnitude
|
|
results['Hard Braking'] = braking
|
|
results['Hard Acceleration'] = accel
|
|
results['Overspeed'] = speed_est > CONFIG['speed_limit']
|
|
|
|
# Log all
|
|
log_entry = f"{timestamp} | Features: {features} | Anomaly: {anomaly} | Alerts: {results}"
|
|
logger.info(log_entry)
|
|
self.logs.append(log_entry[-100:]) # Last 100 chars for display
|
|
|
|
# Update states (sustain if true)
|
|
for alert, triggered in results.items():
|
|
if triggered:
|
|
self.alert_states[alert] = True
|
|
elif time.time() - self.last_inference > CONFIG['distraction_duration']:
|
|
self.alert_states[alert] = False
|
|
|
|
return results
|
|
|
|
def run_inference(self, frame, prev_frame, buffer, frame_idx):
|
|
"""Full pipeline every N frames."""
|
|
if frame_idx % CONFIG['inference_skip'] != 0: return {}, frame
|
|
start = time.time()
|
|
|
|
frame = self.preprocess_frame(frame)
|
|
detections = self.detect_objects(frame)
|
|
seatbelt = self.detect_seatbelt(frame)
|
|
face_data = self.analyze_face(frame)
|
|
buffer.append(frame)
|
|
buffer = buffer[-10:] # Keep last 10
|
|
actions = self.recognize_actions(buffer)
|
|
flow_mag = self.optical_flow(prev_frame, frame) if prev_frame is not None else 0
|
|
|
|
alerts = self.validate_alerts(frame, prev_frame, detections, face_data, actions, seatbelt, flow_mag, buffer)
|
|
self.last_inference = time.time()
|
|
|
|
# Overlay
|
|
for i, bbox in enumerate(detections['bboxes']):
|
|
x1, y1, x2, y2 = map(int, bbox)
|
|
label = f"{detections['classes'][i]}:{detections['confs'][i]:.2f}"
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
# Alert texts
|
|
for alert, active in self.alert_states.items():
|
|
if active:
|
|
cv2.putText(frame, f"ALERT: {alert}", (10, 30 + list(self.alert_states.keys()).index(alert)*20),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
|
|
|
|
logger.info(f"Inference time: {time.time() - start:.2f}s")
|
|
return alerts, frame
|
|
|
|
def video_loop(predictor, frame_queue):
|
|
"""Threaded capture - puts frames in queue for main thread to display."""
|
|
cap = cv2.VideoCapture(0) # Webcam; for RPi: 'nvarguscamerasrc ! video/x-raw(memory:NVMM), width=640, height=480, framerate=30/1 ! nvvidconv ! video/x-raw, format=BGRx ! videoconvert ! video/x-raw, format=BGR ! appsink'
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
|
|
cap.set(cv2.CAP_PROP_FPS, 30)
|
|
|
|
prev_frame = None
|
|
buffer = []
|
|
frame_idx = 0
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
alerts, frame = predictor.run_inference(frame, prev_frame, buffer, frame_idx)
|
|
prev_frame = frame.copy()
|
|
frame_idx += 1
|
|
|
|
# BGR to RGB for Streamlit
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
# Put frame in queue (non-blocking, drop old frames if queue full)
|
|
try:
|
|
frame_queue.put_nowait(frame_rgb)
|
|
except queue.Full:
|
|
# Queue full, remove oldest and add new
|
|
try:
|
|
frame_queue.get_nowait()
|
|
frame_queue.put_nowait(frame_rgb)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
time.sleep(0.033) # ~30 FPS cap
|
|
|
|
# Streamlit UI
|
|
st.title("🚗 Real-Time DSMS/ADAS Validator")
|
|
st.sidebar.title("Active Alerts")
|
|
|
|
# Initialize predictor
|
|
if 'predictor' not in st.session_state:
|
|
st.session_state.predictor = RealTimePredictor()
|
|
st.session_state.frame_queue = queue.Queue(maxsize=2) # Small queue to avoid lag
|
|
st.session_state.video_thread = None
|
|
|
|
predictor = st.session_state.predictor
|
|
frame_queue = st.session_state.frame_queue
|
|
|
|
# Start video thread if not running
|
|
if st.session_state.video_thread is None or not st.session_state.video_thread.is_alive():
|
|
st.session_state.video_thread = threading.Thread(
|
|
target=video_loop,
|
|
args=(predictor, frame_queue),
|
|
daemon=True
|
|
)
|
|
st.session_state.video_thread.start()
|
|
|
|
# Main video display loop
|
|
video_placeholder = st.empty()
|
|
|
|
# Get latest frame from queue and display
|
|
try:
|
|
frame = frame_queue.get_nowait()
|
|
video_placeholder.image(frame, channels='RGB', use_container_width=True)
|
|
except queue.Empty:
|
|
# No frame available yet, show placeholder
|
|
video_placeholder.info("Waiting for camera feed...")
|
|
|
|
# Sidebar: Alerts & Logs
|
|
with st.sidebar:
|
|
st.subheader("Alerts")
|
|
for alert, active in predictor.alert_states.items():
|
|
st.write(f"{'🔴' if active else '🟢'} {alert}")
|
|
|
|
st.subheader("Recent Logs (Traceable)")
|
|
for log in predictor.logs[-10:]:
|
|
st.text(log)
|
|
|
|
st.info("👆 Alerts trigger only on high conf + rules. Check `predictions.log` for full traces. Calibrate distances/speeds for your setup.")
|
|
|
|
# Auto-refresh to update video feed
|
|
time.sleep(0.033) # ~30 FPS
|
|
st.rerun() |