From 057fdb928de1271c131d56540fce47c9f77737ca Mon Sep 17 00:00:00 2001 From: Kenil_KB Date: Wed, 17 Dec 2025 11:48:45 +0530 Subject: [PATCH] CP_StressTest_with_ramp_up --- .../test_generic_load_assessments.py | 19 +- .../test_generic_load_assessments_bkp | 823 ++++++++++++++++++ 2 files changed, 838 insertions(+), 4 deletions(-) create mode 100644 tests/load_tests/test_generic_load_assessments_bkp diff --git a/tests/load_tests/test_generic_load_assessments.py b/tests/load_tests/test_generic_load_assessments.py index 17085f7..e457ab8 100644 --- a/tests/load_tests/test_generic_load_assessments.py +++ b/tests/load_tests/test_generic_load_assessments.py @@ -647,7 +647,8 @@ class AssessmentLoadTest(LoadTestBase): students: List[Dict], max_workers: int = None, headless: bool = True, - metrics_interval: int = 10 + metrics_interval: int = 10, + ramp_up: float = 0.0 ) -> dict: """ Run load test with comprehensive tracking and real-time monitoring @@ -671,6 +672,8 @@ class AssessmentLoadTest(LoadTestBase): print(f" Max Workers: {max_workers or 'Unlimited'}") print(f" Headless: {headless}") print(f" Metrics Interval: Every {metrics_interval} students") + if ramp_up > 0: + print(f" Ramp-up: {ramp_up}s delay between each student start") print(f"{'='*80}\n") # Initialize global metrics @@ -710,6 +713,10 @@ class AssessmentLoadTest(LoadTestBase): headless=headless # **kwargs ) futures.append((user_id, future)) + + # Stagger starts to avoid overwhelming backend + if ramp_up > 0 and idx < len(students) - 1: # Don't delay after last student + time.sleep(ramp_up) # Wait for completion with real-time monitoring print(f" ā³ Waiting for all {num_students} students to complete...\n") @@ -762,7 +769,8 @@ def run_assessment_load_test( end_index: Optional[int] = None, max_workers: int = None, headless: bool = True, - metrics_interval: int = 10 + metrics_interval: int = 10, + ramp_up: float = 0.0 ): """ Standalone function to run assessment load test @@ -787,7 +795,8 @@ def run_assessment_load_test( students=students, max_workers=max_workers or len(students), headless=headless, - metrics_interval=metrics_interval + metrics_interval=metrics_interval, + ramp_up=ramp_up ) @@ -801,6 +810,7 @@ if __name__ == "__main__": parser.add_argument('--visible', action='store_true', help='Run in visible mode (overrides headless)') parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students') parser.add_argument('--url', type=str, default=None, help='Frontend URL to use (e.g., http://localhost:3983 or https://cognitiveprism.tech4bizsolutions.com). If not provided, uses default from config.') + parser.add_argument('--ramp-up', type=float, default=0.0, help='Delay between starting each student (seconds). Use 0.5-2.0 to avoid rate limiting. Default: 0.0 (no delay)') args = parser.parse_args() @@ -819,5 +829,6 @@ if __name__ == "__main__": end_index=args.end, max_workers=args.workers, headless=not args.visible if args.visible else args.headless, - metrics_interval=args.metrics_interval + metrics_interval=args.metrics_interval, + ramp_up=args.ramp_up ) diff --git a/tests/load_tests/test_generic_load_assessments_bkp b/tests/load_tests/test_generic_load_assessments_bkp new file mode 100644 index 0000000..17085f7 --- /dev/null +++ b/tests/load_tests/test_generic_load_assessments_bkp @@ -0,0 +1,823 @@ +""" +World-Class Load Test - Complete Assessment Flow + +A transparent, world-class load testing mechanism that: +1. Loads students from CSV with customizable range support (for multi-device execution) +2. Smart login (Excel password → Admin@123 fallback) +3. Password reset if needed +4. Profile completion if needed +5. Complete ONE domain assessment (100% verified flow) +6. Real-time monitoring and metrics +7. Comprehensive backend/server analysis + +This uses ONLY 100% verified, reliable components. +""" +import sys +from pathlib import Path + +# Add project root to path (for direct execution) +project_root = Path(__file__).parent.parent.parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +import pytest +from concurrent.futures import ThreadPoolExecutor +import threading +import time +import csv +import argparse +import sys +from typing import Dict, List, Optional +from datetime import datetime +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options +from selenium import webdriver +from selenium.common.exceptions import WebDriverException + +# CRITICAL: Parse --url argument EARLY (before importing pages) to override BASE_URL +# This ensures pages use the custom URL when they import BASE_URL +_pre_parser = argparse.ArgumentParser(add_help=False) +_pre_parser.add_argument('--url', type=str, default=None) +_pre_args, _ = _pre_parser.parse_known_args() + +if _pre_args.url: + # Override BASE_URL in config BEFORE pages import it + import config.config as config_module + custom_url = _pre_args.url.rstrip('/') + config_module.BASE_URL = custom_url + config_module.LOGIN_URL = f"{custom_url}/" + config_module.DASHBOARD_URL = f"{custom_url}/student/dashboard" + config_module.ASSESSMENTS_URL = f"{custom_url}/assessments" + config_module.PROFILE_EDITOR_URL = f"{custom_url}/student/profile-builder" + +from utils.load_test_base import LoadTestBase, LoadTestResult, TestStatus +from pages.login_page import LoginPage +from pages.mandatory_reset_page import MandatoryResetPage +from pages.profile_incomplete_page import ProfileIncompletePage +from pages.profile_editor_page import ProfileEditorPage +from pages.assessments_page import AssessmentsPage +from pages.domains_page import DomainsPage +from pages.domain_assessment_page import DomainAssessmentPage +from pages.domain_feedback_page import DomainFeedbackPage +from utils.question_answer_helper import QuestionAnswerHelper +from utils.smart_wait_optimizer import SmartWaitOptimizer +from utils.randomized_wait import RandomizedWait +from config.config import TEST_NEW_PASSWORD, BASE_URL + +# Global metrics tracking +progress_lock = threading.Lock() +performance_metrics = { + 'total_durations': [], + 'step_times': { + 'login': [], + 'password_reset': [], + 'profile_completion': [], + 'assessment': [] + }, + 'step_success_rates': { + 'login': {'success': 0, 'failed': 0}, + 'password_reset': {'success': 0, 'failed': 0}, + 'profile_completion': {'success': 0, 'failed': 0}, + 'assessment': {'success': 0, 'failed': 0} + }, + 'questions_answered': [], + 'completed_students': 0, + 'failed_students': 0, + 'start_time': None +} + + +def load_students_from_csv(csv_path: str, start_index: int = 0, end_index: Optional[int] = None) -> List[Dict]: + """ + Load students from CSV file with range support for multi-device execution + + Args: + csv_path: Path to CSV file + start_index: Starting index (0-based, excluding header) + end_index: Ending index (exclusive, None = all remaining) + + Returns: + List of student dictionaries with 'cpid' and 'data' keys + """ + students = [] + try: + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + all_rows = list(reader) + + # Apply range filter + if end_index is None: + end_index = len(all_rows) + + # Validate range + if start_index < 0: + start_index = 0 + if end_index > len(all_rows): + end_index = len(all_rows) + if start_index >= end_index: + return [] + + selected_rows = all_rows[start_index:end_index] + + for row in selected_rows: + cpid = ( + row.get('Student CPID') or + row.get('student_cpid') or + row.get('Student_CPID') or + row.get('cpid') or + row.get('CPID') or + None + ) + + if not cpid: + continue + + password = ( + row.get('Password') or + row.get('password') or + row.get('PASSWORD') or + None + ) + + students.append({ + 'cpid': cpid.strip(), + 'data': { + **row, + 'password': password.strip() if password else None + } + }) + + print(f"šŸ“‹ Loaded {len(students)} students from CSV (indices {start_index} to {end_index-1})") + return students + + except Exception as e: + print(f"āŒ Error loading CSV: {e}") + import traceback + traceback.print_exc() + return [] + + +def print_real_time_metrics(): + """Print real-time performance metrics""" + with progress_lock: + if not performance_metrics['start_time']: + return + + elapsed = time.time() - performance_metrics['start_time'] + completed = performance_metrics['completed_students'] + failed = performance_metrics['failed_students'] + total = completed + failed + + if total == 0: + return + + success_rate = (completed / total * 100) if total > 0 else 0 + rate = completed / elapsed if elapsed > 0 else 0 + + avg_duration = ( + sum(performance_metrics['total_durations']) / len(performance_metrics['total_durations']) + if performance_metrics['total_durations'] else 0 + ) + + total_questions = sum(performance_metrics['questions_answered']) + avg_questions = ( + total_questions / completed if completed > 0 else 0 + ) + + print("\n" + "=" * 80) + print("šŸ“Š REAL-TIME METRICS") + print("=" * 80) + print(f"ā±ļø Elapsed Time: {elapsed:.1f}s") + print(f"āœ… Completed: {completed}") + print(f"āŒ Failed: {failed}") + print(f"šŸ“ˆ Success Rate: {success_rate:.1f}%") + print(f"⚔ Rate: {rate:.2f} students/sec") + print(f"ā³ Avg Duration: {avg_duration:.1f}s") + print(f"ā“ Avg Questions: {avg_questions:.1f}") + print(f"šŸ“Š Total Questions: {total_questions}") + + # Step-wise metrics + print("\nšŸ“‹ STEP METRICS:") + for step_name in ['login', 'password_reset', 'profile_completion', 'assessment']: + step_times = performance_metrics['step_times'][step_name] + step_success = performance_metrics['step_success_rates'][step_name] + step_total = step_success['success'] + step_success['failed'] + + if step_total > 0: + avg_step_time = sum(step_times) / len(step_times) if step_times else 0 + step_rate = step_success['success'] / step_total * 100 + print(f" {step_name:20s}: {step_rate:5.1f}% success, {avg_step_time:5.1f}s avg") + + print("=" * 80 + "\n") + + +def complete_assessment_flow_for_student( + user_id: int, + student_info: Dict, + student_index: int, + headless: bool = True +) -> dict: + """ + Complete assessment flow for a single student (100% verified flow) + + This is the EXACT flow we've verified works 100%: + 1. Smart login (Excel password → Admin@123) + 2. Password reset if needed + 3. Profile completion if needed + 4. Navigate to assessments + 5. Start first assessment + 6. Navigate to first domain + 7. Answer ALL questions in domain + 8. Submit assessment + 9. Handle feedback + + Args: + user_id: User ID (from LoadTestBase, for tracking) + student_info: Student dictionary with 'cpid' and 'data' keys + student_index: Index of student (for tracking) + headless: Whether to run in headless mode + + Returns: + dict: Result with driver and steps completed + """ + # Input validation - CRITICAL for flawless execution + if not isinstance(user_id, int) or user_id <= 0: + raise ValueError(f"Invalid user_id: {user_id} (must be positive integer)") + + if not isinstance(student_info, dict): + raise ValueError(f"Invalid student_info: {student_info} (must be dict)") + + if 'cpid' not in student_info: + raise ValueError(f"Missing 'cpid' in student_info: {student_info}") + + if 'data' not in student_info: + raise ValueError(f"Missing 'data' in student_info: {student_info}") + + driver = None + user_data_dir = None # Track temp directory for cleanup + steps_completed = [] + cpid = student_info['cpid'] + student_data = student_info['data'] + actual_password_used = None + questions_answered = 0 + start_time = time.time() + + # Live logging + print(f"šŸš€ [Student {user_id}] Starting flow for {cpid}...") + + try: + # Step 1: Setup WebDriver + options = Options() + if headless: + options.add_argument('--headless=new') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument('--disable-gpu') + options.add_argument('--window-size=1920,1080') + options.add_argument('--disable-software-rasterizer') + options.add_argument('--disable-extensions') + + # CRITICAL: Each browser needs unique user data directory to avoid conflicts + import tempfile + import os + import shutil + user_data_dir = tempfile.mkdtemp(prefix=f'chrome_user_data_{user_id}_') + options.add_argument(f'--user-data-dir={user_data_dir}') + + for attempt in range(3): + try: + driver = webdriver.Chrome(options=options) + driver.implicitly_wait(5) + break + except WebDriverException as e: + if attempt < 2: + time.sleep(2) + continue + raise + + steps_completed.append(f"WebDriver created") + print(f" āœ… [Student {user_id}] WebDriver created") + + # Step 2: Smart Login (Excel password → Admin@123) + login_page = LoginPage(driver) + excel_password = student_data.get('password') + + # Try Excel password first + login_success = False + if excel_password: + try: + login_page.login(identifier=cpid, password=excel_password) + # Verify login success (check for error) + time.sleep(1) + if not login_page.is_error_visible(): + login_success = True + actual_password_used = excel_password + except: + pass + + # Fallback to Admin@123 + if not login_success: + try: + login_page.login(identifier=cpid, password=TEST_NEW_PASSWORD) + actual_password_used = TEST_NEW_PASSWORD + login_success = True + except Exception as e: + raise Exception(f"Login failed with both passwords: {e}") + + steps_completed.append(f"Login successful (password: {'Excel' if actual_password_used != TEST_NEW_PASSWORD else 'Admin@123'})") + print(f" āœ… [Student {user_id}] Login successful ({'Excel' if actual_password_used != TEST_NEW_PASSWORD else 'Admin@123'})") + + # Step 3: Password Reset if needed + # CRITICAL: If Admin@123 was used for login, password is already reset - skip entirely + if actual_password_used == TEST_NEW_PASSWORD: + steps_completed.append("Password reset skipped (already reset - Admin@123 used)") + print(f" ā­ļø [Student {user_id}] Password reset skipped (Admin@123 used)") + else: + # Only check for password reset if Excel password was used + reset_page = MandatoryResetPage(driver) + if SmartWaitOptimizer.should_check_password_reset(cpid, actual_password_used): + # Quick check for modal (fast timeout to avoid waiting) + if reset_page.is_modal_present(): + reset_page.reset_password( + current_password=actual_password_used, + new_password=TEST_NEW_PASSWORD, + confirm_password=TEST_NEW_PASSWORD, + student_cpid=cpid + ) + time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) + actual_password_used = TEST_NEW_PASSWORD + steps_completed.append("Password reset completed") + print(f" āœ… [Student {user_id}] Password reset completed") + else: + steps_completed.append("Password reset skipped (modal not present)") + print(f" ā­ļø [Student {user_id}] Password reset skipped (not needed)") + else: + steps_completed.append("Password reset skipped (already reset per tracker)") + print(f" ā­ļø [Student {user_id}] Password reset skipped (already reset)") + + # Step 4: Profile Completion if needed + profile_incomplete = ProfileIncompletePage(driver) + if SmartWaitOptimizer.should_check_profile_incomplete(driver): + if profile_incomplete.is_modal_present(): + profile_incomplete.click_complete() + time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) + + profile_editor = ProfileEditorPage(driver) + profile_editor.wait_for_page_load() + profile_editor.complete_profile_to_100(student_cpid=cpid) + time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING) + steps_completed.append("Profile completed to 100%") + print(f" āœ… [Student {user_id}] Profile completed to 100%") + else: + steps_completed.append("Profile completion skipped (not required)") + print(f" ā­ļø [Student {user_id}] Profile completion skipped (not required)") + else: + steps_completed.append("Profile completion skipped (already complete)") + print(f" ā­ļø [Student {user_id}] Profile completion skipped (already complete)") + + # Step 5: Navigate to Assessments + assessments_page = AssessmentsPage(driver) + assessments_page.navigate() + assessments_page.wait_for_page_load() + RandomizedWait.wait_for_page_load('navigation') + steps_completed.append("Navigated to Assessments page") + + # Step 6: Get first assessment and start it + assessment_ids = assessments_page.get_assessment_ids() + if not assessment_ids: + raise Exception("No assessments available") + + assessments_page.click_begin_assessment(assessment_ids[0]) + RandomizedWait.wait_for_page_load('navigation') + steps_completed.append(f"Started assessment: {assessment_ids[0]}") + print(f" āœ… [Student {user_id}] Started assessment: {assessment_ids[0]}") + + # Step 7: Navigate to Domains + domains_page = DomainsPage(driver) + domains_page.wait_for_page_load() + RandomizedWait.wait_for_page_load('initial') + steps_completed.append("Navigated to Domains page") + + # Step 8: Get first unlocked domain + domain_ids = domains_page.get_all_domain_ids() + if not domain_ids: + raise Exception("No domains available") + + # Find first unlocked domain + unlocked_domain_id = None + for domain_id in domain_ids: + if domains_page.is_domain_unlocked(domain_id): + unlocked_domain_id = domain_id + break + + if not unlocked_domain_id: + raise Exception("No unlocked domains available") + + # Click first unlocked domain + domains_page.click_domain_action(unlocked_domain_id) + RandomizedWait.wait_for_page_load('navigation') + steps_completed.append(f"Started domain: {unlocked_domain_id}") + print(f" āœ… [Student {user_id}] Started domain: {unlocked_domain_id}") + + # Step 9: Handle instructions modal if present + domain_assessment_page = DomainAssessmentPage(driver) + domain_assessment_page.wait_for_page_load() + + if domain_assessment_page.is_instructions_modal_present(): + domain_assessment_page.dismiss_instructions_modal() + RandomizedWait.wait_for_navigation('next') + steps_completed.append("Dismissed instructions modal") + + # Step 10: Answer ALL questions in domain (100% verified logic) + question_helper = QuestionAnswerHelper(driver) + max_questions = 100 # Safety limit + consecutive_failures = 0 + max_consecutive_failures = 3 + + while questions_answered < max_questions: + # Removed redundant wait_for_page_load - page is already loaded from previous Next click + + # Get current question ID + question_id = question_helper.get_question_id() + if not question_id: + RandomizedWait.wait_for_error_recovery('wait') + question_id = question_helper.get_question_id() + if not question_id: + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + break + if domain_assessment_page.is_next_button_visible(): + try: + domain_assessment_page.click_next() + # Removed redundant wait_for_navigation - click_next() already waits + except: + pass + continue + + # Get question type + question_type = question_helper.get_question_type(question_id) + if question_type == "unknown": + # Try scrolling to question + try: + question_elem = driver.find_element( + By.CSS_SELECTOR, + f"[data-testid='domain_question__{question_id}']" + ) + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", question_elem) + # Small wait for scroll animation, but not full page load + RandomizedWait.wait_for_page_load('modal') # Use shorter modal wait instead + question_type = question_helper.get_question_type(question_id) + except: + pass + + if question_type == "unknown": + if domain_assessment_page.is_next_button_visible(): + domain_assessment_page.click_next() + # Removed redundant wait_for_navigation - click_next() already waits + continue + + # Answer the question + try: + question_helper.answer_question(question_id, question_type) + questions_answered += 1 + consecutive_failures = 0 + # Machine-speed: Minimal wait for click to register (0.1s instead of 2-6s) + time.sleep(0.1) + # Live log every 10 questions + if questions_answered % 10 == 0: + print(f" šŸ“ [Student {user_id}] Answered {questions_answered} questions...") + except Exception as e: + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + break + if domain_assessment_page.is_next_button_visible(): + try: + domain_assessment_page.click_next() + # Removed redundant wait_for_navigation - click_next() already waits + except: + pass + continue + + # After answering, check if this is the last question (submit enabled) + # If submit is enabled, break and submit. Otherwise, click Next. + is_last_question = False + try: + submit_button = driver.find_element(*domain_assessment_page.SUBMIT_BUTTON) + if submit_button.is_enabled() and submit_button.is_displayed(): + is_last_question = True + steps_completed.append(f"All questions answered ({questions_answered} questions)") + except: + pass + + if is_last_question: + # Last question - break loop to submit + break + else: + # Not last question - click Next to continue + if domain_assessment_page.is_next_button_visible(): + try: + domain_assessment_page.click_next() + # Removed redundant wait_for_navigation - click_next() already waits + except Exception as e: + print(f"āš ļø Error clicking Next after question {question_id}: {e}") + # Try to continue anyway + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + break + else: + # Next button not visible - might be last question or error + # Check submit button one more time + try: + submit_button = driver.find_element(*domain_assessment_page.SUBMIT_BUTTON) + if submit_button.is_enabled() and submit_button.is_displayed(): + is_last_question = True + break + except: + pass + # If still not last question, this is an error + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + break + + # Step 11: Submit assessment (only if submit button is enabled - last question) + if domain_assessment_page.is_submit_button_visible(): + domain_assessment_page.click_submit() + RandomizedWait.wait_for_submission('submit') + steps_completed.append("Clicked Submit button") + + # Step 12: Handle submit confirmation modal + if domain_assessment_page.is_submit_modal_present(): + domain_assessment_page.confirm_submit() + RandomizedWait.wait_for_submission('confirm') + steps_completed.append("Confirmed submission in modal") + + # Step 13: Wait for success modal (appears after confirmation) + # Success modal auto-closes after 2 seconds, then feedback modal appears + try: + if domain_assessment_page.is_success_modal_present(): + steps_completed.append("Success modal appeared") + # Wait for success modal to auto-close (2 seconds + buffer) + time.sleep(3) # Wait for auto-close (2s) + buffer + # Wait for modal to disappear + domain_assessment_page.close_success_modal() + except: + pass + + # Step 14: Handle feedback modal (appears after success modal closes) + try: + feedback_page = DomainFeedbackPage(driver) + # Wait for feedback modal to appear (with retry) + feedback_modal_present = False + for i in range(10): # Wait up to 10 seconds + if feedback_page.is_modal_present(): + feedback_modal_present = True + break + RandomizedWait.wait_for_page_load('modal') + + if feedback_modal_present: + feedback_page.submit_feedback( + question1_yes=True, + question1_justification='Automated load test response', + question2_text='This is an automated load test response for backend analysis.' + ) + RandomizedWait.wait_for_submission('feedback') + steps_completed.append("Submitted domain feedback") + except Exception as e: + print(f"āš ļø Error handling feedback: {e}") + pass + + duration = time.time() - start_time + + # Update global metrics + with progress_lock: + performance_metrics['completed_students'] += 1 + performance_metrics['total_durations'].append(duration) + performance_metrics['questions_answered'].append(questions_answered) + print(f" āœ… [Student {user_id}] COMPLETED in {duration:.1f}s ({questions_answered} questions)") + + # Note: Driver cleanup is handled by LoadTestBase + # Temp directory cleanup will be done after driver.quit() in LoadTestBase + # Store user_data_dir in result for cleanup + return { + 'driver': driver, + 'steps_completed': steps_completed, + 'success': True, + 'questions_answered': questions_answered, + 'cpid': cpid, + 'duration': duration, + 'user_data_dir': user_data_dir # For cleanup + } + + except Exception as e: + error_msg = f"Student {cpid} (User {user_id}): ERROR - {type(e).__name__}: {str(e)}" + steps_completed.append(error_msg) + + with progress_lock: + performance_metrics['failed_students'] += 1 + duration = time.time() - start_time if 'start_time' in locals() else 0 + print(f" āŒ [Student {user_id}] FAILED after {duration:.1f}s: {str(e)[:80]}") + + # Always cleanup driver and temp directory on error + if driver: + try: + driver.quit() + except: + pass + + # Cleanup temporary user data directory + if user_data_dir and os.path.exists(user_data_dir): + try: + shutil.rmtree(user_data_dir, ignore_errors=True) + except: + pass + + # Re-raise with more context for LoadTestBase to handle + raise Exception(error_msg) + + +class AssessmentLoadTest(LoadTestBase): + """World-class load test executor for complete assessment flow""" + + def __init__(self, test_name: str = "Complete Assessment Flow"): + super().__init__(test_name) + self.lock = threading.Lock() + + def run_load_test( + self, + students: List[Dict], + max_workers: int = None, + headless: bool = True, + metrics_interval: int = 10 + ) -> dict: + """ + Run load test with comprehensive tracking and real-time monitoring + + Args: + students: List of student dictionaries + max_workers: Maximum concurrent workers + headless: Whether to run in headless mode + metrics_interval: Print metrics every N students + + Returns: + dict: Summary and results + """ + num_students = len(students) + + print(f"\n{'='*80}") + print(f"šŸš€ STARTING LOAD TEST: {self.test_name}") + print(f"{'='*80}") + print(f"šŸ“Š Configuration:") + print(f" Students: {num_students}") + print(f" Max Workers: {max_workers or 'Unlimited'}") + print(f" Headless: {headless}") + print(f" Metrics Interval: Every {metrics_interval} students") + print(f"{'='*80}\n") + + # Initialize global metrics + with progress_lock: + performance_metrics['start_time'] = time.time() + performance_metrics['completed_students'] = 0 + performance_metrics['failed_students'] = 0 + performance_metrics['total_durations'] = [] + performance_metrics['questions_answered'] = [] + + self.start_time = datetime.now() + self.results = [] + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + + # Submit all students with proper validation + for idx, student_info in enumerate(students): + # Validate student_info before submitting + if not isinstance(student_info, dict): + print(f" āš ļø Skipping invalid student at index {idx}: not a dict") + continue + + if 'cpid' not in student_info or 'data' not in student_info: + print(f" āš ļø Skipping invalid student at index {idx}: missing cpid or data") + continue + + user_id = idx + 1 # 1-based user ID + + # Submit with explicit arguments to avoid any confusion + future = executor.submit( + self.execute_test_for_user, + user_id, + complete_assessment_flow_for_student, + student_info, # *args[0] + idx, # *args[1] + headless=headless # **kwargs + ) + futures.append((user_id, future)) + + # Wait for completion with real-time monitoring + print(f" ā³ Waiting for all {num_students} students to complete...\n") + completed = 0 + + for user_id, future in futures: + try: + result = future.result() + with self.lock: + self.results.append(result) + completed += 1 + + # Duration is already tracked in the function + + # Print real-time metrics periodically + if completed % metrics_interval == 0: + print_real_time_metrics() + + if completed % 10 == 0: + print(f" āœ… Completed {completed}/{num_students} students...") + + except Exception as e: + print(f" āŒ Student {user_id} failed: {str(e)[:100]}") + completed += 1 + + self.end_time = datetime.now() + + # Final metrics + print_real_time_metrics() + + # Calculate summary + summary = self.calculate_summary() + + # Print summary + self.print_summary(summary) + + # Save results + self.save_results(summary) + + return { + 'summary': summary, + 'results': self.results + } + + +# Standalone execution +def run_assessment_load_test( + csv_path: str, + start_index: int = 0, + end_index: Optional[int] = None, + max_workers: int = None, + headless: bool = True, + metrics_interval: int = 10 +): + """ + Standalone function to run assessment load test + + Args: + csv_path: Path to CSV file + start_index: Starting index (0-based, excluding header) + end_index: Ending index (exclusive, None = all remaining) + max_workers: Maximum concurrent workers + headless: Whether to run in headless mode + metrics_interval: Print metrics every N students + """ + # Load students with range + students = load_students_from_csv(csv_path, start_index, end_index) + + if not students: + print("āŒ No students loaded. Check CSV path and range.") + return None + + load_test = AssessmentLoadTest("Complete Assessment Flow") + return load_test.run_load_test( + students=students, + max_workers=max_workers or len(students), + headless=headless, + metrics_interval=metrics_interval + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="World-Class Assessment Load Test") + parser.add_argument('--csv', type=str, required=True, help='Path to CSV file') + parser.add_argument('--start', type=int, default=0, help='Start index (0-based, excluding header)') + parser.add_argument('--end', type=int, default=None, help='End index (exclusive, None = all remaining)') + parser.add_argument('--workers', type=int, default=None, help='Max concurrent workers (default: all students)') + parser.add_argument('--headless', action='store_true', default=True, help='Run in headless mode') + parser.add_argument('--visible', action='store_true', help='Run in visible mode (overrides headless)') + parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students') + parser.add_argument('--url', type=str, default=None, help='Frontend URL to use (e.g., http://localhost:3983 or https://cognitiveprism.tech4bizsolutions.com). If not provided, uses default from config.') + + args = parser.parse_args() + + # URL override already happened before page imports (above) + # Just print confirmation + if args.url: + custom_url = args.url.rstrip('/') + print(f"🌐 Using custom URL: {custom_url}") + else: + from config.config import BASE_URL + print(f"🌐 Using default URL: {BASE_URL}") + + run_assessment_load_test( + csv_path=args.csv, + start_index=args.start, + end_index=args.end, + max_workers=args.workers, + headless=not args.visible if args.visible else args.headless, + metrics_interval=args.metrics_interval + )