""" Load Testing Runner for Cognitive Prism Platform This module provides functionality to run load tests with multiple concurrent users simulating students performing tests simultaneously. """ import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Callable from dataclasses import dataclass from datetime import datetime import json from pathlib import Path @dataclass class LoadTestResult: """Result of a single test execution""" user_id: int success: bool duration: float error: str = None start_time: datetime = None end_time: datetime = None @dataclass class LoadTestSummary: """Summary of load test execution""" total_users: int successful: int failed: int total_duration: float avg_duration: float min_duration: float max_duration: float success_rate: float start_time: datetime end_time: datetime results: List[LoadTestResult] class LoadTestRunner: """Runner for executing load tests with multiple concurrent users""" def __init__(self, test_function: Callable, max_workers: int = None): """ Initialize Load Test Runner Args: test_function: Function to execute for each user (should accept user_id parameter) max_workers: Maximum number of concurrent workers (default: None = unlimited) """ self.test_function = test_function self.max_workers = max_workers self.results: List[LoadTestResult] = [] self.lock = threading.Lock() def run_single_user(self, user_id: int) -> LoadTestResult: """ Run test for a single user Args: user_id: Unique identifier for the user Returns: LoadTestResult: Result of the test execution """ start_time = datetime.now() success = False error = None try: # Execute the test function self.test_function(user_id) success = True except Exception as e: error = str(e) success = False end_time = datetime.now() duration = (end_time - start_time).total_seconds() result = LoadTestResult( user_id=user_id, success=success, duration=duration, error=error, start_time=start_time, end_time=end_time ) with self.lock: self.results.append(result) return result def run_load_test(self, num_users: int, ramp_up_time: int = 0) -> LoadTestSummary: """ Run load test with specified number of concurrent users Args: num_users: Number of concurrent users to simulate ramp_up_time: Time in seconds to gradually ramp up users (0 = all at once) Returns: LoadTestSummary: Summary of the load test execution """ print(f"\n{'='*60}") print(f"Starting Load Test: {num_users} concurrent users") print(f"{'='*60}\n") start_time = datetime.now() self.results = [] # Calculate delay between user starts for ramp-up delay_per_user = ramp_up_time / num_users if ramp_up_time > 0 else 0 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for user_id in range(1, num_users + 1): # Ramp up delay if delay_per_user > 0: time.sleep(delay_per_user) # Submit test execution future = executor.submit(self.run_single_user, user_id) futures.append(future) if user_id % 10 == 0: print(f"Started {user_id}/{num_users} users...") # Wait for all to complete print(f"\nWaiting for all {num_users} users to complete...\n") for future in as_completed(futures): try: future.result() except Exception as e: print(f"Error in future: {e}") end_time = datetime.now() total_duration = (end_time - start_time).total_seconds() # Calculate summary successful = sum(1 for r in self.results if r.success) failed = len(self.results) - successful durations = [r.duration for r in self.results if r.duration > 0] summary = LoadTestSummary( total_users=num_users, successful=successful, failed=failed, total_duration=total_duration, avg_duration=sum(durations) / len(durations) if durations else 0, min_duration=min(durations) if durations else 0, max_duration=max(durations) if durations else 0, success_rate=(successful / num_users * 100) if num_users > 0 else 0, start_time=start_time, end_time=end_time, results=self.results ) return summary def print_summary(self, summary: LoadTestSummary): """Print load test summary""" print(f"\n{'='*60}") print("LOAD TEST SUMMARY") print(f"{'='*60}") print(f"Total Users: {summary.total_users}") print(f"Successful: {summary.successful}") print(f"Failed: {summary.failed}") print(f"Success Rate: {summary.success_rate:.2f}%") print(f"Total Duration: {summary.total_duration:.2f} seconds") print(f"Average Duration: {summary.avg_duration:.2f} seconds") print(f"Min Duration: {summary.min_duration:.2f} seconds") print(f"Max Duration: {summary.max_duration:.2f} seconds") print(f"Start Time: {summary.start_time}") print(f"End Time: {summary.end_time}") print(f"{'='*60}\n") def save_results(self, summary: LoadTestSummary, output_dir: Path = None): """ Save load test results to JSON file Args: summary: LoadTestSummary to save output_dir: Directory to save results (default: reports/load_tests) """ if output_dir is None: output_dir = Path("reports/load_tests") output_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = output_dir / f"load_test_{summary.total_users}users_{timestamp}.json" # Convert to dict for JSON serialization data = { "summary": { "total_users": summary.total_users, "successful": summary.successful, "failed": summary.failed, "total_duration": summary.total_duration, "avg_duration": summary.avg_duration, "min_duration": summary.min_duration, "max_duration": summary.max_duration, "success_rate": summary.success_rate, "start_time": summary.start_time.isoformat(), "end_time": summary.end_time.isoformat(), }, "results": [ { "user_id": r.user_id, "success": r.success, "duration": r.duration, "error": r.error, "start_time": r.start_time.isoformat() if r.start_time else None, "end_time": r.end_time.isoformat() if r.end_time else None, } for r in summary.results ] } with open(filename, 'w') as f: json.dump(data, f, indent=2) print(f"Results saved to: {filename}") return filename