#!/usr/bin/env python3 """ AI Analysis Service HTTP Server Provides REST API endpoints for repository analysis. """ import os import asyncio import json import tempfile import shutil import time import hashlib from pathlib import Path from typing import Dict, Any, Optional, List from datetime import datetime from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pydantic import BaseModel import uvicorn import httpx import redis # Import the AI analysis components # Note: ai-analyze.py has a hyphen, so we need to handle the import specially import sys import importlib.util # Load the ai-analyze.py module spec = importlib.util.spec_from_file_location("ai_analyze", "/app/ai-analyze.py") ai_analyze_module = importlib.util.module_from_spec(spec) sys.modules["ai_analyze"] = ai_analyze_module spec.loader.exec_module(ai_analyze_module) # Now import the classes from ai_analyze import EnhancedGitHubAnalyzer, get_memory_config app = FastAPI( title="AI Analysis Service", description="AI-powered repository analysis with memory system", version="1.0.0" ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global analyzer instance analyzer = None # Rate limiter for Claude API class ClaudeRateLimiter: def __init__(self, requests_per_minute: int = 90): self.requests_per_minute = requests_per_minute self.requests = [] self.lock = asyncio.Lock() async def wait_if_needed(self): """Wait if rate limit would be exceeded.""" async with self.lock: now = time.time() # Remove requests older than 1 minute self.requests = [req_time for req_time in self.requests if now - req_time < 60] if len(self.requests) >= self.requests_per_minute: sleep_time = 60 - (now - self.requests[0]) if sleep_time > 0: await asyncio.sleep(sleep_time) self.requests.append(now) # Git Integration Service Client class GitIntegrationClient: def __init__(self): self.base_url = os.getenv('GIT_INTEGRATION_SERVICE_URL', 'http://git-integration:8012') self.timeout = 30.0 async def get_repository_info(self, repository_id: str, user_id: str) -> Dict[str, Any]: """Get repository information from git-integration service.""" try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/api/github/repository/{repository_id}/ui-view?view_type=tree", headers={'x-user-id': user_id} ) if response.status_code == 200: data = response.json() if data.get('success') and 'data' in data: repo_info = data['data'].get('repository_info', {}) return { 'id': repo_info.get('id'), 'name': repo_info.get('name'), 'owner': repo_info.get('owner'), 'provider': repo_info.get('provider', 'github'), 'local_path': repo_info.get('local_path'), 'repository_url': repo_info.get('repository_url') } else: raise Exception(f"Invalid response format: {data}") else: raise Exception(f"Failed to get repository info: {response.text}") except Exception as e: raise Exception(f"Git-integration service communication failed: {e}") # Analysis Cache class AnalysisCache: def __init__(self): try: self.redis = redis.Redis( host=os.getenv('REDIS_HOST', 'redis'), port=int(os.getenv('REDIS_PORT', 6379)), password=os.getenv('REDIS_PASSWORD', ''), decode_responses=True ) self.cache_ttl = 86400 # 24 hours except Exception as e: print(f"Warning: Redis connection failed: {e}") self.redis = None async def get_cached_analysis(self, file_hash: str) -> Optional[Dict[str, Any]]: """Get cached analysis result.""" if not self.redis: return None try: cache_key = f"analysis:{file_hash}" cached_data = self.redis.get(cache_key) return json.loads(cached_data) if cached_data else None except Exception: return None async def cache_analysis(self, file_hash: str, result: Dict[str, Any]): """Cache analysis result.""" if not self.redis: return try: cache_key = f"analysis:{file_hash}" self.redis.setex(cache_key, self.cache_ttl, json.dumps(result)) except Exception as e: print(f"Warning: Failed to cache analysis: {e}") # Content Optimizer class ContentOptimizer: @staticmethod def optimize_content_for_claude(content: str, max_tokens: int = 8000) -> str: """Optimize file content for Claude API limits.""" if len(content) > max_tokens * 4: # Rough token estimation # Extract important lines lines = content.split('\n') important_lines = [] for line in lines: # Keep imports, function definitions, class definitions if (line.strip().startswith(('import ', 'from ', 'def ', 'class ', 'export ', 'const ', 'let ', 'var ')) or line.strip().startswith(('function ', 'class ', 'interface ', 'type '))): important_lines.append(line) # Limit to 200 lines important_lines = important_lines[:200] optimized_content = '\n'.join(important_lines) optimized_content += f"\n\n... [Content truncated for analysis - {len(content)} chars total]" return optimized_content return content # Global instances rate_limiter = ClaudeRateLimiter() git_client = GitIntegrationClient() analysis_cache = AnalysisCache() content_optimizer = ContentOptimizer() class AnalysisRequest(BaseModel): repo_path: str output_format: str = "pdf" # pdf, json max_files: int = 50 class RepositoryAnalysisRequest(BaseModel): repository_id: str user_id: str output_format: str = "pdf" # pdf, json max_files: int = 100 class AnalysisResponse(BaseModel): success: bool message: str analysis_id: str = None report_path: str = None stats: Dict[str, Any] = None @app.on_event("startup") async def startup_event(): """Initialize the analyzer on startup.""" global analyzer try: # Load environment variables from dotenv import load_dotenv load_dotenv() # Get API key api_key = os.getenv('ANTHROPIC_API_KEY') if not api_key: raise Exception("ANTHROPIC_API_KEY not found in environment") # Initialize analyzer config = get_memory_config() analyzer = EnhancedGitHubAnalyzer(api_key, config) print("✅ AI Analysis Service initialized successfully") except Exception as e: print(f"❌ Failed to initialize AI Analysis Service: {e}") raise @app.get("/health") async def health_check(): """Health check endpoint.""" return { "status": "healthy", "service": "ai-analysis-service", "timestamp": datetime.now().isoformat(), "version": "1.0.0" } @app.post("/analyze", response_model=AnalysisResponse) async def analyze_repository(request: AnalysisRequest, background_tasks: BackgroundTasks): """Analyze a repository using direct file path.""" try: if not analyzer: raise HTTPException(status_code=500, detail="Analyzer not initialized") # Generate unique analysis ID analysis_id = f"analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Create temporary directory for this analysis temp_dir = tempfile.mkdtemp(prefix=f"ai_analysis_{analysis_id}_") try: # Run analysis analysis = await analyzer.analyze_repository_with_memory( request.repo_path, max_files=request.max_files ) # Generate report if request.output_format == "pdf": report_path = f"/app/reports/{analysis_id}_analysis.pdf" analyzer.create_pdf_report(analysis, report_path) else: report_path = f"/app/reports/{analysis_id}_analysis.json" with open(report_path, 'w') as f: json.dump({ "repo_path": analysis.repo_path, "total_files": analysis.total_files, "total_lines": analysis.total_lines, "languages": analysis.languages, "code_quality_score": analysis.code_quality_score, "architecture_assessment": analysis.architecture_assessment, "security_assessment": analysis.security_assessment, "executive_summary": analysis.executive_summary, "file_analyses": [ { "path": fa.path, "language": fa.language, "lines_of_code": fa.lines_of_code, "severity_score": fa.severity_score, "issues_found": fa.issues_found, "recommendations": fa.recommendations } for fa in analysis.file_analyses ] }, f, indent=2) # Calculate stats stats = { "total_files": analysis.total_files, "total_lines": analysis.total_lines, "languages": analysis.languages, "code_quality_score": analysis.code_quality_score, "high_quality_files": len([fa for fa in analysis.file_analyses if fa.severity_score >= 8]), "medium_quality_files": len([fa for fa in analysis.file_analyses if 5 <= fa.severity_score < 8]), "low_quality_files": len([fa for fa in analysis.file_analyses if fa.severity_score < 5]), "total_issues": sum(len(fa.issues_found) for fa in analysis.file_analyses) } return AnalysisResponse( success=True, message="Analysis completed successfully", analysis_id=analysis_id, report_path=report_path, stats=stats ) finally: # Cleanup temporary directory if os.path.exists(temp_dir): shutil.rmtree(temp_dir) except Exception as e: return AnalysisResponse( success=False, message=f"Analysis failed: {str(e)}", analysis_id=None, report_path=None, stats=None ) @app.post("/analyze-repository", response_model=AnalysisResponse) async def analyze_repository_by_id(request: RepositoryAnalysisRequest, background_tasks: BackgroundTasks): """Analyze a repository by ID using git-integration service.""" try: if not analyzer: raise HTTPException(status_code=500, detail="Analyzer not initialized") # Get repository information from git-integration service try: repo_info = await git_client.get_repository_info(request.repository_id, request.user_id) local_path = repo_info.get('local_path') # Keep for compatibility but don't check file system # Note: We no longer check local_path existence since we use API approach except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to get repository info: {str(e)}" ) # Generate unique analysis ID analysis_id = f"repo_analysis_{request.repository_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Create temporary directory for this analysis temp_dir = tempfile.mkdtemp(prefix=f"ai_analysis_{analysis_id}_") try: # Run analysis with rate limiting and caching analysis = await analyze_repository_with_optimizations( local_path, request.repository_id, request.user_id, request.max_files ) # Generate report if request.output_format == "pdf": report_path = f"/app/reports/{analysis_id}_analysis.pdf" analyzer.create_pdf_report(analysis, report_path) else: report_path = f"/app/reports/{analysis_id}_analysis.json" with open(report_path, 'w') as f: json.dump({ "repository_id": request.repository_id, "repo_path": analysis.repo_path, "total_files": analysis.total_files, "total_lines": analysis.total_lines, "languages": analysis.languages, "code_quality_score": analysis.code_quality_score, "architecture_assessment": analysis.architecture_assessment, "security_assessment": analysis.security_assessment, "executive_summary": analysis.executive_summary, "file_analyses": [ { "path": fa.path, "language": fa.language, "lines_of_code": fa.lines_of_code, "severity_score": fa.severity_score, "issues_found": fa.issues_found, "recommendations": fa.recommendations } for fa in analysis.file_analyses ] }, f, indent=2) # Calculate stats stats = { "repository_id": request.repository_id, "total_files": analysis.total_files, "total_lines": analysis.total_lines, "languages": analysis.languages, "code_quality_score": analysis.code_quality_score, "high_quality_files": len([fa for fa in analysis.file_analyses if fa.severity_score >= 8]), "medium_quality_files": len([fa for fa in analysis.file_analyses if 5 <= fa.severity_score < 8]), "low_quality_files": len([fa for fa in analysis.file_analyses if fa.severity_score < 5]), "total_issues": sum(len(fa.issues_found) for fa in analysis.file_analyses) } return AnalysisResponse( success=True, message="Repository analysis completed successfully", analysis_id=analysis_id, report_path=report_path, stats=stats ) finally: # Cleanup temporary directory if os.path.exists(temp_dir): shutil.rmtree(temp_dir) except HTTPException: raise except Exception as e: return AnalysisResponse( success=False, message=f"Repository analysis failed: {str(e)}" ) async def get_repository_files_from_api(repository_id: str, user_id: str, max_files: int = 100): """Get repository files from Git Integration Service API.""" try: print(f"🔍 [DEBUG] Getting repository files for {repository_id} with user {user_id}") # Get repository file tree from Git Integration Service async with httpx.AsyncClient(timeout=30.0) as client: print(f"🔍 [DEBUG] Making request to: {git_client.base_url}/api/github/repository/{repository_id}/ui-view?view_type=tree") response = await client.get( f"{git_client.base_url}/api/github/repository/{repository_id}/ui-view?view_type=tree", headers={'x-user-id': user_id} ) print(f"🔍 [DEBUG] Response status: {response.status_code}") if response.status_code != 200: raise Exception(f"Failed to get repository tree: {response.text}") data = response.json() print(f"🔍 [DEBUG] Response data keys: {list(data.keys())}") if not data.get('success'): raise Exception(f"Git Integration Service error: {data.get('message', 'Unknown error')}") # Extract files from the tree structure files_to_analyze = [] ui_data = data.get('data', {}).get('ui_data', {}) file_tree = ui_data.get('left_panel', {}).get('file_tree', {}) print(f"🔍 [DEBUG] File tree type: {type(file_tree)}, keys: {list(file_tree.keys()) if isinstance(file_tree, dict) else 'Not a dict'}") def extract_files_from_tree(tree_node, current_path=""): # Handle dictionary-based tree structure (not array) if isinstance(tree_node, dict): # If it's a file/directory node if 'type' in tree_node: if tree_node.get('type') == 'file': file_path = tree_node.get('path', '') if file_path: files_to_analyze.append((file_path, None)) print(f"🔍 [DEBUG] Found file: {file_path}") elif tree_node.get('type') == 'directory' and tree_node.get('children'): # Children is a dict, not an array children = tree_node.get('children', {}) if isinstance(children, dict): for child_name, child_node in children.items(): extract_files_from_tree(child_node, current_path) else: # Root level: iterate over all entries for name, node in tree_node.items(): extract_files_from_tree(node, current_path) extract_files_from_tree(file_tree) print(f"🔍 [DEBUG] Found {len(files_to_analyze)} files to analyze") # Limit files if needed if len(files_to_analyze) > max_files: files_to_analyze = files_to_analyze[:max_files] print(f"🔍 [DEBUG] Limited to {max_files} files") # Fetch file content for each file files_with_content = [] for i, (file_path, _) in enumerate(files_to_analyze): try: print(f"🔍 [DEBUG] Fetching content for file {i+1}/{len(files_to_analyze)}: {file_path}") # Get file content from Git Integration Service content_response = await client.get( f"{git_client.base_url}/api/github/repository/{repository_id}/file-content?file_path={file_path}", headers={'x-user-id': user_id} ) if content_response.status_code == 200: content_data = content_response.json() if content_data.get('success'): # Content is nested in data.content content = content_data.get('data', {}).get('content', '') files_with_content.append((file_path, content)) print(f"🔍 [DEBUG] Successfully got content for {file_path} ({len(content)} chars)") else: print(f"Warning: Failed to get content for {file_path}: {content_data.get('message')}") else: print(f"Warning: Failed to get content for {file_path}: HTTP {content_response.status_code}") except Exception as e: print(f"Warning: Error getting content for {file_path}: {e}") continue print(f"🔍 [DEBUG] Returning {len(files_with_content)} files with content") return files_with_content except Exception as e: print(f"Error getting repository files from API: {e}") import traceback traceback.print_exc() return [] async def analyze_repository_with_optimizations(repo_path: str, repository_id: str, user_id: str, max_files: int = 100): """Analyze repository with rate limiting, caching, and content optimization.""" from pathlib import Path try: # Get repository files from Git Integration Service API files_to_analyze = await get_repository_files_from_api(repository_id, user_id, max_files) if not files_to_analyze: raise Exception("No files found to analyze") print(f"Starting optimized analysis of {len(files_to_analyze)} files...") file_analyses = [] processed_files = 0 for i, (file_path, content) in enumerate(files_to_analyze): print(f"Analyzing file {i+1}/{len(files_to_analyze)}: {file_path}") # Generate file hash for caching file_hash = hashlib.sha256(content.encode()).hexdigest() # Check cache first cached_analysis = await analysis_cache.get_cached_analysis(file_hash) if cached_analysis: print(f"Using cached analysis for {file_path}") # Convert cached dictionary back to analysis object from ai_analyze import FileAnalysis cached_obj = FileAnalysis( path=Path(cached_analysis["path"]), language=cached_analysis["language"], lines_of_code=cached_analysis["lines_of_code"], complexity_score=cached_analysis["complexity_score"], issues_found=cached_analysis["issues_found"], recommendations=cached_analysis["recommendations"], detailed_analysis=cached_analysis["detailed_analysis"], severity_score=cached_analysis["severity_score"] ) file_analyses.append(cached_obj) processed_files += 1 continue # Rate limiting await rate_limiter.wait_if_needed() # Optimize content for Claude API optimized_content = content_optimizer.optimize_content_for_claude(content) # Analyze file with memory try: # Convert string file path to Path object file_path_obj = Path(file_path) analysis = await analyzer.analyze_file_with_memory( file_path_obj, optimized_content, repository_id ) # Cache the result analysis_dict = { "path": str(analysis.path), "language": analysis.language, "lines_of_code": analysis.lines_of_code, "complexity_score": analysis.complexity_score, "issues_found": analysis.issues_found, "recommendations": analysis.recommendations, "detailed_analysis": analysis.detailed_analysis, "severity_score": analysis.severity_score } await analysis_cache.cache_analysis(file_hash, analysis_dict) file_analyses.append(analysis) processed_files += 1 except Exception as e: print(f"Error analyzing {file_path}: {e}") # Continue with other files continue # Repository-level analysis print("Performing repository-level analysis...") # Use a temporary directory path since we don't have a local repo_path temp_repo_path = f"/tmp/repo_{repository_id}" if repo_path is None else repo_path # Create proper context_memories structure context_memories = { 'persistent_knowledge': [], 'similar_analyses': [] } architecture_assessment, security_assessment = await analyzer.analyze_repository_overview_with_memory( temp_repo_path, file_analyses, context_memories, repository_id ) # Create repository analysis result from ai_analyze import RepositoryAnalysis return RepositoryAnalysis( repo_path=str(temp_repo_path), total_files=len(files_to_analyze), total_lines=sum(fa.lines_of_code for fa in file_analyses), languages=list(set(fa.language for fa in file_analyses)), code_quality_score=sum(fa.severity_score for fa in file_analyses) / len(file_analyses) if file_analyses else 0, architecture_assessment=architecture_assessment, security_assessment=security_assessment, file_analyses=file_analyses, executive_summary=f"Analysis completed for {processed_files} files in repository {repository_id}" ) except Exception as e: print(f"Error in optimized analysis: {e}") raise @app.get("/repository/{repository_id}/info") async def get_repository_info(repository_id: str, user_id: str): """Get repository information from git-integration service.""" try: repo_info = await git_client.get_repository_info(repository_id, user_id) return { "success": True, "repository_info": repo_info } except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to get repository info: {str(e)}" ) @app.get("/reports/{filename}") async def download_report(filename: str): """Download analysis report.""" report_path = f"/app/reports/{filename}" if not os.path.exists(report_path): raise HTTPException(status_code=404, detail="Report not found") return FileResponse( report_path, media_type='application/octet-stream', filename=filename ) @app.get("/memory/stats") async def get_memory_stats(): """Get memory system statistics.""" try: if not analyzer: raise HTTPException(status_code=500, detail="Analyzer not initialized") stats = await analyzer.memory_manager.get_memory_stats() return { "success": True, "memory_stats": stats } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get memory stats: {str(e)}") @app.post("/memory/query") async def query_memory(query: str, repo_context: str = ""): """Query the memory system.""" try: if not analyzer: raise HTTPException(status_code=500, detail="Analyzer not initialized") result = await analyzer.query_memory(query, repo_context) return { "success": True, "query": query, "result": result } except Exception as e: raise HTTPException(status_code=500, detail=f"Memory query failed: {str(e)}") if __name__ == "__main__": port = int(os.getenv('PORT', 8022)) host = os.getenv('HOST', '0.0.0.0') print(f"🚀 Starting AI Analysis Service on {host}:{port}") uvicorn.run(app, host=host, port=port)