433 lines
16 KiB
Python
433 lines
16 KiB
Python
"""
|
|
Generic Load Testing Base Framework
|
|
|
|
A world-class, transparent, and absolute mechanism for load testing
|
|
with comprehensive tracking and reporting.
|
|
"""
|
|
from typing import Callable, Dict, Any, Optional
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import json
|
|
from pathlib import Path
|
|
import time
|
|
|
|
|
|
class TestStatus(Enum):
|
|
"""Test execution status"""
|
|
PASSED = "PASSED"
|
|
FAILED = "FAILED"
|
|
SKIPPED = "SKIPPED"
|
|
ERROR = "ERROR"
|
|
|
|
|
|
@dataclass
|
|
class PageMetrics:
|
|
"""Metrics for page load and rendering"""
|
|
page_load_time: float = 0.0
|
|
dom_ready_time: float = 0.0
|
|
scroll_smooth: bool = True
|
|
scroll_time: float = 0.0
|
|
elements_loaded: int = 0
|
|
render_complete: bool = False
|
|
errors: list = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class LoadTestResult:
|
|
"""Comprehensive result for a single user test"""
|
|
user_id: int
|
|
status: TestStatus
|
|
start_time: datetime
|
|
end_time: datetime
|
|
total_duration: float
|
|
page_metrics: PageMetrics = None
|
|
steps_completed: list = field(default_factory=list)
|
|
errors: list = field(default_factory=list)
|
|
screenshots: list = field(default_factory=list)
|
|
custom_metrics: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class LoadTestSummary:
|
|
"""Comprehensive summary of load test execution"""
|
|
test_name: str
|
|
total_users: int
|
|
successful: int
|
|
failed: int
|
|
skipped: int
|
|
success_rate: float
|
|
total_duration: float
|
|
start_time: datetime
|
|
end_time: datetime
|
|
|
|
# Performance metrics
|
|
avg_duration: float = 0.0
|
|
min_duration: float = 0.0
|
|
max_duration: float = 0.0
|
|
p50_duration: float = 0.0 # Median
|
|
p95_duration: float = 0.0 # 95th percentile
|
|
p99_duration: float = 0.0 # 99th percentile
|
|
|
|
# Page metrics aggregation
|
|
avg_page_load_time: float = 0.0
|
|
avg_scroll_time: float = 0.0
|
|
scroll_smooth_rate: float = 0.0
|
|
|
|
# Results
|
|
results: list = field(default_factory=list)
|
|
|
|
# Error analysis
|
|
error_summary: Dict[str, int] = field(default_factory=dict)
|
|
|
|
|
|
class LoadTestBase:
|
|
"""
|
|
Base class for generic load testing
|
|
|
|
Provides transparent tracking, comprehensive metrics, and flexible test execution.
|
|
"""
|
|
|
|
def __init__(self, test_name: str = "Load Test"):
|
|
"""
|
|
Initialize load test base
|
|
|
|
Args:
|
|
test_name: Name of the test for reporting
|
|
"""
|
|
self.test_name = test_name
|
|
self.results: list[LoadTestResult] = []
|
|
self.start_time: Optional[datetime] = None
|
|
self.end_time: Optional[datetime] = None
|
|
|
|
def measure_page_metrics(self, driver) -> PageMetrics:
|
|
"""
|
|
Measure comprehensive page metrics
|
|
|
|
Args:
|
|
driver: WebDriver instance
|
|
|
|
Returns:
|
|
PageMetrics: Comprehensive page metrics
|
|
"""
|
|
metrics = PageMetrics()
|
|
|
|
try:
|
|
# Measure page load time
|
|
navigation_start = driver.execute_script(
|
|
"return window.performance.timing.navigationStart"
|
|
)
|
|
load_complete = driver.execute_script(
|
|
"return window.performance.timing.loadEventEnd"
|
|
)
|
|
dom_ready = driver.execute_script(
|
|
"return window.performance.timing.domContentLoadedEventEnd"
|
|
)
|
|
|
|
if navigation_start and load_complete:
|
|
metrics.page_load_time = (load_complete - navigation_start) / 1000.0
|
|
|
|
if navigation_start and dom_ready:
|
|
metrics.dom_ready_time = (dom_ready - navigation_start) / 1000.0
|
|
|
|
# Count loaded elements
|
|
metrics.elements_loaded = driver.execute_script(
|
|
"return document.getElementsByTagName('*').length"
|
|
)
|
|
|
|
# Check if page is fully rendered
|
|
metrics.render_complete = driver.execute_script(
|
|
"return document.readyState === 'complete'"
|
|
)
|
|
|
|
# Test scroll smoothness
|
|
scroll_start = time.time()
|
|
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
|
time.sleep(0.5) # Brief wait for scroll
|
|
driver.execute_script("window.scrollTo(0, 0);")
|
|
scroll_end = time.time()
|
|
metrics.scroll_time = scroll_end - scroll_start
|
|
|
|
# Check if scroll was smooth (no errors during scroll)
|
|
metrics.scroll_smooth = True # If we got here, scroll worked
|
|
|
|
except Exception as e:
|
|
metrics.errors.append(f"Metrics collection error: {str(e)}")
|
|
metrics.scroll_smooth = False
|
|
|
|
return metrics
|
|
|
|
def execute_test_for_user(
|
|
self,
|
|
user_id: int,
|
|
test_function: Callable,
|
|
*args,
|
|
**kwargs
|
|
) -> LoadTestResult:
|
|
"""
|
|
Execute test for a single user with comprehensive tracking
|
|
|
|
Args:
|
|
user_id: Unique user identifier
|
|
test_function: Test function to execute
|
|
*args: Arguments for test function
|
|
**kwargs: Keyword arguments for test function (e.g., headless=True/False)
|
|
|
|
Returns:
|
|
LoadTestResult: Comprehensive test result
|
|
"""
|
|
start_time = datetime.now()
|
|
status = TestStatus.PASSED
|
|
errors = []
|
|
steps_completed = []
|
|
page_metrics = None
|
|
|
|
try:
|
|
# Execute the test function
|
|
result = test_function(user_id, *args, **kwargs)
|
|
|
|
# If result is a dict, extract information
|
|
if isinstance(result, dict):
|
|
driver = result.get('driver')
|
|
steps_completed = result.get('steps_completed', [])
|
|
|
|
# Measure page metrics if driver is available
|
|
if driver:
|
|
try:
|
|
page_metrics = self.measure_page_metrics(driver)
|
|
except Exception as e:
|
|
errors.append(f"Metrics collection failed: {str(e)}")
|
|
finally:
|
|
# Clean up driver after metrics collection
|
|
try:
|
|
from utils.driver_manager import DriverManager
|
|
DriverManager.quit_driver(driver)
|
|
|
|
# Clean up temporary user data directory if it exists
|
|
user_data_dir = result.get('user_data_dir')
|
|
if user_data_dir:
|
|
import os
|
|
import shutil
|
|
try:
|
|
if os.path.exists(user_data_dir):
|
|
shutil.rmtree(user_data_dir, ignore_errors=True)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
# If no errors occurred, test passed
|
|
status = TestStatus.PASSED
|
|
if not steps_completed:
|
|
steps_completed.append("Test completed successfully")
|
|
|
|
except Exception as e:
|
|
status = TestStatus.FAILED
|
|
# Get detailed error information
|
|
error_type = type(e).__name__
|
|
error_message = str(e) if str(e) else "Unknown error"
|
|
error_traceback = None
|
|
try:
|
|
import traceback
|
|
error_traceback = traceback.format_exc()
|
|
except:
|
|
pass
|
|
|
|
error_msg = f"User {user_id}: {error_type}: {error_message}"
|
|
errors.append(error_msg)
|
|
if error_traceback:
|
|
errors.append(f"Traceback: {error_traceback}")
|
|
|
|
if not steps_completed:
|
|
steps_completed.append(f"Failed at: {error_msg}")
|
|
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
result = LoadTestResult(
|
|
user_id=user_id,
|
|
status=status,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
total_duration=duration,
|
|
page_metrics=page_metrics,
|
|
steps_completed=steps_completed,
|
|
errors=errors
|
|
)
|
|
|
|
return result
|
|
|
|
def calculate_summary(self) -> LoadTestSummary:
|
|
"""
|
|
Calculate comprehensive summary from results
|
|
|
|
Returns:
|
|
LoadTestSummary: Comprehensive test summary
|
|
"""
|
|
if not self.results:
|
|
return None
|
|
|
|
successful = sum(1 for r in self.results if r.status == TestStatus.PASSED)
|
|
failed = sum(1 for r in self.results if r.status == TestStatus.FAILED)
|
|
skipped = sum(1 for r in self.results if r.status == TestStatus.SKIPPED)
|
|
|
|
durations = [r.total_duration for r in self.results if r.total_duration > 0]
|
|
durations_sorted = sorted(durations) if durations else []
|
|
|
|
# Calculate percentiles
|
|
def percentile(data, p):
|
|
if not data:
|
|
return 0.0
|
|
k = (len(data) - 1) * p
|
|
f = int(k)
|
|
c = k - f
|
|
if f + 1 < len(data):
|
|
return data[f] + c * (data[f + 1] - data[f])
|
|
return data[f]
|
|
|
|
# Page metrics aggregation
|
|
page_load_times = [
|
|
r.page_metrics.page_load_time
|
|
for r in self.results
|
|
if r.page_metrics and r.page_metrics.page_load_time > 0
|
|
]
|
|
|
|
scroll_times = [
|
|
r.page_metrics.scroll_time
|
|
for r in self.results
|
|
if r.page_metrics and r.page_metrics.scroll_time > 0
|
|
]
|
|
|
|
smooth_scrolls = sum(
|
|
1 for r in self.results
|
|
if r.page_metrics and r.page_metrics.scroll_smooth
|
|
)
|
|
|
|
# Error analysis
|
|
error_summary = {}
|
|
for result in self.results:
|
|
for error in result.errors:
|
|
error_type = error.split(':')[0] if ':' in error else "Unknown"
|
|
error_summary[error_type] = error_summary.get(error_type, 0) + 1
|
|
|
|
summary = LoadTestSummary(
|
|
test_name=self.test_name,
|
|
total_users=len(self.results),
|
|
successful=successful,
|
|
failed=failed,
|
|
skipped=skipped,
|
|
success_rate=(successful / len(self.results) * 100) if self.results else 0,
|
|
total_duration=(self.end_time - self.start_time).total_seconds() if self.end_time and self.start_time else 0,
|
|
start_time=self.start_time,
|
|
end_time=self.end_time,
|
|
avg_duration=sum(durations) / len(durations) if durations else 0,
|
|
min_duration=min(durations) if durations else 0,
|
|
max_duration=max(durations) if durations else 0,
|
|
p50_duration=percentile(durations_sorted, 0.50),
|
|
p95_duration=percentile(durations_sorted, 0.95),
|
|
p99_duration=percentile(durations_sorted, 0.99),
|
|
avg_page_load_time=sum(page_load_times) / len(page_load_times) if page_load_times else 0,
|
|
avg_scroll_time=sum(scroll_times) / len(scroll_times) if scroll_times else 0,
|
|
scroll_smooth_rate=(smooth_scrolls / len(self.results) * 100) if self.results else 0,
|
|
results=self.results,
|
|
error_summary=error_summary
|
|
)
|
|
|
|
return summary
|
|
|
|
def print_summary(self, summary: LoadTestSummary):
|
|
"""Print comprehensive, transparent summary"""
|
|
print(f"\n{'='*80}")
|
|
print(f"LOAD TEST SUMMARY: {summary.test_name}")
|
|
print(f"{'='*80}")
|
|
print(f"\n📊 OVERALL METRICS")
|
|
print(f" Total Users: {summary.total_users}")
|
|
print(f" Successful: {summary.successful} ({summary.success_rate:.2f}%)")
|
|
print(f" Failed: {summary.failed}")
|
|
print(f" Skipped: {summary.skipped}")
|
|
print(f" Total Duration: {summary.total_duration:.2f} seconds")
|
|
|
|
print(f"\n⏱️ PERFORMANCE METRICS")
|
|
print(f" Average Duration: {summary.avg_duration:.2f} seconds")
|
|
print(f" Min Duration: {summary.min_duration:.2f} seconds")
|
|
print(f" Max Duration: {summary.max_duration:.2f} seconds")
|
|
print(f" Median (P50): {summary.p50_duration:.2f} seconds")
|
|
print(f" 95th Percentile (P95): {summary.p95_duration:.2f} seconds")
|
|
print(f" 99th Percentile (P99): {summary.p99_duration:.2f} seconds")
|
|
|
|
print(f"\n📄 PAGE METRICS")
|
|
print(f" Avg Page Load Time: {summary.avg_page_load_time:.2f} seconds")
|
|
print(f" Avg Scroll Time: {summary.avg_scroll_time:.2f} seconds")
|
|
print(f" Scroll Smooth Rate: {summary.scroll_smooth_rate:.2f}%")
|
|
|
|
if summary.error_summary:
|
|
print(f"\n❌ ERROR ANALYSIS")
|
|
for error_type, count in summary.error_summary.items():
|
|
print(f" {error_type}: {count}")
|
|
|
|
print(f"\n🕐 TIMING")
|
|
print(f" Start Time: {summary.start_time}")
|
|
print(f" End Time: {summary.end_time}")
|
|
print(f"{'='*80}\n")
|
|
|
|
def save_results(self, summary: LoadTestSummary, output_dir: Path = None):
|
|
"""Save comprehensive results to JSON"""
|
|
if output_dir is None:
|
|
output_dir = Path("reports/load_tests")
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = output_dir / f"load_test_{summary.test_name.replace(' ', '_')}_{summary.total_users}users_{timestamp}.json"
|
|
|
|
# Convert to JSON-serializable format
|
|
data = {
|
|
"test_name": summary.test_name,
|
|
"summary": {
|
|
"total_users": summary.total_users,
|
|
"successful": summary.successful,
|
|
"failed": summary.failed,
|
|
"skipped": summary.skipped,
|
|
"success_rate": summary.success_rate,
|
|
"total_duration": summary.total_duration,
|
|
"avg_duration": summary.avg_duration,
|
|
"min_duration": summary.min_duration,
|
|
"max_duration": summary.max_duration,
|
|
"p50_duration": summary.p50_duration,
|
|
"p95_duration": summary.p95_duration,
|
|
"p99_duration": summary.p99_duration,
|
|
"avg_page_load_time": summary.avg_page_load_time,
|
|
"avg_scroll_time": summary.avg_scroll_time,
|
|
"scroll_smooth_rate": summary.scroll_smooth_rate,
|
|
"start_time": summary.start_time.isoformat(),
|
|
"end_time": summary.end_time.isoformat(),
|
|
},
|
|
"error_summary": summary.error_summary,
|
|
"results": [
|
|
{
|
|
"user_id": r.user_id,
|
|
"status": r.status.value,
|
|
"duration": r.total_duration,
|
|
"start_time": r.start_time.isoformat(),
|
|
"end_time": r.end_time.isoformat(),
|
|
"steps_completed": r.steps_completed,
|
|
"errors": r.errors,
|
|
"page_metrics": {
|
|
"page_load_time": r.page_metrics.page_load_time if r.page_metrics else 0,
|
|
"dom_ready_time": r.page_metrics.dom_ready_time if r.page_metrics else 0,
|
|
"scroll_smooth": r.page_metrics.scroll_smooth if r.page_metrics else False,
|
|
"scroll_time": r.page_metrics.scroll_time if r.page_metrics else 0,
|
|
"elements_loaded": r.page_metrics.elements_loaded if r.page_metrics else 0,
|
|
"render_complete": r.page_metrics.render_complete if r.page_metrics else False,
|
|
} if r.page_metrics else None,
|
|
}
|
|
for r in summary.results
|
|
]
|
|
}
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
print(f"📁 Results saved to: {filename}")
|
|
return filename
|
|
|