CP_AUTOMATION/utils/load_test_base.py
2025-12-16 13:04:32 +05:30

433 lines
16 KiB
Python

"""
Generic Load Testing Base Framework
A world-class, transparent, and absolute mechanism for load testing
with comprehensive tracking and reporting.
"""
from typing import Callable, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
from pathlib import Path
import time
class TestStatus(Enum):
"""Test execution status"""
PASSED = "PASSED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"
ERROR = "ERROR"
@dataclass
class PageMetrics:
"""Metrics for page load and rendering"""
page_load_time: float = 0.0
dom_ready_time: float = 0.0
scroll_smooth: bool = True
scroll_time: float = 0.0
elements_loaded: int = 0
render_complete: bool = False
errors: list = field(default_factory=list)
@dataclass
class LoadTestResult:
"""Comprehensive result for a single user test"""
user_id: int
status: TestStatus
start_time: datetime
end_time: datetime
total_duration: float
page_metrics: PageMetrics = None
steps_completed: list = field(default_factory=list)
errors: list = field(default_factory=list)
screenshots: list = field(default_factory=list)
custom_metrics: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LoadTestSummary:
"""Comprehensive summary of load test execution"""
test_name: str
total_users: int
successful: int
failed: int
skipped: int
success_rate: float
total_duration: float
start_time: datetime
end_time: datetime
# Performance metrics
avg_duration: float = 0.0
min_duration: float = 0.0
max_duration: float = 0.0
p50_duration: float = 0.0 # Median
p95_duration: float = 0.0 # 95th percentile
p99_duration: float = 0.0 # 99th percentile
# Page metrics aggregation
avg_page_load_time: float = 0.0
avg_scroll_time: float = 0.0
scroll_smooth_rate: float = 0.0
# Results
results: list = field(default_factory=list)
# Error analysis
error_summary: Dict[str, int] = field(default_factory=dict)
class LoadTestBase:
"""
Base class for generic load testing
Provides transparent tracking, comprehensive metrics, and flexible test execution.
"""
def __init__(self, test_name: str = "Load Test"):
"""
Initialize load test base
Args:
test_name: Name of the test for reporting
"""
self.test_name = test_name
self.results: list[LoadTestResult] = []
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
def measure_page_metrics(self, driver) -> PageMetrics:
"""
Measure comprehensive page metrics
Args:
driver: WebDriver instance
Returns:
PageMetrics: Comprehensive page metrics
"""
metrics = PageMetrics()
try:
# Measure page load time
navigation_start = driver.execute_script(
"return window.performance.timing.navigationStart"
)
load_complete = driver.execute_script(
"return window.performance.timing.loadEventEnd"
)
dom_ready = driver.execute_script(
"return window.performance.timing.domContentLoadedEventEnd"
)
if navigation_start and load_complete:
metrics.page_load_time = (load_complete - navigation_start) / 1000.0
if navigation_start and dom_ready:
metrics.dom_ready_time = (dom_ready - navigation_start) / 1000.0
# Count loaded elements
metrics.elements_loaded = driver.execute_script(
"return document.getElementsByTagName('*').length"
)
# Check if page is fully rendered
metrics.render_complete = driver.execute_script(
"return document.readyState === 'complete'"
)
# Test scroll smoothness
scroll_start = time.time()
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(0.5) # Brief wait for scroll
driver.execute_script("window.scrollTo(0, 0);")
scroll_end = time.time()
metrics.scroll_time = scroll_end - scroll_start
# Check if scroll was smooth (no errors during scroll)
metrics.scroll_smooth = True # If we got here, scroll worked
except Exception as e:
metrics.errors.append(f"Metrics collection error: {str(e)}")
metrics.scroll_smooth = False
return metrics
def execute_test_for_user(
self,
user_id: int,
test_function: Callable,
*args,
**kwargs
) -> LoadTestResult:
"""
Execute test for a single user with comprehensive tracking
Args:
user_id: Unique user identifier
test_function: Test function to execute
*args: Arguments for test function
**kwargs: Keyword arguments for test function (e.g., headless=True/False)
Returns:
LoadTestResult: Comprehensive test result
"""
start_time = datetime.now()
status = TestStatus.PASSED
errors = []
steps_completed = []
page_metrics = None
try:
# Execute the test function
result = test_function(user_id, *args, **kwargs)
# If result is a dict, extract information
if isinstance(result, dict):
driver = result.get('driver')
steps_completed = result.get('steps_completed', [])
# Measure page metrics if driver is available
if driver:
try:
page_metrics = self.measure_page_metrics(driver)
except Exception as e:
errors.append(f"Metrics collection failed: {str(e)}")
finally:
# Clean up driver after metrics collection
try:
from utils.driver_manager import DriverManager
DriverManager.quit_driver(driver)
# Clean up temporary user data directory if it exists
user_data_dir = result.get('user_data_dir')
if user_data_dir:
import os
import shutil
try:
if os.path.exists(user_data_dir):
shutil.rmtree(user_data_dir, ignore_errors=True)
except:
pass
except:
pass
# If no errors occurred, test passed
status = TestStatus.PASSED
if not steps_completed:
steps_completed.append("Test completed successfully")
except Exception as e:
status = TestStatus.FAILED
# Get detailed error information
error_type = type(e).__name__
error_message = str(e) if str(e) else "Unknown error"
error_traceback = None
try:
import traceback
error_traceback = traceback.format_exc()
except:
pass
error_msg = f"User {user_id}: {error_type}: {error_message}"
errors.append(error_msg)
if error_traceback:
errors.append(f"Traceback: {error_traceback}")
if not steps_completed:
steps_completed.append(f"Failed at: {error_msg}")
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
result = LoadTestResult(
user_id=user_id,
status=status,
start_time=start_time,
end_time=end_time,
total_duration=duration,
page_metrics=page_metrics,
steps_completed=steps_completed,
errors=errors
)
return result
def calculate_summary(self) -> LoadTestSummary:
"""
Calculate comprehensive summary from results
Returns:
LoadTestSummary: Comprehensive test summary
"""
if not self.results:
return None
successful = sum(1 for r in self.results if r.status == TestStatus.PASSED)
failed = sum(1 for r in self.results if r.status == TestStatus.FAILED)
skipped = sum(1 for r in self.results if r.status == TestStatus.SKIPPED)
durations = [r.total_duration for r in self.results if r.total_duration > 0]
durations_sorted = sorted(durations) if durations else []
# Calculate percentiles
def percentile(data, p):
if not data:
return 0.0
k = (len(data) - 1) * p
f = int(k)
c = k - f
if f + 1 < len(data):
return data[f] + c * (data[f + 1] - data[f])
return data[f]
# Page metrics aggregation
page_load_times = [
r.page_metrics.page_load_time
for r in self.results
if r.page_metrics and r.page_metrics.page_load_time > 0
]
scroll_times = [
r.page_metrics.scroll_time
for r in self.results
if r.page_metrics and r.page_metrics.scroll_time > 0
]
smooth_scrolls = sum(
1 for r in self.results
if r.page_metrics and r.page_metrics.scroll_smooth
)
# Error analysis
error_summary = {}
for result in self.results:
for error in result.errors:
error_type = error.split(':')[0] if ':' in error else "Unknown"
error_summary[error_type] = error_summary.get(error_type, 0) + 1
summary = LoadTestSummary(
test_name=self.test_name,
total_users=len(self.results),
successful=successful,
failed=failed,
skipped=skipped,
success_rate=(successful / len(self.results) * 100) if self.results else 0,
total_duration=(self.end_time - self.start_time).total_seconds() if self.end_time and self.start_time else 0,
start_time=self.start_time,
end_time=self.end_time,
avg_duration=sum(durations) / len(durations) if durations else 0,
min_duration=min(durations) if durations else 0,
max_duration=max(durations) if durations else 0,
p50_duration=percentile(durations_sorted, 0.50),
p95_duration=percentile(durations_sorted, 0.95),
p99_duration=percentile(durations_sorted, 0.99),
avg_page_load_time=sum(page_load_times) / len(page_load_times) if page_load_times else 0,
avg_scroll_time=sum(scroll_times) / len(scroll_times) if scroll_times else 0,
scroll_smooth_rate=(smooth_scrolls / len(self.results) * 100) if self.results else 0,
results=self.results,
error_summary=error_summary
)
return summary
def print_summary(self, summary: LoadTestSummary):
"""Print comprehensive, transparent summary"""
print(f"\n{'='*80}")
print(f"LOAD TEST SUMMARY: {summary.test_name}")
print(f"{'='*80}")
print(f"\n📊 OVERALL METRICS")
print(f" Total Users: {summary.total_users}")
print(f" Successful: {summary.successful} ({summary.success_rate:.2f}%)")
print(f" Failed: {summary.failed}")
print(f" Skipped: {summary.skipped}")
print(f" Total Duration: {summary.total_duration:.2f} seconds")
print(f"\n⏱️ PERFORMANCE METRICS")
print(f" Average Duration: {summary.avg_duration:.2f} seconds")
print(f" Min Duration: {summary.min_duration:.2f} seconds")
print(f" Max Duration: {summary.max_duration:.2f} seconds")
print(f" Median (P50): {summary.p50_duration:.2f} seconds")
print(f" 95th Percentile (P95): {summary.p95_duration:.2f} seconds")
print(f" 99th Percentile (P99): {summary.p99_duration:.2f} seconds")
print(f"\n📄 PAGE METRICS")
print(f" Avg Page Load Time: {summary.avg_page_load_time:.2f} seconds")
print(f" Avg Scroll Time: {summary.avg_scroll_time:.2f} seconds")
print(f" Scroll Smooth Rate: {summary.scroll_smooth_rate:.2f}%")
if summary.error_summary:
print(f"\n❌ ERROR ANALYSIS")
for error_type, count in summary.error_summary.items():
print(f" {error_type}: {count}")
print(f"\n🕐 TIMING")
print(f" Start Time: {summary.start_time}")
print(f" End Time: {summary.end_time}")
print(f"{'='*80}\n")
def save_results(self, summary: LoadTestSummary, output_dir: Path = None):
"""Save comprehensive results to JSON"""
if output_dir is None:
output_dir = Path("reports/load_tests")
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = output_dir / f"load_test_{summary.test_name.replace(' ', '_')}_{summary.total_users}users_{timestamp}.json"
# Convert to JSON-serializable format
data = {
"test_name": summary.test_name,
"summary": {
"total_users": summary.total_users,
"successful": summary.successful,
"failed": summary.failed,
"skipped": summary.skipped,
"success_rate": summary.success_rate,
"total_duration": summary.total_duration,
"avg_duration": summary.avg_duration,
"min_duration": summary.min_duration,
"max_duration": summary.max_duration,
"p50_duration": summary.p50_duration,
"p95_duration": summary.p95_duration,
"p99_duration": summary.p99_duration,
"avg_page_load_time": summary.avg_page_load_time,
"avg_scroll_time": summary.avg_scroll_time,
"scroll_smooth_rate": summary.scroll_smooth_rate,
"start_time": summary.start_time.isoformat(),
"end_time": summary.end_time.isoformat(),
},
"error_summary": summary.error_summary,
"results": [
{
"user_id": r.user_id,
"status": r.status.value,
"duration": r.total_duration,
"start_time": r.start_time.isoformat(),
"end_time": r.end_time.isoformat(),
"steps_completed": r.steps_completed,
"errors": r.errors,
"page_metrics": {
"page_load_time": r.page_metrics.page_load_time if r.page_metrics else 0,
"dom_ready_time": r.page_metrics.dom_ready_time if r.page_metrics else 0,
"scroll_smooth": r.page_metrics.scroll_smooth if r.page_metrics else False,
"scroll_time": r.page_metrics.scroll_time if r.page_metrics else 0,
"elements_loaded": r.page_metrics.elements_loaded if r.page_metrics else 0,
"render_complete": r.page_metrics.render_complete if r.page_metrics else False,
} if r.page_metrics else None,
}
for r in summary.results
]
}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
print(f"📁 Results saved to: {filename}")
return filename