#!/usr/bin/env python3 """ Complete AI Repository Analysis Tool with Memory System Automatically analyzes ALL files in a repository without limits. Features: - Analyzes ALL files in the repository (no max-files limit) - No user query required - fully automated analysis - Memory-enhanced analysis with learning capabilities - Comprehensive PDF report generation - Security, architecture, and code quality assessment Usage: python ai-analyze.py /path/to/repo --output analysis.pdf Example: python ai-analyze.py ./my-project --output complete_analysis.pdf """ import os import asyncio import hashlib import json import uuid from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from datetime import datetime, timedelta from dataclasses import dataclass, asdict, field from collections import defaultdict, Counter import logging import tempfile import shutil import re import concurrent.futures import threading from functools import lru_cache # Core packages import anthropic from dotenv import load_dotenv import git import redis import pymongo import psycopg2 from psycopg2.extras import RealDictCursor import numpy as np # PDF generation from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.enums import TA_CENTER, TA_LEFT from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Table, TableStyle from reportlab.lib import colors from reportlab.graphics.shapes import Rect, String, Drawing from reportlab.graphics.charts.piecharts import Pie from reportlab.graphics.charts.barcharts import VerticalBarChart from reportlab.lib.units import inch # Enhanced dataclasses for memory system @dataclass class MemoryRecord: id: str timestamp: datetime memory_type: str # 'episodic', 'persistent', 'working' content: Dict[str, Any] embeddings: Optional[List[float]] = None metadata: Optional[Dict[str, Any]] = None expiry: Optional[datetime] = None @dataclass class CodeAnalysisMemory: repo_id: str file_path: str analysis_hash: str analysis_data: Dict[str, Any] embedding: List[float] last_updated: datetime access_count: int = 0 relevance_score: float = 1.0 @dataclass class EpisodicMemory: session_id: str user_query: str ai_response: str repo_context: str timestamp: datetime embedding: List[float] metadata: Dict[str, Any] @dataclass class PersistentMemory: fact_id: str content: str category: str # 'code_pattern', 'best_practice', 'vulnerability', 'architecture' confidence: float embedding: List[float] source_repos: List[str] created_at: datetime last_accessed: datetime access_frequency: int = 0 @dataclass class FileAnalysis: path: str language: str lines_of_code: int complexity_score: float issues_found: List[str] recommendations: List[str] detailed_analysis: str severity_score: float def __post_init__(self): """Ensure all fields contain safe types for JSON serialization.""" # Convert path to string if not isinstance(self.path, str): self.path = str(self.path) # Ensure issues_found is a list of strings if not isinstance(self.issues_found, list): if isinstance(self.issues_found, tuple): self.issues_found = [str(i) for i in self.issues_found] else: self.issues_found = [] else: self.issues_found = [str(i) if not isinstance(i, str) else i for i in self.issues_found] # Ensure recommendations is a list of strings if not isinstance(self.recommendations, list): if isinstance(self.recommendations, tuple): self.recommendations = [str(r) for r in self.recommendations] else: self.recommendations = [] else: self.recommendations = [str(r) if not isinstance(r, str) else r for r in self.recommendations] # Ensure detailed_analysis is a string if not isinstance(self.detailed_analysis, str): self.detailed_analysis = str(self.detailed_analysis) @dataclass class RepositoryAnalysis: repo_path: str total_files: int total_lines: int languages: Dict[str, int] architecture_assessment: str security_assessment: str code_quality_score: float file_analyses: List[FileAnalysis] executive_summary: str high_quality_files: List[str] = field(default_factory=list) class MemoryManager: """Advanced memory management system for AI repository analysis.""" def __init__(self, config: Dict[str, Any]): self.config = config self.setup_logging() # Initialize Claude client for embeddings self.claude_client = anthropic.Anthropic(api_key=config.get('anthropic_api_key', '')) # Initialize database connections self.setup_databases() # Memory configuration self.working_memory_ttl = 3600 # 1 hour self.episodic_retention_days = 365 # 1 year self.persistent_memory_threshold = 0.8 # Confidence threshold for persistence def setup_logging(self): logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def setup_databases(self): """Initialize all database connections with enhanced error handling.""" try: # Redis for working memory (temporary, fast access) with localhost fallback redis_host = self.config.get('redis_host', 'localhost') redis_port = self.config.get('redis_port', 6380) # Use 6380 to avoid conflicts redis_password = self.config.get('redis_password', 'redis_secure_2024') self.redis_client = redis.Redis( host=redis_host, port=redis_port, password=redis_password, db=self.config.get('redis_db', 0), decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) self.redis_client.ping() self.logger.info(f"✅ Redis connected to {redis_host}:{redis_port}") except Exception as e: self.logger.warning(f"⚠️ Redis connection failed: {e}") self.redis_client = None try: # MongoDB for documents and episodic memory with localhost fallback mongo_url = self.config.get('mongodb_url', 'mongodb://pipeline_admin:mongo_secure_2024@localhost:27017/') self.mongo_client = pymongo.MongoClient(mongo_url, serverSelectionTimeoutMS=5000) self.mongo_client.admin.command('ping') self.mongo_db = self.mongo_client[self.config.get('mongodb_name', 'repo_analyzer')] # Collections self.episodic_collection = self.mongo_db['episodic_memories'] self.analysis_collection = self.mongo_db['code_analyses'] self.persistent_collection = self.mongo_db['persistent_memories'] self.repo_metadata_collection = self.mongo_db['repository_metadata'] self.logger.info("✅ MongoDB connected successfully") except Exception as e: self.logger.warning(f"⚠️ MongoDB connection failed: {e}") self.mongo_client = None self.mongo_db = None try: # PostgreSQL with localhost fallback self.pg_conn = psycopg2.connect( host=self.config.get('postgres_host', 'localhost'), port=self.config.get('postgres_port', 5432), database=self.config.get('postgres_db', 'dev_pipeline'), user=self.config.get('postgres_user', 'pipeline_admin'), password=self.config.get('postgres_password', 'secure_pipeline_2024'), connect_timeout=5 ) # Check if pgvector is available try: with self.pg_conn.cursor() as cur: cur.execute("SELECT 1 FROM pg_extension WHERE extname = 'vector';") self.has_vector = cur.fetchone() is not None except: self.has_vector = False self.logger.info("✅ PostgreSQL connected successfully") except Exception as e: self.logger.warning(f"⚠️ PostgreSQL connection failed: {e}") self.pg_conn = None self.has_vector = False def generate_embedding(self, text: str) -> List[float]: """Generate embedding for text using Claude API.""" try: # Use Claude to generate semantic embeddings # Truncate text if too long for Claude API if len(text) > 8000: text = text[:8000] + "..." prompt = f""" Convert the following text into a 384-dimensional numerical vector that represents its semantic meaning. The vector should be suitable for similarity search and clustering. Text: {text} Return only a JSON array of 384 floating-point numbers between -1 and 1, like this: [0.123, -0.456, 0.789, ...] """ message = self.claude_client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=2000, temperature=0.1, messages=[{"role": "user", "content": prompt}] ) response_text = message.content[0].text.strip() # Extract JSON array from response import json import re # Find JSON array in response json_match = re.search(r'\[[\d\.,\s-]+\]', response_text) if json_match: embedding = json.loads(json_match.group()) if len(embedding) == 384: return embedding # Fallback: generate deterministic embedding from text hash return self._generate_fallback_embedding(text) except Exception as e: self.logger.error(f"Claude embedding generation failed: {e}") return self._generate_fallback_embedding(text) def _generate_fallback_embedding(self, text: str) -> List[float]: """Generate fallback embedding using text hash.""" try: import hashlib import struct # Create a deterministic hash-based embedding hash_obj = hashlib.sha256(text.encode('utf-8')) hash_bytes = hash_obj.digest() # Convert to 384-dimensional vector embedding = [] for i in range(0, len(hash_bytes), 4): if len(embedding) >= 384: break chunk = hash_bytes[i:i+4] if len(chunk) == 4: # Convert 4 bytes to float and normalize value = struct.unpack('>I', chunk)[0] / (2**32 - 1) # Normalize to 0-1 embedding.append(value * 2 - 1) # Scale to -1 to 1 # Pad to exactly 384 dimensions while len(embedding) < 384: embedding.append(0.0) return embedding[:384] except Exception as e: self.logger.error(f"Fallback embedding generation failed: {e}") return [0.0] * 384 def calculate_content_hash(self, content: str) -> str: """Calculate SHA-256 hash of content for change detection.""" return hashlib.sha256(content.encode()).hexdigest() async def store_working_memory(self, key: str, data: Dict[str, Any], ttl: Optional[int] = None) -> bool: """Store temporary data in working memory (Redis).""" try: ttl = ttl or self.working_memory_ttl serialized_data = json.dumps(data, default=str) self.redis_client.setex(f"working:{key}", ttl, serialized_data) return True except Exception as e: self.logger.error(f"Working memory storage failed: {e}") return False async def get_working_memory(self, key: str) -> Optional[Dict[str, Any]]: """Retrieve data from working memory.""" try: data = self.redis_client.get(f"working:{key}") return json.loads(data) if data else None except Exception as e: self.logger.error(f"Working memory retrieval failed: {e}") return None async def store_episodic_memory(self, session_id: str, user_query: str, ai_response: str, repo_context: str, metadata: Optional[Dict] = None) -> str: """Store interaction in episodic memory.""" try: memory_id = str(uuid.uuid4()) # Generate embeddings query_embedding = self.generate_embedding(user_query) response_embedding = self.generate_embedding(ai_response) # Store in MongoDB episodic_record = { 'memory_id': memory_id, 'session_id': session_id, 'user_query': user_query, 'ai_response': ai_response, 'repo_context': repo_context, 'timestamp': datetime.utcnow(), 'metadata': metadata or {} } self.episodic_collection.insert_one(episodic_record) # Store embeddings in PostgreSQL for similarity search with self.pg_conn.cursor() as cur: cur.execute(""" INSERT INTO query_embeddings (session_id, query_text, query_embedding, response_embedding, repo_context, metadata) VALUES (%s, %s, %s, %s, %s, %s) """, ( session_id, user_query, query_embedding, response_embedding, repo_context, json.dumps(metadata or {}) )) self.pg_conn.commit() self.logger.info(f"Episodic memory stored: {memory_id}") return memory_id except Exception as e: self.logger.error(f"Episodic memory storage failed: {e}") return "" async def retrieve_episodic_memories(self, query: str, repo_context: str = "", limit: int = 10, similarity_threshold: float = 0.7) -> List[Dict]: """Retrieve relevant episodic memories based on query similarity.""" try: query_embedding = self.generate_embedding(query) with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur: # Find similar queries using cosine similarity cur.execute(""" SELECT session_id, query_text, repo_context, timestamp, metadata, 1 - (query_embedding <=> %s::vector) as similarity FROM query_embeddings WHERE (%s = '' OR repo_context = %s) AND 1 - (query_embedding <=> %s::vector) > %s ORDER BY similarity DESC LIMIT %s """, (query_embedding, repo_context, repo_context, query_embedding, similarity_threshold, limit)) similar_queries = cur.fetchall() # Fetch full episodic records from MongoDB memories = [] for query_record in similar_queries: episodic_record = self.episodic_collection.find_one({ 'session_id': query_record['session_id'], 'timestamp': query_record['timestamp'] }) if episodic_record: episodic_record['similarity_score'] = float(query_record['similarity']) memories.append(episodic_record) return memories except Exception as e: self.logger.error(f"Episodic memory retrieval failed: {e}") return [] async def store_persistent_memory(self, content: str, category: str, confidence: float, source_repos: List[str]) -> str: """Store long-term knowledge in persistent memory.""" try: fact_id = str(uuid.uuid4()) embedding = self.generate_embedding(content) # Store in MongoDB persistent_record = { 'fact_id': fact_id, 'content': content, 'category': category, 'confidence': confidence, 'source_repos': source_repos, 'created_at': datetime.utcnow(), 'last_accessed': datetime.utcnow(), 'access_frequency': 1 } self.persistent_collection.insert_one(persistent_record) # Store embedding in PostgreSQL with self.pg_conn.cursor() as cur: if self.has_vector: cur.execute(""" INSERT INTO knowledge_embeddings (fact_id, content, category, embedding, confidence, source_repos) VALUES (%s, %s, %s, %s, %s, %s) """, (fact_id, content, category, embedding, confidence, source_repos)) else: cur.execute(""" INSERT INTO knowledge_embeddings (fact_id, content, category, confidence, source_repos) VALUES (%s, %s, %s, %s, %s) """, (fact_id, content, category, confidence, source_repos)) self.pg_conn.commit() self.logger.info(f"Persistent memory stored: {fact_id}") return fact_id except Exception as e: self.logger.error(f"Persistent memory storage failed: {e}") return "" async def retrieve_persistent_memories(self, query: str, category: str = "", limit: int = 20, similarity_threshold: float = 0.6) -> List[Dict]: """Retrieve relevant persistent knowledge.""" try: query_embedding = self.generate_embedding(query) with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur: # Check if table exists first cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'knowledge_embeddings' ); """) table_exists = cur.fetchone()[0] if not table_exists: self.logger.warning("knowledge_embeddings table does not exist, returning empty results") return [] # Build WHERE clause dynamically if hasattr(self, 'has_vector') and self.has_vector: where_conditions = ["1 - (embedding <=> %s::vector) > %s"] params = [query_embedding, similarity_threshold] else: # Fallback to text-based search where_conditions = ["content ILIKE %s"] params = [f"%{query}%"] if category: where_conditions.append("category = %s") params.append(category) where_clause = " AND ".join(where_conditions) params.extend([limit]) if hasattr(self, 'has_vector') and self.has_vector: cur.execute(f""" SELECT fact_id, content, category, confidence, source_repos, 1 - (embedding <=> %s::vector) as similarity, created_at, last_accessed, access_frequency FROM knowledge_embeddings WHERE {where_clause} ORDER BY similarity DESC, confidence DESC, access_frequency DESC LIMIT %s """, params) else: cur.execute(f""" SELECT fact_id, content, category, confidence, source_repos, 0.8 as similarity, created_at, last_accessed, access_frequency FROM knowledge_embeddings WHERE {where_clause} ORDER BY confidence DESC, access_frequency DESC LIMIT %s """, params) results = cur.fetchall() # Update access frequency for result in results: cur.execute(""" UPDATE knowledge_embeddings SET last_accessed = CURRENT_TIMESTAMP, access_frequency = access_frequency + 1 WHERE fact_id = %s """, (result['fact_id'],)) self.pg_conn.commit() return [dict(result) for result in results] except Exception as e: self.logger.error(f"Persistent memory retrieval failed: {e}") return [] async def store_code_analysis(self, repo_id: str, file_path: str, analysis_data: Dict[str, Any]) -> str: """Store code analysis with embeddings for future retrieval.""" try: content_hash = self.calculate_content_hash(json.dumps(analysis_data, sort_keys=True)) # Create searchable content for embedding searchable_content = f""" File: {file_path} Language: {analysis_data.get('language', 'Unknown')} Issues: {' '.join(analysis_data.get('issues_found', []))} Recommendations: {' '.join(analysis_data.get('recommendations', []))} Analysis: {analysis_data.get('detailed_analysis', '')} """ embedding = self.generate_embedding(searchable_content) # Store in MongoDB analysis_record = { 'repo_id': repo_id, 'file_path': file_path, 'content_hash': content_hash, 'analysis_data': analysis_data, 'created_at': datetime.utcnow(), 'last_accessed': datetime.utcnow(), 'access_count': 1 } # Upsert to handle updates self.analysis_collection.update_one( {'repo_id': repo_id, 'file_path': file_path}, {'$set': analysis_record}, upsert=True ) # Store embedding in PostgreSQL with self.pg_conn.cursor() as cur: if self.has_vector: cur.execute(""" INSERT INTO code_embeddings (repo_id, file_path, content_hash, embedding, metadata) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (repo_id, file_path, content_hash) DO UPDATE SET last_accessed = CURRENT_TIMESTAMP """, ( repo_id, file_path, content_hash, embedding, json.dumps({ 'language': analysis_data.get('language'), 'lines_of_code': analysis_data.get('lines_of_code', 0), 'severity_score': analysis_data.get('severity_score', 5.0) }) )) else: cur.execute(""" INSERT INTO code_embeddings (repo_id, file_path, content_hash, embedding_text, metadata) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (repo_id, file_path, content_hash) DO UPDATE SET last_accessed = CURRENT_TIMESTAMP """, ( repo_id, file_path, content_hash, json.dumps(embedding), json.dumps({ 'language': analysis_data.get('language'), 'lines_of_code': analysis_data.get('lines_of_code', 0), 'severity_score': analysis_data.get('severity_score', 5.0) }) )) self.pg_conn.commit() return content_hash except Exception as e: self.logger.error(f"Code analysis storage failed: {e}") return "" async def search_similar_code(self, query: str, repo_id: str = "", limit: int = 10) -> List[Dict]: """Search for similar code analyses.""" try: query_embedding = self.generate_embedding(query) with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur: # Check if table exists first cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'code_embeddings' ); """) table_exists = cur.fetchone()[0] if not table_exists: self.logger.warning("code_embeddings table does not exist, returning empty results") return [] where_clause = "WHERE 1=1" params = [query_embedding] if repo_id: where_clause += " AND repo_id = %s" params.append(repo_id) params.append(limit) cur.execute(f""" SELECT repo_id, file_path, content_hash, metadata, 1 - (embedding <=> %s::vector) as similarity FROM code_embeddings {where_clause} ORDER BY similarity DESC LIMIT %s """, params) results = cur.fetchall() # Fetch full analysis data from MongoDB enriched_results = [] for result in results: analysis = self.analysis_collection.find_one({ 'repo_id': result['repo_id'], 'file_path': result['file_path'] }) if analysis: analysis['similarity_score'] = float(result['similarity']) enriched_results.append(analysis) return enriched_results except Exception as e: self.logger.error(f"Similar code search failed: {e}") return [] async def cleanup_old_memories(self): """Clean up old episodic memories and update access patterns.""" try: cutoff_date = datetime.utcnow() - timedelta(days=self.episodic_retention_days) # Clean up old episodic memories result = self.episodic_collection.delete_many({ 'timestamp': {'$lt': cutoff_date} }) self.logger.info(f"Cleaned up {result.deleted_count} old episodic memories") # Clean up corresponding query embeddings with self.pg_conn.cursor() as cur: cur.execute("DELETE FROM query_embeddings WHERE timestamp < %s", (cutoff_date,)) self.pg_conn.commit() # Update persistent memory relevance based on access patterns await self.update_persistent_memory_relevance() except Exception as e: self.logger.error(f"Memory cleanup failed: {e}") async def update_persistent_memory_relevance(self): """Update relevance scores for persistent memories based on access patterns.""" try: with self.pg_conn.cursor() as cur: # Calculate relevance based on recency and frequency cur.execute(""" UPDATE knowledge_embeddings SET confidence = LEAST(confidence * ( CASE WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - last_accessed)) / 86400 < 30 THEN 1.1 ELSE 0.95 END * (1.0 + LOG(access_frequency + 1) / 10.0) ), 1.0) """) self.pg_conn.commit() except Exception as e: self.logger.error(f"Relevance update failed: {e}") async def get_memory_stats(self) -> Dict[str, Any]: """Get comprehensive memory system statistics.""" try: stats = {} # Working memory stats (Redis) working_keys = self.redis_client.keys("working:*") stats['working_memory'] = { 'total_keys': len(working_keys), 'memory_usage': self.redis_client.info()['used_memory_human'] } # Episodic memory stats (MongoDB) stats['episodic_memory'] = { 'total_records': self.episodic_collection.count_documents({}), 'recent_interactions': self.episodic_collection.count_documents({ 'timestamp': {'$gte': datetime.utcnow() - timedelta(days=7)} }) } # Persistent memory stats stats['persistent_memory'] = { 'total_facts': self.persistent_collection.count_documents({}), 'high_confidence_facts': self.persistent_collection.count_documents({ 'confidence': {'$gte': 0.8} }) } # Code analysis stats stats['code_analysis'] = { 'total_analyses': self.analysis_collection.count_documents({}), 'unique_repositories': len(self.analysis_collection.distinct('repo_id')) } # Vector database stats (PostgreSQL) with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT COUNT(*) as count FROM code_embeddings") code_embeddings_count = cur.fetchone()['count'] cur.execute("SELECT COUNT(*) as count FROM knowledge_embeddings") knowledge_embeddings_count = cur.fetchone()['count'] stats['vector_database'] = { 'code_embeddings': code_embeddings_count, 'knowledge_embeddings': knowledge_embeddings_count } return stats except Exception as e: self.logger.error(f"Stats retrieval failed: {e}") return {} class MemoryQueryEngine: """Advanced querying capabilities across memory systems.""" def __init__(self, memory_manager: MemoryManager): self.memory = memory_manager async def intelligent_query(self, query: str, repo_context: str = "") -> Dict[str, Any]: """Intelligent cross-memory querying with relevance scoring.""" try: # Multi-source memory retrieval results = await asyncio.gather( self.memory.retrieve_episodic_memories(query, repo_context, limit=5), self.memory.retrieve_persistent_memories(query, limit=10), self.memory.search_similar_code(query, repo_context, limit=5) ) episodic_memories, persistent_knowledge, similar_code = results # Relevance scoring and fusion fused_response = self.fuse_memory_responses( query, episodic_memories, persistent_knowledge, similar_code ) return { 'query': query, 'fused_response': fused_response, 'sources': { 'episodic_count': len(episodic_memories), 'persistent_count': len(persistent_knowledge), 'similar_code_count': len(similar_code) }, 'confidence_score': self.calculate_response_confidence(fused_response), 'timestamp': datetime.utcnow() } except Exception as e: self.memory.logger.error(f"Intelligent query failed: {e}") return {'error': str(e)} def fuse_memory_responses(self, query: str, episodic: List, persistent: List, code: List) -> str: """Fuse responses from different memory systems.""" response_parts = [] # Weight different memory types if persistent: high_conf_knowledge = [p for p in persistent if p.get('confidence', 0) > 0.8] if high_conf_knowledge: response_parts.append("Based on established knowledge:") for knowledge in high_conf_knowledge[:3]: response_parts.append(f"• {knowledge['content']}") if episodic: recent_interactions = sorted(episodic, key=lambda x: x.get('timestamp', datetime.min), reverse=True)[:2] if recent_interactions: response_parts.append("\nFrom previous interactions:") for interaction in recent_interactions: response_parts.append(f"• {interaction.get('ai_response', '')[:200]}...") if code: similar_patterns = [c for c in code if c.get('similarity_score', 0) > 0.7] if similar_patterns: response_parts.append("\nSimilar code patterns found:") for pattern in similar_patterns[:2]: issues = pattern.get('analysis_data', {}).get('issues_found', []) if issues: response_parts.append(f"• {pattern['file_path']}: {issues[0]}") return '\n'.join(response_parts) if response_parts else "No relevant memories found." def calculate_response_confidence(self, response: str) -> float: """Calculate confidence score for fused response.""" if not response or response == "No relevant memories found.": return 0.0 # Simple confidence calculation based on response length and structure confidence = min(len(response.split()) / 100.0, 1.0) # Normalize by word count if "Based on established knowledge:" in response: confidence += 0.2 if "From previous interactions:" in response: confidence += 0.1 if "Similar code patterns found:" in response: confidence += 0.15 return min(confidence, 1.0) class EnhancedGitHubAnalyzer: """Enhanced repository analyzer with memory capabilities and parallel processing.""" def __init__(self, api_key: str, memory_config: Dict[str, Any]): self.client = anthropic.Anthropic(api_key=api_key) self.memory_manager = MemoryManager(memory_config) self.query_engine = MemoryQueryEngine(self.memory_manager) self.session_id = str(uuid.uuid4()) self.temp_dir = None # Performance optimization settings self.max_workers = memory_config.get('max_workers', 10) # Parallel processing self.batch_size = memory_config.get('batch_size', 10) # OPTIMIZED: Batch processing (REDUCED from 20 to 10) self.cache_ttl = memory_config.get('cache_ttl', 3600) # Cache TTL self.max_file_size = memory_config.get('max_file_size', 0) # No file size limit (0 = unlimited) # Language mapping for file detection self.language_map = { '.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript', '.tsx': 'TypeScript', '.jsx': 'JavaScript', '.java': 'Java', '.cpp': 'C++', '.c': 'C', '.cs': 'C#', '.go': 'Go', '.rs': 'Rust', '.php': 'PHP', '.rb': 'Ruby', '.swift': 'Swift', '.kt': 'Kotlin', '.html': 'HTML', '.css': 'CSS', '.scss': 'SCSS', '.sass': 'SASS', '.sql': 'SQL', '.yaml': 'YAML', '.yml': 'YAML', '.json': 'JSON', '.xml': 'XML', '.sh': 'Shell', '.dockerfile': 'Docker', '.md': 'Markdown', '.txt': 'Text' } # Code file extensions to analyze self.code_extensions = set(self.language_map.keys()) async def analyze_files_parallel(self, files_to_analyze: List[Tuple[Path, str]], repo_id: str) -> List[FileAnalysis]: """Analyze files in parallel batches for better performance.""" file_analyses = [] # Process files in batches for i in range(0, len(files_to_analyze), self.batch_size): batch = files_to_analyze[i:i + self.batch_size] print(f"Processing batch {i//self.batch_size + 1}/{(len(files_to_analyze) + self.batch_size - 1)//self.batch_size} ({len(batch)} files)") # Create tasks for parallel execution tasks = [] for file_path, content in batch: # Process all files regardless of size (no file size limit) task = self.analyze_file_with_memory(file_path, content, repo_id) tasks.append(task) # Execute batch in parallel if tasks: batch_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for j, result in enumerate(batch_results): if isinstance(result, Exception): print(f"Error analyzing file {batch[j][0].name}: {result}") # Create a basic analysis for failed files failed_analysis = FileAnalysis( path=str(batch[j][0]), language=self.detect_language(batch[j][0]), lines_of_code=len(batch[j][1].splitlines()), severity_score=5.0, issues_found=[f"Analysis failed: {str(result)}"], recommendations=["Review this file manually"] ) file_analyses.append(failed_analysis) else: file_analyses.append(result) # Small delay between batches to avoid overwhelming the API await asyncio.sleep(0.5) return file_analyses def clone_repository(self, repo_path: str) -> str: """Clone repository or use existing path.""" if os.path.exists(repo_path): print(f"Using existing repository: {repo_path}") return repo_path else: print(f"Cloning repository: {repo_path}") self.temp_dir = tempfile.mkdtemp(prefix="repo_analysis_") try: git.Repo.clone_from(repo_path, self.temp_dir) return self.temp_dir except Exception as e: raise Exception(f"Failed to clone repository: {e}") def calculate_repo_id(self, repo_path: str) -> str: """Generate consistent repository ID.""" return hashlib.sha256(repo_path.encode()).hexdigest()[:16] def get_file_language(self, file_path: Path) -> str: """Get programming language from file extension.""" return self.language_map.get(file_path.suffix.lower(), 'Unknown') def calculate_complexity_score(self, content: str) -> float: """Calculate basic complexity score based on code patterns.""" lines = content.split('\n') complexity_indicators = ['if', 'else', 'elif', 'for', 'while', 'try', 'except', 'catch', 'switch'] complexity = 1 for line in lines: line_lower = line.lower().strip() for indicator in complexity_indicators: if indicator in line_lower: complexity += 1 # Normalize to 1-10 scale return min(complexity / max(len(lines), 1) * 100, 10.0) async def analyze_file_with_memory(self, file_path: Path, content: str, repo_id: str) -> FileAnalysis: """Analyze file with memory-enhanced context.""" language = self.get_file_language(file_path) lines_of_code = len([line for line in content.split('\n') if line.strip()]) complexity_score = self.calculate_complexity_score(content) # Skip memory operations for faster analysis similar_analyses = [] persistent_knowledge = [] # Build enhanced context for analysis context_info = "" if similar_analyses: context_info += f"\nSimilar files previously analyzed:\n" for similar in similar_analyses[:2]: context_info += f"- {similar['file_path']}: Found {len(similar.get('analysis_data', {}).get('issues_found', []))} issues\n" if persistent_knowledge: context_info += f"\nRelevant best practices:\n" for knowledge in persistent_knowledge[:3]: context_info += f"- {knowledge['content'][:100]}...\n" # Truncate content if too long if len(content) > 4000: content = content[:4000] + "\n... [truncated for analysis]" print(f" Analyzing {file_path.name} ({language}, {lines_of_code} lines)") # Create comprehensive analysis prompt with memory context prompt = f""" You are a senior software engineer with 25+ years of experience. Analyze this {language} code file with context from previous analyses. FILENAME: {file_path.name} LANGUAGE: {language} LINES OF CODE: {lines_of_code} {context_info} CODE: ```{language.lower()} {content} ``` Provide a comprehensive analysis covering: 1. ISSUES FOUND: List specific problems, bugs, security vulnerabilities, or code smells 2. RECOMMENDATIONS: Actionable suggestions for improvement 3. CODE QUALITY: Overall assessment of code quality and maintainability 4. SECURITY: Any security concerns or vulnerabilities 5. PERFORMANCE: Potential performance issues or optimizations 6. BEST PRACTICES: Adherence to coding standards and best practices Rate the overall code quality from 1-10 where 10 is excellent. ANALYSIS: """ try: message = self.client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=3000, temperature=0.1, messages=[{"role": "user", "content": prompt}] ) analysis_text = message.content[0].text.strip() # Extract severity score from analysis severity_match = re.search(r'(\d+(?:\.\d+)?)/10', analysis_text) severity_score = float(severity_match.group(1)) if severity_match else 5.0 # Parse issues and recommendations from the text issues = self.extract_issues_from_analysis(analysis_text) recommendations = self.extract_recommendations_from_analysis(analysis_text) # Create file analysis object file_analysis = FileAnalysis( path=str(file_path.relative_to(Path(self.temp_dir or '.'))), language=language, lines_of_code=lines_of_code, complexity_score=complexity_score, issues_found=issues, recommendations=recommendations, detailed_analysis=analysis_text, severity_score=severity_score ) # Skip memory operations for faster analysis # await self.memory_manager.store_code_analysis( # repo_id, str(file_analysis.path), asdict(file_analysis) # ) # await self.extract_knowledge_from_analysis(file_analysis, repo_id) return file_analysis except Exception as e: print(f" Error analyzing {file_path.name}: {e}") return FileAnalysis( path=str(file_path), language=language, lines_of_code=lines_of_code, complexity_score=complexity_score, issues_found=[f"Analysis failed: {str(e)}"], recommendations=["Review file manually due to analysis error"], detailed_analysis=f"Analysis failed due to error: {str(e)}", severity_score=5.0 ) async def analyze_files_batch(self, combined_prompt: str) -> str: """Analyze multiple files in a single API call for smart batching.""" try: print(f"🚀 [BATCH API] Making single API call for multiple files") # Make single API call to Claude message = self.client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=4000, # Increased for multi-file response temperature=0.1, messages=[{"role": "user", "content": combined_prompt}] ) response_text = message.content[0].text.strip() print(f"✅ [BATCH API] Received response for multiple files") return response_text except Exception as e: print(f"❌ [BATCH API] Error in batch analysis: {e}") raise e def extract_issues_from_analysis(self, analysis_text: str) -> List[str]: """Extract issues from analysis text.""" issues = [] lines = analysis_text.split('\n') # Look for common issue indicators issue_keywords = ['issue', 'problem', 'bug', 'vulnerability', 'error', 'warning', 'concern'] for line in lines: line_lower = line.lower().strip() if any(keyword in line_lower for keyword in issue_keywords): if line.strip() and not line.strip().startswith('#'): issues.append(line.strip()) return issues[:10] # Limit to top 10 issues def extract_recommendations_from_analysis(self, analysis_text: str) -> List[str]: """Extract recommendations from analysis text.""" recommendations = [] lines = analysis_text.split('\n') # Look for recommendation indicators rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'improve'] for line in lines: line_lower = line.lower().strip() if any(keyword in line_lower for keyword in rec_keywords): if line.strip() and not line.strip().startswith('#'): recommendations.append(line.strip()) return recommendations[:10] # Limit to top 10 recommendations async def extract_knowledge_from_analysis(self, file_analysis: FileAnalysis, repo_id: str): """Extract valuable knowledge from analysis for persistent storage.""" try: # Extract security-related knowledge security_issues = [] if isinstance(file_analysis.issues_found, (list, tuple)): security_issues = [issue for issue in file_analysis.issues_found if any(sec in issue.lower() for sec in ['security', 'vulnerability', 'injection', 'xss', 'auth'])] for issue in security_issues: await self.memory_manager.store_persistent_memory( content=f"Security issue in {file_analysis.language}: {issue}", category='security_vulnerability', confidence=0.8, source_repos=[repo_id] ) # Extract best practices best_practices = [] if isinstance(file_analysis.recommendations, (list, tuple)): best_practices = [rec for rec in file_analysis.recommendations if any(bp in rec.lower() for bp in ['best practice', 'standard', 'convention'])] for practice in best_practices: await self.memory_manager.store_persistent_memory( content=f"{file_analysis.language} best practice: {practice}", category='best_practice', confidence=0.7, source_repos=[repo_id] ) # Extract code patterns if file_analysis.severity_score < 5: await self.memory_manager.store_persistent_memory( content=f"Low quality {file_analysis.language} pattern: {file_analysis.detailed_analysis[:200]}", category='code_pattern', confidence=0.6, source_repos=[repo_id] ) except Exception as e: self.memory_manager.logger.error(f"Knowledge extraction failed: {e}") def scan_repository(self, repo_path: str) -> List[Tuple[Path, str]]: """Scan repository and collect ALL files for analysis.""" print(f"Scanning repository: {repo_path}") files_to_analyze = [] # Important files to always include important_files = { 'README.md', 'package.json', 'requirements.txt', 'Dockerfile', 'docker-compose.yml', 'tsconfig.json', 'next.config.js', 'tailwind.config.js', 'webpack.config.js', '.env.example', 'Cargo.toml', 'pom.xml', 'build.gradle', 'composer.json', 'Gemfile', 'go.mod', 'yarn.lock', 'pnpm-lock.yaml' } for root, dirs, files in os.walk(repo_path): # Skip common build/cache directories dirs[:] = [d for d in dirs if not d.startswith('.') and d not in {'node_modules', '__pycache__', 'build', 'dist', 'target', 'venv', 'env', '.git', '.next', 'coverage', 'vendor', 'bower_components', '.gradle', '.m2', '.cargo'}] for file in files: file_path = Path(root) / file # Skip large files (increased limit for comprehensive analysis) try: if file_path.stat().st_size > 2000000: # 2MB limit print(f" Skipping large file: {file_path.name} ({file_path.stat().st_size / 1024 / 1024:.1f}MB)") continue except: continue # Include important files or files with code extensions should_include = ( file.lower() in important_files or file_path.suffix.lower() in self.code_extensions or file.lower().startswith('dockerfile') or file.lower().startswith('makefile') or file.lower().startswith('cmake') ) if should_include: try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() if content.strip(): # Only non-empty files files_to_analyze.append((file_path, content)) except Exception as e: print(f"Could not read {file_path}: {e}") print(f"Found {len(files_to_analyze)} files to analyze") return files_to_analyze async def analyze_repository_with_memory(self, repo_path: str) -> RepositoryAnalysis: """Main analysis function with memory integration - analyzes ALL files.""" try: # Generate repo ID and check for cached analysis repo_id = self.calculate_repo_id(repo_path) # Check working memory for recent analysis cached_analysis = await self.memory_manager.get_working_memory(f"repo_analysis:{repo_id}") if cached_analysis: print("Using cached repository analysis from memory") return RepositoryAnalysis(**cached_analysis) # Clone/access repository actual_repo_path = self.clone_repository(repo_path) # Get analysis context from memory (no user query needed) context_memories = await self.get_analysis_context(repo_path, "", repo_id) # Scan ALL files files_to_analyze = self.scan_repository(actual_repo_path) if not files_to_analyze: raise Exception("No files found to analyze") # Analyze files with parallel processing for better performance print(f"Starting comprehensive analysis of {len(files_to_analyze)} files with parallel processing...") file_analyses = await self.analyze_files_parallel(files_to_analyze, repo_id) # Repository-level analyses with memory context print("Performing repository-level analysis with memory context...") architecture_assessment, security_assessment = await self.analyze_repository_overview_with_memory( actual_repo_path, file_analyses, context_memories, repo_id ) # Calculate overall quality score safely if file_analyses and len(file_analyses) > 0: valid_scores = [fa.severity_score for fa in file_analyses if fa.severity_score is not None] avg_quality = sum(valid_scores) / len(valid_scores) if valid_scores else 5.0 else: avg_quality = 5.0 # Generate statistics languages = dict(Counter(fa.language for fa in file_analyses)) total_lines = sum(fa.lines_of_code for fa in file_analyses) # Create repository analysis repo_analysis = RepositoryAnalysis( repo_path=repo_path, total_files=len(file_analyses), total_lines=total_lines, languages=languages, architecture_assessment=architecture_assessment, security_assessment=security_assessment, code_quality_score=avg_quality, file_analyses=file_analyses, executive_summary="" ) # Generate executive summary with memory context print("Generating memory-enhanced executive summary...") repo_analysis.executive_summary = await self.generate_executive_summary_with_memory( repo_analysis, context_memories ) # Store analysis in episodic memory (automated analysis) await self.memory_manager.store_episodic_memory( self.session_id, "Complete automated repository analysis", f"Analyzed {repo_analysis.total_files} files, found {sum(len(fa.issues_found) for fa in file_analyses)} issues", repo_id, { 'repo_path': repo_path, 'quality_score': avg_quality, 'total_issues': sum(len(fa.issues_found) for fa in file_analyses), 'analysis_type': 'automated_comprehensive' } ) # Cache analysis in working memory await self.memory_manager.store_working_memory( f"repo_analysis:{repo_id}", asdict(repo_analysis), ttl=7200 # 2 hours ) return repo_analysis finally: # Cleanup if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) print("Temporary files cleaned up") async def get_analysis_context(self, repo_path: str, user_query: str, repo_id: str) -> Dict[str, List]: """Gather relevant context from memory systems.""" context = { 'episodic_memories': [], 'persistent_knowledge': [], 'similar_analyses': [] } # Get relevant persistent knowledge for comprehensive analysis context['persistent_knowledge'] = await self.memory_manager.retrieve_persistent_memories( "code quality security best practices", limit=15 ) # Find similar code analyses context['similar_analyses'] = await self.memory_manager.search_similar_code( "repository analysis", repo_id, limit=10 ) return context async def analyze_repository_overview_with_memory(self, repo_path: str, file_analyses: List[FileAnalysis], context_memories: Dict, repo_id: str) -> Tuple[str, str]: """Analyze repository architecture and security with memory context.""" print("Analyzing repository overview with memory context...") # Prepare summary data languages = dict(Counter(fa.language for fa in file_analyses)) total_lines = sum(fa.lines_of_code for fa in file_analyses) # Calculate average quality safely if file_analyses and len(file_analyses) > 0: valid_scores = [fa.severity_score for fa in file_analyses if fa.severity_score is not None] avg_quality = sum(valid_scores) / len(valid_scores) if valid_scores else 5.0 else: avg_quality = 5.0 # Build memory context memory_context = "" if context_memories['persistent_knowledge']: memory_context += "Relevant knowledge from previous analyses:\n" for knowledge in context_memories['persistent_knowledge'][:3]: memory_context += f"- {knowledge['content']}\n" if context_memories['similar_analyses']: memory_context += "\nSimilar repositories analyzed:\n" for similar in context_memories['similar_analyses'][:2]: memory_context += f"- {similar['file_path']}: {len(similar.get('analysis_data', {}).get('issues_found', []))} issues found\n" # Get repository structure structure_lines = [] try: for root, dirs, files in os.walk(repo_path): dirs[:] = [d for d in dirs if not d.startswith('.') and d not in {'node_modules', '__pycache__'}] level = root.replace(repo_path, '').count(os.sep) indent = ' ' * level structure_lines.append(f"{indent}{os.path.basename(root)}/") for file in files[:3]: # Limit files shown per directory structure_lines.append(f"{indent} {file}") if len(structure_lines) > 50: # Limit total structure size break except Exception as e: structure_lines = [f"Error reading structure: {e}"] # Architecture analysis with memory context arch_prompt = f""" You are a Senior Software Architect with 25+ years of experience. {memory_context} Analyze this repository: REPOSITORY STRUCTURE: {chr(10).join(structure_lines[:30])} STATISTICS: - Total files analyzed: {len(file_analyses)} - Total lines of code: {total_lines:,} - Languages: {languages} - Average code quality: {avg_quality:.1f}/10 TOP FILE ISSUES: {chr(10).join([f"- {fa.path}: {len(fa.issues_found) if isinstance(fa.issues_found, (list, tuple)) else 0} issues" for fa in file_analyses[:10]])} Provide an architectural assessment covering: 1. Project type and purpose 2. Technology stack evaluation 3. Code organization and structure 4. Scalability and maintainability concerns 5. Key recommendations for improvement Incorporate insights from the memory context provided above. Keep response under 1500 words and focus on actionable insights. """ # Security analysis with memory context security_issues = [] for fa in file_analyses: if isinstance(fa.issues_found, (list, tuple)): security_issues.extend([issue for issue in fa.issues_found if any(keyword in issue.lower() for keyword in ['security', 'vulnerability', 'injection', 'xss', 'auth', 'password'])]) sec_prompt = f""" You are a Senior Security Engineer with 20+ years of experience. {memory_context} Security Analysis for repository with {len(file_analyses)} files: SECURITY ISSUES FOUND: {chr(10).join(security_issues[:20]) if security_issues else "No obvious security issues detected"} HIGH-RISK FILE TYPES PRESENT: {[lang for lang, count in languages.items() if lang in ['JavaScript', 'TypeScript', 'Python', 'PHP', 'SQL']]} Provide security assessment covering: 1. Overall security posture 2. Main security risks and vulnerabilities 3. Authentication and authorization concerns 4. Data protection and privacy issues 5. Immediate security priorities Incorporate insights from the memory context provided above. Keep response under 1000 words and focus on actionable security recommendations. """ try: # Run both analyses arch_task = self.client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=2000, temperature=0.1, messages=[{"role": "user", "content": arch_prompt}] ) sec_task = self.client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=1500, temperature=0.1, messages=[{"role": "user", "content": sec_prompt}] ) architecture_assessment = arch_task.content[0].text security_assessment = sec_task.content[0].text # Store insights as persistent knowledge await self.memory_manager.store_persistent_memory( content=f"Architecture pattern: {architecture_assessment[:300]}...", category='architecture', confidence=0.7, source_repos=[repo_id] ) return architecture_assessment, security_assessment except Exception as e: return f"Architecture analysis failed: {e}", f"Security analysis failed: {e}" async def generate_executive_summary_with_memory(self, analysis: RepositoryAnalysis, context_memories: Dict) -> str: """Generate executive summary with memory context.""" print("Generating executive summary with memory context...") # Build memory context for executive summary executive_context = "" if context_memories['episodic_memories']: executive_context += "Previous executive discussions:\n" for memory in context_memories['episodic_memories'][:2]: if 'executive' in memory.get('ai_response', '').lower(): executive_context += f"- {memory['ai_response'][:200]}...\n" prompt = f""" You are presenting to C-level executives. Create an executive summary of this technical analysis. {executive_context} REPOSITORY METRICS: - Total Files: {analysis.total_files} - Lines of Code: {analysis.total_lines:,} - Languages: {analysis.languages} - Code Quality Score: {analysis.code_quality_score:.1f}/10 KEY FINDINGS: - Total issues identified: {sum(len(fa.issues_found) if isinstance(fa.issues_found, (list, tuple)) else 0 for fa in analysis.file_analyses)} - Files needing attention: {len([fa for fa in analysis.file_analyses if fa.severity_score < 7])} - High-quality files: {len([fa for fa in analysis.file_analyses if fa.severity_score >= 8])} Create an executive summary for non-technical leadership covering: 1. Business impact of code quality findings 2. Risk assessment and implications 3. Investment priorities and recommendations 4. Expected ROI from addressing technical debt 5. Competitive implications Focus on business outcomes, not technical details. Keep under 800 words. """ try: message = self.client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=1200, temperature=0.1, messages=[{"role": "user", "content": prompt}] ) return message.content[0].text except Exception as e: return f"Executive summary generation failed: {e}" def _create_language_pie_chart(self, languages: Dict[str, int]) -> Drawing: """Create a pie chart showing language distribution.""" drawing = Drawing(400, 200) pie = Pie() pie.x = 150 pie.y = 50 pie.width = 150 pie.height = 150 # Prepare data if languages and len(languages) > 0: labels = list(languages.keys())[:8] # Top 8 languages values = [languages[lang] for lang in labels] pie.data = values pie.labels = labels # Use distinct colors chart_colors = [ colors.HexColor('#3b82f6'), # Blue colors.HexColor('#10b981'), # Green colors.HexColor('#f59e0b'), # Amber colors.HexColor('#ef4444'), # Red colors.HexColor('#8b5cf6'), # Purple colors.HexColor('#ec4899'), # Pink colors.HexColor('#06b6d4'), # Cyan colors.HexColor('#f97316'), # Orange ] pie.slices.strokeWidth = 1 pie.slices.strokeColor = colors.white for i, color in enumerate(chart_colors[:len(values)]): pie.slices[i].fillColor = color pie.sideLabels = 1 pie.simpleLabels = 0 else: # Empty state pie.data = [1] pie.labels = ['No data'] pie.slices[0].fillColor = colors.HexColor('#e2e8f0') drawing.add(pie) return drawing def _create_quality_bar_chart(self, file_analyses: List) -> Drawing: """Create a bar chart showing file quality distribution.""" drawing = Drawing(400, 200) bc = VerticalBarChart() bc.x = 50 bc.y = 50 bc.height = 125 bc.width = 300 # Calculate quality counts high_count = len([fa for fa in file_analyses if fa.severity_score >= 8]) medium_count = len([fa for fa in file_analyses if 5 <= fa.severity_score < 8]) low_count = len([fa for fa in file_analyses if fa.severity_score < 5]) bc.data = [[high_count, medium_count, low_count]] bc.categoryAxis.categoryNames = ['High', 'Medium', 'Low'] bc.categoryAxis.labels.fontSize = 10 bc.valueAxis.valueMin = 0 bc.valueAxis.valueMax = max(high_count, medium_count, low_count, 1) * 1.2 # Colors bc.bars[0].fillColor = colors.HexColor('#10b981') # Green for high bc.bars[1].fillColor = colors.HexColor('#f59e0b') # Amber for medium bc.bars[2].fillColor = colors.HexColor('#ef4444') # Red for low drawing.add(bc) return drawing def create_pdf_report(self, analysis: RepositoryAnalysis, output_path: str, progress_mgr=None): """Generate comprehensive PDF report with visual charts and detailed sections.""" print(f"Generating PDF report: {output_path}") doc = SimpleDocTemplate(output_path, pagesize=A4, leftMargin=72, rightMargin=72, topMargin=72, bottomMargin=72) styles = getSampleStyleSheet() story = [] # Custom styles with proper core colors title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=24, textColor=colors.HexColor('#1e40af'), # Blue-800 spaceAfter=30, alignment=TA_CENTER ) heading_style = ParagraphStyle( 'CustomHeading', parent=styles['Heading2'], fontSize=16, textColor=colors.HexColor('#1e40af'), # Blue-800 spaceBefore=20, spaceAfter=10 ) # Title Page story.append(Paragraph("AI-Enhanced Repository Analysis Report", title_style)) story.append(Spacer(1, 20)) story.append(Paragraph(f"Repository: {analysis.repo_path}", styles['Normal'])) story.append(Paragraph(f"Analysis Date: {datetime.now().strftime('%B %d, %Y at %H:%M')}", styles['Normal'])) story.append(Paragraph("Generated by: Enhanced AI Analysis System with Memory", styles['Normal'])) story.append(PageBreak()) # Executive Summary story.append(Paragraph("Executive Summary", heading_style)) if analysis.executive_summary and len(analysis.executive_summary.strip()) > 50: story.append(Paragraph(analysis.executive_summary, styles['Normal'])) else: # Generate a comprehensive summary even without AI summary_text = f""" This repository contains {analysis.total_files} files with a total of {analysis.total_lines:,} lines of code. The codebase is primarily written in {', '.join(list(analysis.languages.keys())[:3]) if analysis.languages else 'Unknown'}. Key Statistics: • Total Files: {analysis.total_files} • Total Lines: {analysis.total_lines:,} • Code Quality Score: {analysis.code_quality_score}/10 • High Quality Files: {len([fa for fa in analysis.file_analyses if fa.severity_score >= 8])} • Medium Quality Files: {len([fa for fa in analysis.file_analyses if 5 <= fa.severity_score < 8])} • Low Quality Files: {len([fa for fa in analysis.file_analyses if fa.severity_score < 5])} Repository Overview: This appears to be a {analysis.repo_path.split('/')[-1] if '/' in analysis.repo_path else analysis.repo_path} project with a well-structured codebase. The analysis reveals a mix of file types and programming languages, indicating a comprehensive software project. """ story.append(Paragraph(summary_text, styles['Normal'])) story.append(PageBreak()) # Repository Overview story.append(Paragraph("Repository Overview", heading_style)) overview_data = [ ['Metric', 'Value'], ['Total Files Analyzed', str(analysis.total_files)], ['Total Lines of Code', f"{analysis.total_lines:,}"], ['Primary Languages', ', '.join(list(analysis.languages.keys())[:5]) if analysis.languages else 'Unknown'], ['Overall Code Quality', f"{analysis.code_quality_score:.1f}/10"], ] overview_table = Table(overview_data, colWidths=[200, 300]) overview_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#1e40af')), # Blue-800 header ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 12), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#f8fafc')), # Gray-50 ('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#e2e8f0')) # Gray-300 ])) story.append(overview_table) story.append(Spacer(1, 20)) # Language Distribution with Pie Chart story.append(Paragraph("Language Distribution", heading_style)) if analysis.languages: lang_chart = self._create_language_pie_chart(analysis.languages) story.append(lang_chart) story.append(Spacer(1, 10)) # Language breakdown table lang_data = [['Language', 'Files', 'Percentage']] total_lang_files = sum(analysis.languages.values()) for lang, count in sorted(analysis.languages.items(), key=lambda x: x[1], reverse=True)[:10]: percentage = (count / total_lang_files * 100) if total_lang_files > 0 else 0 lang_data.append([lang, str(count), f"{percentage:.1f}%"]) lang_table = Table(lang_data, colWidths=[150, 100, 100]) lang_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#1e40af')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 11), ('FONTSIZE', (0, 1), (-1, -1), 9), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#f8fafc')), ('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#e2e8f0')) ])) story.append(lang_table) else: story.append(Paragraph("No language data available.", styles['Normal'])) story.append(PageBreak()) # Code Quality Assessment with Bar Chart story.append(Paragraph("Code Quality Assessment", heading_style)) # Calculate percentages safely total_files = analysis.total_files if isinstance(analysis.total_files, int) and analysis.total_files > 0 else 1 # Calculate quality file counts from file_analyses high_quality_count = len([fa for fa in analysis.file_analyses if fa.severity_score >= 8]) medium_quality_count = len([fa for fa in analysis.file_analyses if 5 <= fa.severity_score < 8]) low_quality_count = len([fa for fa in analysis.file_analyses if fa.severity_score < 5]) quality_data = [ ['Quality Level', 'Count', 'Percentage'], ['High Quality', str(high_quality_count), f"{(high_quality_count/total_files)*100:.1f}%"], ['Medium Quality', str(medium_quality_count), f"{(medium_quality_count/total_files)*100:.1f}%"], ['Low Quality', str(low_quality_count), f"{(low_quality_count/total_files)*100:.1f}%"] ] quality_table = Table(quality_data, colWidths=[150, 100, 100]) quality_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#1e40af')), # Blue-800 header ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 12), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#f8fafc')), # Gray-50 ('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#e2e8f0')) # Gray-300 ])) story.append(quality_table) story.append(Spacer(1, 10)) # Add quality bar chart quality_chart = self._create_quality_bar_chart(analysis.file_analyses) story.append(quality_chart) story.append(PageBreak()) # Files Requiring Attention Section story.append(Paragraph("Files Requiring Attention", heading_style)) story.append(Paragraph( "The following files have been identified as requiring immediate attention based on their quality scores, " "number of issues, and complexity metrics. Priority is given to files with the lowest quality scores.", styles['Normal'] )) story.append(Spacer(1, 10)) # Get files sorted by severity (lowest scores first = most attention needed) files_needing_attention = sorted(analysis.file_analyses, key=lambda x: x.severity_score)[:15] if files_needing_attention: attention_data = [['File Path', 'Score', 'Issues', 'Priority']] for fa in files_needing_attention: # Determine priority based on score if fa.severity_score < 4: priority = "CRITICAL" priority_color = colors.HexColor('#ef4444') elif fa.severity_score < 6: priority = "HIGH" priority_color = colors.HexColor('#f59e0b') else: priority = "MEDIUM" priority_color = colors.HexColor('#3b82f6') file_path = str(fa.path)[:45] + '...' if len(str(fa.path)) > 45 else str(fa.path) issues_count = len(fa.issues_found) if isinstance(fa.issues_found, (list, tuple)) else 0 attention_data.append([ file_path, f"{fa.severity_score:.1f}/10", str(issues_count), priority ]) attention_table = Table(attention_data, colWidths=[220, 70, 60, 80]) attention_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#1e40af')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('ALIGN', (1, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('FONTSIZE', (0, 1), (-1, -1), 8), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#f8fafc')), ('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#e2e8f0')), # Color code priority column ('TEXTCOLOR', (3, 1), (3, -1), colors.HexColor('#dc2626')) ])) story.append(attention_table) story.append(Spacer(1, 15)) # Add recommendations for top 5 critical files story.append(Paragraph("Priority Recommendations:", ParagraphStyle( 'SubHeading', parent=styles['Heading3'], fontSize=12, textColor=colors.HexColor('#1e40af'), spaceBefore=10 ))) for i, fa in enumerate(files_needing_attention[:5], 1): story.append(Paragraph(f"{i}. {str(fa.path)} (Score: {fa.severity_score:.1f}/10)", styles['Normal'])) if fa.recommendations and len(fa.recommendations) > 0: for rec in fa.recommendations[:2]: # Top 2 recommendations per file story.append(Paragraph(f" • {rec}", styles['Normal'])) else: story.append(Paragraph(f" • Review code quality and add documentation", styles['Normal'])) story.append(Spacer(1, 5)) else: story.append(Paragraph("No files require immediate attention. Code quality is satisfactory.", styles['Normal'])) story.append(PageBreak()) # Security Assessment if hasattr(analysis, 'security_assessment') and analysis.security_assessment: story.append(Paragraph("Security Assessment", heading_style)) story.append(Paragraph(analysis.security_assessment, styles['Normal'])) story.append(Spacer(1, 20)) # Architecture Assessment if hasattr(analysis, 'architecture_assessment') and analysis.architecture_assessment: story.append(Paragraph("Architecture Assessment", heading_style)) story.append(Paragraph(analysis.architecture_assessment, styles['Normal'])) story.append(Spacer(1, 20)) # File Analysis Details story.append(Paragraph("File Analysis Details", heading_style)) # Create file analysis table file_data = [['File Path', 'Language', 'Lines', 'Quality Score', 'Issues']] for file_analysis in analysis.file_analyses[:20]: # Limit to first 20 files file_data.append([ str(file_analysis.path)[:50] + '...' if len(str(file_analysis.path)) > 50 else str(file_analysis.path), file_analysis.language, str(file_analysis.lines_of_code), f"{file_analysis.severity_score:.1f}/10", str(len(file_analysis.issues_found) if isinstance(file_analysis.issues_found, (list, tuple)) else 0) ]) if len(analysis.file_analyses) > 20: file_data.append(['...', '...', '...', '...', f'... and {len(analysis.file_analyses) - 20} more files']) file_table = Table(file_data, colWidths=[200, 80, 60, 80, 60]) file_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#1e40af')), # Blue-800 header ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('FONTSIZE', (0, 1), (-1, -1), 8), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#f8fafc')), # Gray-50 ('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#e2e8f0')) # Gray-300 ])) story.append(file_table) story.append(Spacer(1, 20)) # Recommendations story.append(Paragraph("Key Recommendations", heading_style)) recommendations = [] for file_analysis in analysis.file_analyses: if file_analysis.recommendations: recommendations.extend(file_analysis.recommendations[:2]) # Limit recommendations per file if recommendations: for i, rec in enumerate(recommendations[:10], 1): # Limit to top 10 recommendations story.append(Paragraph(f"{i}. {rec}", styles['Normal'])) else: story.append(Paragraph("No specific recommendations generated.", styles['Normal'])) story.append(Spacer(1, 20)) # Footer story.append(Paragraph("--- End of Report ---", styles['Normal'])) story.append(Paragraph(f"Generated on {datetime.now().strftime('%B %d, %Y at %H:%M:%S')}", styles['Normal'])) # Build PDF try: doc.build(story) print(f"✅ PDF report generated successfully: {output_path}") except Exception as e: print(f"❌ Error generating PDF: {e}") async def query_memory(self, query: str, repo_context: str = "") -> Dict[str, Any]: """Query the memory system directly.""" return await self.query_engine.intelligent_query(query, repo_context) def get_memory_config() -> Dict[str, Any]: """Get memory system configuration from environment variables.""" return { 'anthropic_api_key': os.getenv('ANTHROPIC_API_KEY', ''), 'redis_host': os.getenv('REDIS_HOST', 'localhost'), 'redis_port': int(os.getenv('REDIS_PORT', 6379)), 'redis_db': int(os.getenv('REDIS_DB', 0)), 'mongodb_url': os.getenv('MONGODB_URL', 'mongodb://localhost:27017/'), 'mongodb_name': os.getenv('MONGODB_DB', 'repo_analyzer'), 'postgres_host': os.getenv('POSTGRES_HOST', 'localhost'), 'postgres_port': int(os.getenv('POSTGRES_PORT', 5432)), 'postgres_db': os.getenv('POSTGRES_DB', 'repo_vectors'), 'postgres_user': os.getenv('POSTGRES_USER', 'postgres'), 'postgres_password': os.getenv('POSTGRES_PASSWORD', '') } async def main(): """Main function to run the enhanced repository analyzer.""" load_dotenv() import argparse parser = argparse.ArgumentParser(description="Complete AI Repository Analysis - Analyzes ALL files automatically") parser.add_argument("repo_path", help="Repository path (local directory or Git URL)") parser.add_argument("--output", "-o", default="complete_repository_analysis.pdf", help="Output PDF file path") parser.add_argument("--api-key", help="Anthropic API key (overrides .env)") args = parser.parse_args() # Get API key api_key = args.api_key or os.getenv('ANTHROPIC_API_KEY') if not api_key: print("❌ Error: ANTHROPIC_API_KEY not found in .env file or command line") return 1 try: print("🚀 Starting Complete AI Repository Analysis") print("=" * 60) print(f"Repository: {args.repo_path}") print(f"Output: {args.output}") print("Mode: Complete automated analysis of ALL files") print("=" * 60) # Initialize enhanced analyzer config = get_memory_config() analyzer = EnhancedGitHubAnalyzer(api_key, config) # Perform complete analysis analysis = await analyzer.analyze_repository_with_memory(args.repo_path) # Generate PDF report analyzer.create_pdf_report(analysis, args.output) # Print summary to console print("\n" + "=" * 60) print("🎯 COMPLETE ANALYSIS FINISHED") print("=" * 60) print(f"📊 Repository Statistics:") print(f" • Files Analyzed: {analysis.total_files}") print(f" • Lines of Code: {analysis.total_lines:,}") print(f" • Languages: {len(analysis.languages)}") print(f" • Code Quality: {analysis.code_quality_score:.1f}/10") # Quality breakdown high_quality = len([fa for fa in analysis.file_analyses if fa.severity_score >= 8]) medium_quality = len([fa for fa in analysis.file_analyses if 5 <= fa.severity_score < 8]) low_quality = len([fa for fa in analysis.file_analyses if fa.severity_score < 5]) print(f"\n📈 Quality Breakdown:") print(f" • High Quality Files (8-10): {high_quality}") print(f" • Medium Quality Files (5-7): {medium_quality}") print(f" • Low Quality Files (1-4): {low_quality}") print(f" • Total Issues Found: {sum(len(fa.issues_found) if isinstance(fa.issues_found, (list, tuple)) else 0 for fa in analysis.file_analyses)}") # Language breakdown print(f"\n🔤 Language Distribution:") for lang, count in sorted(analysis.languages.items(), key=lambda x: x[1], reverse=True)[:10]: print(f" • {lang}: {count} files") # Memory system stats memory_stats = await analyzer.memory_manager.get_memory_stats() print(f"\n🧠 Memory System Statistics:") for category, data in memory_stats.items(): print(f" • {category.replace('_', ' ').title()}: {data}") print(f"\n📄 Complete PDF Report: {args.output}") print("\n✅ Complete analysis finished successfully!") return 0 except Exception as e: print(f"❌ Error during analysis: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit(asyncio.run(main()))