238 lines
7.8 KiB
Python
238 lines
7.8 KiB
Python
"""
|
|
Load Testing Runner for Cognitive Prism Platform
|
|
|
|
This module provides functionality to run load tests with multiple concurrent users
|
|
simulating students performing tests simultaneously.
|
|
"""
|
|
import time
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import List, Dict, Callable
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class LoadTestResult:
|
|
"""Result of a single test execution"""
|
|
user_id: int
|
|
success: bool
|
|
duration: float
|
|
error: str = None
|
|
start_time: datetime = None
|
|
end_time: datetime = None
|
|
|
|
|
|
@dataclass
|
|
class LoadTestSummary:
|
|
"""Summary of load test execution"""
|
|
total_users: int
|
|
successful: int
|
|
failed: int
|
|
total_duration: float
|
|
avg_duration: float
|
|
min_duration: float
|
|
max_duration: float
|
|
success_rate: float
|
|
start_time: datetime
|
|
end_time: datetime
|
|
results: List[LoadTestResult]
|
|
|
|
|
|
class LoadTestRunner:
|
|
"""Runner for executing load tests with multiple concurrent users"""
|
|
|
|
def __init__(self, test_function: Callable, max_workers: int = None):
|
|
"""
|
|
Initialize Load Test Runner
|
|
|
|
Args:
|
|
test_function: Function to execute for each user (should accept user_id parameter)
|
|
max_workers: Maximum number of concurrent workers (default: None = unlimited)
|
|
"""
|
|
self.test_function = test_function
|
|
self.max_workers = max_workers
|
|
self.results: List[LoadTestResult] = []
|
|
self.lock = threading.Lock()
|
|
|
|
def run_single_user(self, user_id: int) -> LoadTestResult:
|
|
"""
|
|
Run test for a single user
|
|
|
|
Args:
|
|
user_id: Unique identifier for the user
|
|
|
|
Returns:
|
|
LoadTestResult: Result of the test execution
|
|
"""
|
|
start_time = datetime.now()
|
|
success = False
|
|
error = None
|
|
|
|
try:
|
|
# Execute the test function
|
|
self.test_function(user_id)
|
|
success = True
|
|
except Exception as e:
|
|
error = str(e)
|
|
success = False
|
|
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
result = LoadTestResult(
|
|
user_id=user_id,
|
|
success=success,
|
|
duration=duration,
|
|
error=error,
|
|
start_time=start_time,
|
|
end_time=end_time
|
|
)
|
|
|
|
with self.lock:
|
|
self.results.append(result)
|
|
|
|
return result
|
|
|
|
def run_load_test(self, num_users: int, ramp_up_time: int = 0) -> LoadTestSummary:
|
|
"""
|
|
Run load test with specified number of concurrent users
|
|
|
|
Args:
|
|
num_users: Number of concurrent users to simulate
|
|
ramp_up_time: Time in seconds to gradually ramp up users (0 = all at once)
|
|
|
|
Returns:
|
|
LoadTestSummary: Summary of the load test execution
|
|
"""
|
|
print(f"\n{'='*60}")
|
|
print(f"Starting Load Test: {num_users} concurrent users")
|
|
print(f"{'='*60}\n")
|
|
|
|
start_time = datetime.now()
|
|
self.results = []
|
|
|
|
# Calculate delay between user starts for ramp-up
|
|
delay_per_user = ramp_up_time / num_users if ramp_up_time > 0 else 0
|
|
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
futures = []
|
|
|
|
for user_id in range(1, num_users + 1):
|
|
# Ramp up delay
|
|
if delay_per_user > 0:
|
|
time.sleep(delay_per_user)
|
|
|
|
# Submit test execution
|
|
future = executor.submit(self.run_single_user, user_id)
|
|
futures.append(future)
|
|
|
|
if user_id % 10 == 0:
|
|
print(f"Started {user_id}/{num_users} users...")
|
|
|
|
# Wait for all to complete
|
|
print(f"\nWaiting for all {num_users} users to complete...\n")
|
|
for future in as_completed(futures):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
print(f"Error in future: {e}")
|
|
|
|
end_time = datetime.now()
|
|
total_duration = (end_time - start_time).total_seconds()
|
|
|
|
# Calculate summary
|
|
successful = sum(1 for r in self.results if r.success)
|
|
failed = len(self.results) - successful
|
|
durations = [r.duration for r in self.results if r.duration > 0]
|
|
|
|
summary = LoadTestSummary(
|
|
total_users=num_users,
|
|
successful=successful,
|
|
failed=failed,
|
|
total_duration=total_duration,
|
|
avg_duration=sum(durations) / len(durations) if durations else 0,
|
|
min_duration=min(durations) if durations else 0,
|
|
max_duration=max(durations) if durations else 0,
|
|
success_rate=(successful / num_users * 100) if num_users > 0 else 0,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
results=self.results
|
|
)
|
|
|
|
return summary
|
|
|
|
def print_summary(self, summary: LoadTestSummary):
|
|
"""Print load test summary"""
|
|
print(f"\n{'='*60}")
|
|
print("LOAD TEST SUMMARY")
|
|
print(f"{'='*60}")
|
|
print(f"Total Users: {summary.total_users}")
|
|
print(f"Successful: {summary.successful}")
|
|
print(f"Failed: {summary.failed}")
|
|
print(f"Success Rate: {summary.success_rate:.2f}%")
|
|
print(f"Total Duration: {summary.total_duration:.2f} seconds")
|
|
print(f"Average Duration: {summary.avg_duration:.2f} seconds")
|
|
print(f"Min Duration: {summary.min_duration:.2f} seconds")
|
|
print(f"Max Duration: {summary.max_duration:.2f} seconds")
|
|
print(f"Start Time: {summary.start_time}")
|
|
print(f"End Time: {summary.end_time}")
|
|
print(f"{'='*60}\n")
|
|
|
|
def save_results(self, summary: LoadTestSummary, output_dir: Path = None):
|
|
"""
|
|
Save load test results to JSON file
|
|
|
|
Args:
|
|
summary: LoadTestSummary to save
|
|
output_dir: Directory to save results (default: reports/load_tests)
|
|
"""
|
|
if output_dir is None:
|
|
output_dir = Path("reports/load_tests")
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = output_dir / f"load_test_{summary.total_users}users_{timestamp}.json"
|
|
|
|
# Convert to dict for JSON serialization
|
|
data = {
|
|
"summary": {
|
|
"total_users": summary.total_users,
|
|
"successful": summary.successful,
|
|
"failed": summary.failed,
|
|
"total_duration": summary.total_duration,
|
|
"avg_duration": summary.avg_duration,
|
|
"min_duration": summary.min_duration,
|
|
"max_duration": summary.max_duration,
|
|
"success_rate": summary.success_rate,
|
|
"start_time": summary.start_time.isoformat(),
|
|
"end_time": summary.end_time.isoformat(),
|
|
},
|
|
"results": [
|
|
{
|
|
"user_id": r.user_id,
|
|
"success": r.success,
|
|
"duration": r.duration,
|
|
"error": r.error,
|
|
"start_time": r.start_time.isoformat() if r.start_time else None,
|
|
"end_time": r.end_time.isoformat() if r.end_time else None,
|
|
}
|
|
for r in summary.results
|
|
]
|
|
}
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
print(f"Results saved to: {filename}")
|
|
return filename
|
|
|
|
|
|
|
|
|
|
|
|
|