849 lines
34 KiB
Python
849 lines
34 KiB
Python
"""
|
|
Advanced Load Testing Script - End-to-End Assessment Flow
|
|
|
|
World-class load testing with:
|
|
- Smart browser management (prevents system crashes)
|
|
- Progress persistence (resume from checkpoint)
|
|
- Real-time performance tracking
|
|
- Lightweight metrics collection
|
|
- Advanced error handling
|
|
|
|
Usage:
|
|
python scripts/load_test_e2e_assessment_advanced.py --students 100 --csv students.csv
|
|
|
|
Features:
|
|
- Progress persistence (save every N students)
|
|
- Resume capability (skip completed students)
|
|
- Real-time metrics dashboard
|
|
- Smart browser limiting (visible mode)
|
|
- Performance analytics
|
|
- Resource monitoring
|
|
"""
|
|
import sys
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import time
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import Dict, List, Optional
|
|
from collections import defaultdict
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.common.exceptions import WebDriverException, TimeoutException
|
|
|
|
from pages.login_page import LoginPage
|
|
from pages.mandatory_reset_page import MandatoryResetPage
|
|
from pages.profile_incomplete_page import ProfileIncompletePage
|
|
from pages.profile_editor_page import ProfileEditorPage
|
|
from pages.assessments_page import AssessmentsPage
|
|
from pages.domains_page import DomainsPage
|
|
from pages.domain_assessment_page import DomainAssessmentPage
|
|
from pages.domain_feedback_page import DomainFeedbackPage
|
|
from utils.question_answer_helper import QuestionAnswerHelper
|
|
from utils.password_tracker import password_tracker
|
|
from utils.student_data_manager import student_data_manager
|
|
from utils.smart_wait_optimizer import SmartWaitOptimizer
|
|
from utils.randomized_wait import RandomizedWait
|
|
from utils.wait_helpers import WaitHelpers
|
|
from config.config import BASE_URL, TEST_NEW_PASSWORD
|
|
|
|
# Global tracking with locks
|
|
results_lock = threading.Lock()
|
|
progress_lock = threading.Lock()
|
|
|
|
results = {
|
|
'total': 0,
|
|
'success': 0,
|
|
'failed': 0,
|
|
'skipped': 0,
|
|
'errors': [],
|
|
'start_time': None,
|
|
'last_update': None
|
|
}
|
|
|
|
# Performance metrics
|
|
performance_metrics = {
|
|
'step_times': defaultdict(list), # step -> [durations]
|
|
'total_durations': [],
|
|
'questions_answered': [],
|
|
'step_success_rates': defaultdict(lambda: {'success': 0, 'failed': 0})
|
|
}
|
|
|
|
# Progress file
|
|
PROGRESS_FILE = Path(__file__).parent.parent / "reports" / "load_test_progress.json"
|
|
PROGRESS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Track completed students
|
|
completed_students = set()
|
|
|
|
|
|
class ProgressTracker:
|
|
"""Lightweight progress tracking and persistence"""
|
|
|
|
@staticmethod
|
|
def save_progress():
|
|
"""Save current progress to file"""
|
|
with progress_lock:
|
|
progress_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'results': results.copy(),
|
|
'performance': {
|
|
'step_times': {k: v[-100:] for k, v in performance_metrics['step_times'].items()}, # Last 100
|
|
'total_durations': performance_metrics['total_durations'][-100:], # Last 100
|
|
'questions_answered': performance_metrics['questions_answered'][-100:], # Last 100
|
|
'step_success_rates': dict(performance_metrics['step_success_rates'])
|
|
},
|
|
'completed_students': ProgressTracker.get_completed_students()
|
|
}
|
|
|
|
try:
|
|
with open(PROGRESS_FILE, 'w') as f:
|
|
json.dump(progress_data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"⚠️ Error saving progress: {e}")
|
|
|
|
@staticmethod
|
|
def load_progress() -> Dict:
|
|
"""Load previous progress from file"""
|
|
if not PROGRESS_FILE.exists():
|
|
return {}
|
|
|
|
try:
|
|
with open(PROGRESS_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"⚠️ Error loading progress: {e}")
|
|
return {}
|
|
|
|
@staticmethod
|
|
def get_completed_students() -> List[str]:
|
|
"""Get list of completed student CPIDs"""
|
|
return list(completed_students)
|
|
|
|
@staticmethod
|
|
def print_real_time_metrics():
|
|
"""Print real-time performance metrics"""
|
|
with progress_lock:
|
|
if results['total'] == 0:
|
|
return
|
|
|
|
print("\n" + "=" * 80)
|
|
print("📊 REAL-TIME METRICS")
|
|
print("=" * 80)
|
|
|
|
# Overall stats
|
|
success_rate = (results['success'] / results['total']) * 100 if results['total'] > 0 else 0
|
|
print(f"✅ Success Rate: {success_rate:.1f}% ({results['success']}/{results['total']})")
|
|
|
|
# Average durations
|
|
if performance_metrics['total_durations']:
|
|
avg_duration = sum(performance_metrics['total_durations']) / len(performance_metrics['total_durations'])
|
|
print(f"⏱️ Average Duration: {avg_duration:.2f}s")
|
|
|
|
# Step performance
|
|
print("\n📈 Step Performance:")
|
|
for step, times in performance_metrics['step_times'].items():
|
|
if times:
|
|
avg_time = sum(times) / len(times)
|
|
success_rate = performance_metrics['step_success_rates'][step]
|
|
total_attempts = success_rate['success'] + success_rate['failed']
|
|
rate = (success_rate['success'] / total_attempts * 100) if total_attempts > 0 else 0
|
|
print(f" {step}: {avg_time:.2f}s avg, {rate:.1f}% success ({success_rate['success']}/{total_attempts})")
|
|
|
|
# Questions answered
|
|
if performance_metrics['questions_answered']:
|
|
avg_questions = sum(performance_metrics['questions_answered']) / len(performance_metrics['questions_answered'])
|
|
print(f"\n❓ Average Questions Answered: {avg_questions:.1f}")
|
|
|
|
# Estimated time remaining
|
|
if results['total'] > 0 and results['start_time']:
|
|
elapsed = time.time() - results['start_time']
|
|
rate = results['total'] / elapsed if elapsed > 0 else 0
|
|
print(f"\n⚡ Rate: {rate:.2f} students/second")
|
|
|
|
print("=" * 80)
|
|
|
|
|
|
class LoadTestStudent:
|
|
"""Handles end-to-end flow for a single student with performance tracking"""
|
|
|
|
def __init__(self, cpid: str, student_data: Dict, headless: bool = True):
|
|
self.cpid = cpid
|
|
self.student_data = student_data
|
|
self.headless = headless
|
|
self.driver = None
|
|
self.step_times = {}
|
|
self.actual_password_used = None # Track which password was actually used for login
|
|
self.result = {
|
|
'cpid': cpid,
|
|
'name': f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}",
|
|
'status': 'pending',
|
|
'steps_completed': [],
|
|
'step_times': {},
|
|
'error': None,
|
|
'duration': 0,
|
|
'questions_answered': 0
|
|
}
|
|
|
|
def _track_step(self, step_name: str, func):
|
|
"""Track step execution time and success/failure"""
|
|
start_time = time.time()
|
|
try:
|
|
result = func()
|
|
duration = time.time() - start_time
|
|
|
|
self.step_times[step_name] = duration
|
|
self.result['step_times'][step_name] = duration
|
|
|
|
# Update global metrics
|
|
with progress_lock:
|
|
performance_metrics['step_times'][step_name].append(duration)
|
|
if result:
|
|
performance_metrics['step_success_rates'][step_name]['success'] += 1
|
|
else:
|
|
performance_metrics['step_success_rates'][step_name]['failed'] += 1
|
|
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
self.step_times[step_name] = duration
|
|
|
|
with progress_lock:
|
|
performance_metrics['step_times'][step_name].append(duration)
|
|
performance_metrics['step_success_rates'][step_name]['failed'] += 1
|
|
|
|
raise
|
|
|
|
def setup_driver(self, max_retries=3):
|
|
"""Create and configure WebDriver with retry logic for high concurrency"""
|
|
options = Options()
|
|
if self.headless:
|
|
options.add_argument('--headless=new')
|
|
options.add_argument('--no-sandbox')
|
|
options.add_argument('--disable-dev-shm-usage')
|
|
options.add_argument('--disable-gpu')
|
|
options.add_argument('--window-size=1920,1080')
|
|
# Additional stability options for high concurrency
|
|
options.add_argument('--disable-software-rasterizer')
|
|
options.add_argument('--disable-extensions')
|
|
options.add_argument('--disable-background-networking')
|
|
options.add_argument('--disable-sync')
|
|
options.add_argument('--disable-default-apps')
|
|
options.add_argument('--disable-background-timer-throttling')
|
|
options.add_argument('--disable-renderer-backgrounding')
|
|
options.add_argument('--disable-backgrounding-occluded-windows')
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
self.driver = webdriver.Chrome(options=options)
|
|
self.driver.implicitly_wait(5)
|
|
return True
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
time.sleep(2) # Wait before retry (longer for resource contention)
|
|
continue
|
|
self.result['error'] = f"Driver setup failed after {max_retries} attempts: {str(e)[:200]}"
|
|
return False
|
|
|
|
def step_login(self) -> bool:
|
|
"""Step 1: Login with smart password handling"""
|
|
return self._track_step('login', lambda: self._do_login())
|
|
|
|
def _do_login(self) -> bool:
|
|
"""
|
|
Login with explicit password strategy:
|
|
1. Try Excel password FIRST (from CSV)
|
|
2. If fails, fallback to Admin@123 (TEST_NEW_PASSWORD)
|
|
"""
|
|
login_page = LoginPage(self.driver)
|
|
login_page.navigate()
|
|
|
|
# Get Excel password from student data
|
|
excel_password = self.student_data.get('password')
|
|
|
|
if not excel_password:
|
|
# No password in CSV - use Admin@123 directly
|
|
print(f"⚠️ No Excel password found for {self.cpid}, using Admin@123")
|
|
excel_password = TEST_NEW_PASSWORD
|
|
|
|
# Strategy: Try Excel password first, then Admin@123
|
|
print(f"🔑 Trying Excel password first for {self.cpid}")
|
|
|
|
# Try Excel password first
|
|
login_page.enter_identifier(self.cpid)
|
|
login_page.enter_password(excel_password)
|
|
login_page.click_submit()
|
|
|
|
# Wait for result
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from config.config import SHORT_WAIT, BASE_URL, MEDIUM_WAIT
|
|
from selenium.webdriver.common.by import By
|
|
import time
|
|
|
|
# Wait for navigation or error
|
|
try:
|
|
WebDriverWait(self.driver, MEDIUM_WAIT).until(
|
|
lambda d: "/dashboard" in d.current_url
|
|
or "/student" in d.current_url
|
|
or "mandatory_reset" in d.page_source.lower()
|
|
or login_page.is_error_visible()
|
|
)
|
|
except:
|
|
pass
|
|
|
|
current_url = self.driver.current_url
|
|
is_on_login_page = (
|
|
current_url.rstrip("/") == BASE_URL.rstrip("/") or
|
|
current_url.rstrip("/") == f"{BASE_URL}/" or
|
|
"login" in current_url.lower()
|
|
)
|
|
|
|
# Determine which password was actually used
|
|
actual_password_used = excel_password
|
|
|
|
# If still on login page, Excel password failed - try Admin@123
|
|
if is_on_login_page and login_page.is_error_visible():
|
|
print(f"🔄 Excel password failed, trying Admin@123 (reset password)...")
|
|
# Clear form and retry with Admin@123
|
|
login_page.enter_identifier(self.cpid)
|
|
login_page.enter_password(TEST_NEW_PASSWORD)
|
|
login_page.click_submit()
|
|
|
|
# Wait for navigation or error
|
|
try:
|
|
WebDriverWait(self.driver, MEDIUM_WAIT).until(
|
|
lambda d: "/dashboard" in d.current_url
|
|
or "/student" in d.current_url
|
|
or "mandatory_reset" in d.page_source.lower()
|
|
or login_page.is_error_visible()
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# Check if Admin@123 worked
|
|
current_url = self.driver.current_url
|
|
is_still_on_login = (
|
|
current_url.rstrip("/") == BASE_URL.rstrip("/") or
|
|
current_url.rstrip("/") == f"{BASE_URL}/" or
|
|
"login" in current_url.lower()
|
|
)
|
|
|
|
if is_still_on_login and login_page.is_error_visible():
|
|
# Both passwords failed
|
|
error_msg = login_page.get_error_message()
|
|
raise Exception(f"Login failed with both passwords. Excel password: {excel_password}, Admin@123: {TEST_NEW_PASSWORD}. Error: {error_msg}")
|
|
else:
|
|
# Admin@123 worked
|
|
actual_password_used = TEST_NEW_PASSWORD
|
|
|
|
# Store which password was actually used (for password reset step)
|
|
self.actual_password_used = actual_password_used
|
|
|
|
# Update password tracker if Admin@123 was used
|
|
if actual_password_used == TEST_NEW_PASSWORD:
|
|
password_tracker.update_password(self.cpid, TEST_NEW_PASSWORD)
|
|
|
|
SmartWaitOptimizer.smart_wait_for_dashboard(self.driver, self.cpid, actual_password_used)
|
|
|
|
self.result['steps_completed'].append('login')
|
|
return True
|
|
|
|
def step_password_reset(self) -> bool:
|
|
"""Step 2: Reset password if needed"""
|
|
return self._track_step('password_reset', lambda: self._do_password_reset())
|
|
|
|
def _do_password_reset(self) -> bool:
|
|
"""
|
|
Reset password if needed.
|
|
Uses the password that was actually used during login.
|
|
"""
|
|
reset_page = MandatoryResetPage(self.driver)
|
|
|
|
# Use the password that was actually used during login
|
|
# If Admin@123 was used, password is already reset - skip
|
|
if self.actual_password_used == TEST_NEW_PASSWORD:
|
|
# Password already reset (Admin@123 worked) - skip reset
|
|
self.result['steps_completed'].append('password_reset_skipped')
|
|
return True
|
|
|
|
# Excel password was used - check if reset is needed
|
|
if SmartWaitOptimizer.should_check_password_reset(self.cpid, self.actual_password_used):
|
|
if reset_page.is_modal_present():
|
|
# Use Excel password (the one that worked) for reset
|
|
current_password = self.actual_password_used or self.student_data.get('password')
|
|
reset_page.reset_password(
|
|
current_password=current_password,
|
|
new_password=TEST_NEW_PASSWORD,
|
|
confirm_password=TEST_NEW_PASSWORD,
|
|
student_cpid=self.cpid
|
|
)
|
|
# Update password tracker
|
|
password_tracker.update_password(self.cpid, TEST_NEW_PASSWORD)
|
|
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
|
|
self.result['steps_completed'].append('password_reset')
|
|
else:
|
|
self.result['steps_completed'].append('password_reset_skipped')
|
|
else:
|
|
self.result['steps_completed'].append('password_reset_skipped')
|
|
|
|
return True
|
|
|
|
def step_profile_completion(self) -> bool:
|
|
"""Step 3: Complete profile if needed"""
|
|
return self._track_step('profile_completion', lambda: self._do_profile_completion())
|
|
|
|
def _do_profile_completion(self) -> bool:
|
|
profile_incomplete = ProfileIncompletePage(self.driver)
|
|
|
|
if SmartWaitOptimizer.should_check_profile_incomplete(self.driver):
|
|
if profile_incomplete.is_modal_present():
|
|
profile_incomplete.click_complete()
|
|
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
|
|
|
|
profile_editor = ProfileEditorPage(self.driver)
|
|
profile_editor.wait_for_page_load()
|
|
profile_editor.complete_profile_to_100(student_cpid=self.cpid)
|
|
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
|
|
self.result['steps_completed'].append('profile_completion')
|
|
else:
|
|
self.result['steps_completed'].append('profile_completion_skipped')
|
|
else:
|
|
self.result['steps_completed'].append('profile_completion_skipped')
|
|
|
|
return True
|
|
|
|
def step_complete_domain_assessment(self) -> bool:
|
|
"""Step 4: Complete ONE domain assessment"""
|
|
return self._track_step('domain_assessment', lambda: self._do_domain_assessment())
|
|
|
|
def _do_domain_assessment(self) -> bool:
|
|
# Navigate to assessments
|
|
assessments_page = AssessmentsPage(self.driver)
|
|
assessments_page.navigate()
|
|
assessments_page.wait_for_page_load()
|
|
RandomizedWait.wait_for_page_load('navigation')
|
|
|
|
# Get first assessment
|
|
assessment_ids = assessments_page.get_assessment_ids()
|
|
if not assessment_ids:
|
|
raise Exception("No assessments available")
|
|
|
|
assessments_page.click_begin_assessment(assessment_ids[0])
|
|
RandomizedWait.wait_for_page_load('navigation')
|
|
|
|
# Navigate to domains
|
|
domains_page = DomainsPage(self.driver)
|
|
domains_page.wait_for_page_load()
|
|
RandomizedWait.wait_for_page_load('initial')
|
|
|
|
# Get first unlocked domain
|
|
domain_ids = domains_page.get_all_domain_ids()
|
|
if not domain_ids:
|
|
raise Exception("No domains available")
|
|
|
|
unlocked_domain_id = None
|
|
for domain_id in domain_ids:
|
|
if domains_page.is_domain_unlocked(domain_id):
|
|
unlocked_domain_id = domain_id
|
|
break
|
|
|
|
if not unlocked_domain_id:
|
|
raise Exception("No unlocked domains available")
|
|
|
|
domains_page.click_start_domain(unlocked_domain_id)
|
|
RandomizedWait.wait_for_page_load('initial')
|
|
|
|
# Get domain assessment page
|
|
domain_assessment_page = DomainAssessmentPage(self.driver)
|
|
domain_assessment_page.wait_for_page_load()
|
|
RandomizedWait.wait_for_page_load('initial')
|
|
|
|
# Dismiss modals
|
|
if domain_assessment_page.is_instructions_modal_present():
|
|
domain_assessment_page.dismiss_instructions_modal()
|
|
RandomizedWait.wait_for_page_load('modal')
|
|
|
|
domain_assessment_page.dismiss_guidance()
|
|
|
|
# Answer questions
|
|
question_helper = QuestionAnswerHelper(self.driver)
|
|
wait = WaitHelpers(self.driver)
|
|
|
|
max_questions = 100
|
|
questions_answered = 0
|
|
consecutive_failures = 0
|
|
max_consecutive_failures = 3
|
|
|
|
while questions_answered < max_questions:
|
|
try:
|
|
question_id = question_helper.get_question_id()
|
|
if not question_id:
|
|
consecutive_failures += 1
|
|
if consecutive_failures >= max_consecutive_failures:
|
|
break
|
|
try:
|
|
next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON)
|
|
if next_button.is_enabled():
|
|
next_button.click()
|
|
RandomizedWait.wait_for_navigation('next')
|
|
except:
|
|
pass
|
|
continue
|
|
|
|
consecutive_failures = 0
|
|
|
|
question_type = question_helper.get_question_type(question_id)
|
|
if not question_type:
|
|
try:
|
|
question_element = self.driver.find_element(
|
|
'css selector',
|
|
f"[data-testid='domain_question__{question_id}']"
|
|
)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", question_element)
|
|
time.sleep(1)
|
|
question_type = question_helper.get_question_type(question_id)
|
|
except:
|
|
pass
|
|
|
|
if not question_type:
|
|
try:
|
|
next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON)
|
|
if next_button.is_enabled():
|
|
next_button.click()
|
|
RandomizedWait.wait_for_navigation('next')
|
|
except:
|
|
pass
|
|
continue
|
|
|
|
try:
|
|
question_helper.answer_question(question_id=question_id, question_type=question_type)
|
|
RandomizedWait.wait_for_question_answer(question_type)
|
|
questions_answered += 1
|
|
except Exception as e:
|
|
print(f" ⚠️ Error answering question {question_id}: {e}")
|
|
|
|
# Check submit button
|
|
try:
|
|
submit_button = domain_assessment_page.find_element(domain_assessment_page.SUBMIT_BUTTON)
|
|
if submit_button.is_enabled():
|
|
submit_button.click()
|
|
RandomizedWait.wait_for_submission('submit')
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# Click next
|
|
try:
|
|
next_button = domain_assessment_page.find_element(domain_assessment_page.NEXT_BUTTON)
|
|
if next_button.is_enabled():
|
|
next_button.click()
|
|
RandomizedWait.wait_for_navigation('next')
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
consecutive_failures += 1
|
|
if consecutive_failures >= max_consecutive_failures:
|
|
break
|
|
|
|
# Handle submit modal
|
|
try:
|
|
if domain_assessment_page.is_submit_modal_present():
|
|
domain_assessment_page.confirm_submit()
|
|
RandomizedWait.wait_for_submission('confirm')
|
|
except:
|
|
pass
|
|
|
|
# Handle feedback
|
|
try:
|
|
feedback_page = DomainFeedbackPage(self.driver)
|
|
if feedback_page.is_modal_present():
|
|
feedback_page.submit_feedback(
|
|
question1_answer='yes',
|
|
question1_justification='Automated test response',
|
|
question2_answer='This is an automated test response for load testing purposes.'
|
|
)
|
|
RandomizedWait.wait_for_submission('feedback')
|
|
except Exception as e:
|
|
print(f" ⚠️ Error handling feedback: {e}")
|
|
|
|
self.result['steps_completed'].append('domain_assessment')
|
|
self.result['questions_answered'] = questions_answered
|
|
|
|
# Update global metrics
|
|
with progress_lock:
|
|
performance_metrics['questions_answered'].append(questions_answered)
|
|
|
|
return True
|
|
|
|
def run(self) -> Dict:
|
|
"""Run complete end-to-end flow for this student"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
if not self.setup_driver():
|
|
self.result['status'] = 'failed'
|
|
return self.result
|
|
|
|
if not self.step_login():
|
|
self.result['status'] = 'failed'
|
|
return self.result
|
|
|
|
if not self.step_password_reset():
|
|
self.result['status'] = 'failed'
|
|
return self.result
|
|
|
|
if not self.step_profile_completion():
|
|
self.result['status'] = 'failed'
|
|
return self.result
|
|
|
|
if not self.step_complete_domain_assessment():
|
|
self.result['status'] = 'failed'
|
|
return self.result
|
|
|
|
self.result['status'] = 'success'
|
|
|
|
except Exception as e:
|
|
self.result['status'] = 'failed'
|
|
self.result['error'] = f"Unexpected error: {str(e)}"
|
|
|
|
finally:
|
|
if self.driver:
|
|
try:
|
|
self.driver.quit()
|
|
except:
|
|
pass
|
|
|
|
self.result['duration'] = time.time() - start_time
|
|
|
|
# Update global metrics
|
|
with progress_lock:
|
|
performance_metrics['total_durations'].append(self.result['duration'])
|
|
|
|
return self.result
|
|
|
|
|
|
def load_students_from_csv(csv_path: str) -> List[Dict]:
|
|
"""Load students from CSV file"""
|
|
students = []
|
|
try:
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
cpid = (
|
|
row.get('Student CPID') or
|
|
row.get('student_cpid') or
|
|
row.get('Student_CPID') or
|
|
row.get('cpid') or
|
|
row.get('CPID') or
|
|
None
|
|
)
|
|
|
|
if not cpid:
|
|
continue
|
|
|
|
password = (
|
|
row.get('Password') or
|
|
row.get('password') or
|
|
row.get('PASSWORD') or
|
|
None
|
|
)
|
|
|
|
students.append({
|
|
'cpid': cpid.strip(),
|
|
'data': {
|
|
**row,
|
|
'password': password.strip() if password else None
|
|
}
|
|
})
|
|
except Exception as e:
|
|
print(f"❌ Error loading CSV: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return []
|
|
|
|
return students
|
|
|
|
|
|
def run_student_flow(student_info: Dict, headless: bool = True, save_interval: int = 5) -> Dict:
|
|
"""Run flow for a single student"""
|
|
cpid = student_info['cpid']
|
|
student_data = student_info['data']
|
|
|
|
student_name = f"{student_data.get('First Name', '')} {student_data.get('Last Name', '')}".strip()
|
|
if not student_name:
|
|
student_name = cpid
|
|
|
|
print(f"🚀 Starting: {cpid} ({student_name})")
|
|
|
|
student_flow = LoadTestStudent(cpid, student_data, headless=headless)
|
|
result = student_flow.run()
|
|
|
|
# Update global results
|
|
with results_lock:
|
|
results['total'] += 1
|
|
results['last_update'] = datetime.now().isoformat()
|
|
|
|
if result['status'] == 'success':
|
|
results['success'] += 1
|
|
completed_students.add(cpid)
|
|
elif result['status'] == 'failed':
|
|
results['failed'] += 1
|
|
results['errors'].append({
|
|
'cpid': cpid,
|
|
'error': result.get('error', 'Unknown error')
|
|
})
|
|
else:
|
|
results['skipped'] += 1
|
|
completed_students.add(cpid) # Also track skipped as completed
|
|
|
|
# Save progress periodically
|
|
if results['total'] % save_interval == 0:
|
|
ProgressTracker.save_progress()
|
|
ProgressTracker.print_real_time_metrics()
|
|
|
|
status_emoji = '✅' if result['status'] == 'success' else '❌'
|
|
print(f"{status_emoji} Completed: {cpid} ({student_name}) - {result['status']} - {result['duration']:.2f}s")
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Advanced Load Testing Script - End-to-End Assessment Flow')
|
|
parser.add_argument('--students', type=int, required=True, help='Number of students to test')
|
|
parser.add_argument('--csv', type=str, required=True, help='Path to CSV file with student data')
|
|
parser.add_argument('--concurrent', type=int, default=5, help='Number of concurrent students (default: 5)')
|
|
parser.add_argument('--headless', action='store_true', default=True, help='Run in headless mode (default: True)')
|
|
parser.add_argument('--no-headless', dest='headless', action='store_false', help='Run in visible mode')
|
|
parser.add_argument('--max-visible', type=int, default=2, help='Max visible browsers when not headless (default: 2)')
|
|
parser.add_argument('--save-interval', type=int, default=5, help='Save progress every N students (default: 5)')
|
|
parser.add_argument('--resume', action='store_true', help='Resume from previous progress')
|
|
parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students (default: 10)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 80)
|
|
print("ADVANCED LOAD TESTING - END-TO-END ASSESSMENT FLOW")
|
|
print("=" * 80)
|
|
print(f"📊 Configuration:")
|
|
print(f" Total Students: {args.students}")
|
|
print(f" Concurrent: {args.concurrent}")
|
|
print(f" Headless: {args.headless}")
|
|
if not args.headless:
|
|
print(f" Max Visible Browsers: {args.max_visible}")
|
|
print(f" Save Interval: Every {args.save_interval} students")
|
|
print(f" Metrics Interval: Every {args.metrics_interval} students")
|
|
print(f" Resume: {args.resume}")
|
|
print(f" CSV File: {args.csv}")
|
|
print("=" * 80)
|
|
|
|
# Load students
|
|
all_students = load_students_from_csv(args.csv)
|
|
if not all_students:
|
|
print("❌ No students loaded from CSV")
|
|
return
|
|
|
|
# Handle resume
|
|
if args.resume:
|
|
progress = ProgressTracker.load_progress()
|
|
if progress:
|
|
print(f"📂 Resuming from previous progress...")
|
|
completed = set(progress.get('completed_students', []))
|
|
completed_students.update(completed)
|
|
all_students = [s for s in all_students if s['cpid'] not in completed]
|
|
print(f" Skipping {len(completed)} already completed students")
|
|
# Restore results from progress
|
|
if 'results' in progress:
|
|
results.update(progress['results'])
|
|
results['start_time'] = time.time() # Reset start time
|
|
|
|
# Limit to requested number
|
|
students_to_test = all_students[:args.students]
|
|
print(f"\n📋 Loaded {len(students_to_test)} students from CSV")
|
|
|
|
# Load student data manager
|
|
try:
|
|
student_data_manager.load_students_from_csv(args.csv)
|
|
except:
|
|
pass
|
|
|
|
# Initialize results
|
|
results['start_time'] = time.time()
|
|
|
|
# Smart browser management for visible mode
|
|
if not args.headless:
|
|
# Limit concurrent visible browsers
|
|
actual_concurrent = min(args.concurrent, args.max_visible)
|
|
print(f"⚠️ Visible mode: Limiting concurrent browsers to {actual_concurrent} (to prevent crashes)")
|
|
else:
|
|
actual_concurrent = args.concurrent
|
|
# Warning for high concurrency in headless mode
|
|
if actual_concurrent > 50:
|
|
print(f"⚠️ WARNING: Running {actual_concurrent} concurrent browsers in headless mode")
|
|
print(f" This may cause system stress. Monitor system resources.")
|
|
print(f" If crashes occur, reduce --concurrent to 20-50")
|
|
|
|
print(f"\n🚀 Starting load test with {actual_concurrent} concurrent students...")
|
|
print("=" * 80)
|
|
|
|
# Run load test
|
|
with ThreadPoolExecutor(max_workers=actual_concurrent) as executor:
|
|
futures = {
|
|
executor.submit(run_student_flow, student, args.headless, args.save_interval): student
|
|
for student in students_to_test
|
|
}
|
|
|
|
completed_count = 0
|
|
for future in as_completed(futures):
|
|
try:
|
|
result = future.result()
|
|
completed_count += 1
|
|
|
|
# Print metrics periodically
|
|
if completed_count % args.metrics_interval == 0:
|
|
ProgressTracker.print_real_time_metrics()
|
|
except Exception as e:
|
|
print(f"❌ Exception in thread: {e}")
|
|
|
|
# Final save and summary
|
|
ProgressTracker.save_progress()
|
|
|
|
total_duration = time.time() - results['start_time']
|
|
|
|
print("\n" + "=" * 80)
|
|
print("FINAL SUMMARY")
|
|
print("=" * 80)
|
|
print(f"✅ Success: {results['success']}/{results['total']}")
|
|
print(f"❌ Failed: {results['failed']}/{results['total']}")
|
|
print(f"⏭️ Skipped: {results['skipped']}/{results['total']}")
|
|
print(f"⏱️ Total Duration: {total_duration:.2f}s ({total_duration/60:.2f} minutes)")
|
|
if results['total'] > 0:
|
|
print(f"📊 Average per Student: {total_duration/results['total']:.2f}s")
|
|
print(f"⚡ Rate: {results['total']/total_duration:.3f} students/second")
|
|
|
|
ProgressTracker.print_real_time_metrics()
|
|
|
|
if results['errors']:
|
|
print(f"\n❌ Errors ({len(results['errors'])}):")
|
|
for error in results['errors'][:10]:
|
|
print(f" - {error['cpid']}: {error['error'][:100]}")
|
|
if len(results['errors']) > 10:
|
|
print(f" ... and {len(results['errors']) - 10} more")
|
|
|
|
print(f"\n💾 Progress saved to: {PROGRESS_FILE}")
|
|
print("=" * 80)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|