CP_AUTOMATION/tests/load_tests/test_generic_load_assessments.py
2025-12-16 17:43:41 +05:30

796 lines
32 KiB
Python

"""
World-Class Load Test - Complete Assessment Flow
A transparent, world-class load testing mechanism that:
1. Loads students from CSV with customizable range support (for multi-device execution)
2. Smart login (Excel password → Admin@123 fallback)
3. Password reset if needed
4. Profile completion if needed
5. Complete ONE domain assessment (100% verified flow)
6. Real-time monitoring and metrics
7. Comprehensive backend/server analysis
This uses ONLY 100% verified, reliable components.
"""
import sys
from pathlib import Path
# Add project root to path (for direct execution)
project_root = Path(__file__).parent.parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
import pytest
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import csv
import argparse
from typing import Dict, List, Optional
from datetime import datetime
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from utils.load_test_base import LoadTestBase, LoadTestResult, TestStatus
from pages.login_page import LoginPage
from pages.mandatory_reset_page import MandatoryResetPage
from pages.profile_incomplete_page import ProfileIncompletePage
from pages.profile_editor_page import ProfileEditorPage
from pages.assessments_page import AssessmentsPage
from pages.domains_page import DomainsPage
from pages.domain_assessment_page import DomainAssessmentPage
from pages.domain_feedback_page import DomainFeedbackPage
from utils.question_answer_helper import QuestionAnswerHelper
from utils.smart_wait_optimizer import SmartWaitOptimizer
from utils.randomized_wait import RandomizedWait
from config.config import TEST_NEW_PASSWORD, BASE_URL
# Global metrics tracking
progress_lock = threading.Lock()
performance_metrics = {
'total_durations': [],
'step_times': {
'login': [],
'password_reset': [],
'profile_completion': [],
'assessment': []
},
'step_success_rates': {
'login': {'success': 0, 'failed': 0},
'password_reset': {'success': 0, 'failed': 0},
'profile_completion': {'success': 0, 'failed': 0},
'assessment': {'success': 0, 'failed': 0}
},
'questions_answered': [],
'completed_students': 0,
'failed_students': 0,
'start_time': None
}
def load_students_from_csv(csv_path: str, start_index: int = 0, end_index: Optional[int] = None) -> List[Dict]:
"""
Load students from CSV file with range support for multi-device execution
Args:
csv_path: Path to CSV file
start_index: Starting index (0-based, excluding header)
end_index: Ending index (exclusive, None = all remaining)
Returns:
List of student dictionaries with 'cpid' and 'data' keys
"""
students = []
try:
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
all_rows = list(reader)
# Apply range filter
if end_index is None:
end_index = len(all_rows)
# Validate range
if start_index < 0:
start_index = 0
if end_index > len(all_rows):
end_index = len(all_rows)
if start_index >= end_index:
return []
selected_rows = all_rows[start_index:end_index]
for row in selected_rows:
cpid = (
row.get('Student CPID') or
row.get('student_cpid') or
row.get('Student_CPID') or
row.get('cpid') or
row.get('CPID') or
None
)
if not cpid:
continue
password = (
row.get('Password') or
row.get('password') or
row.get('PASSWORD') or
None
)
students.append({
'cpid': cpid.strip(),
'data': {
**row,
'password': password.strip() if password else None
}
})
print(f"📋 Loaded {len(students)} students from CSV (indices {start_index} to {end_index-1})")
return students
except Exception as e:
print(f"❌ Error loading CSV: {e}")
import traceback
traceback.print_exc()
return []
def print_real_time_metrics():
"""Print real-time performance metrics"""
with progress_lock:
if not performance_metrics['start_time']:
return
elapsed = time.time() - performance_metrics['start_time']
completed = performance_metrics['completed_students']
failed = performance_metrics['failed_students']
total = completed + failed
if total == 0:
return
success_rate = (completed / total * 100) if total > 0 else 0
rate = completed / elapsed if elapsed > 0 else 0
avg_duration = (
sum(performance_metrics['total_durations']) / len(performance_metrics['total_durations'])
if performance_metrics['total_durations'] else 0
)
total_questions = sum(performance_metrics['questions_answered'])
avg_questions = (
total_questions / completed if completed > 0 else 0
)
print("\n" + "=" * 80)
print("📊 REAL-TIME METRICS")
print("=" * 80)
print(f"⏱️ Elapsed Time: {elapsed:.1f}s")
print(f"✅ Completed: {completed}")
print(f"❌ Failed: {failed}")
print(f"📈 Success Rate: {success_rate:.1f}%")
print(f"⚡ Rate: {rate:.2f} students/sec")
print(f"⏳ Avg Duration: {avg_duration:.1f}s")
print(f"❓ Avg Questions: {avg_questions:.1f}")
print(f"📊 Total Questions: {total_questions}")
# Step-wise metrics
print("\n📋 STEP METRICS:")
for step_name in ['login', 'password_reset', 'profile_completion', 'assessment']:
step_times = performance_metrics['step_times'][step_name]
step_success = performance_metrics['step_success_rates'][step_name]
step_total = step_success['success'] + step_success['failed']
if step_total > 0:
avg_step_time = sum(step_times) / len(step_times) if step_times else 0
step_rate = step_success['success'] / step_total * 100
print(f" {step_name:20s}: {step_rate:5.1f}% success, {avg_step_time:5.1f}s avg")
print("=" * 80 + "\n")
def complete_assessment_flow_for_student(
user_id: int,
student_info: Dict,
student_index: int,
headless: bool = True
) -> dict:
"""
Complete assessment flow for a single student (100% verified flow)
This is the EXACT flow we've verified works 100%:
1. Smart login (Excel password → Admin@123)
2. Password reset if needed
3. Profile completion if needed
4. Navigate to assessments
5. Start first assessment
6. Navigate to first domain
7. Answer ALL questions in domain
8. Submit assessment
9. Handle feedback
Args:
user_id: User ID (from LoadTestBase, for tracking)
student_info: Student dictionary with 'cpid' and 'data' keys
student_index: Index of student (for tracking)
headless: Whether to run in headless mode
Returns:
dict: Result with driver and steps completed
"""
# Input validation - CRITICAL for flawless execution
if not isinstance(user_id, int) or user_id <= 0:
raise ValueError(f"Invalid user_id: {user_id} (must be positive integer)")
if not isinstance(student_info, dict):
raise ValueError(f"Invalid student_info: {student_info} (must be dict)")
if 'cpid' not in student_info:
raise ValueError(f"Missing 'cpid' in student_info: {student_info}")
if 'data' not in student_info:
raise ValueError(f"Missing 'data' in student_info: {student_info}")
driver = None
user_data_dir = None # Track temp directory for cleanup
steps_completed = []
cpid = student_info['cpid']
student_data = student_info['data']
actual_password_used = None
questions_answered = 0
start_time = time.time()
try:
# Step 1: Setup WebDriver
options = Options()
if headless:
options.add_argument('--headless=new')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1080')
options.add_argument('--disable-software-rasterizer')
options.add_argument('--disable-extensions')
# CRITICAL: Each browser needs unique user data directory to avoid conflicts
import tempfile
import os
import shutil
user_data_dir = tempfile.mkdtemp(prefix=f'chrome_user_data_{user_id}_')
options.add_argument(f'--user-data-dir={user_data_dir}')
for attempt in range(3):
try:
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(5)
break
except WebDriverException as e:
if attempt < 2:
time.sleep(2)
continue
raise
steps_completed.append(f"WebDriver created")
# Step 2: Smart Login (Excel password → Admin@123)
login_page = LoginPage(driver)
excel_password = student_data.get('password')
# Try Excel password first
login_success = False
if excel_password:
try:
login_page.login(identifier=cpid, password=excel_password)
# Verify login success (check for error)
time.sleep(1)
if not login_page.is_error_visible():
login_success = True
actual_password_used = excel_password
except:
pass
# Fallback to Admin@123
if not login_success:
try:
login_page.login(identifier=cpid, password=TEST_NEW_PASSWORD)
actual_password_used = TEST_NEW_PASSWORD
login_success = True
except Exception as e:
raise Exception(f"Login failed with both passwords: {e}")
steps_completed.append(f"Login successful (password: {'Excel' if actual_password_used != TEST_NEW_PASSWORD else 'Admin@123'})")
# Step 3: Password Reset if needed
# CRITICAL: If Admin@123 was used for login, password is already reset - skip entirely
if actual_password_used == TEST_NEW_PASSWORD:
steps_completed.append("Password reset skipped (already reset - Admin@123 used)")
else:
# Only check for password reset if Excel password was used
reset_page = MandatoryResetPage(driver)
if SmartWaitOptimizer.should_check_password_reset(cpid, actual_password_used):
# Quick check for modal (fast timeout to avoid waiting)
if reset_page.is_modal_present():
reset_page.reset_password(
current_password=actual_password_used,
new_password=TEST_NEW_PASSWORD,
confirm_password=TEST_NEW_PASSWORD,
student_cpid=cpid
)
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
actual_password_used = TEST_NEW_PASSWORD
steps_completed.append("Password reset completed")
else:
steps_completed.append("Password reset skipped (modal not present)")
else:
steps_completed.append("Password reset skipped (already reset per tracker)")
# Step 4: Profile Completion if needed
profile_incomplete = ProfileIncompletePage(driver)
if SmartWaitOptimizer.should_check_profile_incomplete(driver):
if profile_incomplete.is_modal_present():
profile_incomplete.click_complete()
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
profile_editor = ProfileEditorPage(driver)
profile_editor.wait_for_page_load()
profile_editor.complete_profile_to_100(student_cpid=cpid)
time.sleep(SmartWaitOptimizer.ANIMATION_NORMAL + SmartWaitOptimizer.SAFETY_PADDING)
steps_completed.append("Profile completed to 100%")
else:
steps_completed.append("Profile completion skipped (not required)")
else:
steps_completed.append("Profile completion skipped (already complete)")
# Step 5: Navigate to Assessments
assessments_page = AssessmentsPage(driver)
assessments_page.navigate()
assessments_page.wait_for_page_load()
RandomizedWait.wait_for_page_load('navigation')
steps_completed.append("Navigated to Assessments page")
# Step 6: Get first assessment and start it
assessment_ids = assessments_page.get_assessment_ids()
if not assessment_ids:
raise Exception("No assessments available")
assessments_page.click_begin_assessment(assessment_ids[0])
RandomizedWait.wait_for_page_load('navigation')
steps_completed.append(f"Started assessment: {assessment_ids[0]}")
# Step 7: Navigate to Domains
domains_page = DomainsPage(driver)
domains_page.wait_for_page_load()
RandomizedWait.wait_for_page_load('initial')
steps_completed.append("Navigated to Domains page")
# Step 8: Get first unlocked domain
domain_ids = domains_page.get_all_domain_ids()
if not domain_ids:
raise Exception("No domains available")
# Find first unlocked domain
unlocked_domain_id = None
for domain_id in domain_ids:
if domains_page.is_domain_unlocked(domain_id):
unlocked_domain_id = domain_id
break
if not unlocked_domain_id:
raise Exception("No unlocked domains available")
# Click first unlocked domain
domains_page.click_domain_action(unlocked_domain_id)
RandomizedWait.wait_for_page_load('navigation')
steps_completed.append(f"Started domain: {unlocked_domain_id}")
# Step 9: Handle instructions modal if present
domain_assessment_page = DomainAssessmentPage(driver)
domain_assessment_page.wait_for_page_load()
if domain_assessment_page.is_instructions_modal_present():
domain_assessment_page.dismiss_instructions_modal()
RandomizedWait.wait_for_navigation('next')
steps_completed.append("Dismissed instructions modal")
# Step 10: Answer ALL questions in domain (100% verified logic)
question_helper = QuestionAnswerHelper(driver)
max_questions = 100 # Safety limit
consecutive_failures = 0
max_consecutive_failures = 3
while questions_answered < max_questions:
# Removed redundant wait_for_page_load - page is already loaded from previous Next click
# Get current question ID
question_id = question_helper.get_question_id()
if not question_id:
RandomizedWait.wait_for_error_recovery('wait')
question_id = question_helper.get_question_id()
if not question_id:
consecutive_failures += 1
if consecutive_failures >= max_consecutive_failures:
break
if domain_assessment_page.is_next_button_visible():
try:
domain_assessment_page.click_next()
# Removed redundant wait_for_navigation - click_next() already waits
except:
pass
continue
# Get question type
question_type = question_helper.get_question_type(question_id)
if question_type == "unknown":
# Try scrolling to question
try:
question_elem = driver.find_element(
By.CSS_SELECTOR,
f"[data-testid='domain_question__{question_id}']"
)
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", question_elem)
# Small wait for scroll animation, but not full page load
RandomizedWait.wait_for_page_load('modal') # Use shorter modal wait instead
question_type = question_helper.get_question_type(question_id)
except:
pass
if question_type == "unknown":
if domain_assessment_page.is_next_button_visible():
domain_assessment_page.click_next()
# Removed redundant wait_for_navigation - click_next() already waits
continue
# Answer the question
try:
question_helper.answer_question(question_id, question_type)
questions_answered += 1
consecutive_failures = 0
# Machine-speed: Minimal wait for click to register (0.1s instead of 2-6s)
time.sleep(0.1)
except Exception as e:
consecutive_failures += 1
if consecutive_failures >= max_consecutive_failures:
break
if domain_assessment_page.is_next_button_visible():
try:
domain_assessment_page.click_next()
# Removed redundant wait_for_navigation - click_next() already waits
except:
pass
continue
# After answering, check if this is the last question (submit enabled)
# If submit is enabled, break and submit. Otherwise, click Next.
is_last_question = False
try:
submit_button = driver.find_element(*domain_assessment_page.SUBMIT_BUTTON)
if submit_button.is_enabled() and submit_button.is_displayed():
is_last_question = True
steps_completed.append(f"All questions answered ({questions_answered} questions)")
except:
pass
if is_last_question:
# Last question - break loop to submit
break
else:
# Not last question - click Next to continue
if domain_assessment_page.is_next_button_visible():
try:
domain_assessment_page.click_next()
# Removed redundant wait_for_navigation - click_next() already waits
except Exception as e:
print(f"⚠️ Error clicking Next after question {question_id}: {e}")
# Try to continue anyway
consecutive_failures += 1
if consecutive_failures >= max_consecutive_failures:
break
else:
# Next button not visible - might be last question or error
# Check submit button one more time
try:
submit_button = driver.find_element(*domain_assessment_page.SUBMIT_BUTTON)
if submit_button.is_enabled() and submit_button.is_displayed():
is_last_question = True
break
except:
pass
# If still not last question, this is an error
consecutive_failures += 1
if consecutive_failures >= max_consecutive_failures:
break
# Step 11: Submit assessment (only if submit button is enabled - last question)
if domain_assessment_page.is_submit_button_visible():
domain_assessment_page.click_submit()
RandomizedWait.wait_for_submission('submit')
steps_completed.append("Clicked Submit button")
# Step 12: Handle submit confirmation modal
if domain_assessment_page.is_submit_modal_present():
domain_assessment_page.confirm_submit()
RandomizedWait.wait_for_submission('confirm')
steps_completed.append("Confirmed submission in modal")
# Step 13: Wait for success modal (appears after confirmation)
# Success modal auto-closes after 2 seconds, then feedback modal appears
try:
if domain_assessment_page.is_success_modal_present():
steps_completed.append("Success modal appeared")
# Wait for success modal to auto-close (2 seconds + buffer)
time.sleep(3) # Wait for auto-close (2s) + buffer
# Wait for modal to disappear
domain_assessment_page.close_success_modal()
except:
pass
# Step 14: Handle feedback modal (appears after success modal closes)
try:
feedback_page = DomainFeedbackPage(driver)
# Wait for feedback modal to appear (with retry)
feedback_modal_present = False
for i in range(10): # Wait up to 10 seconds
if feedback_page.is_modal_present():
feedback_modal_present = True
break
RandomizedWait.wait_for_page_load('modal')
if feedback_modal_present:
feedback_page.submit_feedback(
question1_yes=True,
question1_justification='Automated load test response',
question2_text='This is an automated load test response for backend analysis.'
)
RandomizedWait.wait_for_submission('feedback')
steps_completed.append("Submitted domain feedback")
except Exception as e:
print(f"⚠️ Error handling feedback: {e}")
pass
duration = time.time() - start_time
# Update global metrics
with progress_lock:
performance_metrics['completed_students'] += 1
performance_metrics['total_durations'].append(duration)
performance_metrics['questions_answered'].append(questions_answered)
# Note: Driver cleanup is handled by LoadTestBase
# Temp directory cleanup will be done after driver.quit() in LoadTestBase
# Store user_data_dir in result for cleanup
return {
'driver': driver,
'steps_completed': steps_completed,
'success': True,
'questions_answered': questions_answered,
'cpid': cpid,
'duration': duration,
'user_data_dir': user_data_dir # For cleanup
}
except Exception as e:
error_msg = f"Student {cpid} (User {user_id}): ERROR - {type(e).__name__}: {str(e)}"
steps_completed.append(error_msg)
with progress_lock:
performance_metrics['failed_students'] += 1
# Always cleanup driver and temp directory on error
if driver:
try:
driver.quit()
except:
pass
# Cleanup temporary user data directory
if user_data_dir and os.path.exists(user_data_dir):
try:
shutil.rmtree(user_data_dir, ignore_errors=True)
except:
pass
# Re-raise with more context for LoadTestBase to handle
raise Exception(error_msg)
class AssessmentLoadTest(LoadTestBase):
"""World-class load test executor for complete assessment flow"""
def __init__(self, test_name: str = "Complete Assessment Flow"):
super().__init__(test_name)
self.lock = threading.Lock()
def run_load_test(
self,
students: List[Dict],
max_workers: int = None,
headless: bool = True,
metrics_interval: int = 10
) -> dict:
"""
Run load test with comprehensive tracking and real-time monitoring
Args:
students: List of student dictionaries
max_workers: Maximum concurrent workers
headless: Whether to run in headless mode
metrics_interval: Print metrics every N students
Returns:
dict: Summary and results
"""
num_students = len(students)
print(f"\n{'='*80}")
print(f"🚀 STARTING LOAD TEST: {self.test_name}")
print(f"{'='*80}")
print(f"📊 Configuration:")
print(f" Students: {num_students}")
print(f" Max Workers: {max_workers or 'Unlimited'}")
print(f" Headless: {headless}")
print(f" Metrics Interval: Every {metrics_interval} students")
print(f"{'='*80}\n")
# Initialize global metrics
with progress_lock:
performance_metrics['start_time'] = time.time()
performance_metrics['completed_students'] = 0
performance_metrics['failed_students'] = 0
performance_metrics['total_durations'] = []
performance_metrics['questions_answered'] = []
self.start_time = datetime.now()
self.results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
# Submit all students with proper validation
for idx, student_info in enumerate(students):
# Validate student_info before submitting
if not isinstance(student_info, dict):
print(f" ⚠️ Skipping invalid student at index {idx}: not a dict")
continue
if 'cpid' not in student_info or 'data' not in student_info:
print(f" ⚠️ Skipping invalid student at index {idx}: missing cpid or data")
continue
user_id = idx + 1 # 1-based user ID
# Submit with explicit arguments to avoid any confusion
future = executor.submit(
self.execute_test_for_user,
user_id,
complete_assessment_flow_for_student,
student_info, # *args[0]
idx, # *args[1]
headless=headless # **kwargs
)
futures.append((user_id, future))
# Wait for completion with real-time monitoring
print(f" ⏳ Waiting for all {num_students} students to complete...\n")
completed = 0
for user_id, future in futures:
try:
result = future.result()
with self.lock:
self.results.append(result)
completed += 1
# Duration is already tracked in the function
# Print real-time metrics periodically
if completed % metrics_interval == 0:
print_real_time_metrics()
if completed % 10 == 0:
print(f" ✅ Completed {completed}/{num_students} students...")
except Exception as e:
print(f" ❌ Student {user_id} failed: {str(e)[:100]}")
completed += 1
self.end_time = datetime.now()
# Final metrics
print_real_time_metrics()
# Calculate summary
summary = self.calculate_summary()
# Print summary
self.print_summary(summary)
# Save results
self.save_results(summary)
return {
'summary': summary,
'results': self.results
}
# Standalone execution
def run_assessment_load_test(
csv_path: str,
start_index: int = 0,
end_index: Optional[int] = None,
max_workers: int = None,
headless: bool = True,
metrics_interval: int = 10
):
"""
Standalone function to run assessment load test
Args:
csv_path: Path to CSV file
start_index: Starting index (0-based, excluding header)
end_index: Ending index (exclusive, None = all remaining)
max_workers: Maximum concurrent workers
headless: Whether to run in headless mode
metrics_interval: Print metrics every N students
"""
# Load students with range
students = load_students_from_csv(csv_path, start_index, end_index)
if not students:
print("❌ No students loaded. Check CSV path and range.")
return None
load_test = AssessmentLoadTest("Complete Assessment Flow")
return load_test.run_load_test(
students=students,
max_workers=max_workers or len(students),
headless=headless,
metrics_interval=metrics_interval
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="World-Class Assessment Load Test")
parser.add_argument('--csv', type=str, required=True, help='Path to CSV file')
parser.add_argument('--start', type=int, default=0, help='Start index (0-based, excluding header)')
parser.add_argument('--end', type=int, default=None, help='End index (exclusive, None = all remaining)')
parser.add_argument('--workers', type=int, default=None, help='Max concurrent workers (default: all students)')
parser.add_argument('--headless', action='store_true', default=True, help='Run in headless mode')
parser.add_argument('--visible', action='store_true', help='Run in visible mode (overrides headless)')
parser.add_argument('--metrics-interval', type=int, default=10, help='Print metrics every N students')
parser.add_argument('--url', type=str, default=None, help='Frontend URL to use (e.g., http://localhost:3983 or https://cognitiveprism.tech4bizsolutions.com). If not provided, uses default from config.')
args = parser.parse_args()
# Override BASE_URL if --url is provided
if args.url:
# Remove trailing slash if present
custom_url = args.url.rstrip('/')
# Override BASE_URL in config before importing pages
import config.config as config_module
config_module.BASE_URL = custom_url
# Update all derived URLs
config_module.LOGIN_URL = f"{custom_url}/"
config_module.DASHBOARD_URL = f"{custom_url}/student/dashboard"
config_module.ASSESSMENTS_URL = f"{custom_url}/assessments"
config_module.PROFILE_EDITOR_URL = f"{custom_url}/student/profile-builder"
print(f"🌐 Using custom URL: {custom_url}")
else:
# Import to show default URL
from config.config import BASE_URL
print(f"🌐 Using default URL: {BASE_URL}")
run_assessment_load_test(
csv_path=args.csv,
start_index=args.start,
end_index=args.end,
max_workers=args.workers,
headless=not args.visible if args.visible else args.headless,
metrics_interval=args.metrics_interval
)