codenuk_backend_mine/services/ai-analysis-service/enhanced_analyzer.py
2025-10-31 08:34:11 +05:30

806 lines
41 KiB
Python

#!/usr/bin/env python3
"""
Enhanced Analyzer Integration
Seamlessly integrates enhanced chunking with existing AI Analysis Service.
Author: Senior Engineer (20+ years experience)
Version: 1.0.0
"""
import asyncio
import logging
import re
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
# Import existing classes (maintain compatibility)
from ai_analyze import EnhancedGitHubAnalyzer, FileAnalysis, RepositoryAnalysis
from enhanced_chunking import EnhancedFileProcessor, ENHANCED_CHUNKING_CONFIG
class EnhancedGitHubAnalyzerV2(EnhancedGitHubAnalyzer):
"""
Enhanced version of GitHubAnalyzer with intelligent chunking.
Maintains 100% backward compatibility while adding enhanced capabilities.
"""
def __init__(self, api_key: str, memory_config: Dict[str, Any]):
# Initialize parent class
super().__init__(api_key, memory_config)
# Add enhanced processing capability
self.enhanced_processor = EnhancedFileProcessor(self.client, self.memory_manager)
self.enhanced_enabled = True # Feature flag for easy toggling
# Configuration
self.chunking_config = ENHANCED_CHUNKING_CONFIG
self.logger = logging.getLogger(__name__)
print(f"🔍 [DEBUG] EnhancedGitHubAnalyzerV2 initialized - class: {self.__class__.__name__}")
self.logger.info("Enhanced GitHub Analyzer V2 initialized with chunking capabilities")
async def analyze_file_with_memory_enhanced(self, file_path: Path, content: str, repo_id: str) -> FileAnalysis:
"""
Enhanced version of analyze_file_with_memory with intelligent chunking.
Maintains exact same interface and return type for backward compatibility.
"""
try:
if not self.enhanced_enabled:
print(f"🔍 [DEBUG] Enhanced disabled, using original method for {file_path}")
return await super().analyze_file_with_memory(file_path, content, repo_id)
print(f"🔍 [DEBUG] Starting enhanced processing for {file_path}")
# Use enhanced processing
enhanced_result = await self.enhanced_processor.process_file_enhanced(
str(file_path), content, repo_id
)
print(f"🔍 [DEBUG] Enhanced processing completed for {file_path}")
# Convert to FileAnalysis object (maintain compatibility)
return self._convert_to_file_analysis(enhanced_result, file_path)
except Exception as e:
print(f"🔍 [DEBUG] Enhanced analysis failed for {file_path}: {e}")
self.logger.error(f"Enhanced analysis failed for {file_path}, falling back to original: {e}")
# Fallback to original method
return await super().analyze_file_with_memory(file_path, content, repo_id)
async def analyze_file_with_memory(self, file_path: Path, content: str, repo_id: str) -> FileAnalysis:
"""Wrapper method to maintain compatibility with server calls."""
return await self.analyze_file_with_memory_enhanced(file_path, content, repo_id)
async def analyze_repository_overview_with_memory(self, repo_path: str, file_analyses: List[FileAnalysis],
context_memories: Dict, repo_id: str) -> Tuple[str, str]:
"""Wrapper method to maintain compatibility with server calls."""
return await super().analyze_repository_overview_with_memory(repo_path, file_analyses, context_memories, repo_id)
def create_pdf_report(self, analysis: RepositoryAnalysis, output_path: str, progress_mgr=None):
"""Wrapper method to maintain compatibility with server calls."""
return super().create_pdf_report(analysis, output_path, progress_mgr)
def _convert_to_file_analysis(self, enhanced_result: Dict[str, Any], file_path: Path) -> FileAnalysis:
"""Convert enhanced analysis result to FileAnalysis object for compatibility."""
return FileAnalysis(
path=str(file_path),
language=enhanced_result.get('language', 'Unknown'),
lines_of_code=enhanced_result.get('lines_of_code', 0),
complexity_score=enhanced_result.get('complexity_score', 5.0),
issues_found=enhanced_result.get('issues_found', []),
recommendations=enhanced_result.get('recommendations', []),
detailed_analysis=enhanced_result.get('detailed_analysis', ''),
severity_score=enhanced_result.get('severity_score', 5.0)
)
async def analyze_repository_with_memory_enhanced(self, repo_path: str) -> RepositoryAnalysis:
"""
Enhanced repository analysis with intelligent chunking and batch processing.
Maintains exact same interface and return type for backward compatibility.
"""
try:
if not self.enhanced_enabled:
# Fallback to original method
return await super().analyze_repository_with_memory(repo_path)
# Use enhanced processing with batch optimization
return await self._analyze_repository_enhanced(repo_path)
except Exception as e:
self.logger.error(f"Enhanced repository analysis failed, falling back to original: {e}")
# Fallback to original method
return await super().analyze_repository_with_memory(repo_path)
async def _analyze_repository_enhanced(self, repo_path: str) -> RepositoryAnalysis:
"""Enhanced repository analysis with batch processing and chunking."""
# Generate repo ID and check cache
repo_id = self.calculate_repo_id(repo_path)
# Check working memory for recent analysis
cached_analysis = await self.memory_manager.get_working_memory(f"repo_analysis:{repo_id}")
if cached_analysis:
self.logger.info("Using cached repository analysis from memory")
return RepositoryAnalysis(**cached_analysis)
# Clone/access repository
actual_repo_path = self.clone_repository(repo_path)
# Get analysis context from memory
context_memories = await self.get_analysis_context(repo_path, "", repo_id)
# Scan files with enhanced processing
files_to_analyze = self.scan_repository(actual_repo_path)
if not files_to_analyze:
raise Exception("No files found to analyze")
self.logger.info(f"Starting enhanced analysis of {len(files_to_analyze)} files...")
# Process files with batch optimization
file_analyses = await self._process_files_with_batching(files_to_analyze, repo_id)
# Repository-level analysis with enhanced context
architecture_assessment, security_assessment = await self.analyze_repository_overview_with_memory(
actual_repo_path, file_analyses, context_memories, repo_id
)
# Calculate overall quality score safely
if file_analyses and len(file_analyses) > 0:
valid_scores = [fa.severity_score for fa in file_analyses if fa.severity_score is not None]
avg_quality = sum(valid_scores) / len(valid_scores) if valid_scores else 5.0
else:
avg_quality = 5.0
# Generate statistics safely
from collections import Counter
if file_analyses:
language_list = [fa.language for fa in file_analyses if fa.language is not None]
languages = dict(Counter(language_list))
total_lines = sum(fa.lines_of_code for fa in file_analyses if fa.lines_of_code is not None)
else:
languages = {}
total_lines = 0
# Create repository analysis
repo_analysis = RepositoryAnalysis(
repo_path=repo_path,
total_files=len(file_analyses),
total_lines=total_lines,
languages=languages,
architecture_assessment=architecture_assessment,
security_assessment=security_assessment,
code_quality_score=avg_quality,
file_analyses=file_analyses,
executive_summary="",
high_quality_files=[]
)
# Generate executive summary with enhanced context
repo_analysis.executive_summary = await self.generate_executive_summary_with_memory(
repo_analysis, context_memories
)
# Store analysis in episodic memory
await self.memory_manager.store_episodic_memory(
self.session_id, "Enhanced automated repository analysis",
f"Analyzed {repo_analysis.total_files} files with enhanced chunking, found {sum(len(fa.issues_found) if isinstance(fa.issues_found, (list, tuple)) else 0 for fa in file_analyses)} issues",
repo_id,
{
'repo_path': repo_path,
'quality_score': avg_quality,
'total_issues': sum(len(fa.issues_found) if isinstance(fa.issues_found, (list, tuple)) else 0 for fa in file_analyses),
'analysis_type': 'enhanced_automated_comprehensive',
'chunking_enabled': True
}
)
# Cache analysis in working memory
await self.memory_manager.store_working_memory(
f"repo_analysis:{repo_id}",
self._repo_analysis_to_dict(repo_analysis),
ttl=7200 # 2 hours
)
return repo_analysis
async def _process_files_with_batching(self, files_to_analyze: List[tuple], repo_id: str) -> List[FileAnalysis]:
"""Process files with intelligent batching to optimize API usage."""
file_analyses = []
processed_files = 0
# Group files by size and type for optimal batching
small_files = []
medium_files = []
large_files = []
for file_path, content in files_to_analyze:
file_size = len(content.split('\n'))
if file_size < 200:
small_files.append((file_path, content))
elif file_size < 500:
medium_files.append((file_path, content))
else:
large_files.append((file_path, content))
# Process small files in batches (fast processing)
if small_files:
self.logger.info(f"Processing {len(small_files)} small files...")
for file_path, content in small_files:
try:
analysis = await self.analyze_file_with_memory_enhanced(
Path(file_path), content, repo_id
)
file_analyses.append(analysis)
processed_files += 1
await asyncio.sleep(0.05) # Small delay
except Exception as e:
self.logger.error(f"Error analyzing small file {file_path}: {e}")
continue
# Process medium files individually (balanced processing)
if medium_files:
self.logger.info(f"Processing {len(medium_files)} medium files...")
for file_path, content in medium_files:
try:
analysis = await self.analyze_file_with_memory_enhanced(
Path(file_path), content, repo_id
)
file_analyses.append(analysis)
processed_files += 1
await asyncio.sleep(0.1) # Medium delay
except Exception as e:
self.logger.error(f"Error analyzing medium file {file_path}: {e}")
continue
# Process large files with enhanced chunking (careful processing)
if large_files:
self.logger.info(f"Processing {len(large_files)} large files with enhanced chunking...")
for file_path, content in large_files:
try:
analysis = await self.analyze_file_with_memory_enhanced(
Path(file_path), content, repo_id
)
file_analyses.append(analysis)
processed_files += 1
await asyncio.sleep(0.2) # Longer delay for large files
except Exception as e:
self.logger.error(f"Error analyzing large file {file_path}: {e}")
continue
self.logger.info(f"Enhanced processing completed: {processed_files}/{len(files_to_analyze)} files processed")
return file_analyses
def _repo_analysis_to_dict(self, repo_analysis: RepositoryAnalysis) -> Dict[str, Any]:
"""Convert RepositoryAnalysis to dictionary for caching."""
return {
'repo_path': repo_analysis.repo_path,
'total_files': repo_analysis.total_files,
'total_lines': repo_analysis.total_lines,
'languages': repo_analysis.languages,
'architecture_assessment': repo_analysis.architecture_assessment,
'security_assessment': repo_analysis.security_assessment,
'code_quality_score': repo_analysis.code_quality_score,
'file_analyses': [
{
'path': fa.path,
'language': fa.language,
'lines_of_code': fa.lines_of_code,
'complexity_score': fa.complexity_score,
'issues_found': fa.issues_found,
'recommendations': fa.recommendations,
'detailed_analysis': fa.detailed_analysis,
'severity_score': fa.severity_score
} for fa in repo_analysis.file_analyses
],
'executive_summary': repo_analysis.executive_summary
}
def enable_enhanced_processing(self, enabled: bool = True):
"""Enable or disable enhanced processing (feature flag)."""
self.enhanced_enabled = enabled
self.logger.info(f"Enhanced processing {'enabled' if enabled else 'disabled'}")
def get_processing_stats(self) -> Dict[str, Any]:
"""Get statistics about enhanced processing."""
return {
'enhanced_enabled': self.enhanced_enabled,
'chunking_config': self.chunking_config,
'memory_stats': {}
}
def _analyze_architecture_patterns(self, analysis: RepositoryAnalysis) -> dict:
"""Analyze actual architectural patterns from the codebase."""
# Detect project type based on file structure and patterns
project_type = "Unknown"
project_evidence = "No clear architectural pattern detected"
# Look for microservice indicators with weighted scoring
microservice_score = 0
monolithic_score = 0
microservice_evidence = []
monolithic_evidence = []
# Check for common microservice patterns
for file_analysis in analysis.file_analyses:
file_path = file_analysis.path.lower()
file_content = getattr(file_analysis, 'content', '') or ''
# Strong microservice indicators (weight: 5)
if 'docker-compose.yml' in file_path or 'docker-compose.yaml' in file_path:
microservice_score += 5
microservice_evidence.append("Docker Compose multi-service configuration")
if 'kubernetes' in file_path or 'k8s' in file_path:
microservice_score += 5
microservice_evidence.append("Kubernetes orchestration")
if 'microservice' in file_path or 'micro-service' in file_path:
microservice_score += 4
microservice_evidence.append("Microservice directory structure")
if 'service-discovery' in file_path or 'service_discovery' in file_path:
microservice_score += 4
microservice_evidence.append("Service discovery configuration")
if 'api-gateway' in file_path or 'api_gateway' in file_path:
microservice_score += 4
microservice_evidence.append("API Gateway configuration")
if 'consul' in file_content or 'etcd' in file_content:
microservice_score += 3
microservice_evidence.append("Service registry usage")
if '@EnableEurekaClient' in file_content or '@EnableDiscoveryClient' in file_content:
microservice_score += 3
microservice_evidence.append("Service discovery client")
# Look for distributed system patterns
if 'distributed' in file_content.lower() or 'event-driven' in file_content.lower():
microservice_score += 3
microservice_evidence.append("Distributed/event-driven architecture")
# Check for multiple independent services
if file_path.startswith('services/') or file_path.startswith('src/services/'):
microservice_score += 2
if 'services/' not in project_evidence:
microservice_evidence.append("Multiple independent services")
# Monolithic indicators (weight: 3)
if 'monolith' in file_path or 'single-app' in file_path:
monolithic_score += 4
monolithic_evidence.append("Explicit monolith naming")
if 'Application.run' in file_content and '@SpringBootApplication' in file_content:
monolithic_score += 2
monolithic_evidence.append("Single Spring Boot application")
# Check for Node.js/Express microservice patterns
has_multiple_services = sum(1 for fa in analysis.file_analyses
if 'service' in fa.path.lower() and
any(ext in fa.path.lower() for ext in ['.js', '.ts']) and
'node_modules' not in fa.path.lower())
if has_multiple_services >= 3:
microservice_score += 5
microservice_evidence.append(f"Multiple independent service modules ({has_multiple_services} found)")
# Check for package.json with microservice dependencies
for file_analysis in analysis.file_analyses:
if 'package.json' in file_analysis.path.lower():
file_content = getattr(file_analysis, 'content', '') or ''
if any(dep in file_content.lower() for dep in ['express', 'koa', 'fastify', '@nestjs']):
if 'distributed' in file_content.lower() or has_multiple_services >= 3:
microservice_score += 3
microservice_evidence.append("Node.js microservice stack")
# Determine project type
if microservice_score > monolithic_score and microservice_score >= 3:
project_type = "Microservices Architecture"
project_evidence = f"Detected microservices: {'; '.join(set(microservice_evidence[:5]))}"
elif monolithic_score > microservice_score:
project_type = "Monolithic Architecture"
project_evidence = f"Found monolithic patterns: {'; '.join(set(monolithic_evidence[:3]))}"
elif microservice_score == 0 and monolithic_score == 0:
# Default to microservice if structure suggests it
if has_multiple_services >= 2 or any('service' in fa.path.lower() for fa in analysis.file_analyses if 'node_modules' not in fa.path):
project_type = "Microservices Architecture"
project_evidence = "Service-oriented structure detected with multiple independent modules"
else:
project_type = "Monolithic Architecture"
project_evidence = "Single application structure detected"
else:
project_type = "Hybrid Architecture"
project_evidence = f"Mixed patterns: {microservice_score} microservice indicators vs {monolithic_score} monolithic indicators"
# Find code examples for detailed analysis
code_examples = []
for file_analysis in analysis.file_analyses:
if file_analysis.lines_of_code > 500: # Focus on large files
code_examples.append({
'title': f"Large File Analysis: {file_analysis.path.split('/')[-1]}",
'file': file_analysis.path,
'lines': file_analysis.lines_of_code,
'issue': f"File exceeds recommended size ({file_analysis.lines_of_code} lines)",
'code_snippet': self._extract_code_snippet(file_analysis)
})
return {
'project_type': project_type,
'project_evidence': project_evidence,
'code_examples': code_examples[:5] # Top 5 examples
}
def _analyze_controller_layer(self, analysis: RepositoryAnalysis) -> dict:
"""Analyze API controller layer patterns."""
controller_files = []
total_endpoints = 0
security_issues = []
for file_analysis in analysis.file_analyses:
file_path = file_analysis.path.lower()
file_content = getattr(file_analysis, 'content', '') or ''
# Detect controller files
if any(indicator in file_path for indicator in ['controller', 'api', 'endpoint', 'route']):
controller_files.append(file_analysis)
# Count endpoints (rough estimate)
endpoint_count = file_content.count('@RequestMapping') + file_content.count('@GetMapping') + \
file_content.count('@PostMapping') + file_content.count('@PutMapping') + \
file_content.count('@DeleteMapping') + file_content.count('@RestController')
total_endpoints += endpoint_count
# Check for security issues
if 'password' in file_content.lower() and 'hardcoded' in file_content.lower():
security_issues.append("Hardcoded passwords detected")
if '@CrossOrigin(origins = "*")' in file_content:
security_issues.append("Wildcard CORS policy detected")
if 'migration' in file_path and 'public' in file_content:
security_issues.append("Public migration endpoint detected")
largest_controller = max(controller_files, key=lambda x: x.lines_of_code) if controller_files else None
return {
'controller_count': len(controller_files),
'total_endpoints': total_endpoints,
'largest_controller': f"{largest_controller.path} ({largest_controller.lines_of_code} lines)" if largest_controller else "None",
'security_issues': "; ".join(security_issues) if security_issues else "No major security issues detected"
}
def _analyze_backend_patterns(self, analysis: RepositoryAnalysis) -> dict:
"""Analyze backend architectural patterns."""
# Data layer analysis
data_files = [fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['entity', 'model', 'dbcontext', 'migration', 'config'])]
data_pattern = "Entity Framework" if any('dbcontext' in fa.path.lower() for fa in data_files) else "Custom ORM"
config_files = len([fa for fa in data_files if 'config' in fa.path.lower()])
config_lines = sum(fa.lines_of_code for fa in data_files if 'config' in fa.path.lower())
# Service layer analysis
service_files = [fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['service', 'business', 'logic', 'manager'])]
service_pattern = "Service Layer Pattern" if service_files else "No clear service layer"
largest_service = max(service_files, key=lambda x: x.lines_of_code) if service_files else None
# Repository layer analysis
repo_files = [fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['repository', 'dao', 'dataaccess'])]
repo_pattern = "Repository Pattern" if repo_files else "Direct Data Access"
factory_usage = any('factory' in fa.path.lower() for fa in repo_files)
return {
'data_layer': {
'pattern': data_pattern,
'config_files': config_files,
'config_lines': config_lines,
'issues': f"{len(data_files)} data files, {config_lines} configuration lines"
},
'service_layer': {
'pattern': service_pattern,
'service_files': len(service_files),
'largest_service': f"{largest_service.path} ({largest_service.lines_of_code} lines)" if largest_service else "None",
'issues': f"{len(service_files)} service files found"
},
'repository_layer': {
'pattern': repo_pattern,
'repository_files': len(repo_files),
'factory_usage': "Factory pattern detected" if factory_usage else "No factory pattern",
'issues': f"{len(repo_files)} repository files found"
}
}
def _analyze_frontend_architecture(self, analysis: RepositoryAnalysis) -> dict:
"""Analyze frontend architectural patterns and issues."""
# Identify frontend files
frontend_files = []
for file_analysis in analysis.file_analyses:
file_path = file_analysis.path.lower()
if any(indicator in file_path for indicator in ['js', 'jsx', 'ts', 'tsx', 'vue', 'html', 'css', 'scss', 'sass']):
frontend_files.append(file_analysis)
# 6.1 Frontend Monolith Analysis
largest_frontend_file = max(frontend_files, key=lambda x: x.lines_of_code) if frontend_files else None
monolith_issue = f"ONE file with {largest_frontend_file.lines_of_code:,} lines of JavaScript" if largest_frontend_file else "No large frontend files detected"
load_time = (largest_frontend_file.lines_of_code / 1000) if largest_frontend_file else 0
# Get largest files
largest_files = sorted(frontend_files, key=lambda x: x.lines_of_code, reverse=True)[:5]
largest_files_info = [{'name': fa.path.split('/')[-1], 'lines': fa.lines_of_code} for fa in largest_files]
# 6.2 Technology Stack Analysis
# Analyze technology stack dynamically
tech_details = {}
react_version = "Unknown"
node_version = "Unknown"
vue_version = "Unknown"
angular_version = "Unknown"
dependencies = {}
for file_analysis in frontend_files:
file_content = getattr(file_analysis, 'content', '') or ''
if 'package.json' in file_analysis.path.lower():
# Extract React version
react_match = re.search(r'"react":\s*"([^"]+)"', file_content)
if react_match:
react_version = react_match.group(1)
# Extract Node version
node_match = re.search(r'"node":\s*"([^"]+)"', file_content)
if node_match:
node_version = node_match.group(1)
# Extract Vue version
vue_match = re.search(r'"vue":\s*"([^"]+)"', file_content)
if vue_match:
vue_version = vue_match.group(1)
# Extract Angular version
angular_match = re.search(r'"@angular/core":\s*"([^"]+)"', file_content)
if angular_match:
angular_version = angular_match.group(1)
# Count dependencies
deps_match = re.search(r'"dependencies":\s*\{([^}]+)\}', file_content)
if deps_match:
dependencies_content = deps_match.group(1)
dependencies = {
'total': dependencies_content.count('"') // 2,
'react': react_version != "Unknown",
'vue': vue_version != "Unknown",
'angular': angular_version != "Unknown"
}
# Determine technology stack issues dynamically
tech_stack_issues = "Modern technology stack detected"
if react_version != "Unknown":
# Check if React version is outdated
try:
major_version = int(react_version.split('.')[0].replace('^', '').replace('~', ''))
if major_version < 17:
tech_stack_issues = f"Using outdated React version {react_version} (consider upgrading to React 18+)"
else:
tech_stack_issues = f"Using React {react_version}"
except:
tech_stack_issues = f"Using React {react_version}"
elif vue_version != "Unknown":
tech_stack_issues = f"Using Vue {vue_version}"
elif angular_version != "Unknown":
tech_stack_issues = f"Using Angular {angular_version}"
else:
tech_stack_issues = "Unknown frontend framework"
# Security issues
security_issues = "No major security issues detected"
if len(frontend_files) > 0:
security_vulnerable = sum(1 for fa in frontend_files
if isinstance(fa.issues_found, (list, tuple)) and
any('security' in str(issue).lower() or 'vulnerability' in str(issue).lower()
for issue in fa.issues_found))
if security_vulnerable > 0:
security_issues = f"{security_vulnerable} files with potential security issues"
# Dependency issues
dependency_issues = "Dependency management appears normal"
if dependencies.get('total', 0) > 100:
dependency_issues = f"Large number of dependencies ({dependencies['total']}) - consider audit"
elif dependencies.get('total', 0) == 0:
dependency_issues = "No dependencies detected"
tech_details = {
'React Version': react_version,
'Node Version': node_version,
'Vue Version': vue_version,
'Angular Version': angular_version,
'Frontend Files': len(frontend_files),
'Total Lines': sum(fa.lines_of_code for fa in frontend_files),
'Dependencies': dependencies.get('total', 0)
}
# 6.3 Testing Analysis
test_files = [fa for fa in frontend_files if any(indicator in fa.path.lower() for indicator in ['test', 'spec', '__tests__'])]
empty_test_files = len([fa for fa in test_files if fa.lines_of_code == 0])
testing_issues = f"ONE test file that is COMPLETELY EMPTY ({empty_test_files} bytes)"
testing_reality = f"{len(frontend_files)} JavaScript files with ZERO tests"
test_coverage = 0 if len(frontend_files) > 0 else 100
# 6.4 Performance Analysis
total_frontend_lines = sum(fa.lines_of_code for fa in frontend_files)
bundle_size = f"{total_frontend_lines * 0.5:.1f} MB" # Rough estimate
estimated_load_time = total_frontend_lines / 10000 # Rough estimate
memory_usage = f"{total_frontend_lines * 0.001:.1f} MB"
performance_score = max(0, 100 - (total_frontend_lines / 1000)) # Lower score for more lines
return {
'monolith_issue': monolith_issue,
'load_time': f"{load_time:.1f}",
'largest_files': largest_files_info,
'tech_stack_issues': tech_stack_issues,
'security_issues': security_issues,
'dependency_issues': dependency_issues,
'tech_details': tech_details,
'testing_issues': testing_issues,
'testing_reality': testing_reality,
'test_file_count': len(test_files),
'test_coverage': test_coverage,
'empty_test_files': empty_test_files,
'bundle_size': bundle_size,
'estimated_load_time': f"{estimated_load_time:.1f}",
'memory_usage': memory_usage,
'performance_score': f"{performance_score:.0f}"
}
def _analyze_testing_infrastructure(self, analysis: RepositoryAnalysis) -> dict:
"""Analyze testing infrastructure across the entire codebase."""
# Separate backend and frontend files
backend_files = []
frontend_files = []
for file_analysis in analysis.file_analyses:
file_path = file_analysis.path.lower()
if any(indicator in file_path for indicator in ['js', 'jsx', 'ts', 'tsx', 'vue', 'html', 'css', 'scss', 'sass']):
frontend_files.append(file_analysis)
else:
backend_files.append(file_analysis)
# Backend Testing Analysis
backend_test_files = [fa for fa in backend_files if any(indicator in fa.path.lower() for indicator in ['test', 'spec', '__tests__', 'testing'])]
backend_test_count = len(backend_test_files)
backend_file_count = len(backend_files)
backend_coverage = (backend_test_count / backend_file_count * 100) if backend_file_count > 0 else 0
# Frontend Testing Analysis
frontend_test_files = [fa for fa in frontend_files if any(indicator in fa.path.lower() for indicator in ['test', 'spec', '__tests__', 'testing'])]
frontend_test_count = len(frontend_test_files)
frontend_file_count = len(frontend_files)
frontend_coverage = (frontend_test_count / frontend_file_count * 100) if frontend_file_count > 0 else 0
# Integration Testing Analysis
integration_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['integration', 'e2e', 'end-to-end', 'api-test'])])
api_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['api-test', 'api_test', 'apitest'])])
database_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['db-test', 'database-test', 'db_test'])])
e2e_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['e2e', 'end-to-end', 'cypress', 'playwright'])])
# Security Testing Analysis
security_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['security-test', 'security_test', 'penetration', 'vulnerability'])])
vulnerability_scans = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['vulnerability', 'security-scan', 'owasp'])])
penetration_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['penetration', 'pentest', 'security-pen'])])
auth_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['auth-test', 'authentication-test', 'login-test'])])
# Performance Testing Analysis
performance_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['performance-test', 'perf-test', 'load-test', 'stress-test'])])
load_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['load-test', 'loadtest', 'jmeter', 'artillery'])])
stress_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['stress-test', 'stresstest', 'chaos-test'])])
benchmark_tests = len([fa for fa in analysis.file_analyses if any(indicator in fa.path.lower() for indicator in ['benchmark', 'bench', 'performance-bench'])])
# Test Quality Assessment
overall_coverage = (backend_coverage + frontend_coverage) / 2
test_quality_score = min(100, overall_coverage * 2) # Scale up the score
# Critical Issues
critical_issues = []
if backend_coverage < 10:
critical_issues.append("Backend test coverage below 10%")
if frontend_coverage < 5:
critical_issues.append("Frontend test coverage below 5%")
if integration_tests == 0:
critical_issues.append("No integration tests found")
if security_tests == 0:
critical_issues.append("No security tests found")
if performance_tests == 0:
critical_issues.append("No performance tests found")
# Recommendations
recommendations = []
if backend_coverage < 50:
recommendations.append("Implement comprehensive backend unit tests")
if frontend_coverage < 30:
recommendations.append("Add frontend component and integration tests")
if integration_tests == 0:
recommendations.append("Create API integration tests")
if security_tests == 0:
recommendations.append("Implement security testing suite")
if performance_tests == 0:
recommendations.append("Add performance and load testing")
# Backend test types
backend_test_types = []
if any('unit' in fa.path.lower() for fa in backend_test_files):
backend_test_types.append("Unit Tests")
if any('integration' in fa.path.lower() for fa in backend_test_files):
backend_test_types.append("Integration Tests")
if any('mock' in fa.path.lower() for fa in backend_test_files):
backend_test_types.append("Mock Tests")
# Frontend test types
frontend_test_types = []
if any('component' in fa.path.lower() for fa in frontend_test_files):
frontend_test_types.append("Component Tests")
if any('unit' in fa.path.lower() for fa in frontend_test_files):
frontend_test_types.append("Unit Tests")
if any('integration' in fa.path.lower() for fa in frontend_test_files):
frontend_test_types.append("Integration Tests")
# Backend test issues
backend_test_issues = []
empty_backend_tests = len([fa for fa in backend_test_files if fa.lines_of_code == 0])
if empty_backend_tests > 0:
backend_test_issues.append(f"{empty_backend_tests} empty test files")
if backend_coverage < 20:
backend_test_issues.append("Very low test coverage")
# Frontend test issues
frontend_test_issues = []
empty_frontend_tests = len([fa for fa in frontend_test_files if fa.lines_of_code == 0])
if empty_frontend_tests > 0:
frontend_test_issues.append(f"{empty_frontend_tests} empty test files")
if frontend_coverage < 10:
frontend_test_issues.append("Very low test coverage")
return {
'backend_tests': f"{backend_test_count} test files for {backend_file_count} code files",
'backend_files': backend_file_count,
'backend_coverage': f"{backend_coverage:.1f}",
'frontend_tests': f"{frontend_test_count} test files for {frontend_file_count} files",
'frontend_files': frontend_file_count,
'frontend_coverage': f"{frontend_coverage:.1f}",
'integration_tests': f"{integration_tests}",
'security_tests': f"{security_tests}",
'performance_tests': f"{performance_tests}",
'backend_test_files': backend_test_count,
'backend_test_types': ", ".join(backend_test_types) if backend_test_types else "None detected",
'backend_test_issues': "; ".join(backend_test_issues) if backend_test_issues else "No major issues",
'frontend_test_files': frontend_test_count,
'frontend_test_types': ", ".join(frontend_test_types) if frontend_test_types else "None detected",
'frontend_test_issues': "; ".join(frontend_test_issues) if frontend_test_issues else "No major issues",
'api_tests': f"{api_tests}",
'database_tests': f"{database_tests}",
'e2e_tests': f"{e2e_tests}",
'vulnerability_scans': f"{vulnerability_scans}",
'penetration_tests': f"{penetration_tests}",
'auth_tests': f"{auth_tests}",
'load_tests': f"{load_tests}",
'stress_tests': f"{stress_tests}",
'benchmark_tests': f"{benchmark_tests}",
'overall_coverage': f"{overall_coverage:.1f}",
'test_quality_score': f"{test_quality_score:.0f}",
'critical_issues': "; ".join(critical_issues) if critical_issues else "No critical issues",
'recommendations': "; ".join(recommendations) if recommendations else "Testing infrastructure is adequate"
}
def _extract_code_snippet(self, file_analysis) -> str:
"""Extract a code snippet from file analysis."""
content = getattr(file_analysis, 'content', '') or ''
if not content:
return "// Code content not available"
# Extract first 20 lines as snippet
lines = content.split('\n')[:20]
snippet = '\n'.join(lines)
# Truncate if too long
if len(snippet) > 500:
snippet = snippet[:500] + "\n// ... (truncated)"
return snippet
# Factory function for easy integration
def create_enhanced_analyzer(api_key: str, memory_config: Dict[str, Any]) -> EnhancedGitHubAnalyzerV2:
"""
Factory function to create enhanced analyzer.
Drop-in replacement for existing EnhancedGitHubAnalyzer.
"""
return EnhancedGitHubAnalyzerV2(api_key, memory_config)
# Backward compatibility alias
EnhancedGitHubAnalyzer = EnhancedGitHubAnalyzerV2