""" Advanced Load Testing Script - End-to-End Assessment Flow World-class load testing with: - Smart browser management (prevents system crashes) - Progress persistence (resume from checkpoint) - Real-time performance tracking - Lightweight metrics collection - Advanced error handling Usage: python scripts/load_test_e2e_assessment_advanced.py --students 100 --csv students.csv Features: - Progress persistence (save every N students) - Resume capability (skip completed students) - Real-time metrics dashboard - Smart browser limiting (visible mode) - Performance analytics - Resource monitoring """ import sys import argparse import csv import json import time import threading from pathlib import Path from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Optional from collections import defaultdict # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import WebDriverException, TimeoutException from pages.login_page import LoginPage from pages.mandatory_reset_page import MandatoryResetPage from pages.profile_incomplete_page import ProfileIncompletePage from pages.profile_editor_page import ProfileEditorPage from pages.assessments_page import AssessmentsPage from pages.domains_page import DomainsPage from pages.domain_assessment_page import DomainAssessmentPage from pages.domain_feedback_page import DomainFeedbackPage from utils.question_answer_helper import QuestionAnswerHelper from utils.password_tracker import password_tracker from utils.student_data_manager import student_data_manager from utils.smart_wait_optimizer import SmartWaitOptimizer from utils.randomized_wait import RandomizedWait from utils.wait_helpers import WaitHelpers from config.config import BASE_URL, TEST_NEW_PASSWORD # Global tracking with locks results_lock = threading.Lock() progress_lock = threading.Lock() results = { 'total': 0, 'success': 0, 'failed': 0, 'skipped': 0, 'errors': [], 'start_time': None, 'last_update': None } # Performance metrics performance_metrics = { 'step_times': defaultdict(list), # step -> [durations] 'total_durations': [], 'questions_answered': [], 'step_success_rates': defaultdict(lambda: {'success': 0, 'failed': 0}) } # Progress file PROGRESS_FILE = Path(__file__).parent.parent / "reports" / "load_test_progress.json" PROGRESS_FILE.parent.mkdir(parents=True, exist_ok=True) # Track completed students completed_students = set() class ProgressTracker: """Lightweight progress tracking and persistence""" @staticmethod def save_progress(): """Save current progress to file""" with progress_lock: progress_data = { 'timestamp': datetime.now().isoformat(), 'results': results.copy(), 'performance': { 'step_times': {k: v[-100:] for k, v in performance_metrics['step_times'].items()}, # Last 100 'total_durations': performance_metrics['total_durations'][-100:], # Last 100 'questions_answered': performance_metrics['questions_answered'][-100:], # Last 100 'step_success_rates': dict(performance_metrics['step_success_rates']) }, 'completed_students': ProgressTracker.get_completed_students() } try: with open(PROGRESS_FILE, 'w') as f: json.dump(progress_data, f, indent=2) except Exception as e: print(f"āš ļø Error saving progress: {e}") @staticmethod def load_progress() -> Dict: """Load previous progress from file""" if not PROGRESS_FILE.exists(): return {} try: with open(PROGRESS_FILE, 'r') as f: return json.load(f) except Exception as e: print(f"āš ļø Error loading progress: {e}") return {} @staticmethod def get_completed_students() -> List[str]: """Get list of completed student CPIDs""" return list(completed_students) @staticmethod def print_real_time_metrics(): """Print real-time performance metrics""" with progress_lock: if results['total'] == 0: return print("\n" + "=" * 80) print("šŸ“Š REAL-TIME METRICS") print("=" * 80) # Overall stats success_rate = (results['success'] / results['total']) * 100 if results['total'] > 0 else 0 print(f"āœ… Success Rate: {success_rate:.1f}% ({results['success']}/{results['total']})") # Average durations if performance_metrics['total_durations']: avg_duration = sum(performance_metrics['total_durations']) / len(performance_metrics['total_durations']) print(f"ā±ļø Average Duration: {avg_duration:.2f}s") # Step performance print("\nšŸ“ˆ Step Performance:") for step, times in performance_metrics['step_times'].items(): if times: avg_time = sum(times) / len(times) success_rate = performance_metrics['step_success_rates'][step] total_attempts = success_rate['success'] + success_rate['failed'] rate = (success_rate['success'] / total_attempts * 100) if total_attempts > 0 else 0 print(f" {step}: {avg_time:.2f}s avg, {rate:.1f}% success ({success_rate['success']}/{total_attempts})") # Questions answered if performance_metrics['questions_answered']: avg_questions = sum(performance_metrics['questions_answered']) / len(performance_metrics['questions_answered']) print(f"\nā“ Average Questions Answered: {avg_questions:.1f}") # Estimated time remaining if results['total'] > 0 and results['start_time']: elapsed = time.time() - results['start_time'] rate = results['total'] / elapsed if elapsed > 0 else 0 print(f"\n⚔ Rate: {rate:.2f} students/second") print("=" * 80) class LoadTestStudent: """Handles end-to-end flow for a single student with performance tracking""" def __init__(self, cpid: str, student_data: Dict, headless: bool = True): self.cpid = cpid self.student_data = student_data self.headless = headless self.driver = None self.step_times = {} self.actual_password_used = None # Track which password was actually used for login self.result = { 'cpid': cpid, 'name': f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}", 'status': 'pending', 'steps_completed': [], 'step_times': {}, 'error': None, 'duration': 0, 'questions_answered': 0 } def _track_step(self, step_name: str, func): """Track step execution time and success/failure""" start_time = time.time() try: result = func() duration = time.time() - start_time self.step_times[step_name] = duration self.result['step_times'][step_name] = duration # Update global metrics with progress_lock: performance_metrics['step_times'][step_name].append(duration) if result: performance_metrics['step_success_rates'][step_name]['success'] += 1 else: performance_metrics['step_success_rates'][step_name]['failed'] += 1 return result except Exception as e: duration = time.time() - start_time self.step_times[step_name] = duration with progress_lock: performance_metrics['step_times'][step_name].append(duration) performance_metrics['step_success_rates'][step_name]['failed'] += 1 raise def setup_driver(self, max_retries=3): """Create and configure WebDriver with retry logic for high concurrency""" options = Options() if self.headless: options.add_argument('--headless=new') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') options.add_argument('--window-size=1920,1080') # Additional stability options for high concurrency options.add_argument('--disable-software-rasterizer') options.add_argument('--disable-extensions') options.add_argument('--disable-background-networking') options.add_argument('--disable-sync') options.add_argument('--disable-default-apps') options.add_argument('--disable-background-timer-throttling') options.add_argument('--disable-renderer-backgrounding') options.add_argument('--disable-backgrounding-occluded-windows') for attempt in range(max_retries): try: self.driver = webdriver.Chrome(options=options) self.driver.implicitly_wait(5) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2) # Wait before retry (longer for resource contention) continue self.result['error'] = f"Driver setup failed after {max_retries} attempts: {str(e)[:200]}" return False def step_login(self) -> bool: """Step 1: Login with smart password handling""" return self._track_step('login', lambda: self._do_login()) def _do_login(self) -> bool: """ Login with explicit password strategy: 1. Try Excel password FIRST (from CSV) 2. If fails, fallback to Admin@123 (TEST_NEW_PASSWORD) """ login_page = LoginPage(self.driver) login_page.navigate() # Get Excel password from student data excel_password = self.student_data.get('password') if not excel_password: # No password in CSV - use Admin@123 directly print(f"āš ļø No Excel password found for {self.cpid}, using Admin@123") excel_password = TEST_NEW_PASSWORD # Strategy: Try Excel password first, then Admin@123 print(f"šŸ”‘ Trying Excel password first for {self.cpid}") # Try Excel password first login_page.enter_identifier(self.cpid) login_page.enter_password(excel_password) login_page.click_submit() # Wait for result from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from config.config import SHORT_WAIT, BASE_URL, MEDIUM_WAIT from selenium.webdriver.common.by import By import time # Wait for navigation or error try: WebDriverWait(self.driver, MEDIUM_WAIT).until( lambda d: "/dashboard" in d.current_url or "/student" in d.current_url or "mandatory_reset" in d.page_source.lower() or login_page.is_error_visible() ) except: pass current_url = self.driver.current_url is_on_login_page = ( current_url.rstrip("/") == BASE_URL.rstrip("/") or current_url.rstrip("/") == f"{BASE_URL}/" or "login" in current_url.lower() ) # Determine which password was actually used actual_password_used = excel_password # If still on login page, Excel password failed - try Admin@123 if is_on_login_page and login_page.is_error_visible(): print(f"šŸ”„ Excel password failed, trying Admin@123 (reset password)...") # Clear form and retry with Admin@123 login_page.enter_identifier(self.cpid) login_page.enter_password(TEST_NEW_PASSWORD) login_page.click_submit() # Wait for navigation or error try: WebDriverWait(self.driver, MEDIUM_WAIT).until( lambda d: "/dashboard" in d.current_url or "/student" in d.current_url or "mandatory_reset" in d.page_source.lower() or login_page.is_error_visible() ) except: pass # Check if Admin@123 worked current_url = self.driver.current_url is_still_on_login = ( current_url.rstrip("/") == BASE_URL.rstrip("/") or current_url.rstrip("/") == f"{BASE_URL}/" or "login" in current_url.lower() ) if is_still_on_login and login_page.is_error_visible(): # Both passwords failed error_msg = login_page.get_error_message() raise Exception(f"Login failed with both passwords. Excel password: {excel_password}, Admin@123: {TEST_NEW_PASSWORD}. Error: {error_msg}") else: # Admin@123 worked actual_password_used = TEST_NEW_PASSWORD # Store which password was actually used (for password reset step) self.actual_password_used = actual_password_used # Update password tracker if Admin@123 was used if actual_password_used == TEST_NEW_PASSWORD: password_tracker.update_password(self.cpid, TEST_NEW_PASSWORD) SmartWaitOptimizer.smart_wait_for_dashboard(self.driver, self.cpid, actual_password_used) self.result['steps_completed'].append('login') return True def step_password_reset(self) -> bool: """Step 2: Reset password if needed""" return self._track_step('password_reset', lambda: self._do_password_reset()) def _do_password_reset(self) -> bool: """ Reset password if needed. Uses the password that was actually used during login. """ reset_page = MandatoryResetPage(self.driver) # Use the password that was actually used during login # If Admin@123 was used, password is already reset - skip if self.actual_password_used == TEST_NEW_PASSWORD: # Password already reset (Admin@123 worked) - skip reset self.result['steps_completed'].append('password_reset_skipped') return True # Excel password was used - check if reset is needed if SmartWaitOptimizer.should_check_password_reset(self.cpid, self.actual_password_used): if reset_page.is_modal_present(): # Use Excel password (the one that worked) for reset current_password = self.actual_password_used or self.student_data.get('password') reset_page.reset_password( current_password=current_password, new_password=TEST_NEW_PASSWORD, confirm_password=TEST_NEW_PASSWORD, student_cpid=self.cpid ) # Update password tracker password_tracker.update_password(self.cpid, TEST_NEW_PASSWORD) time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) self.result['steps_completed'].append('password_reset') else: self.result['steps_completed'].append('password_reset_skipped') else: self.result['steps_completed'].append('password_reset_skipped') return True def step_profile_completion(self) -> bool: """Step 3: Complete profile if needed""" return self._track_step('profile_completion', lambda: self._do_profile_completion()) def _do_profile_completion(self) -> bool: profile_incomplete = ProfileIncompletePage(self.driver) if SmartWaitOptimizer.should_check_profile_incomplete(self.driver): if profile_incomplete.is_modal_present(): profile_incomplete.click_complete() time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) profile_editor = ProfileEditorPage(self.driver) profile_editor.wait_for_page_load() profile_editor.complete_profile_to_100(student_cpid=self.cpid) time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) self.result['steps_completed'].append('profile_completion') else: self.result['steps_completed'].append('profile_completion_skipped') else: self.result['steps_completed'].append('profile_completion_skipped') return True def step_complete_domain_assessment(self) -> bool: """Step 4: Complete ONE domain assessment""" return self._track_step('domain_assessment', lambda: self._do_domain_assessment()) def _do_domain_assessment(self) -> bool: # Navigate to assessments assessments_page = AssessmentsPage(self.driver) assessments_page.navigate() assessments_page.wait_for_page_load() RandomizedWait.wait_for_page_load('navigation') # Get first assessment assessment_ids = assessments_page.get_assessment_ids() if not assessment_ids: raise Exception("No assessments available") assessments_page.click_begin_assessment(assessment_ids[0]) RandomizedWait.wait_for_page_load('navigation') # Navigate to domains domains_page = DomainsPage(self.driver) domains_page.wait_for_page_load() RandomizedWait.wait_for_page_load('initial') # Get first unlocked domain domain_ids = domains_page.get_all_domain_ids() if not domain_ids: raise Exception("No domains available") unlocked_domain_id = None for domain_id in domain_ids: if domains_page.is_domain_unlocked(domain_id): unlocked_domain_id = domain_id break if not unlocked_domain_id: raise Exception("No unlocked domains available") domains_page.click_start_domain(unlocked_domain_id) RandomizedWait.wait_for_page_load('initial') # Get domain assessment page domain_assessment_page = DomainAssessmentPage(self.driver) domain_assessment_page.wait_for_page_load() RandomizedWait.wait_for_page_load('initial') # Dismiss modals if domain_assessment_page.is_instructions_modal_present(): domain_assessment_page.dismiss_instructions_modal() RandomizedWait.wait_for_page_load('modal') domain_assessment_page.dismiss_guidance() # Answer questions question_helper = QuestionAnswerHelper(self.driver) wait = WaitHelpers(self.driver) max_questions = 100 questions_answered = 0 consecutive_failures = 0 max_consecutive_failures = 3 while questions_answered < max_questions: try: question_id = question_helper.get_question_id() if not question_id: consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: break try: next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON) if next_button.is_enabled(): next_button.click() RandomizedWait.wait_for_navigation('next') except: pass continue consecutive_failures = 0 question_type = question_helper.get_question_type(question_id) if not question_type: try: question_element = self.driver.find_element( 'css selector', f"[data-testid='domain_question__{question_id}']" ) self.driver.execute_script("arguments[0].scrollIntoView(true);", question_element) time.sleep(1) question_type = question_helper.get_question_type(question_id) except: pass if not question_type: try: next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON) if next_button.is_enabled(): next_button.click() RandomizedWait.wait_for_navigation('next') except: pass continue try: question_helper.answer_question(question_id=question_id, question_type=question_type) RandomizedWait.wait_for_question_answer(question_type) questions_answered += 1 except Exception as e: print(f" āš ļø Error answering question {question_id}: {e}") # Check submit button try: submit_button = domain_assessment_page.find_element(domain_assessment_page.SUBMIT_BUTTON) if submit_button.is_enabled(): submit_button.click() RandomizedWait.wait_for_submission('submit') break except: pass # Click next try: next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON) if next_button.is_enabled(): next_button.click() RandomizedWait.wait_for_navigation('next') except: pass except Exception as e: consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: break # Handle submit modal try: if domain_assessment_page.is_submit_modal_present(): domain_assessment_page.confirm_submit() RandomizedWait.wait_for_submission('confirm') except: pass # Handle feedback try: feedback_page = DomainFeedbackPage(self.driver) if feedback_page.is_modal_present(): feedback_page.submit_feedback( question1_answer='yes', question1_justification='Automated test response', question2_answer='This is an automated test response for load testing purposes.' ) RandomizedWait.wait_for_submission('feedback') except Exception as e: print(f" āš ļø Error handling feedback: {e}") self.result['steps_completed'].append('domain_assessment') self.result['questions_answered'] = questions_answered # Update global metrics with progress_lock: performance_metrics['questions_answered'].append(questions_answered) return True def run(self) -> Dict: """Run complete end-to-end flow for this student""" start_time = time.time() try: if not self.setup_driver(): self.result['status'] = 'failed' return self.result if not self.step_login(): self.result['status'] = 'failed' return self.result if not self.step_password_reset(): self.result['status'] = 'failed' return self.result if not self.step_profile_completion(): self.result['status'] = 'failed' return self.result if not self.step_complete_domain_assessment(): self.result['status'] = 'failed' return self.result self.result['status'] = 'success' except Exception as e: self.result['status'] = 'failed' self.result['error'] = f"Unexpected error: {str(e)}" finally: if self.driver: try: self.driver.quit() except: pass self.result['duration'] = time.time() - start_time # Update global metrics with progress_lock: performance_metrics['total_durations'].append(self.result['duration']) return self.result def load_students_from_csv(csv_path: str) -> List[Dict]: """Load students from CSV file""" students = [] try: with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: cpid = ( row.get('Student CPID') or row.get('student_cpid') or row.get('Student_CPID') or row.get('cpid') or row.get('CPID') or None ) if not cpid: continue password = ( row.get('Password') or row.get('password') or row.get('PASSWORD') or None ) students.append({ 'cpid': cpid.strip(), 'data': { **row, 'password': password.strip() if password else None } }) except Exception as e: print(f"āŒ Error loading CSV: {e}") import traceback traceback.print_exc() return [] return students def run_student_flow(student_info: Dict, headless: bool = True, save_interval: int = 5) -> Dict: """Run flow for a single student""" cpid = student_info['cpid'] student_data = student_info['data'] student_name = f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}".strip() if not student_name: student_name = cpid print(f"šŸš€ Starting: {cpid} ({student_name})") student_flow = LoadTestStudent(cpid, student_data, headless=headless) result = student_flow.run() # Update global results with results_lock: results['total'] += 1 results['last_update'] = datetime.now().isoformat() if result['status'] == 'success': results['success'] += 1 completed_students.add(cpid) elif result['status'] == 'failed': results['failed'] += 1 results['errors'].append({ 'cpid': cpid, 'error': result.get('error', 'Unknown error') }) else: results['skipped'] += 1 completed_students.add(cpid) # Also track skipped as completed # Save progress periodically if results['total'] % save_interval == 0: ProgressTracker.save_progress() ProgressTracker.print_real_time_metrics() status_emoji = 'āœ…' if result['status'] == 'success' else 'āŒ' print(f"{status_emoji} Completed: {cpid} ({student_name}) - {result['status']} - {result['duration']:.2f}s") return result def main(): parser = argparse.ArgumentParser(description='Advanced Load Testing Script - End-to-End Assessment Flow') parser.add_argument('--students', type=int, required=True, help='Number of students to test') parser.add_argument('--csv', type=str, required=True, help='Path to CSV file with student data') parser.add_argument('--concurrent', type=int, default=5, help='Number of concurrent students (default: 5)') parser.add_argument('--headless', action='store_true', default=True, help='Run in headless mode (default: True)') parser.add_argument('--no-headless', dest='headless', action='store_false', help='Run in visible mode') parser.add_argument('--max-visible', type=int, default=2, help='Max visible browsers when not headless (default: 2)') parser.add_argument('--save-interval', type=int, default=5, help='Save progress every N students (default: 5)') parser.add_argument('--resume', action='store_true', help='Resume from previous progress') parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students (default: 10)') args = parser.parse_args() print("=" * 80) print("ADVANCED LOAD TESTING - END-TO-END ASSESSMENT FLOW") print("=" * 80) print(f"šŸ“Š Configuration:") print(f" Total Students: {args.students}") print(f" Concurrent: {args.concurrent}") print(f" Headless: {args.headless}") if not args.headless: print(f" Max Visible Browsers: {args.max_visible}") print(f" Save Interval: Every {args.save_interval} students") print(f" Metrics Interval: Every {args.metrics_interval} students") print(f" Resume: {args.resume}") print(f" CSV File: {args.csv}") print("=" * 80) # Load students all_students = load_students_from_csv(args.csv) if not all_students: print("āŒ No students loaded from CSV") return # Handle resume if args.resume: progress = ProgressTracker.load_progress() if progress: print(f"šŸ“‚ Resuming from previous progress...") completed = set(progress.get('completed_students', [])) completed_students.update(completed) all_students = [s for s in all_students if s['cpid'] not in completed] print(f" Skipping {len(completed)} already completed students") # Restore results from progress if 'results' in progress: results.update(progress['results']) results['start_time'] = time.time() # Reset start time # Limit to requested number students_to_test = all_students[:args.students] print(f"\nšŸ“‹ Loaded {len(students_to_test)} students from CSV") # Load student data manager try: student_data_manager.load_students_from_csv(args.csv) except: pass # Initialize results results['start_time'] = time.time() # Smart browser management for visible mode if not args.headless: # Limit concurrent visible browsers actual_concurrent = min(args.concurrent, args.max_visible) print(f"āš ļø Visible mode: Limiting concurrent browsers to {actual_concurrent} (to prevent crashes)") else: actual_concurrent = args.concurrent # Warning for high concurrency in headless mode if actual_concurrent > 50: print(f"āš ļø WARNING: Running {actual_concurrent} concurrent browsers in headless mode") print(f" This may cause system stress. Monitor system resources.") print(f" If crashes occur, reduce --concurrent to 20-50") print(f"\nšŸš€ Starting load test with {actual_concurrent} concurrent students...") print("=" * 80) # Run load test with ThreadPoolExecutor(max_workers=actual_concurrent) as executor: futures = { executor.submit(run_student_flow, student, args.headless, args.save_interval): student for student in students_to_test } completed_count = 0 for future in as_completed(futures): try: result = future.result() completed_count += 1 # Print metrics periodically if completed_count % args.metrics_interval == 0: ProgressTracker.print_real_time_metrics() except Exception as e: print(f"āŒ Exception in thread: {e}") # Final save and summary ProgressTracker.save_progress() total_duration = time.time() - results['start_time'] print("\n" + "=" * 80) print("FINAL SUMMARY") print("=" * 80) print(f"āœ… Success: {results['success']}/{results['total']}") print(f"āŒ Failed: {results['failed']}/{results['total']}") print(f"ā­ļø Skipped: {results['skipped']}/{results['total']}") print(f"ā±ļø Total Duration: {total_duration:.2f}s ({total_duration/60:.2f} minutes)") if results['total'] > 0: print(f"šŸ“Š Average per Student: {total_duration/results['total']:.2f}s") print(f"⚔ Rate: {results['total']/total_duration:.3f} students/second") ProgressTracker.print_real_time_metrics() if results['errors']: print(f"\nāŒ Errors ({len(results['errors'])}):") for error in results['errors'][:10]: print(f" - {error['cpid']}: {error['error'][:100]}") if len(results['errors']) > 10: print(f" ... and {len(results['errors']) - 10} more") print(f"\nšŸ’¾ Progress saved to: {PROGRESS_FILE}") print("=" * 80) if __name__ == "__main__": main()