#!/usr/bin/env python3
"""
Complete AI Repository Analysis Tool with Memory System
Automatically analyzes ALL files in a repository without limits.
Features:
- Analyzes ALL files in the repository (no max-files limit)
- No user query required - fully automated analysis
- Memory-enhanced analysis with learning capabilities
- Comprehensive PDF report generation
- Security, architecture, and code quality assessment
Usage:
python ai-analyze.py /path/to/repo --output analysis.pdf
Example:
python ai-analyze.py ./my-project --output complete_analysis.pdf
"""
import os
import asyncio
import hashlib
import json
import uuid
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from collections import defaultdict, Counter
import logging
import tempfile
import shutil
import re
# Core packages
import anthropic
from dotenv import load_dotenv
import git
import redis
import pymongo
import psycopg2
from psycopg2.extras import RealDictCursor
import numpy as np
# PDF generation
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER, TA_LEFT
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Table, TableStyle
from reportlab.lib import colors
# Enhanced dataclasses for memory system
@dataclass
class MemoryRecord:
id: str
timestamp: datetime
memory_type: str # 'episodic', 'persistent', 'working'
content: Dict[str, Any]
embeddings: Optional[List[float]] = None
metadata: Optional[Dict[str, Any]] = None
expiry: Optional[datetime] = None
@dataclass
class CodeAnalysisMemory:
repo_id: str
file_path: str
analysis_hash: str
analysis_data: Dict[str, Any]
embedding: List[float]
last_updated: datetime
access_count: int = 0
relevance_score: float = 1.0
@dataclass
class EpisodicMemory:
session_id: str
user_query: str
ai_response: str
repo_context: str
timestamp: datetime
embedding: List[float]
metadata: Dict[str, Any]
@dataclass
class PersistentMemory:
fact_id: str
content: str
category: str # 'code_pattern', 'best_practice', 'vulnerability', 'architecture'
confidence: float
embedding: List[float]
source_repos: List[str]
created_at: datetime
last_accessed: datetime
access_frequency: int = 0
@dataclass
class FileAnalysis:
path: str
language: str
lines_of_code: int
complexity_score: float
issues_found: List[str]
recommendations: List[str]
detailed_analysis: str
severity_score: float
@dataclass
class RepositoryAnalysis:
repo_path: str
total_files: int
total_lines: int
languages: Dict[str, int]
architecture_assessment: str
security_assessment: str
code_quality_score: float
file_analyses: List[FileAnalysis]
executive_summary: str
class MemoryManager:
"""Advanced memory management system for AI repository analysis."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.setup_logging()
# Initialize Claude client for embeddings
self.claude_client = anthropic.Anthropic(api_key=config.get('anthropic_api_key', ''))
# Initialize database connections
self.setup_databases()
# Memory configuration
self.working_memory_ttl = 3600 # 1 hour
self.episodic_retention_days = 365 # 1 year
self.persistent_memory_threshold = 0.8 # Confidence threshold for persistence
def setup_logging(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def setup_databases(self):
"""Initialize all database connections."""
try:
# Redis for working memory (temporary, fast access)
self.redis_client = redis.Redis(
host=self.config.get('redis_host', 'localhost'),
port=self.config.get('redis_port', 6379),
db=self.config.get('redis_db', 0),
decode_responses=True
)
# MongoDB for documents and episodic memory
self.mongo_client = pymongo.MongoClient(
self.config.get('mongodb_url', 'mongodb://localhost:27017/')
)
self.mongo_db = self.mongo_client[self.config.get('mongodb_name', 'repo_analyzer')]
# Collections
self.episodic_collection = self.mongo_db['episodic_memories']
self.analysis_collection = self.mongo_db['code_analyses']
self.persistent_collection = self.mongo_db['persistent_memories']
self.repo_metadata_collection = self.mongo_db['repository_metadata']
# PostgreSQL with pgvector for vector operations
self.pg_conn = psycopg2.connect(
host=self.config.get('postgres_host', 'localhost'),
port=self.config.get('postgres_port', 5432),
database=self.config.get('postgres_db', 'dev_pipeline'),
user=self.config.get('postgres_user', 'pipeline_admin'),
password=self.config.get('postgres_password', 'secure_pipeline_2024')
)
# Check if pgvector is available
try:
with self.pg_conn.cursor() as cur:
cur.execute("SELECT 1 FROM pg_extension WHERE extname = 'vector';")
self.has_vector = cur.fetchone() is not None
except:
self.has_vector = False
self.logger.info("All database connections established successfully")
except Exception as e:
self.logger.error(f"Database setup failed: {e}")
raise
def generate_embedding(self, text: str) -> List[float]:
"""Generate embedding for text using Claude API."""
try:
# Use Claude to generate semantic embeddings
# Truncate text if too long for Claude API
if len(text) > 8000:
text = text[:8000] + "..."
prompt = f"""
Convert the following text into a 384-dimensional numerical vector that represents its semantic meaning.
The vector should be suitable for similarity search and clustering.
Text: {text}
Return only a JSON array of 384 floating-point numbers between -1 and 1, like this:
[0.123, -0.456, 0.789, ...]
"""
message = self.claude_client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=2000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
response_text = message.content[0].text.strip()
# Extract JSON array from response
import json
import re
# Find JSON array in response
json_match = re.search(r'\[[\d\.,\s-]+\]', response_text)
if json_match:
embedding = json.loads(json_match.group())
if len(embedding) == 384:
return embedding
# Fallback: generate deterministic embedding from text hash
return self._generate_fallback_embedding(text)
except Exception as e:
self.logger.error(f"Claude embedding generation failed: {e}")
return self._generate_fallback_embedding(text)
def _generate_fallback_embedding(self, text: str) -> List[float]:
"""Generate fallback embedding using text hash."""
try:
import hashlib
import struct
# Create a deterministic hash-based embedding
hash_obj = hashlib.sha256(text.encode('utf-8'))
hash_bytes = hash_obj.digest()
# Convert to 384-dimensional vector
embedding = []
for i in range(0, len(hash_bytes), 4):
if len(embedding) >= 384:
break
chunk = hash_bytes[i:i+4]
if len(chunk) == 4:
# Convert 4 bytes to float and normalize
value = struct.unpack('>I', chunk)[0] / (2**32 - 1) # Normalize to 0-1
embedding.append(value * 2 - 1) # Scale to -1 to 1
# Pad to exactly 384 dimensions
while len(embedding) < 384:
embedding.append(0.0)
return embedding[:384]
except Exception as e:
self.logger.error(f"Fallback embedding generation failed: {e}")
return [0.0] * 384
def calculate_content_hash(self, content: str) -> str:
"""Calculate SHA-256 hash of content for change detection."""
return hashlib.sha256(content.encode()).hexdigest()
async def store_working_memory(self, key: str, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""Store temporary data in working memory (Redis)."""
try:
ttl = ttl or self.working_memory_ttl
serialized_data = json.dumps(data, default=str)
self.redis_client.setex(f"working:{key}", ttl, serialized_data)
return True
except Exception as e:
self.logger.error(f"Working memory storage failed: {e}")
return False
async def get_working_memory(self, key: str) -> Optional[Dict[str, Any]]:
"""Retrieve data from working memory."""
try:
data = self.redis_client.get(f"working:{key}")
return json.loads(data) if data else None
except Exception as e:
self.logger.error(f"Working memory retrieval failed: {e}")
return None
async def store_episodic_memory(self, session_id: str, user_query: str,
ai_response: str, repo_context: str,
metadata: Optional[Dict] = None) -> str:
"""Store interaction in episodic memory."""
try:
memory_id = str(uuid.uuid4())
# Generate embeddings
query_embedding = self.generate_embedding(user_query)
response_embedding = self.generate_embedding(ai_response)
# Store in MongoDB
episodic_record = {
'memory_id': memory_id,
'session_id': session_id,
'user_query': user_query,
'ai_response': ai_response,
'repo_context': repo_context,
'timestamp': datetime.utcnow(),
'metadata': metadata or {}
}
self.episodic_collection.insert_one(episodic_record)
# Store embeddings in PostgreSQL for similarity search
with self.pg_conn.cursor() as cur:
cur.execute("""
INSERT INTO query_embeddings
(session_id, query_text, query_embedding, response_embedding, repo_context, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
session_id, user_query, query_embedding, response_embedding,
repo_context, json.dumps(metadata or {})
))
self.pg_conn.commit()
self.logger.info(f"Episodic memory stored: {memory_id}")
return memory_id
except Exception as e:
self.logger.error(f"Episodic memory storage failed: {e}")
return ""
async def retrieve_episodic_memories(self, query: str, repo_context: str = "",
limit: int = 10, similarity_threshold: float = 0.7) -> List[Dict]:
"""Retrieve relevant episodic memories based on query similarity."""
try:
query_embedding = self.generate_embedding(query)
with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
# Find similar queries using cosine similarity
cur.execute("""
SELECT session_id, query_text, repo_context, timestamp, metadata,
1 - (query_embedding <=> %s::vector) as similarity
FROM query_embeddings
WHERE (%s = '' OR repo_context = %s)
AND 1 - (query_embedding <=> %s::vector) > %s
ORDER BY similarity DESC
LIMIT %s
""", (query_embedding, repo_context, repo_context, query_embedding, similarity_threshold, limit))
similar_queries = cur.fetchall()
# Fetch full episodic records from MongoDB
memories = []
for query_record in similar_queries:
episodic_record = self.episodic_collection.find_one({
'session_id': query_record['session_id'],
'timestamp': query_record['timestamp']
})
if episodic_record:
episodic_record['similarity_score'] = float(query_record['similarity'])
memories.append(episodic_record)
return memories
except Exception as e:
self.logger.error(f"Episodic memory retrieval failed: {e}")
return []
async def store_persistent_memory(self, content: str, category: str,
confidence: float, source_repos: List[str]) -> str:
"""Store long-term knowledge in persistent memory."""
try:
fact_id = str(uuid.uuid4())
embedding = self.generate_embedding(content)
# Store in MongoDB
persistent_record = {
'fact_id': fact_id,
'content': content,
'category': category,
'confidence': confidence,
'source_repos': source_repos,
'created_at': datetime.utcnow(),
'last_accessed': datetime.utcnow(),
'access_frequency': 1
}
self.persistent_collection.insert_one(persistent_record)
# Store embedding in PostgreSQL
with self.pg_conn.cursor() as cur:
if self.has_vector:
cur.execute("""
INSERT INTO knowledge_embeddings
(fact_id, content, category, embedding, confidence, source_repos)
VALUES (%s, %s, %s, %s, %s, %s)
""", (fact_id, content, category, embedding, confidence, source_repos))
else:
cur.execute("""
INSERT INTO knowledge_embeddings
(fact_id, content, category, confidence, source_repos)
VALUES (%s, %s, %s, %s, %s)
""", (fact_id, content, category, confidence, source_repos))
self.pg_conn.commit()
self.logger.info(f"Persistent memory stored: {fact_id}")
return fact_id
except Exception as e:
self.logger.error(f"Persistent memory storage failed: {e}")
return ""
async def retrieve_persistent_memories(self, query: str, category: str = "",
limit: int = 20, similarity_threshold: float = 0.6) -> List[Dict]:
"""Retrieve relevant persistent knowledge."""
try:
query_embedding = self.generate_embedding(query)
with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
# Check if table exists first
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'knowledge_embeddings'
);
""")
table_exists = cur.fetchone()[0]
if not table_exists:
self.logger.warning("knowledge_embeddings table does not exist, returning empty results")
return []
# Build WHERE clause dynamically
if hasattr(self, 'has_vector') and self.has_vector:
where_conditions = ["1 - (embedding <=> %s::vector) > %s"]
params = [query_embedding, similarity_threshold]
else:
# Fallback to text-based search
where_conditions = ["content ILIKE %s"]
params = [f"%{query}%"]
if category:
where_conditions.append("category = %s")
params.append(category)
where_clause = " AND ".join(where_conditions)
params.extend([limit])
if hasattr(self, 'has_vector') and self.has_vector:
cur.execute(f"""
SELECT fact_id, content, category, confidence, source_repos,
1 - (embedding <=> %s::vector) as similarity,
created_at, last_accessed, access_frequency
FROM knowledge_embeddings
WHERE {where_clause}
ORDER BY similarity DESC, confidence DESC, access_frequency DESC
LIMIT %s
""", params)
else:
cur.execute(f"""
SELECT fact_id, content, category, confidence, source_repos,
0.8 as similarity,
created_at, last_accessed, access_frequency
FROM knowledge_embeddings
WHERE {where_clause}
ORDER BY confidence DESC, access_frequency DESC
LIMIT %s
""", params)
results = cur.fetchall()
# Update access frequency
for result in results:
cur.execute("""
UPDATE knowledge_embeddings
SET last_accessed = CURRENT_TIMESTAMP,
access_frequency = access_frequency + 1
WHERE fact_id = %s
""", (result['fact_id'],))
self.pg_conn.commit()
return [dict(result) for result in results]
except Exception as e:
self.logger.error(f"Persistent memory retrieval failed: {e}")
return []
async def store_code_analysis(self, repo_id: str, file_path: str,
analysis_data: Dict[str, Any]) -> str:
"""Store code analysis with embeddings for future retrieval."""
try:
content_hash = self.calculate_content_hash(json.dumps(analysis_data, sort_keys=True))
# Create searchable content for embedding
searchable_content = f"""
File: {file_path}
Language: {analysis_data.get('language', 'Unknown')}
Issues: {' '.join(analysis_data.get('issues_found', []))}
Recommendations: {' '.join(analysis_data.get('recommendations', []))}
Analysis: {analysis_data.get('detailed_analysis', '')}
"""
embedding = self.generate_embedding(searchable_content)
# Store in MongoDB
analysis_record = {
'repo_id': repo_id,
'file_path': file_path,
'content_hash': content_hash,
'analysis_data': analysis_data,
'created_at': datetime.utcnow(),
'last_accessed': datetime.utcnow(),
'access_count': 1
}
# Upsert to handle updates
self.analysis_collection.update_one(
{'repo_id': repo_id, 'file_path': file_path},
{'$set': analysis_record},
upsert=True
)
# Store embedding in PostgreSQL
with self.pg_conn.cursor() as cur:
if self.has_vector:
cur.execute("""
INSERT INTO code_embeddings (repo_id, file_path, content_hash, embedding, metadata)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (repo_id, file_path, content_hash)
DO UPDATE SET last_accessed = CURRENT_TIMESTAMP
""", (
repo_id, file_path, content_hash, embedding,
json.dumps({
'language': analysis_data.get('language'),
'lines_of_code': analysis_data.get('lines_of_code', 0),
'severity_score': analysis_data.get('severity_score', 5.0)
})
))
else:
cur.execute("""
INSERT INTO code_embeddings (repo_id, file_path, content_hash, embedding_text, metadata)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (repo_id, file_path, content_hash)
DO UPDATE SET last_accessed = CURRENT_TIMESTAMP
""", (
repo_id, file_path, content_hash, json.dumps(embedding),
json.dumps({
'language': analysis_data.get('language'),
'lines_of_code': analysis_data.get('lines_of_code', 0),
'severity_score': analysis_data.get('severity_score', 5.0)
})
))
self.pg_conn.commit()
return content_hash
except Exception as e:
self.logger.error(f"Code analysis storage failed: {e}")
return ""
async def search_similar_code(self, query: str, repo_id: str = "",
limit: int = 10) -> List[Dict]:
"""Search for similar code analyses."""
try:
query_embedding = self.generate_embedding(query)
with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
# Check if table exists first
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'code_embeddings'
);
""")
table_exists = cur.fetchone()[0]
if not table_exists:
self.logger.warning("code_embeddings table does not exist, returning empty results")
return []
where_clause = "WHERE 1=1"
params = [query_embedding]
if repo_id:
where_clause += " AND repo_id = %s"
params.append(repo_id)
params.append(limit)
cur.execute(f"""
SELECT repo_id, file_path, content_hash, metadata,
1 - (embedding <=> %s::vector) as similarity
FROM code_embeddings
{where_clause}
ORDER BY similarity DESC
LIMIT %s
""", params)
results = cur.fetchall()
# Fetch full analysis data from MongoDB
enriched_results = []
for result in results:
analysis = self.analysis_collection.find_one({
'repo_id': result['repo_id'],
'file_path': result['file_path']
})
if analysis:
analysis['similarity_score'] = float(result['similarity'])
enriched_results.append(analysis)
return enriched_results
except Exception as e:
self.logger.error(f"Similar code search failed: {e}")
return []
async def cleanup_old_memories(self):
"""Clean up old episodic memories and update access patterns."""
try:
cutoff_date = datetime.utcnow() - timedelta(days=self.episodic_retention_days)
# Clean up old episodic memories
result = self.episodic_collection.delete_many({
'timestamp': {'$lt': cutoff_date}
})
self.logger.info(f"Cleaned up {result.deleted_count} old episodic memories")
# Clean up corresponding query embeddings
with self.pg_conn.cursor() as cur:
cur.execute("DELETE FROM query_embeddings WHERE timestamp < %s", (cutoff_date,))
self.pg_conn.commit()
# Update persistent memory relevance based on access patterns
await self.update_persistent_memory_relevance()
except Exception as e:
self.logger.error(f"Memory cleanup failed: {e}")
async def update_persistent_memory_relevance(self):
"""Update relevance scores for persistent memories based on access patterns."""
try:
with self.pg_conn.cursor() as cur:
# Calculate relevance based on recency and frequency
cur.execute("""
UPDATE knowledge_embeddings
SET confidence = LEAST(confidence * (
CASE
WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - last_accessed)) / 86400 < 30
THEN 1.1
ELSE 0.95
END *
(1.0 + LOG(access_frequency + 1) / 10.0)
), 1.0)
""")
self.pg_conn.commit()
except Exception as e:
self.logger.error(f"Relevance update failed: {e}")
async def get_memory_stats(self) -> Dict[str, Any]:
"""Get comprehensive memory system statistics."""
try:
stats = {}
# Working memory stats (Redis)
working_keys = self.redis_client.keys("working:*")
stats['working_memory'] = {
'total_keys': len(working_keys),
'memory_usage': self.redis_client.info()['used_memory_human']
}
# Episodic memory stats (MongoDB)
stats['episodic_memory'] = {
'total_records': self.episodic_collection.count_documents({}),
'recent_interactions': self.episodic_collection.count_documents({
'timestamp': {'$gte': datetime.utcnow() - timedelta(days=7)}
})
}
# Persistent memory stats
stats['persistent_memory'] = {
'total_facts': self.persistent_collection.count_documents({}),
'high_confidence_facts': self.persistent_collection.count_documents({
'confidence': {'$gte': 0.8}
})
}
# Code analysis stats
stats['code_analysis'] = {
'total_analyses': self.analysis_collection.count_documents({}),
'unique_repositories': len(self.analysis_collection.distinct('repo_id'))
}
# Vector database stats (PostgreSQL)
with self.pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT COUNT(*) as count FROM code_embeddings")
code_embeddings_count = cur.fetchone()['count']
cur.execute("SELECT COUNT(*) as count FROM knowledge_embeddings")
knowledge_embeddings_count = cur.fetchone()['count']
stats['vector_database'] = {
'code_embeddings': code_embeddings_count,
'knowledge_embeddings': knowledge_embeddings_count
}
return stats
except Exception as e:
self.logger.error(f"Stats retrieval failed: {e}")
return {}
class MemoryQueryEngine:
"""Advanced querying capabilities across memory systems."""
def __init__(self, memory_manager: MemoryManager):
self.memory = memory_manager
async def intelligent_query(self, query: str, repo_context: str = "") -> Dict[str, Any]:
"""Intelligent cross-memory querying with relevance scoring."""
try:
# Multi-source memory retrieval
results = await asyncio.gather(
self.memory.retrieve_episodic_memories(query, repo_context, limit=5),
self.memory.retrieve_persistent_memories(query, limit=10),
self.memory.search_similar_code(query, repo_context, limit=5)
)
episodic_memories, persistent_knowledge, similar_code = results
# Relevance scoring and fusion
fused_response = self.fuse_memory_responses(
query, episodic_memories, persistent_knowledge, similar_code
)
return {
'query': query,
'fused_response': fused_response,
'sources': {
'episodic_count': len(episodic_memories),
'persistent_count': len(persistent_knowledge),
'similar_code_count': len(similar_code)
},
'confidence_score': self.calculate_response_confidence(fused_response),
'timestamp': datetime.utcnow()
}
except Exception as e:
self.memory.logger.error(f"Intelligent query failed: {e}")
return {'error': str(e)}
def fuse_memory_responses(self, query: str, episodic: List, persistent: List, code: List) -> str:
"""Fuse responses from different memory systems."""
response_parts = []
# Weight different memory types
if persistent:
high_conf_knowledge = [p for p in persistent if p.get('confidence', 0) > 0.8]
if high_conf_knowledge:
response_parts.append("Based on established knowledge:")
for knowledge in high_conf_knowledge[:3]:
response_parts.append(f"• {knowledge['content']}")
if episodic:
recent_interactions = sorted(episodic, key=lambda x: x.get('timestamp', datetime.min), reverse=True)[:2]
if recent_interactions:
response_parts.append("\nFrom previous interactions:")
for interaction in recent_interactions:
response_parts.append(f"• {interaction.get('ai_response', '')[:200]}...")
if code:
similar_patterns = [c for c in code if c.get('similarity_score', 0) > 0.7]
if similar_patterns:
response_parts.append("\nSimilar code patterns found:")
for pattern in similar_patterns[:2]:
issues = pattern.get('analysis_data', {}).get('issues_found', [])
if issues:
response_parts.append(f"• {pattern['file_path']}: {issues[0]}")
return '\n'.join(response_parts) if response_parts else "No relevant memories found."
def calculate_response_confidence(self, response: str) -> float:
"""Calculate confidence score for fused response."""
if not response or response == "No relevant memories found.":
return 0.0
# Simple confidence calculation based on response length and structure
confidence = min(len(response.split()) / 100.0, 1.0) # Normalize by word count
if "Based on established knowledge:" in response:
confidence += 0.2
if "From previous interactions:" in response:
confidence += 0.1
if "Similar code patterns found:" in response:
confidence += 0.15
return min(confidence, 1.0)
class EnhancedGitHubAnalyzer:
"""Enhanced repository analyzer with memory capabilities."""
def __init__(self, api_key: str, memory_config: Dict[str, Any]):
self.client = anthropic.Anthropic(api_key=api_key)
self.memory_manager = MemoryManager(memory_config)
self.query_engine = MemoryQueryEngine(self.memory_manager)
self.session_id = str(uuid.uuid4())
self.temp_dir = None
# Language mapping for file detection
self.language_map = {
'.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript',
'.tsx': 'TypeScript', '.jsx': 'JavaScript', '.java': 'Java',
'.cpp': 'C++', '.c': 'C', '.cs': 'C#', '.go': 'Go', '.rs': 'Rust',
'.php': 'PHP', '.rb': 'Ruby', '.swift': 'Swift', '.kt': 'Kotlin',
'.html': 'HTML', '.css': 'CSS', '.scss': 'SCSS', '.sass': 'SASS',
'.sql': 'SQL', '.yaml': 'YAML', '.yml': 'YAML', '.json': 'JSON',
'.xml': 'XML', '.sh': 'Shell', '.dockerfile': 'Docker',
'.md': 'Markdown', '.txt': 'Text'
}
# Code file extensions to analyze
self.code_extensions = set(self.language_map.keys())
def clone_repository(self, repo_path: str) -> str:
"""Clone repository or use existing path."""
if os.path.exists(repo_path):
print(f"Using existing repository: {repo_path}")
return repo_path
else:
print(f"Cloning repository: {repo_path}")
self.temp_dir = tempfile.mkdtemp(prefix="repo_analysis_")
try:
git.Repo.clone_from(repo_path, self.temp_dir)
return self.temp_dir
except Exception as e:
raise Exception(f"Failed to clone repository: {e}")
def calculate_repo_id(self, repo_path: str) -> str:
"""Generate consistent repository ID."""
return hashlib.sha256(repo_path.encode()).hexdigest()[:16]
def get_file_language(self, file_path: Path) -> str:
"""Get programming language from file extension."""
return self.language_map.get(file_path.suffix.lower(), 'Unknown')
def calculate_complexity_score(self, content: str) -> float:
"""Calculate basic complexity score based on code patterns."""
lines = content.split('\n')
complexity_indicators = ['if', 'else', 'elif', 'for', 'while', 'try', 'except', 'catch', 'switch']
complexity = 1
for line in lines:
line_lower = line.lower().strip()
for indicator in complexity_indicators:
if indicator in line_lower:
complexity += 1
# Normalize to 1-10 scale
return min(complexity / max(len(lines), 1) * 100, 10.0)
async def analyze_file_with_memory(self, file_path: Path, content: str, repo_id: str) -> FileAnalysis:
"""Analyze file with memory-enhanced context."""
language = self.get_file_language(file_path)
lines_of_code = len([line for line in content.split('\n') if line.strip()])
complexity_score = self.calculate_complexity_score(content)
# Check for similar code patterns in memory
similar_analyses = await self.memory_manager.search_similar_code(
f"{language} {file_path.name}", repo_id, limit=3
)
# Get relevant knowledge from persistent memory
persistent_knowledge = await self.memory_manager.retrieve_persistent_memories(
f"{language} code quality security", category="", limit=5
)
# Build enhanced context for analysis
context_info = ""
if similar_analyses:
context_info += f"\nSimilar files previously analyzed:\n"
for similar in similar_analyses[:2]:
context_info += f"- {similar['file_path']}: Found {len(similar.get('analysis_data', {}).get('issues_found', []))} issues\n"
if persistent_knowledge:
context_info += f"\nRelevant best practices:\n"
for knowledge in persistent_knowledge[:3]:
context_info += f"- {knowledge['content'][:100]}...\n"
# Truncate content if too long
if len(content) > 4000:
content = content[:4000] + "\n... [truncated for analysis]"
print(f" Analyzing {file_path.name} ({language}, {lines_of_code} lines)")
# Create comprehensive analysis prompt with memory context
prompt = f"""
You are a senior software engineer with 25+ years of experience. Analyze this {language} code file with context from previous analyses.
FILENAME: {file_path.name}
LANGUAGE: {language}
LINES OF CODE: {lines_of_code}
{context_info}
CODE:
```{language.lower()}
{content}
```
Provide a comprehensive analysis covering:
1. ISSUES FOUND: List specific problems, bugs, security vulnerabilities, or code smells
2. RECOMMENDATIONS: Actionable suggestions for improvement
3. CODE QUALITY: Overall assessment of code quality and maintainability
4. SECURITY: Any security concerns or vulnerabilities
5. PERFORMANCE: Potential performance issues or optimizations
6. BEST PRACTICES: Adherence to coding standards and best practices
Rate the overall code quality from 1-10 where 10 is excellent.
ANALYSIS:
"""
try:
message = self.client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=3000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
analysis_text = message.content[0].text.strip()
# Extract severity score from analysis
severity_match = re.search(r'(\d+(?:\.\d+)?)/10', analysis_text)
severity_score = float(severity_match.group(1)) if severity_match else 5.0
# Parse issues and recommendations from the text
issues = self.extract_issues_from_analysis(analysis_text)
recommendations = self.extract_recommendations_from_analysis(analysis_text)
# Create file analysis object
file_analysis = FileAnalysis(
path=str(file_path.relative_to(Path(self.temp_dir or '.'))),
language=language,
lines_of_code=lines_of_code,
complexity_score=complexity_score,
issues_found=issues,
recommendations=recommendations,
detailed_analysis=analysis_text,
severity_score=severity_score
)
# Store analysis in memory for future reference
await self.memory_manager.store_code_analysis(
repo_id, str(file_analysis.path), asdict(file_analysis)
)
# Extract knowledge for persistent memory
await self.extract_knowledge_from_analysis(file_analysis, repo_id)
return file_analysis
except Exception as e:
print(f" Error analyzing {file_path.name}: {e}")
return FileAnalysis(
path=str(file_path),
language=language,
lines_of_code=lines_of_code,
complexity_score=complexity_score,
issues_found=[f"Analysis failed: {str(e)}"],
recommendations=["Review file manually due to analysis error"],
detailed_analysis=f"Analysis failed due to error: {str(e)}",
severity_score=5.0
)
def extract_issues_from_analysis(self, analysis_text: str) -> List[str]:
"""Extract issues from analysis text."""
issues = []
lines = analysis_text.split('\n')
# Look for common issue indicators
issue_keywords = ['issue', 'problem', 'bug', 'vulnerability', 'error', 'warning', 'concern']
for line in lines:
line_lower = line.lower().strip()
if any(keyword in line_lower for keyword in issue_keywords):
if line.strip() and not line.strip().startswith('#'):
issues.append(line.strip())
return issues[:10] # Limit to top 10 issues
def extract_recommendations_from_analysis(self, analysis_text: str) -> List[str]:
"""Extract recommendations from analysis text."""
recommendations = []
lines = analysis_text.split('\n')
# Look for recommendation indicators
rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'improve']
for line in lines:
line_lower = line.lower().strip()
if any(keyword in line_lower for keyword in rec_keywords):
if line.strip() and not line.strip().startswith('#'):
recommendations.append(line.strip())
return recommendations[:10] # Limit to top 10 recommendations
async def extract_knowledge_from_analysis(self, file_analysis: FileAnalysis, repo_id: str):
"""Extract valuable knowledge from analysis for persistent storage."""
try:
# Extract security-related knowledge
security_issues = [issue for issue in file_analysis.issues_found
if any(sec in issue.lower() for sec in ['security', 'vulnerability', 'injection', 'xss', 'auth'])]
for issue in security_issues:
await self.memory_manager.store_persistent_memory(
content=f"Security issue in {file_analysis.language}: {issue}",
category='security_vulnerability',
confidence=0.8,
source_repos=[repo_id]
)
# Extract best practices
best_practices = [rec for rec in file_analysis.recommendations
if any(bp in rec.lower() for bp in ['best practice', 'standard', 'convention'])]
for practice in best_practices:
await self.memory_manager.store_persistent_memory(
content=f"{file_analysis.language} best practice: {practice}",
category='best_practice',
confidence=0.7,
source_repos=[repo_id]
)
# Extract code patterns
if file_analysis.severity_score < 5:
await self.memory_manager.store_persistent_memory(
content=f"Low quality {file_analysis.language} pattern: {file_analysis.detailed_analysis[:200]}",
category='code_pattern',
confidence=0.6,
source_repos=[repo_id]
)
except Exception as e:
self.memory_manager.logger.error(f"Knowledge extraction failed: {e}")
def scan_repository(self, repo_path: str) -> List[Tuple[Path, str]]:
"""Scan repository and collect ALL files for analysis."""
print(f"Scanning repository: {repo_path}")
files_to_analyze = []
# Important files to always include
important_files = {
'README.md', 'package.json', 'requirements.txt', 'Dockerfile',
'docker-compose.yml', 'tsconfig.json', 'next.config.js',
'tailwind.config.js', 'webpack.config.js', '.env.example',
'Cargo.toml', 'pom.xml', 'build.gradle', 'composer.json',
'Gemfile', 'go.mod', 'yarn.lock', 'pnpm-lock.yaml'
}
for root, dirs, files in os.walk(repo_path):
# Skip common build/cache directories
dirs[:] = [d for d in dirs if not d.startswith('.') and
d not in {'node_modules', '__pycache__', 'build', 'dist', 'target',
'venv', 'env', '.git', '.next', 'coverage', 'vendor',
'bower_components', '.gradle', '.m2', '.cargo'}]
for file in files:
file_path = Path(root) / file
# Skip large files (increased limit for comprehensive analysis)
try:
if file_path.stat().st_size > 2000000: # 2MB limit
print(f" Skipping large file: {file_path.name} ({file_path.stat().st_size / 1024 / 1024:.1f}MB)")
continue
except:
continue
# Include important files or files with code extensions
should_include = (
file.lower() in important_files or
file_path.suffix.lower() in self.code_extensions or
file.lower().startswith('dockerfile') or
file.lower().startswith('makefile') or
file.lower().startswith('cmake')
)
if should_include:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if content.strip(): # Only non-empty files
files_to_analyze.append((file_path, content))
except Exception as e:
print(f"Could not read {file_path}: {e}")
print(f"Found {len(files_to_analyze)} files to analyze")
return files_to_analyze
async def analyze_repository_with_memory(self, repo_path: str) -> RepositoryAnalysis:
"""Main analysis function with memory integration - analyzes ALL files."""
try:
# Generate repo ID and check for cached analysis
repo_id = self.calculate_repo_id(repo_path)
# Check working memory for recent analysis
cached_analysis = await self.memory_manager.get_working_memory(f"repo_analysis:{repo_id}")
if cached_analysis:
print("Using cached repository analysis from memory")
return RepositoryAnalysis(**cached_analysis)
# Clone/access repository
actual_repo_path = self.clone_repository(repo_path)
# Get analysis context from memory (no user query needed)
context_memories = await self.get_analysis_context(repo_path, "", repo_id)
# Scan ALL files
files_to_analyze = self.scan_repository(actual_repo_path)
if not files_to_analyze:
raise Exception("No files found to analyze")
# Analyze each file with memory context
print(f"Starting comprehensive analysis of {len(files_to_analyze)} files...")
file_analyses = []
for i, (file_path, content) in enumerate(files_to_analyze):
print(f"Analyzing file {i+1}/{len(files_to_analyze)}: {file_path.name}")
analysis = await self.analyze_file_with_memory(file_path, content, repo_id)
file_analyses.append(analysis)
# Small delay to avoid rate limiting
await asyncio.sleep(0.1)
# Repository-level analyses with memory context
print("Performing repository-level analysis with memory context...")
architecture_assessment, security_assessment = await self.analyze_repository_overview_with_memory(
actual_repo_path, file_analyses, context_memories, repo_id
)
# Calculate overall quality score
avg_quality = sum(fa.severity_score for fa in file_analyses) / len(file_analyses)
# Generate statistics
languages = dict(Counter(fa.language for fa in file_analyses))
total_lines = sum(fa.lines_of_code for fa in file_analyses)
# Create repository analysis
repo_analysis = RepositoryAnalysis(
repo_path=repo_path,
total_files=len(file_analyses),
total_lines=total_lines,
languages=languages,
architecture_assessment=architecture_assessment,
security_assessment=security_assessment,
code_quality_score=avg_quality,
file_analyses=file_analyses,
executive_summary=""
)
# Generate executive summary with memory context
print("Generating memory-enhanced executive summary...")
repo_analysis.executive_summary = await self.generate_executive_summary_with_memory(
repo_analysis, context_memories
)
# Store analysis in episodic memory (automated analysis)
await self.memory_manager.store_episodic_memory(
self.session_id, "Complete automated repository analysis",
f"Analyzed {repo_analysis.total_files} files, found {sum(len(fa.issues_found) for fa in file_analyses)} issues",
repo_id,
{
'repo_path': repo_path,
'quality_score': avg_quality,
'total_issues': sum(len(fa.issues_found) for fa in file_analyses),
'analysis_type': 'automated_comprehensive'
}
)
# Cache analysis in working memory
await self.memory_manager.store_working_memory(
f"repo_analysis:{repo_id}",
asdict(repo_analysis),
ttl=7200 # 2 hours
)
return repo_analysis
finally:
# Cleanup
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
print("Temporary files cleaned up")
async def get_analysis_context(self, repo_path: str, user_query: str, repo_id: str) -> Dict[str, List]:
"""Gather relevant context from memory systems."""
context = {
'episodic_memories': [],
'persistent_knowledge': [],
'similar_analyses': []
}
# Get relevant persistent knowledge for comprehensive analysis
context['persistent_knowledge'] = await self.memory_manager.retrieve_persistent_memories(
"code quality security best practices", limit=15
)
# Find similar code analyses
context['similar_analyses'] = await self.memory_manager.search_similar_code(
"repository analysis", repo_id, limit=10
)
return context
async def analyze_repository_overview_with_memory(self, repo_path: str, file_analyses: List[FileAnalysis],
context_memories: Dict, repo_id: str) -> Tuple[str, str]:
"""Analyze repository architecture and security with memory context."""
print("Analyzing repository overview with memory context...")
# Prepare summary data
languages = dict(Counter(fa.language for fa in file_analyses))
total_lines = sum(fa.lines_of_code for fa in file_analyses)
avg_quality = sum(fa.severity_score for fa in file_analyses) / len(file_analyses) if file_analyses else 5.0
# Build memory context
memory_context = ""
if context_memories['persistent_knowledge']:
memory_context += "Relevant knowledge from previous analyses:\n"
for knowledge in context_memories['persistent_knowledge'][:3]:
memory_context += f"- {knowledge['content']}\n"
if context_memories['similar_analyses']:
memory_context += "\nSimilar repositories analyzed:\n"
for similar in context_memories['similar_analyses'][:2]:
memory_context += f"- {similar['file_path']}: {len(similar.get('analysis_data', {}).get('issues_found', []))} issues found\n"
# Get repository structure
structure_lines = []
try:
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in {'node_modules', '__pycache__'}]
level = root.replace(repo_path, '').count(os.sep)
indent = ' ' * level
structure_lines.append(f"{indent}{os.path.basename(root)}/")
for file in files[:3]: # Limit files shown per directory
structure_lines.append(f"{indent} {file}")
if len(structure_lines) > 50: # Limit total structure size
break
except Exception as e:
structure_lines = [f"Error reading structure: {e}"]
# Architecture analysis with memory context
arch_prompt = f"""
You are a Senior Software Architect with 25+ years of experience.
{memory_context}
Analyze this repository:
REPOSITORY STRUCTURE:
{chr(10).join(structure_lines[:30])}
STATISTICS:
- Total files analyzed: {len(file_analyses)}
- Total lines of code: {total_lines:,}
- Languages: {languages}
- Average code quality: {avg_quality:.1f}/10
TOP FILE ISSUES:
{chr(10).join([f"- {fa.path}: {len(fa.issues_found)} issues" for fa in file_analyses[:10]])}
Provide an architectural assessment covering:
1. Project type and purpose
2. Technology stack evaluation
3. Code organization and structure
4. Scalability and maintainability concerns
5. Key recommendations for improvement
Incorporate insights from the memory context provided above.
Keep response under 1500 words and focus on actionable insights.
"""
# Security analysis with memory context
security_issues = []
for fa in file_analyses:
security_issues.extend([issue for issue in fa.issues_found if
any(keyword in issue.lower() for keyword in
['security', 'vulnerability', 'injection', 'xss', 'auth', 'password'])])
sec_prompt = f"""
You are a Senior Security Engineer with 20+ years of experience.
{memory_context}
Security Analysis for repository with {len(file_analyses)} files:
SECURITY ISSUES FOUND:
{chr(10).join(security_issues[:20]) if security_issues else "No obvious security issues detected"}
HIGH-RISK FILE TYPES PRESENT:
{[lang for lang, count in languages.items() if lang in ['JavaScript', 'TypeScript', 'Python', 'PHP', 'SQL']]}
Provide security assessment covering:
1. Overall security posture
2. Main security risks and vulnerabilities
3. Authentication and authorization concerns
4. Data protection and privacy issues
5. Immediate security priorities
Incorporate insights from the memory context provided above.
Keep response under 1000 words and focus on actionable security recommendations.
"""
try:
# Run both analyses
arch_task = self.client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=2000,
temperature=0.1,
messages=[{"role": "user", "content": arch_prompt}]
)
sec_task = self.client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=1500,
temperature=0.1,
messages=[{"role": "user", "content": sec_prompt}]
)
architecture_assessment = arch_task.content[0].text
security_assessment = sec_task.content[0].text
# Store insights as persistent knowledge
await self.memory_manager.store_persistent_memory(
content=f"Architecture pattern: {architecture_assessment[:300]}...",
category='architecture',
confidence=0.7,
source_repos=[repo_id]
)
return architecture_assessment, security_assessment
except Exception as e:
return f"Architecture analysis failed: {e}", f"Security analysis failed: {e}"
async def generate_executive_summary_with_memory(self, analysis: RepositoryAnalysis, context_memories: Dict) -> str:
"""Generate executive summary with memory context."""
print("Generating executive summary with memory context...")
# Build memory context for executive summary
executive_context = ""
if context_memories['episodic_memories']:
executive_context += "Previous executive discussions:\n"
for memory in context_memories['episodic_memories'][:2]:
if 'executive' in memory.get('ai_response', '').lower():
executive_context += f"- {memory['ai_response'][:200]}...\n"
prompt = f"""
You are presenting to C-level executives. Create an executive summary of this technical analysis.
{executive_context}
REPOSITORY METRICS:
- Total Files: {analysis.total_files}
- Lines of Code: {analysis.total_lines:,}
- Languages: {analysis.languages}
- Code Quality Score: {analysis.code_quality_score:.1f}/10
KEY FINDINGS:
- Total issues identified: {sum(len(fa.issues_found) for fa in analysis.file_analyses)}
- Files needing attention: {len([fa for fa in analysis.file_analyses if fa.severity_score < 7])}
- High-quality files: {len([fa for fa in analysis.file_analyses if fa.severity_score >= 8])}
Create an executive summary for non-technical leadership covering:
1. Business impact of code quality findings
2. Risk assessment and implications
3. Investment priorities and recommendations
4. Expected ROI from addressing technical debt
5. Competitive implications
Focus on business outcomes, not technical details. Keep under 800 words.
"""
try:
message = self.client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=1200,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
except Exception as e:
return f"Executive summary generation failed: {e}"
def create_pdf_report(self, analysis: RepositoryAnalysis, output_path: str):
"""Generate comprehensive PDF report."""
print(f"Generating PDF report: {output_path}")
doc = SimpleDocTemplate(output_path, pagesize=A4,
leftMargin=72, rightMargin=72,
topMargin=72, bottomMargin=72)
styles = getSampleStyleSheet()
story = []
# Custom styles
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
textColor=colors.darkblue,
spaceAfter=30,
alignment=TA_CENTER
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=16,
textColor=colors.darkblue,
spaceBefore=20,
spaceAfter=10
)
# Title Page
story.append(Paragraph("AI-Enhanced Repository Analysis Report", title_style))
story.append(Spacer(1, 20))
story.append(Paragraph(f"Repository: {analysis.repo_path}", styles['Normal']))
story.append(Paragraph(f"Analysis Date: {datetime.now().strftime('%B %d, %Y at %H:%M')}", styles['Normal']))
story.append(Paragraph("Generated by: Enhanced AI Analysis System with Memory", styles['Normal']))
story.append(PageBreak())
# Executive Summary
story.append(Paragraph("Executive Summary", heading_style))
story.append(Paragraph(analysis.executive_summary, styles['Normal']))
story.append(PageBreak())
# Repository Overview
story.append(Paragraph("Repository Overview", heading_style))
overview_data = [
['Metric', 'Value'],
['Total Files Analyzed', str(analysis.total_files)],
['Total Lines of Code', f"{analysis.total_lines:,}"],
['Primary Languages', ', '.join(list(analysis.languages.keys())[:5])],
['Overall Code Quality', f"{analysis.code_quality_score:.1f}/10"],
]
overview_table = Table(overview_data, colWidths=[200, 300])
overview_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(overview_table)
story.append(Spacer(1, 20))
# Build PDF
try:
doc.build(story)
print(f"✅ PDF report generated successfully: {output_path}")
except Exception as e:
print(f"❌ Error generating PDF: {e}")
async def query_memory(self, query: str, repo_context: str = "") -> Dict[str, Any]:
"""Query the memory system directly."""
return await self.query_engine.intelligent_query(query, repo_context)
def get_memory_config() -> Dict[str, Any]:
"""Get memory system configuration from environment variables."""
return {
'anthropic_api_key': os.getenv('ANTHROPIC_API_KEY', ''),
'redis_host': os.getenv('REDIS_HOST', 'localhost'),
'redis_port': int(os.getenv('REDIS_PORT', 6379)),
'redis_db': int(os.getenv('REDIS_DB', 0)),
'mongodb_url': os.getenv('MONGODB_URL', 'mongodb://localhost:27017/'),
'mongodb_name': os.getenv('MONGODB_DB', 'repo_analyzer'),
'postgres_host': os.getenv('POSTGRES_HOST', 'localhost'),
'postgres_port': int(os.getenv('POSTGRES_PORT', 5432)),
'postgres_db': os.getenv('POSTGRES_DB', 'repo_vectors'),
'postgres_user': os.getenv('POSTGRES_USER', 'postgres'),
'postgres_password': os.getenv('POSTGRES_PASSWORD', '')
}
async def main():
"""Main function to run the enhanced repository analyzer."""
load_dotenv()
import argparse
parser = argparse.ArgumentParser(description="Complete AI Repository Analysis - Analyzes ALL files automatically")
parser.add_argument("repo_path", help="Repository path (local directory or Git URL)")
parser.add_argument("--output", "-o", default="complete_repository_analysis.pdf",
help="Output PDF file path")
parser.add_argument("--api-key", help="Anthropic API key (overrides .env)")
args = parser.parse_args()
# Get API key
api_key = args.api_key or os.getenv('ANTHROPIC_API_KEY')
if not api_key:
print("❌ Error: ANTHROPIC_API_KEY not found in .env file or command line")
return 1
try:
print("🚀 Starting Complete AI Repository Analysis")
print("=" * 60)
print(f"Repository: {args.repo_path}")
print(f"Output: {args.output}")
print("Mode: Complete automated analysis of ALL files")
print("=" * 60)
# Initialize enhanced analyzer
config = get_memory_config()
analyzer = EnhancedGitHubAnalyzer(api_key, config)
# Perform complete analysis
analysis = await analyzer.analyze_repository_with_memory(args.repo_path)
# Generate PDF report
analyzer.create_pdf_report(analysis, args.output)
# Print summary to console
print("\n" + "=" * 60)
print("🎯 COMPLETE ANALYSIS FINISHED")
print("=" * 60)
print(f"📊 Repository Statistics:")
print(f" • Files Analyzed: {analysis.total_files}")
print(f" • Lines of Code: {analysis.total_lines:,}")
print(f" • Languages: {len(analysis.languages)}")
print(f" • Code Quality: {analysis.code_quality_score:.1f}/10")
# Quality breakdown
high_quality = len([fa for fa in analysis.file_analyses if fa.severity_score >= 8])
medium_quality = len([fa for fa in analysis.file_analyses if 5 <= fa.severity_score < 8])
low_quality = len([fa for fa in analysis.file_analyses if fa.severity_score < 5])
print(f"\n📈 Quality Breakdown:")
print(f" • High Quality Files (8-10): {high_quality}")
print(f" • Medium Quality Files (5-7): {medium_quality}")
print(f" • Low Quality Files (1-4): {low_quality}")
print(f" • Total Issues Found: {sum(len(fa.issues_found) for fa in analysis.file_analyses)}")
# Language breakdown
print(f"\n🔤 Language Distribution:")
for lang, count in sorted(analysis.languages.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" • {lang}: {count} files")
# Memory system stats
memory_stats = await analyzer.memory_manager.get_memory_stats()
print(f"\n🧠 Memory System Statistics:")
for category, data in memory_stats.items():
print(f" • {category.replace('_', ' ').title()}: {data}")
print(f"\n📄 Complete PDF Report: {args.output}")
print("\n✅ Complete analysis finished successfully!")
return 0
except Exception as e:
print(f"❌ Error during analysis: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit(asyncio.run(main()))