CP_AUTOMATION/scripts/load_test_e2e_assessment_advanced.py
2025-12-12 19:54:54 +05:30

849 lines
34 KiB
Python

"""
Advanced Load Testing Script - End-to-End Assessment Flow
World-class load testing with:
- Smart browser management (prevents system crashes)
- Progress persistence (resume from checkpoint)
- Real-time performance tracking
- Lightweight metrics collection
- Advanced error handling
Usage:
python scripts/load_test_e2e_assessment_advanced.py --students 100 --csv students.csv
Features:
- Progress persistence (save every N students)
- Resume capability (skip completed students)
- Real-time metrics dashboard
- Smart browser limiting (visible mode)
- Performance analytics
- Resource monitoring
"""
import sys
import argparse
import csv
import json
import time
import threading
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional
from collections import defaultdict
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException, TimeoutException
from pages.login_page import LoginPage
from pages.mandatory_reset_page import MandatoryResetPage
from pages.profile_incomplete_page import ProfileIncompletePage
from pages.profile_editor_page import ProfileEditorPage
from pages.assessments_page import AssessmentsPage
from pages.domains_page import DomainsPage
from pages.domain_assessment_page import DomainAssessmentPage
from pages.domain_feedback_page import DomainFeedbackPage
from utils.question_answer_helper import QuestionAnswerHelper
from utils.password_tracker import password_tracker
from utils.student_data_manager import student_data_manager
from utils.smart_wait_optimizer import SmartWaitOptimizer
from utils.randomized_wait import RandomizedWait
from utils.wait_helpers import WaitHelpers
from config.config import BASE_URL, TEST_NEW_PASSWORD
# Global tracking with locks
results_lock = threading.Lock()
progress_lock = threading.Lock()
results = {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'errors': [],
'start_time': None,
'last_update': None
}
# Performance metrics
performance_metrics = {
'step_times': defaultdict(list), # step -> [durations]
'total_durations': [],
'questions_answered': [],
'step_success_rates': defaultdict(lambda: {'success': 0, 'failed': 0})
}
# Progress file
PROGRESS_FILE = Path(__file__).parent.parent / "reports" / "load_test_progress.json"
PROGRESS_FILE.parent.mkdir(parents=True, exist_ok=True)
# Track completed students
completed_students = set()
class ProgressTracker:
"""Lightweight progress tracking and persistence"""
@staticmethod
def save_progress():
"""Save current progress to file"""
with progress_lock:
progress_data = {
'timestamp': datetime.now().isoformat(),
'results': results.copy(),
'performance': {
'step_times': {k: v[-100:] for k, v in performance_metrics['step_times'].items()}, # Last 100
'total_durations': performance_metrics['total_durations'][-100:], # Last 100
'questions_answered': performance_metrics['questions_answered'][-100:], # Last 100
'step_success_rates': dict(performance_metrics['step_success_rates'])
},
'completed_students': ProgressTracker.get_completed_students()
}
try:
with open(PROGRESS_FILE, 'w') as f:
json.dump(progress_data, f, indent=2)
except Exception as e:
print(f"⚠️ Error saving progress: {e}")
@staticmethod
def load_progress() -> Dict:
"""Load previous progress from file"""
if not PROGRESS_FILE.exists():
return {}
try:
with open(PROGRESS_FILE, 'r') as f:
return json.load(f)
except Exception as e:
print(f"⚠️ Error loading progress: {e}")
return {}
@staticmethod
def get_completed_students() -> List[str]:
"""Get list of completed student CPIDs"""
return list(completed_students)
@staticmethod
def print_real_time_metrics():
"""Print real-time performance metrics"""
with progress_lock:
if results['total'] == 0:
return
print("\n" + "=" * 80)
print("📊 REAL-TIME METRICS")
print("=" * 80)
# Overall stats
success_rate = (results['success'] / results['total']) * 100 if results['total'] > 0 else 0
print(f"✅ Success Rate: {success_rate:.1f}% ({results['success']}/{results['total']})")
# Average durations
if performance_metrics['total_durations']:
avg_duration = sum(performance_metrics['total_durations']) / len(performance_metrics['total_durations'])
print(f"⏱️ Average Duration: {avg_duration:.2f}s")
# Step performance
print("\n📈 Step Performance:")
for step, times in performance_metrics['step_times'].items():
if times:
avg_time = sum(times) / len(times)
success_rate = performance_metrics['step_success_rates'][step]
total_attempts = success_rate['success'] + success_rate['failed']
rate = (success_rate['success'] / total_attempts * 100) if total_attempts > 0 else 0
print(f" {step}: {avg_time:.2f}s avg, {rate:.1f}% success ({success_rate['success']}/{total_attempts})")
# Questions answered
if performance_metrics['questions_answered']:
avg_questions = sum(performance_metrics['questions_answered']) / len(performance_metrics['questions_answered'])
print(f"\n❓ Average Questions Answered: {avg_questions:.1f}")
# Estimated time remaining
if results['total'] > 0 and results['start_time']:
elapsed = time.time() - results['start_time']
rate = results['total'] / elapsed if elapsed > 0 else 0
print(f"\n⚡ Rate: {rate:.2f} students/second")
print("=" * 80)
class LoadTestStudent:
"""Handles end-to-end flow for a single student with performance tracking"""
def __init__(self, cpid: str, student_data: Dict, headless: bool = True):
self.cpid = cpid
self.student_data = student_data
self.headless = headless
self.driver = None
self.step_times = {}
self.actual_password_used = None # Track which password was actually used for login
self.result = {
'cpid': cpid,
'name': f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}",
'status': 'pending',
'steps_completed': [],
'step_times': {},
'error': None,
'duration': 0,
'questions_answered': 0
}
def _track_step(self, step_name: str, func):
"""Track step execution time and success/failure"""
start_time = time.time()
try:
result = func()
duration = time.time() - start_time
self.step_times[step_name] = duration
self.result['step_times'][step_name] = duration
# Update global metrics
with progress_lock:
performance_metrics['step_times'][step_name].append(duration)
if result:
performance_metrics['step_success_rates'][step_name]['success'] += 1
else:
performance_metrics['step_success_rates'][step_name]['failed'] += 1
return result
except Exception as e:
duration = time.time() - start_time
self.step_times[step_name] = duration
with progress_lock:
performance_metrics['step_times'][step_name].append(duration)
performance_metrics['step_success_rates'][step_name]['failed'] += 1
raise
def setup_driver(self, max_retries=3):
"""Create and configure WebDriver with retry logic for high concurrency"""
options = Options()
if self.headless:
options.add_argument('--headless=new')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1080')
# Additional stability options for high concurrency
options.add_argument('--disable-software-rasterizer')
options.add_argument('--disable-extensions')
options.add_argument('--disable-background-networking')
options.add_argument('--disable-sync')
options.add_argument('--disable-default-apps')
options.add_argument('--disable-background-timer-throttling')
options.add_argument('--disable-renderer-backgrounding')
options.add_argument('--disable-backgrounding-occluded-windows')
for attempt in range(max_retries):
try:
self.driver = webdriver.Chrome(options=options)
self.driver.implicitly_wait(5)
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2) # Wait before retry (longer for resource contention)
continue
self.result['error'] = f"Driver setup failed after {max_retries} attempts: {str(e)[:200]}"
return False
def step_login(self) -> bool:
"""Step 1: Login with smart password handling"""
return self._track_step('login', lambda: self._do_login())
def _do_login(self) -> bool:
"""
Login with explicit password strategy:
1. Try Excel password FIRST (from CSV)
2. If fails, fallback to Admin@123 (TEST_NEW_PASSWORD)
"""
login_page = LoginPage(self.driver)
login_page.navigate()
# Get Excel password from student data
excel_password = self.student_data.get('password')
if not excel_password:
# No password in CSV - use Admin@123 directly
print(f"⚠️ No Excel password found for {self.cpid}, using Admin@123")
excel_password = TEST_NEW_PASSWORD
# Strategy: Try Excel password first, then Admin@123
print(f"🔑 Trying Excel password first for {self.cpid}")
# Try Excel password first
login_page.enter_identifier(self.cpid)
login_page.enter_password(excel_password)
login_page.click_submit()
# Wait for result
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from config.config import SHORT_WAIT, BASE_URL, MEDIUM_WAIT
from selenium.webdriver.common.by import By
import time
# Wait for navigation or error
try:
WebDriverWait(self.driver, MEDIUM_WAIT).until(
lambda d: "/dashboard" in d.current_url
or "/student" in d.current_url
or "mandatory_reset" in d.page_source.lower()
or login_page.is_error_visible()
)
except:
pass
current_url = self.driver.current_url
is_on_login_page = (
current_url.rstrip("/") == BASE_URL.rstrip("/") or
current_url.rstrip("/") == f"{BASE_URL}/" or
"login" in current_url.lower()
)
# Determine which password was actually used
actual_password_used = excel_password
# If still on login page, Excel password failed - try Admin@123
if is_on_login_page and login_page.is_error_visible():
print(f"🔄 Excel password failed, trying Admin@123 (reset password)...")
# Clear form and retry with Admin@123
login_page.enter_identifier(self.cpid)
login_page.enter_password(TEST_NEW_PASSWORD)
login_page.click_submit()
# Wait for navigation or error
try:
WebDriverWait(self.driver, MEDIUM_WAIT).until(
lambda d: "/dashboard" in d.current_url
or "/student" in d.current_url
or "mandatory_reset" in d.page_source.lower()
or login_page.is_error_visible()
)
except:
pass
# Check if Admin@123 worked
current_url = self.driver.current_url
is_still_on_login = (
current_url.rstrip("/") == BASE_URL.rstrip("/") or
current_url.rstrip("/") == f"{BASE_URL}/" or
"login" in current_url.lower()
)
if is_still_on_login and login_page.is_error_visible():
# Both passwords failed
error_msg = login_page.get_error_message()
raise Exception(f"Login failed with both passwords. Excel password: {excel_password}, Admin@123: {TEST_NEW_PASSWORD}. Error: {error_msg}")
else:
# Admin@123 worked
actual_password_used = TEST_NEW_PASSWORD
# Store which password was actually used (for password reset step)
self.actual_password_used = actual_password_used
# Update password tracker if Admin@123 was used
if actual_password_used == TEST_NEW_PASSWORD:
password_tracker.update_password(self.cpid, TEST_NEW_PASSWORD)
SmartWaitOptimizer.smart_wait_for_dashboard(self.driver, self.cpid, actual_password_used)
self.result['steps_completed'].append('login')
return True
def step_password_reset(self) -> bool:
"""Step 2: Reset password if needed"""
return self._track_step('password_reset', lambda: self._do_password_reset())
def _do_password_reset(self) -> bool:
"""
Reset password if needed.
Uses the password that was actually used during login.
"""
reset_page = MandatoryResetPage(self.driver)
# Use the password that was actually used during login
# If Admin@123 was used, password is already reset - skip
if self.actual_password_used == TEST_NEW_PASSWORD:
# Password already reset (Admin@123 worked) - skip reset
self.result['steps_completed'].append('password_reset_skipped')
return True
# Excel password was used - check if reset is needed
if SmartWaitOptimizer.should_check_password_reset(self.cpid, self.actual_password_used):
if reset_page.is_modal_present():
# Use Excel password (the one that worked) for reset
current_password = self.actual_password_used or self.student_data.get('password')
reset_page.reset_password(
current_password=current_password,
new_password=TEST_NEW_PASSWORD,
confirm_password=TEST_NEW_PASSWORD,
student_cpid=self.cpid
)
# Update password tracker
password_tracker.update_password(self.cpid, TEST_NEW_PASSWORD)
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
self.result['steps_completed'].append('password_reset')
else:
self.result['steps_completed'].append('password_reset_skipped')
else:
self.result['steps_completed'].append('password_reset_skipped')
return True
def step_profile_completion(self) -> bool:
"""Step 3: Complete profile if needed"""
return self._track_step('profile_completion', lambda: self._do_profile_completion())
def _do_profile_completion(self) -> bool:
profile_incomplete = ProfileIncompletePage(self.driver)
if SmartWaitOptimizer.should_check_profile_incomplete(self.driver):
if profile_incomplete.is_modal_present():
profile_incomplete.click_complete()
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
profile_editor = ProfileEditorPage(self.driver)
profile_editor.wait_for_page_load()
profile_editor.complete_profile_to_100(student_cpid=self.cpid)
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
self.result['steps_completed'].append('profile_completion')
else:
self.result['steps_completed'].append('profile_completion_skipped')
else:
self.result['steps_completed'].append('profile_completion_skipped')
return True
def step_complete_domain_assessment(self) -> bool:
"""Step 4: Complete ONE domain assessment"""
return self._track_step('domain_assessment', lambda: self._do_domain_assessment())
def _do_domain_assessment(self) -> bool:
# Navigate to assessments
assessments_page = AssessmentsPage(self.driver)
assessments_page.navigate()
assessments_page.wait_for_page_load()
RandomizedWait.wait_for_page_load('navigation')
# Get first assessment
assessment_ids = assessments_page.get_assessment_ids()
if not assessment_ids:
raise Exception("No assessments available")
assessments_page.click_begin_assessment(assessment_ids[0])
RandomizedWait.wait_for_page_load('navigation')
# Navigate to domains
domains_page = DomainsPage(self.driver)
domains_page.wait_for_page_load()
RandomizedWait.wait_for_page_load('initial')
# Get first unlocked domain
domain_ids = domains_page.get_all_domain_ids()
if not domain_ids:
raise Exception("No domains available")
unlocked_domain_id = None
for domain_id in domain_ids:
if domains_page.is_domain_unlocked(domain_id):
unlocked_domain_id = domain_id
break
if not unlocked_domain_id:
raise Exception("No unlocked domains available")
domains_page.click_start_domain(unlocked_domain_id)
RandomizedWait.wait_for_page_load('initial')
# Get domain assessment page
domain_assessment_page = DomainAssessmentPage(self.driver)
domain_assessment_page.wait_for_page_load()
RandomizedWait.wait_for_page_load('initial')
# Dismiss modals
if domain_assessment_page.is_instructions_modal_present():
domain_assessment_page.dismiss_instructions_modal()
RandomizedWait.wait_for_page_load('modal')
domain_assessment_page.dismiss_guidance()
# Answer questions
question_helper = QuestionAnswerHelper(self.driver)
wait = WaitHelpers(self.driver)
max_questions = 100
questions_answered = 0
consecutive_failures = 0
max_consecutive_failures = 3
while questions_answered < max_questions:
try:
question_id = question_helper.get_question_id()
if not question_id:
consecutive_failures += 1
if consecutive_failures >= max_consecutive_failures:
break
try:
next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON)
if next_button.is_enabled():
next_button.click()
RandomizedWait.wait_for_navigation('next')
except:
pass
continue
consecutive_failures = 0
question_type = question_helper.get_question_type(question_id)
if not question_type:
try:
question_element = self.driver.find_element(
'css selector',
f"[data-testid='domain_question__{question_id}']"
)
self.driver.execute_script("arguments[0].scrollIntoView(true);", question_element)
time.sleep(1)
question_type = question_helper.get_question_type(question_id)
except:
pass
if not question_type:
try:
next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON)
if next_button.is_enabled():
next_button.click()
RandomizedWait.wait_for_navigation('next')
except:
pass
continue
try:
question_helper.answer_question(question_id=question_id, question_type=question_type)
RandomizedWait.wait_for_question_answer(question_type)
questions_answered += 1
except Exception as e:
print(f" ⚠️ Error answering question {question_id}: {e}")
# Check submit button
try:
submit_button = domain_assessment_page.find_element(domain_assessment_page.SUBMIT_BUTTON)
if submit_button.is_enabled():
submit_button.click()
RandomizedWait.wait_for_submission('submit')
break
except:
pass
# Click next
try:
next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON)
if next_button.is_enabled():
next_button.click()
RandomizedWait.wait_for_navigation('next')
except:
pass
except Exception as e:
consecutive_failures += 1
if consecutive_failures >= max_consecutive_failures:
break
# Handle submit modal
try:
if domain_assessment_page.is_submit_modal_present():
domain_assessment_page.confirm_submit()
RandomizedWait.wait_for_submission('confirm')
except:
pass
# Handle feedback
try:
feedback_page = DomainFeedbackPage(self.driver)
if feedback_page.is_modal_present():
feedback_page.submit_feedback(
question1_answer='yes',
question1_justification='Automated test response',
question2_answer='This is an automated test response for load testing purposes.'
)
RandomizedWait.wait_for_submission('feedback')
except Exception as e:
print(f" ⚠️ Error handling feedback: {e}")
self.result['steps_completed'].append('domain_assessment')
self.result['questions_answered'] = questions_answered
# Update global metrics
with progress_lock:
performance_metrics['questions_answered'].append(questions_answered)
return True
def run(self) -> Dict:
"""Run complete end-to-end flow for this student"""
start_time = time.time()
try:
if not self.setup_driver():
self.result['status'] = 'failed'
return self.result
if not self.step_login():
self.result['status'] = 'failed'
return self.result
if not self.step_password_reset():
self.result['status'] = 'failed'
return self.result
if not self.step_profile_completion():
self.result['status'] = 'failed'
return self.result
if not self.step_complete_domain_assessment():
self.result['status'] = 'failed'
return self.result
self.result['status'] = 'success'
except Exception as e:
self.result['status'] = 'failed'
self.result['error'] = f"Unexpected error: {str(e)}"
finally:
if self.driver:
try:
self.driver.quit()
except:
pass
self.result['duration'] = time.time() - start_time
# Update global metrics
with progress_lock:
performance_metrics['total_durations'].append(self.result['duration'])
return self.result
def load_students_from_csv(csv_path: str) -> List[Dict]:
"""Load students from CSV file"""
students = []
try:
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
cpid = (
row.get('Student CPID') or
row.get('student_cpid') or
row.get('Student_CPID') or
row.get('cpid') or
row.get('CPID') or
None
)
if not cpid:
continue
password = (
row.get('Password') or
row.get('password') or
row.get('PASSWORD') or
None
)
students.append({
'cpid': cpid.strip(),
'data': {
**row,
'password': password.strip() if password else None
}
})
except Exception as e:
print(f"❌ Error loading CSV: {e}")
import traceback
traceback.print_exc()
return []
return students
def run_student_flow(student_info: Dict, headless: bool = True, save_interval: int = 5) -> Dict:
"""Run flow for a single student"""
cpid = student_info['cpid']
student_data = student_info['data']
student_name = f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}".strip()
if not student_name:
student_name = cpid
print(f"🚀 Starting: {cpid} ({student_name})")
student_flow = LoadTestStudent(cpid, student_data, headless=headless)
result = student_flow.run()
# Update global results
with results_lock:
results['total'] += 1
results['last_update'] = datetime.now().isoformat()
if result['status'] == 'success':
results['success'] += 1
completed_students.add(cpid)
elif result['status'] == 'failed':
results['failed'] += 1
results['errors'].append({
'cpid': cpid,
'error': result.get('error', 'Unknown error')
})
else:
results['skipped'] += 1
completed_students.add(cpid) # Also track skipped as completed
# Save progress periodically
if results['total'] % save_interval == 0:
ProgressTracker.save_progress()
ProgressTracker.print_real_time_metrics()
status_emoji = '' if result['status'] == 'success' else ''
print(f"{status_emoji} Completed: {cpid} ({student_name}) - {result['status']} - {result['duration']:.2f}s")
return result
def main():
parser = argparse.ArgumentParser(description='Advanced Load Testing Script - End-to-End Assessment Flow')
parser.add_argument('--students', type=int, required=True, help='Number of students to test')
parser.add_argument('--csv', type=str, required=True, help='Path to CSV file with student data')
parser.add_argument('--concurrent', type=int, default=5, help='Number of concurrent students (default: 5)')
parser.add_argument('--headless', action='store_true', default=True, help='Run in headless mode (default: True)')
parser.add_argument('--no-headless', dest='headless', action='store_false', help='Run in visible mode')
parser.add_argument('--max-visible', type=int, default=2, help='Max visible browsers when not headless (default: 2)')
parser.add_argument('--save-interval', type=int, default=5, help='Save progress every N students (default: 5)')
parser.add_argument('--resume', action='store_true', help='Resume from previous progress')
parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students (default: 10)')
args = parser.parse_args()
print("=" * 80)
print("ADVANCED LOAD TESTING - END-TO-END ASSESSMENT FLOW")
print("=" * 80)
print(f"📊 Configuration:")
print(f" Total Students: {args.students}")
print(f" Concurrent: {args.concurrent}")
print(f" Headless: {args.headless}")
if not args.headless:
print(f" Max Visible Browsers: {args.max_visible}")
print(f" Save Interval: Every {args.save_interval} students")
print(f" Metrics Interval: Every {args.metrics_interval} students")
print(f" Resume: {args.resume}")
print(f" CSV File: {args.csv}")
print("=" * 80)
# Load students
all_students = load_students_from_csv(args.csv)
if not all_students:
print("❌ No students loaded from CSV")
return
# Handle resume
if args.resume:
progress = ProgressTracker.load_progress()
if progress:
print(f"📂 Resuming from previous progress...")
completed = set(progress.get('completed_students', []))
completed_students.update(completed)
all_students = [s for s in all_students if s['cpid'] not in completed]
print(f" Skipping {len(completed)} already completed students")
# Restore results from progress
if 'results' in progress:
results.update(progress['results'])
results['start_time'] = time.time() # Reset start time
# Limit to requested number
students_to_test = all_students[:args.students]
print(f"\n📋 Loaded {len(students_to_test)} students from CSV")
# Load student data manager
try:
student_data_manager.load_students_from_csv(args.csv)
except:
pass
# Initialize results
results['start_time'] = time.time()
# Smart browser management for visible mode
if not args.headless:
# Limit concurrent visible browsers
actual_concurrent = min(args.concurrent, args.max_visible)
print(f"⚠️ Visible mode: Limiting concurrent browsers to {actual_concurrent} (to prevent crashes)")
else:
actual_concurrent = args.concurrent
# Warning for high concurrency in headless mode
if actual_concurrent > 50:
print(f"⚠️ WARNING: Running {actual_concurrent} concurrent browsers in headless mode")
print(f" This may cause system stress. Monitor system resources.")
print(f" If crashes occur, reduce --concurrent to 20-50")
print(f"\n🚀 Starting load test with {actual_concurrent} concurrent students...")
print("=" * 80)
# Run load test
with ThreadPoolExecutor(max_workers=actual_concurrent) as executor:
futures = {
executor.submit(run_student_flow, student, args.headless, args.save_interval): student
for student in students_to_test
}
completed_count = 0
for future in as_completed(futures):
try:
result = future.result()
completed_count += 1
# Print metrics periodically
if completed_count % args.metrics_interval == 0:
ProgressTracker.print_real_time_metrics()
except Exception as e:
print(f"❌ Exception in thread: {e}")
# Final save and summary
ProgressTracker.save_progress()
total_duration = time.time() - results['start_time']
print("\n" + "=" * 80)
print("FINAL SUMMARY")
print("=" * 80)
print(f"✅ Success: {results['success']}/{results['total']}")
print(f"❌ Failed: {results['failed']}/{results['total']}")
print(f"⏭️ Skipped: {results['skipped']}/{results['total']}")
print(f"⏱️ Total Duration: {total_duration:.2f}s ({total_duration/60:.2f} minutes)")
if results['total'] > 0:
print(f"📊 Average per Student: {total_duration/results['total']:.2f}s")
print(f"⚡ Rate: {results['total']/total_duration:.3f} students/second")
ProgressTracker.print_real_time_metrics()
if results['errors']:
print(f"\n❌ Errors ({len(results['errors'])}):")
for error in results['errors'][:10]:
print(f" - {error['cpid']}: {error['error'][:100]}")
if len(results['errors']) > 10:
print(f" ... and {len(results['errors']) - 10} more")
print(f"\n💾 Progress saved to: {PROGRESS_FILE}")
print("=" * 80)
if __name__ == "__main__":
main()