""" Load Testing Script - End-to-End Assessment Flow This script performs load testing by running multiple students through the complete end-to-end flow simultaneously: 1. Login (with smart password handling) 2. Password reset (if needed) 3. Profile completion (if needed) 4. Complete ONE domain assessment (100% reliable flow) Usage: python scripts/load_test_e2e_assessment.py --students 10 --csv students.csv Features: - Concurrent execution (threading/multiprocessing) - Progress tracking and reporting - Error handling and recovery - Detailed logging - CSV input support - Configurable concurrency """ import sys import argparse import csv import time import threading from pathlib import Path from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Optional # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import WebDriverException, TimeoutException from pages.login_page import LoginPage from pages.mandatory_reset_page import MandatoryResetPage from pages.profile_incomplete_page import ProfileIncompletePage from pages.profile_editor_page import ProfileEditorPage from pages.assessments_page import AssessmentsPage from pages.domains_page import DomainsPage from pages.domain_assessment_page import DomainAssessmentPage from pages.domain_feedback_page import DomainFeedbackPage from utils.question_answer_helper import QuestionAnswerHelper from utils.password_tracker import password_tracker from utils.student_data_manager import student_data_manager from utils.smart_wait_optimizer import SmartWaitOptimizer from utils.randomized_wait import RandomizedWait from utils.wait_helpers import WaitHelpers from config.config import BASE_URL, TEST_NEW_PASSWORD # Global tracking results_lock = threading.Lock() results = { 'total': 0, 'success': 0, 'failed': 0, 'skipped': 0, 'errors': [] } class LoadTestStudent: """Handles end-to-end flow for a single student""" def __init__(self, cpid: str, student_data: Dict, headless: bool = True): self.cpid = cpid self.student_data = student_data self.headless = headless self.driver = None self.result = { 'cpid': cpid, 'name': f"{student_data.get('first_name', '')} {student_data.get('last_name', '')}", 'status': 'pending', 'steps_completed': [], 'error': None, 'duration': 0 } def setup_driver(self, max_retries=3): """Create and configure WebDriver with retry logic""" options = Options() if self.headless: options.add_argument('--headless=new') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') options.add_argument('--window-size=1920,1080') # Additional stability options for high concurrency options.add_argument('--disable-software-rasterizer') options.add_argument('--disable-extensions') options.add_argument('--disable-background-networking') options.add_argument('--disable-sync') options.add_argument('--disable-default-apps') for attempt in range(max_retries): try: self.driver = webdriver.Chrome(options=options) self.driver.implicitly_wait(5) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2) # Wait before retry (longer wait for resource contention) continue self.result['error'] = f"Driver setup failed after {max_retries} attempts: {str(e)[:200]}" return False def step_login(self) -> bool: """Step 1: Login with smart password handling""" try: login_page = LoginPage(self.driver) login_page.navigate() # Determine password to use tracked_password = password_tracker.get_password( self.cpid, self.student_data.get('password') ) password_to_use = tracked_password if tracked_password != self.student_data.get('password') else None # Login login_page.login(identifier=self.cpid, password=password_to_use) # Wait for dashboard actual_password_used = tracked_password if tracked_password != self.student_data.get('password') else TEST_NEW_PASSWORD SmartWaitOptimizer.smart_wait_for_dashboard(self.driver, self.cpid, actual_password_used) self.result['steps_completed'].append('login') return True except Exception as e: self.result['error'] = f"Login failed: {str(e)}" return False def step_password_reset(self) -> bool: """Step 2: Reset password if needed""" try: reset_page = MandatoryResetPage(self.driver) # Smart check: Only check if password wasn't already reset tracked_password = password_tracker.get_password( self.cpid, self.student_data.get('password') ) actual_password_used = tracked_password if tracked_password != self.student_data.get('password') else TEST_NEW_PASSWORD if SmartWaitOptimizer.should_check_password_reset(self.cpid, actual_password_used): if reset_page.is_modal_present(): current_password = password_tracker.get_password(self.cpid, self.student_data.get('password')) reset_page.reset_password( current_password=current_password, new_password=TEST_NEW_PASSWORD, confirm_password=TEST_NEW_PASSWORD, student_cpid=self.cpid ) time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) self.result['steps_completed'].append('password_reset') else: self.result['steps_completed'].append('password_reset_skipped') return True except Exception as e: self.result['error'] = f"Password reset failed: {str(e)}" return False def step_profile_completion(self) -> bool: """Step 3: Complete profile if needed""" try: profile_incomplete = ProfileIncompletePage(self.driver) # Smart check: Only check if profile might be incomplete if SmartWaitOptimizer.should_check_profile_incomplete(self.driver): if profile_incomplete.is_modal_present(): profile_incomplete.click_complete() time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) # Complete profile to 100% profile_editor = ProfileEditorPage(self.driver) profile_editor.wait_for_page_load() profile_editor.complete_profile_to_100(student_cpid=self.cpid) time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) self.result['steps_completed'].append('profile_completion') else: self.result['steps_completed'].append('profile_completion_skipped') else: self.result['steps_completed'].append('profile_completion_skipped') return True except Exception as e: self.result['error'] = f"Profile completion failed: {str(e)}" return False def step_complete_domain_assessment(self) -> bool: """Step 4: Complete ONE domain assessment (100% reliable flow)""" try: # Navigate to assessments page assessments_page = AssessmentsPage(self.driver) assessments_page.navigate() assessments_page.wait_for_page_load() RandomizedWait.wait_for_page_load('navigation') # Get first assessment assessment_ids = assessments_page.get_assessment_ids() if not assessment_ids: self.result['error'] = "No assessments available" return False # Start first assessment assessments_page.click_begin_assessment(assessment_ids[0]) RandomizedWait.wait_for_page_load('navigation') # Navigate to domains page domains_page = DomainsPage(self.driver) domains_page.wait_for_page_load() RandomizedWait.wait_for_page_load('initial') # Get first unlocked domain domain_ids = domains_page.get_all_domain_ids() if not domain_ids: self.result['error'] = "No domains available" return False unlocked_domain_id = None for domain_id in domain_ids: if domains_page.is_domain_unlocked(domain_id): unlocked_domain_id = domain_id break if not unlocked_domain_id: self.result['error'] = "No unlocked domains available" return False # Start domain assessment domains_page.click_start_domain(unlocked_domain_id) RandomizedWait.wait_for_page_load('initial') # Get domain assessment page domain_assessment_page = DomainAssessmentPage(self.driver) domain_assessment_page.wait_for_page_load() RandomizedWait.wait_for_page_load('initial') # Dismiss instructions modal if present if domain_assessment_page.is_instructions_modal_present(): domain_assessment_page.dismiss_instructions_modal() RandomizedWait.wait_for_page_load('modal') # Dismiss guidance modal if present domain_assessment_page.dismiss_guidance() # Initialize question answer helper question_helper = QuestionAnswerHelper(self.driver) wait = WaitHelpers(self.driver) # Answer all questions max_questions = 100 # Safety limit questions_answered = 0 consecutive_failures = 0 max_consecutive_failures = 3 while questions_answered < max_questions: try: # Get current question ID question_id = question_helper.get_question_id() if not question_id: consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: break # Try clicking next try: next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON) if next_button.is_enabled(): next_button.click() RandomizedWait.wait_for_navigation('next') except: pass continue # Reset consecutive failures on success consecutive_failures = 0 # Get question type question_type = question_helper.get_question_type(question_id) if not question_type: # Try scrolling to question try: question_element = self.driver.find_element( 'css selector', f"[data-testid='domain_question__{question_id}']" ) self.driver.execute_script("arguments[0].scrollIntoView(true);", question_element) time.sleep(1) question_type = question_helper.get_question_type(question_id) except: pass if not question_type: # Skip unknown question type try: next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON) if next_button.is_enabled(): next_button.click() RandomizedWait.wait_for_navigation('next') except: pass continue # Answer question try: question_helper.answer_question(question_id=question_id, question_type=question_type) RandomizedWait.wait_for_question_answer(question_type) questions_answered += 1 except Exception as e: print(f" ⚠️ Error answering question {question_id}: {e}") # Check if submit button is enabled (all questions answered) try: submit_button = domain_assessment_page.find_element(domain_assessment_page.SUBMIT_BUTTON) if submit_button.is_enabled(): # All questions answered, submit submit_button.click() RandomizedWait.wait_for_submission('submit') break except: pass # Click next button try: next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON) if next_button.is_enabled(): next_button.click() RandomizedWait.wait_for_navigation('next') except: # Try previous button as fallback try: prev_button = domain_assessment_page.find_element(domain_assessment_page.PREV_BUTTON) if prev_button.is_enabled(): prev_button.click() RandomizedWait.wait_for_navigation('previous') except: pass except Exception as e: print(f" ⚠️ Error in question loop: {e}") consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: break # Handle submit modal try: if domain_assessment_page.is_submit_modal_present(): domain_assessment_page.confirm_submit() RandomizedWait.wait_for_submission('confirm') except: pass # Handle domain feedback try: feedback_page = DomainFeedbackPage(self.driver) if feedback_page.is_modal_present(): # Answer feedback questions feedback_page.submit_feedback( question1_answer='yes', question1_justification='Automated test response', question2_answer='This is an automated test response for load testing purposes.' ) RandomizedWait.wait_for_submission('feedback') except Exception as e: print(f" ⚠️ Error handling feedback: {e}") self.result['steps_completed'].append('domain_assessment') self.result['questions_answered'] = questions_answered return True except Exception as e: self.result['error'] = f"Domain assessment failed: {str(e)}" return False def run(self) -> Dict: """Run complete end-to-end flow for this student""" start_time = time.time() try: # Setup if not self.setup_driver(): self.result['status'] = 'failed' return self.result # Step 1: Login if not self.step_login(): self.result['status'] = 'failed' return self.result # Step 2: Password reset (if needed) if not self.step_password_reset(): self.result['status'] = 'failed' return self.result # Step 3: Profile completion (if needed) if not self.step_profile_completion(): self.result['status'] = 'failed' return self.result # Step 4: Complete domain assessment if not self.step_complete_domain_assessment(): self.result['status'] = 'failed' return self.result # Success! self.result['status'] = 'success' except Exception as e: self.result['status'] = 'failed' self.result['error'] = f"Unexpected error: {str(e)}" finally: # Cleanup if self.driver: try: self.driver.quit() except: pass self.result['duration'] = time.time() - start_time return self.result def load_students_from_csv(csv_path: str) -> List[Dict]: """Load students from CSV file""" students = [] try: with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: # Extract CPID - try multiple column name variations cpid = ( row.get('Student CPID') or # "Student CPID" (with space) row.get('student_cpid') or # "student_cpid" (snake_case) row.get('Student_CPID') or # "Student_CPID" (with underscore) row.get('cpid') or # "cpid" (lowercase) row.get('CPID') or # "CPID" (uppercase) row.get('student_cpid') or # "student_cpid" (lowercase with underscore) None ) if not cpid: # If no CPID found, skip this row continue # Also extract password password = ( row.get('Password') or row.get('password') or row.get('PASSWORD') or None ) if cpid: students.append({ 'cpid': cpid.strip(), # Remove any whitespace 'data': { **row, 'password': password.strip() if password else None } }) except Exception as e: print(f"❌ Error loading CSV: {e}") import traceback traceback.print_exc() return [] return students def print_real_time_metrics(): """Print real-time performance metrics""" with results_lock: if results['total'] == 0: return print("\n" + "=" * 80) print("📊 REAL-TIME METRICS") print("=" * 80) # Overall stats success_rate = (results['success'] / results['total']) * 100 if results['total'] > 0 else 0 print(f"✅ Success Rate: {success_rate:.1f}% ({results['success']}/{results['total']})") print(f"❌ Failed: {results['failed']}/{results['total']}") # Time stats if results.get('start_time'): elapsed = time.time() - results['start_time'] rate = results['total'] / elapsed if elapsed > 0 else 0 print(f"⏱️ Elapsed: {elapsed:.1f}s | Rate: {rate:.2f} students/second") # Estimated completion if results['total'] > 0 and results.get('start_time'): elapsed = time.time() - results['start_time'] avg_time = elapsed / results['total'] print(f"📊 Average: {avg_time:.2f}s per student") print("=" * 80) def run_student_flow(student_info: Dict, headless: bool = True, metrics_interval: int = 10) -> Dict: """Run flow for a single student (wrapper for ThreadPoolExecutor)""" cpid = student_info['cpid'] student_data = student_info['data'] # Get student name for logging student_name = f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}".strip() if not student_name: student_name = cpid print(f"🚀 Starting: {cpid} ({student_name})") student_flow = LoadTestStudent(cpid, student_data, headless=headless) result = student_flow.run() # Update global results with results_lock: results['total'] += 1 if result['status'] == 'success': results['success'] += 1 elif result['status'] == 'failed': results['failed'] += 1 results['errors'].append({ 'cpid': cpid, 'error': result.get('error', 'Unknown error')[:200] # Truncate long errors }) else: results['skipped'] += 1 # Print real-time metrics periodically if results['total'] % metrics_interval == 0: print_real_time_metrics() status_emoji = '✅' if result['status'] == 'success' else '❌' print(f"{status_emoji} Completed: {cpid} ({student_name}) - {result['status']} - {result['duration']:.2f}s") return result def main(): parser = argparse.ArgumentParser(description='Load Testing Script - End-to-End Assessment Flow') parser.add_argument('--students', type=int, required=True, help='Number of students to test') parser.add_argument('--csv', type=str, required=True, help='Path to CSV file with student data') parser.add_argument('--concurrent', type=int, default=5, help='Number of concurrent students (default: 5)') parser.add_argument('--headless', action='store_true', default=True, help='Run in headless mode (default: True)') parser.add_argument('--no-headless', dest='headless', action='store_false', help='Run in visible mode') parser.add_argument('--max-concurrent', type=int, default=20, help='Maximum allowed concurrent browsers (safety limit, default: 20)') parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students (default: 10)') args = parser.parse_args() # Safety check: Limit concurrent browsers to prevent system crashes if args.concurrent > args.max_concurrent: print(f"⚠️ WARNING: Requested {args.concurrent} concurrent browsers exceeds safety limit of {args.max_concurrent}") print(f" Limiting to {args.max_concurrent} concurrent browsers to prevent system crashes") args.concurrent = args.max_concurrent print("=" * 80) print("LOAD TESTING - END-TO-END ASSESSMENT FLOW") print("=" * 80) print(f"📊 Configuration:") print(f" Total Students: {args.students}") print(f" Concurrent: {args.concurrent} (max: {args.max_concurrent})") print(f" Headless: {args.headless}") print(f" Metrics Interval: Every {args.metrics_interval} students") print(f" CSV File: {args.csv}") print("=" * 80) # Initialize start time results['start_time'] = time.time() # Load students from CSV all_students = load_students_from_csv(args.csv) if not all_students: print("❌ No students loaded from CSV") return # Limit to requested number students_to_test = all_students[:args.students] print(f"\n📋 Loaded {len(students_to_test)} students from CSV") # Load student data manager try: student_data_manager.load_students_from_csv(args.csv) except: pass # Run load test start_time = time.time() print(f"\n🚀 Starting load test with {args.concurrent} concurrent students...") print("=" * 80) with ThreadPoolExecutor(max_workers=args.concurrent) as executor: futures = { executor.submit(run_student_flow, student, args.headless, args.metrics_interval): student for student in students_to_test } completed_count = 0 for future in as_completed(futures): try: result = future.result() completed_count += 1 except Exception as e: print(f"❌ Exception in thread: {e}") completed_count += 1 total_duration = time.time() - start_time # Print final metrics print_real_time_metrics() # Print summary print("\n" + "=" * 80) print("FINAL SUMMARY") print("=" * 80) print(f"✅ Success: {results['success']}/{results['total']}") print(f"❌ Failed: {results['failed']}/{results['total']}") print(f"⏭️ Skipped: {results['skipped']}/{results['total']}") print(f"⏱️ Total Duration: {total_duration:.2f}s ({total_duration/60:.2f} minutes)") if results['total'] > 0: print(f"📊 Average per Student: {total_duration/results['total']:.2f}s") print(f"⚡ Rate: {results['total']/total_duration:.3f} students/second") if results['errors']: print(f"\n❌ Errors ({len(results['errors'])}):") for error in results['errors'][:10]: # Show first 10 print(f" - {error['cpid']}: {error['error']}") if len(results['errors']) > 10: print(f" ... and {len(results['errors']) - 10} more") print("=" * 80) if __name__ == "__main__": main()