#!/usr/bin/env python3 """ Enhanced Chunking System for AI Analysis Service Implements intelligent file chunking with zero disruption to existing flows. Author: Senior Engineer (20+ years experience) Version: 1.0.0 """ import re import os import json import hashlib import asyncio from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass from pathlib import Path import logging @dataclass class ChunkInfo: """Information about a file chunk.""" chunk_id: int content: str start_line: int end_line: int chunk_type: str # 'function', 'class', 'import', 'main', 'utility' context: str is_complete: bool tokens_estimate: int language: str = "Unknown" # Programming language of the chunk @dataclass class ChunkAnalysis: """Analysis result for a single chunk.""" chunk_id: int issues_found: List[str] recommendations: List[str] severity_score: float detailed_analysis: str chunk_type: str context: str @dataclass class FileChunkingResult: """Result of chunking a file.""" file_path: str language: str total_chunks: int chunks: List[ChunkInfo] is_chunked: bool original_tokens: int chunked_tokens: int savings_percentage: float class IntelligentChunker: """ Intelligent file chunking system that breaks large files into semantic chunks while preserving context and relationships. """ def __init__(self, max_tokens_per_chunk: int = 4000, overlap_lines: int = 5): self.max_tokens = max_tokens_per_chunk self.overlap_lines = overlap_lines self.logger = logging.getLogger(__name__) # Language-specific patterns for intelligent chunking self.language_patterns = { 'python': { 'function': r'^def\s+\w+', 'class': r'^class\s+\w+', 'import': r'^(import|from)\s+', 'comment': r'^\s*#', 'docstring': r'^\s*""".*"""' }, 'javascript': { 'function': r'^(function\s+\w+|const\s+\w+\s*=\s*(async\s+)?\(|export\s+(function|const))', 'class': r'^class\s+\w+', 'import': r'^(import|const\s+\w+\s*=\s*require)', 'comment': r'^\s*//', 'jsdoc': r'^\s*/\*\*' }, 'typescript': { 'function': r'^(function\s+\w+|const\s+\w+\s*=\s*(async\s+)?\(|export\s+(function|const))', 'class': r'^class\s+\w+', 'interface': r'^interface\s+\w+', 'import': r'^(import|const\s+\w+\s*=\s*require)', 'comment': r'^\s*//', 'jsdoc': r'^\s*/\*\*' }, 'java': { 'function': r'^\s*(public|private|protected)?\s*(static\s+)?\w+\s+\w+\s*\(', 'class': r'^class\s+\w+', 'import': r'^import\s+', 'comment': r'^\s*//', 'javadoc': r'^\s*/\*\*' }, 'cpp': { 'function': r'^\w+\s+\w+\s*\(', 'class': r'^class\s+\w+', 'include': r'^#include\s*<', 'comment': r'^\s*//', 'block_comment': r'^\s*/\*' } } def estimate_tokens(self, text: str) -> int: """Estimate token count for text (rough approximation).""" return len(text) // 4 def detect_language(self, file_path: str) -> str: """Detect programming language from file extension.""" ext = Path(file_path).suffix.lower() language_map = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.tsx': 'typescript', '.jsx': 'javascript', '.java': 'java', '.cpp': 'cpp', '.c': 'cpp', '.cs': 'csharp', '.go': 'go', '.rs': 'rust', '.php': 'php', '.rb': 'ruby' } return language_map.get(ext, 'unknown') def chunk_file(self, file_path: str, content: str) -> FileChunkingResult: """ Intelligently chunk a file based on its programming language and structure. """ language = self.detect_language(file_path) lines = content.split('\n') original_tokens = self.estimate_tokens(content) # If file is small enough, don't chunk if original_tokens <= self.max_tokens: return FileChunkingResult( file_path=file_path, language=language, total_chunks=1, chunks=[ChunkInfo( chunk_id=0, content=content, start_line=0, end_line=len(lines), chunk_type='complete', context='', is_complete=True, tokens_estimate=original_tokens, language=language )], is_chunked=False, original_tokens=original_tokens, chunked_tokens=original_tokens, savings_percentage=0.0 ) # Chunk the file intelligently chunks = self._chunk_by_language(content, language, file_path) # Calculate savings chunked_tokens = sum(chunk.tokens_estimate for chunk in chunks) savings = max(0, (original_tokens - chunked_tokens) / original_tokens * 100) return FileChunkingResult( file_path=file_path, language=language, total_chunks=len(chunks), chunks=chunks, is_chunked=True, original_tokens=original_tokens, chunked_tokens=chunked_tokens, savings_percentage=savings ) def _chunk_by_language(self, content: str, language: str, file_path: str) -> List[ChunkInfo]: """Chunk file based on language-specific patterns.""" lines = content.split('\n') patterns = self.language_patterns.get(language, self.language_patterns['python']) chunks = [] current_chunk = [] current_tokens = 0 chunk_id = 0 start_line = 0 # Extract imports and global declarations first imports, main_content = self._extract_imports(lines, patterns) if imports: chunks.append(ChunkInfo( chunk_id=chunk_id, content='\n'.join(imports), start_line=0, end_line=len(imports), chunk_type='import', context='File imports and global declarations', is_complete=True, tokens_estimate=self.estimate_tokens('\n'.join(imports)), language=language )) chunk_id += 1 # Process main content for i, line in enumerate(main_content): current_chunk.append(line) current_tokens += self.estimate_tokens(line) # Check if we should create a chunk should_chunk = ( current_tokens >= self.max_tokens or self._is_logical_boundary(line, patterns) or i == len(main_content) - 1 ) if should_chunk and current_chunk: # Determine chunk type chunk_type = self._determine_chunk_type(current_chunk, patterns) context = self._generate_context(current_chunk, chunk_type, language) chunks.append(ChunkInfo( chunk_id=chunk_id, content='\n'.join(current_chunk), start_line=start_line, end_line=start_line + len(current_chunk), chunk_type=chunk_type, context=context, is_complete=False, tokens_estimate=current_tokens, language=language )) # Prepare for next chunk with overlap overlap = current_chunk[-self.overlap_lines:] if len(current_chunk) > self.overlap_lines else [] current_chunk = overlap current_tokens = self.estimate_tokens('\n'.join(overlap)) start_line += len(current_chunk) - len(overlap) chunk_id += 1 return chunks def _extract_imports(self, lines: List[str], patterns: Dict[str, str]) -> Tuple[List[str], List[str]]: """Extract import statements and return them separately.""" imports = [] main_content = [] for line in lines: if re.match(patterns.get('import', r'^(import|from)'), line.strip()): imports.append(line) else: main_content.append(line) return imports, main_content def _is_logical_boundary(self, line: str, patterns: Dict[str, str]) -> bool: """Check if line represents a logical boundary for chunking.""" line_stripped = line.strip() # Function/class definitions if (re.match(patterns.get('function', r'^def\s+'), line_stripped) or re.match(patterns.get('class', r'^class\s+'), line_stripped)): return True # Major comments or documentation if (re.match(patterns.get('comment', r'^\s*#'), line_stripped) and len(line_stripped) > 50): # Significant comment return True return False def _determine_chunk_type(self, chunk_lines: List[str], patterns: Dict[str, str]) -> str: """Determine the type of chunk based on its content.""" content = '\n'.join(chunk_lines) if re.search(patterns.get('function', r'^def\s+'), content, re.MULTILINE): return 'function' elif re.search(patterns.get('class', r'^class\s+'), content, re.MULTILINE): return 'class' elif re.search(patterns.get('import', r'^(import|from)'), content, re.MULTILINE): return 'import' else: return 'main' def _generate_context(self, chunk_lines: List[str], chunk_type: str, language: str) -> str: """Generate contextual information for a chunk.""" if chunk_type == 'import': return f"Import statements and global declarations for {language} file" elif chunk_type == 'function': return f"Function definitions and related code in {language}" elif chunk_type == 'class': return f"Class definitions and methods in {language}" else: return f"Main logic and implementation code in {language}" class ChunkAnalyzer: """ Analyzes individual chunks with context awareness and combines results. """ def __init__(self, claude_client, memory_manager): self.claude_client = claude_client self.memory_manager = memory_manager self.logger = logging.getLogger(__name__) async def analyze_chunks(self, file_path: str, chunks: List[ChunkInfo], repo_id: str) -> List[ChunkAnalysis]: """Analyze all chunks of a file with context awareness.""" if len(chunks) == 1 and chunks[0].is_complete: # Single chunk - use existing analysis return await self._analyze_single_chunk(file_path, chunks[0], repo_id) # Multiple chunks - analyze with context chunk_analyses = [] for i, chunk in enumerate(chunks): try: analysis = await self._analyze_chunk_with_context( file_path, chunk, i, len(chunks), repo_id ) chunk_analyses.append(analysis) # Small delay to respect rate limits await asyncio.sleep(0.1) except Exception as e: self.logger.error(f"Error analyzing chunk {i} of {file_path}: {e}") # Create fallback analysis chunk_analyses.append(ChunkAnalysis( chunk_id=chunk.chunk_id, issues_found=[f"Analysis failed: {str(e)}"], recommendations=["Review this section manually"], severity_score=5.0, detailed_analysis=f"Analysis failed due to error: {str(e)}", chunk_type=chunk.chunk_type, context=chunk.context )) return chunk_analyses async def _analyze_single_chunk(self, file_path: str, chunk: ChunkInfo, repo_id: str) -> List[ChunkAnalysis]: """Analyze a single complete chunk using existing logic.""" try: # Use the existing analysis logic but optimized for single chunk analysis_prompt = f""" Analyze this code file for quality, security, and best practices. File: {file_path} Language: {chunk.language} Code: {chunk.content} Provide a comprehensive analysis focusing on: 1. Code quality and maintainability 2. Security vulnerabilities 3. Performance issues 4. Best practices adherence 5. Specific recommendations for improvement Format your response as JSON with these fields: - issues_found: List of specific issues - recommendations: List of improvement suggestions - severity_score: Number from 1-10 (10 being best quality) - detailed_analysis: Comprehensive analysis text """ # Make API call to Claude using the anthropic client response = self.claude_client.messages.create( model=os.getenv("CLAUDE_MODEL", "claude-3-5-haiku-latest"), max_tokens=2048, messages=[{ "role": "user", "content": analysis_prompt }] ) # Parse response and create analysis response_text = response.content[0].text if response.content else "" analysis_data = self._parse_analysis_response(response_text) return [ChunkAnalysis( chunk_id=chunk.chunk_id, issues_found=analysis_data.get('issues_found', []), recommendations=analysis_data.get('recommendations', []), severity_score=analysis_data.get('severity_score', 5.0), detailed_analysis=analysis_data.get('detailed_analysis', 'Analysis completed'), chunk_type=chunk.chunk_type, context=chunk.context )] except Exception as e: self.logger.error(f"Error analyzing single chunk for {file_path}: {e}") return [ChunkAnalysis( chunk_id=chunk.chunk_id, issues_found=[f"Analysis failed: {str(e)}"], recommendations=["Review this section manually"], severity_score=5.0, detailed_analysis=f"Analysis failed due to error: {str(e)}", chunk_type=chunk.chunk_type, context=chunk.context )] def _parse_analysis_response(self, response: str) -> Dict[str, Any]: """Parse Claude's analysis response into structured data.""" try: # Clean the response by removing invalid control characters cleaned_response = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', response) # Try to extract JSON from response if '{' in cleaned_response and '}' in cleaned_response: start = cleaned_response.find('{') end = cleaned_response.rfind('}') + 1 json_str = cleaned_response[start:end] # Additional cleaning for common JSON issues json_str = json_str.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t') try: return json.loads(json_str) except json.JSONDecodeError as json_err: # Try to fix common JSON issues json_str = re.sub(r',\s*}', '}', json_str) # Remove trailing commas json_str = re.sub(r',\s*]', ']', json_str) # Remove trailing commas in arrays json_str = re.sub(r'(\w+):', r'"\1":', json_str) # Quote unquoted keys try: return json.loads(json_str) except json.JSONDecodeError: # If still failing, create a structured response from the text return self._create_fallback_response(cleaned_response) else: # Fallback parsing return self._create_fallback_response(cleaned_response) except Exception as e: self.logger.error(f"Error parsing analysis response: {e}") return self._create_fallback_response(response) def _create_fallback_response(self, response_text: str) -> Dict[str, Any]: """Create a structured response when JSON parsing fails.""" # Extract basic information from the text response issues = [] recommendations = [] # Look for common patterns in the response if 'error' in response_text.lower() or 'issue' in response_text.lower(): issues.append('Code issues detected (parsing failed)') if 'improve' in response_text.lower() or 'recommend' in response_text.lower(): recommendations.append('Code improvements suggested (parsing failed)') if not issues: issues.append('Analysis completed (detailed parsing unavailable)') if not recommendations: recommendations.append('Review code manually') return { 'issues_found': issues, 'recommendations': recommendations, 'severity_score': 3.0, # Medium severity for fallback 'detailed_analysis': response_text[:500] + '...' if len(response_text) > 500 else response_text } async def _analyze_chunk_with_context(self, file_path: str, chunk: ChunkInfo, chunk_index: int, total_chunks: int, repo_id: str) -> ChunkAnalysis: """Analyze a single chunk with file and repository context.""" # Get relevant context from memory system context_memories = await self._get_chunk_context(file_path, chunk, repo_id) # Build enhanced prompt with context prompt = self._build_chunk_analysis_prompt( file_path, chunk, chunk_index, total_chunks, context_memories ) try: # Rate limiting await asyncio.sleep(0.1) # Small delay between requests # Send to Claude API message = self.claude_client.messages.create( model=os.getenv("CLAUDE_MODEL", "claude-3-5-haiku-latest"), max_tokens=2048, temperature=0.1, messages=[{"role": "user", "content": prompt}] ) analysis_text = message.content[0].text.strip() # Parse the analysis return self._parse_chunk_analysis(analysis_text, chunk) except Exception as e: self.logger.error(f"Claude API error for chunk {chunk_index}: {e}") raise async def _get_chunk_context(self, file_path: str, chunk: ChunkInfo, repo_id: str) -> Dict[str, Any]: """Get relevant context for chunk analysis.""" context = { 'similar_code': [], 'repository_patterns': [], 'best_practices': [] } try: # Search for similar code patterns similar_code = await self.memory_manager.search_similar_code( f"{chunk.chunk_type} {chunk.context}", repo_id, limit=3 ) context['similar_code'] = similar_code # Get relevant best practices best_practices = await self.memory_manager.retrieve_persistent_memories( f"{chunk.chunk_type} best practices", limit=5 ) context['best_practices'] = best_practices except Exception as e: self.logger.warning(f"Could not retrieve context for chunk: {e}") return context def _build_chunk_analysis_prompt(self, file_path: str, chunk: ChunkInfo, chunk_index: int, total_chunks: int, context_memories: Dict[str, Any]) -> str: """Build comprehensive analysis prompt for a chunk.""" # Build context information context_info = "" if context_memories['similar_code']: context_info += "\nSimilar code patterns found in repository:\n" for similar in context_memories['similar_code'][:2]: context_info += f"- {similar.get('file_path', 'Unknown')}: {len(similar.get('analysis_data', {}).get('issues_found', []))} issues\n" if context_memories['best_practices']: context_info += "\nRelevant best practices:\n" for practice in context_memories['best_practices'][:3]: context_info += f"- {practice['content'][:100]}...\n" prompt = f""" You are a senior software engineer analyzing chunk {chunk_index + 1} of {total_chunks} from file: {file_path} CHUNK INFORMATION: - Chunk Type: {chunk.chunk_type} - Context: {chunk.context} - Lines: {chunk.start_line}-{chunk.end_line} - Estimated Tokens: {chunk.tokens_estimate} {context_info} CHUNK CODE: ```{self._detect_language_from_path(file_path)} {chunk.content} ``` Provide a focused analysis of this specific chunk, considering: 1. How it fits into the overall file structure 2. Specific issues within this chunk 3. Recommendations for this chunk 4. Code quality assessment (1-10 scale) 5. Security concerns specific to this chunk 6. Performance implications Focus on actionable insights for this specific code section. """ return prompt def _detect_language_from_path(self, file_path: str) -> str: """Detect language from file path.""" ext = Path(file_path).suffix.lower() lang_map = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.tsx': 'typescript', '.jsx': 'javascript', '.java': 'java', '.cpp': 'cpp', '.c': 'cpp' } return lang_map.get(ext, 'text') def _parse_chunk_analysis(self, analysis_text: str, chunk: ChunkInfo) -> ChunkAnalysis: """Parse Claude's analysis response for a chunk.""" # Extract severity score severity_match = re.search(r'(\d+(?:\.\d+)?)/10', analysis_text) severity_score = float(severity_match.group(1)) if severity_match else 5.0 # Extract issues and recommendations issues = self._extract_issues_from_analysis(analysis_text) recommendations = self._extract_recommendations_from_analysis(analysis_text) return ChunkAnalysis( chunk_id=chunk.chunk_id, issues_found=issues, recommendations=recommendations, severity_score=severity_score, detailed_analysis=analysis_text, chunk_type=chunk.chunk_type, context=chunk.context ) def _extract_issues_from_analysis(self, analysis_text: str) -> List[str]: """Extract issues from analysis text.""" issues = [] lines = analysis_text.split('\n') issue_keywords = ['issue', 'problem', 'bug', 'vulnerability', 'error', 'warning', 'concern'] for line in lines: line_lower = line.lower().strip() if any(keyword in line_lower for keyword in issue_keywords): if line.strip() and not line.strip().startswith('#'): issues.append(line.strip()) return issues[:10] # Limit to top 10 issues def _extract_recommendations_from_analysis(self, analysis_text: str) -> List[str]: """Extract recommendations from analysis text.""" recommendations = [] lines = analysis_text.split('\n') rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'improve'] for line in lines: line_lower = line.lower().strip() if any(keyword in line_lower for keyword in rec_keywords): if line.strip() and not line.strip().startswith('#'): recommendations.append(line.strip()) return recommendations[:10] # Limit to top 10 recommendations class ChunkResultCombiner: """ Combines analysis results from multiple chunks into a comprehensive file analysis. """ def __init__(self): self.logger = logging.getLogger(__name__) def combine_chunk_analyses(self, file_path: str, language: str, chunk_analyses: List[ChunkAnalysis], chunking_result: FileChunkingResult) -> Dict[str, Any]: """Combine multiple chunk analyses into a single file analysis.""" if not chunk_analyses: return self._create_fallback_analysis(file_path, language) # Combine all issues and recommendations all_issues = [] all_recommendations = [] for analysis in chunk_analyses: all_issues.extend(analysis.issues_found) all_recommendations.extend(analysis.recommendations) # Calculate overall severity score severity_scores = [a.severity_score for a in chunk_analyses if a.severity_score > 0] overall_severity = sum(severity_scores) / len(severity_scores) if severity_scores else 5.0 # Create comprehensive analysis detailed_analysis = self._create_comprehensive_analysis(chunk_analyses, chunking_result) # Calculate statistics total_lines = sum(chunk.end_line - chunk.start_line for chunk in chunking_result.chunks) return { "path": file_path, "language": language, "lines_of_code": total_lines, "complexity_score": self._calculate_complexity_score(chunk_analyses), "issues_found": all_issues, "recommendations": all_recommendations, "detailed_analysis": detailed_analysis, "severity_score": overall_severity, "chunking_info": { "total_chunks": len(chunk_analyses), "chunked": chunking_result.is_chunked, "savings_percentage": chunking_result.savings_percentage, "original_tokens": chunking_result.original_tokens, "chunked_tokens": chunking_result.chunked_tokens } } def _create_fallback_analysis(self, file_path: str, language: str) -> Dict[str, Any]: """Create fallback analysis when chunk analysis fails.""" return { "path": file_path, "language": language, "lines_of_code": 0, "complexity_score": 5.0, "issues_found": ["Analysis failed - manual review recommended"], "recommendations": ["Review file manually due to analysis failure"], "detailed_analysis": "Analysis could not be completed due to processing errors.", "severity_score": 5.0, "chunking_info": { "total_chunks": 0, "chunked": False, "savings_percentage": 0.0, "original_tokens": 0, "chunked_tokens": 0 } } def _create_comprehensive_analysis(self, chunk_analyses: List[ChunkAnalysis], chunking_result: FileChunkingResult) -> str: """Create comprehensive analysis from chunk analyses.""" analysis_parts = [] # File overview analysis_parts.append(f"File Analysis Summary:") analysis_parts.append(f"- Total chunks analyzed: {len(chunk_analyses)}") analysis_parts.append(f"- Chunking efficiency: {chunking_result.savings_percentage:.1f}% token savings") # Chunk-specific findings for i, analysis in enumerate(chunk_analyses): if analysis.issues_found or analysis.recommendations: analysis_parts.append(f"\nChunk {i+1} ({analysis.chunk_type}):") if analysis.issues_found: if isinstance(analysis.issues_found, (list, tuple)): analysis_parts.append(f" Issues: {len(analysis.issues_found)} found") else: analysis_parts.append(f" Issues: 0 found") if analysis.recommendations: if isinstance(analysis.recommendations, (list, tuple)): analysis_parts.append(f" Recommendations: {len(analysis.recommendations)} provided") else: analysis_parts.append(f" Recommendations: 0 provided") # Overall assessment - calculate safely if chunk_analyses and len(chunk_analyses) > 0: valid_scores = [a.severity_score for a in chunk_analyses if a.severity_score is not None] avg_severity = sum(valid_scores) / len(valid_scores) if valid_scores else 5.0 else: avg_severity = 5.0 analysis_parts.append(f"\nOverall Assessment:") analysis_parts.append(f"- Average quality score: {avg_severity:.1f}/10") analysis_parts.append(f"- Total issues found: {sum(len(a.issues_found) if isinstance(a.issues_found, (list, tuple)) else 0 for a in chunk_analyses)}") analysis_parts.append(f"- Total recommendations: {sum(len(a.recommendations) if isinstance(a.recommendations, (list, tuple)) else 0 for a in chunk_analyses)}") return '\n'.join(analysis_parts) def _calculate_complexity_score(self, chunk_analyses: List[ChunkAnalysis]) -> float: """Calculate complexity score based on chunk analyses.""" if not chunk_analyses: return 5.0 # Simple complexity calculation based on issues and severity total_issues = sum(len(a.issues_found) if isinstance(a.issues_found, (list, tuple)) else 0 for a in chunk_analyses) # Calculate average severity safely if chunk_analyses and len(chunk_analyses) > 0: valid_scores = [a.severity_score for a in chunk_analyses if a.severity_score is not None] avg_severity = sum(valid_scores) / len(valid_scores) if valid_scores else 5.0 else: avg_severity = 5.0 # Higher complexity = more issues + lower quality complexity = min(10.0, (total_issues * 0.5) + (10 - avg_severity)) return complexity class EnhancedFileProcessor: """ Main processor that integrates chunking with existing analysis flow. Maintains backward compatibility while adding enhanced capabilities. """ def __init__(self, claude_client, memory_manager): self.claude_client = claude_client self.memory_manager = memory_manager self.chunker = IntelligentChunker() self.analyzer = ChunkAnalyzer(claude_client, memory_manager) self.combiner = ChunkResultCombiner() self.logger = logging.getLogger(__name__) async def process_file_enhanced(self, file_path: str, content: str, repo_id: str) -> Dict[str, Any]: """ Process a file with enhanced chunking while maintaining compatibility. This method can be used as a drop-in replacement for existing analysis. """ try: # Step 1: Chunk the file chunking_result = self.chunker.chunk_file(file_path, content) # Step 2: Analyze chunks chunk_analyses = await self.analyzer.analyze_chunks( file_path, chunking_result.chunks, repo_id ) # Step 3: Combine results file_analysis = self.combiner.combine_chunk_analyses( file_path, chunking_result.language, chunk_analyses, chunking_result ) # Step 4: Store in memory system (compatible with existing) await self._store_enhanced_analysis(repo_id, file_path, file_analysis, chunking_result) return file_analysis except Exception as e: self.logger.error(f"Enhanced processing failed for {file_path}: {e}") # Fallback to basic analysis return await self._fallback_analysis(file_path, content, repo_id) async def _store_enhanced_analysis(self, repo_id: str, file_path: str, file_analysis: Dict[str, Any], chunking_result: FileChunkingResult): """Store enhanced analysis in memory system.""" try: # Store file-level analysis (compatible with existing system) await self.memory_manager.store_code_analysis(repo_id, file_path, file_analysis) # Store chunking metadata for future reference chunking_metadata = { 'chunked': chunking_result.is_chunked, 'total_chunks': chunking_result.total_chunks, 'savings_percentage': chunking_result.savings_percentage, 'original_tokens': chunking_result.original_tokens, 'chunked_tokens': chunking_result.chunked_tokens } # Store additional metadata (non-breaking) enhanced_data = {**file_analysis, 'chunking_metadata': chunking_metadata} await self.memory_manager.store_code_analysis(repo_id, f"{file_path}_enhanced", enhanced_data) except Exception as e: self.logger.warning(f"Could not store enhanced analysis: {e}") async def _fallback_analysis(self, file_path: str, content: str, repo_id: str) -> Dict[str, Any]: """Fallback to basic analysis if enhanced processing fails.""" return { "path": file_path, "language": self.chunker.detect_language(file_path), "lines_of_code": len(content.split('\n')), "complexity_score": 5.0, "issues_found": ["Enhanced analysis failed - using fallback"], "recommendations": ["Review file manually"], "detailed_analysis": "Enhanced analysis could not be completed. Basic fallback analysis used.", "severity_score": 5.0, "chunking_info": { "total_chunks": 1, "chunked": False, "savings_percentage": 0.0, "original_tokens": self.chunker.estimate_tokens(content), "chunked_tokens": self.chunker.estimate_tokens(content) } } # Configuration for enhanced chunking ENHANCED_CHUNKING_CONFIG = { "max_tokens_per_chunk": 4000, "overlap_lines": 5, "min_chunk_size": 100, "preserve_imports": True, "preserve_comments": True, "enable_context_sharing": True, "enable_memory_integration": True }