CP_AUTOMATION/utils/load_test_runner.py
2025-12-12 19:54:54 +05:30

238 lines
7.8 KiB
Python

"""
Load Testing Runner for Cognitive Prism Platform
This module provides functionality to run load tests with multiple concurrent users
simulating students performing tests simultaneously.
"""
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Callable
from dataclasses import dataclass
from datetime import datetime
import json
from pathlib import Path
@dataclass
class LoadTestResult:
"""Result of a single test execution"""
user_id: int
success: bool
duration: float
error: str = None
start_time: datetime = None
end_time: datetime = None
@dataclass
class LoadTestSummary:
"""Summary of load test execution"""
total_users: int
successful: int
failed: int
total_duration: float
avg_duration: float
min_duration: float
max_duration: float
success_rate: float
start_time: datetime
end_time: datetime
results: List[LoadTestResult]
class LoadTestRunner:
"""Runner for executing load tests with multiple concurrent users"""
def __init__(self, test_function: Callable, max_workers: int = None):
"""
Initialize Load Test Runner
Args:
test_function: Function to execute for each user (should accept user_id parameter)
max_workers: Maximum number of concurrent workers (default: None = unlimited)
"""
self.test_function = test_function
self.max_workers = max_workers
self.results: List[LoadTestResult] = []
self.lock = threading.Lock()
def run_single_user(self, user_id: int) -> LoadTestResult:
"""
Run test for a single user
Args:
user_id: Unique identifier for the user
Returns:
LoadTestResult: Result of the test execution
"""
start_time = datetime.now()
success = False
error = None
try:
# Execute the test function
self.test_function(user_id)
success = True
except Exception as e:
error = str(e)
success = False
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
result = LoadTestResult(
user_id=user_id,
success=success,
duration=duration,
error=error,
start_time=start_time,
end_time=end_time
)
with self.lock:
self.results.append(result)
return result
def run_load_test(self, num_users: int, ramp_up_time: int = 0) -> LoadTestSummary:
"""
Run load test with specified number of concurrent users
Args:
num_users: Number of concurrent users to simulate
ramp_up_time: Time in seconds to gradually ramp up users (0 = all at once)
Returns:
LoadTestSummary: Summary of the load test execution
"""
print(f"\n{'='*60}")
print(f"Starting Load Test: {num_users} concurrent users")
print(f"{'='*60}\n")
start_time = datetime.now()
self.results = []
# Calculate delay between user starts for ramp-up
delay_per_user = ramp_up_time / num_users if ramp_up_time > 0 else 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for user_id in range(1, num_users + 1):
# Ramp up delay
if delay_per_user > 0:
time.sleep(delay_per_user)
# Submit test execution
future = executor.submit(self.run_single_user, user_id)
futures.append(future)
if user_id % 10 == 0:
print(f"Started {user_id}/{num_users} users...")
# Wait for all to complete
print(f"\nWaiting for all {num_users} users to complete...\n")
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Error in future: {e}")
end_time = datetime.now()
total_duration = (end_time - start_time).total_seconds()
# Calculate summary
successful = sum(1 for r in self.results if r.success)
failed = len(self.results) - successful
durations = [r.duration for r in self.results if r.duration > 0]
summary = LoadTestSummary(
total_users=num_users,
successful=successful,
failed=failed,
total_duration=total_duration,
avg_duration=sum(durations) / len(durations) if durations else 0,
min_duration=min(durations) if durations else 0,
max_duration=max(durations) if durations else 0,
success_rate=(successful / num_users * 100) if num_users > 0 else 0,
start_time=start_time,
end_time=end_time,
results=self.results
)
return summary
def print_summary(self, summary: LoadTestSummary):
"""Print load test summary"""
print(f"\n{'='*60}")
print("LOAD TEST SUMMARY")
print(f"{'='*60}")
print(f"Total Users: {summary.total_users}")
print(f"Successful: {summary.successful}")
print(f"Failed: {summary.failed}")
print(f"Success Rate: {summary.success_rate:.2f}%")
print(f"Total Duration: {summary.total_duration:.2f} seconds")
print(f"Average Duration: {summary.avg_duration:.2f} seconds")
print(f"Min Duration: {summary.min_duration:.2f} seconds")
print(f"Max Duration: {summary.max_duration:.2f} seconds")
print(f"Start Time: {summary.start_time}")
print(f"End Time: {summary.end_time}")
print(f"{'='*60}\n")
def save_results(self, summary: LoadTestSummary, output_dir: Path = None):
"""
Save load test results to JSON file
Args:
summary: LoadTestSummary to save
output_dir: Directory to save results (default: reports/load_tests)
"""
if output_dir is None:
output_dir = Path("reports/load_tests")
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = output_dir / f"load_test_{summary.total_users}users_{timestamp}.json"
# Convert to dict for JSON serialization
data = {
"summary": {
"total_users": summary.total_users,
"successful": summary.successful,
"failed": summary.failed,
"total_duration": summary.total_duration,
"avg_duration": summary.avg_duration,
"min_duration": summary.min_duration,
"max_duration": summary.max_duration,
"success_rate": summary.success_rate,
"start_time": summary.start_time.isoformat(),
"end_time": summary.end_time.isoformat(),
},
"results": [
{
"user_id": r.user_id,
"success": r.success,
"duration": r.duration,
"error": r.error,
"start_time": r.start_time.isoformat() if r.start_time else None,
"end_time": r.end_time.isoformat() if r.end_time else None,
}
for r in summary.results
]
}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
print(f"Results saved to: {filename}")
return filename