From 603e9b4b2056f782e5711ab9e20f76997bc0d74a Mon Sep 17 00:00:00 2001 From: Pradeep Date: Mon, 17 Nov 2025 09:04:49 +0530 Subject: [PATCH] newly added multi doc upload service --- docker-compose.yml | 68 ++- services/ai-analysis-service/server.py | 485 +++++++++++++++-- services/api-gateway/src/server.js | 26 + .../multi-document-upload-service/Dockerfile | 30 + .../FIX_EMPTY_GRAPH.md | 144 +++++ .../NEO4J_DIAGNOSTIC_QUERIES.md | 176 ++++++ .../QUICK_TEST.md | 85 +++ .../multi-document-upload-service/README.md | 36 ++ .../REBUILD_INSTRUCTIONS.md | 152 ++++++ .../TESTING_GUIDE.md | 300 ++++++++++ .../requirements.txt | 34 ++ .../multi_document_upload_service/__init__.py | 4 + .../claude_client.py | 328 +++++++++++ .../multi_document_upload_service/config.py | 52 ++ .../extractors/auto.py | 168 ++++++ .../extractors/image_extractor.py | 514 ++++++++++++++++++ .../src/multi_document_upload_service/jobs.py | 93 ++++ .../src/multi_document_upload_service/main.py | 189 +++++++ .../multi_document_upload_service/models.py | 84 +++ .../processors/chunker.py | 24 + .../processors/graph_writer.py | 81 +++ .../multi_document_upload_service/storage.py | 59 ++ .../workflows/pipeline.py | 164 ++++++ 23 files changed, 3248 insertions(+), 48 deletions(-) create mode 100644 services/multi-document-upload-service/Dockerfile create mode 100644 services/multi-document-upload-service/FIX_EMPTY_GRAPH.md create mode 100644 services/multi-document-upload-service/NEO4J_DIAGNOSTIC_QUERIES.md create mode 100644 services/multi-document-upload-service/QUICK_TEST.md create mode 100644 services/multi-document-upload-service/README.md create mode 100644 services/multi-document-upload-service/REBUILD_INSTRUCTIONS.md create mode 100644 services/multi-document-upload-service/TESTING_GUIDE.md create mode 100644 services/multi-document-upload-service/requirements.txt create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/__init__.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/claude_client.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/config.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/extractors/auto.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/extractors/image_extractor.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/jobs.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/main.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/models.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/processors/chunker.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/processors/graph_writer.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/storage.py create mode 100644 services/multi-document-upload-service/src/multi_document_upload_service/workflows/pipeline.py diff --git a/docker-compose.yml b/docker-compose.yml index f75182b..4617e1a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -131,11 +131,11 @@ services: networks: - pipeline_network healthcheck: - test: ["CMD", "cypher-shell", "--username", "neo4j", "--password", "password", "MATCH () RETURN count(*) as count"] + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:7474 || exit 1"] interval: 30s timeout: 10s retries: 5 - start_period: 60s + start_period: 90s # chromadb: # image: chromadb/chroma:latest @@ -269,6 +269,7 @@ services: - SELF_IMPROVING_GENERATOR_URL=http://self-improving-generator:8007 - AI_MOCKUP_URL=http://ai-mockup-service:8021 - AI_ANALYSIS_URL=http://ai-analysis-service:8022 + - MULTI_DOCUMENT_UPLOAD_URL=http://multi-document-upload-service:8024 - UNISON_URL=http://unison:8010 - TEMPLATE_MANAGER_AI_URL=http://template-manager:8013 volumes: @@ -775,6 +776,67 @@ services: retries: 3 start_period: 60s restart: unless-stopped + + # Multi-Document Upload Service + # ===================================== + + multi-document-upload-service: + build: + context: ./services/multi-document-upload-service + dockerfile: Dockerfile + container_name: pipeline_multi_document_upload + ports: + - "8024:8024" + environment: + - PORT=8024 + - HOST=0.0.0.0 + - ANTHROPIC_API_KEY=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 + - CLAUDE_MODEL=claude-3-5-haiku-latest + + # Neo4j Configuration + - NEO4J_URI=bolt://neo4j:7687 + - NEO4J_USER=neo4j + - NEO4J_PASSWORD=password + - NEO4J_DATABASE=neo4j + + # Storage Configuration + - STORAGE_DIR=/app/storage + + # Database configurations (optional, for job tracking) + - POSTGRES_HOST=pipeline_postgres + - POSTGRES_PORT=5432 + - POSTGRES_DB=dev_pipeline + - POSTGRES_USER=pipeline_admin + - POSTGRES_PASSWORD=secure_pipeline_2024 + + - REDIS_HOST=pipeline_redis + - REDIS_PORT=6379 + - REDIS_PASSWORD=redis_secure_2024 + volumes: + - multi_document_storage:/app/storage + depends_on: + neo4j: + condition: service_healthy + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - pipeline_network + deploy: + resources: + limits: + memory: 4G + reservations: + memory: 2G + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8024/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + restart: unless-stopped + # ===================================== # Workflow Orchestration # ===================================== @@ -894,6 +956,8 @@ volumes: driver: local ai_analysis_temp: driver: local + multi_document_storage: + driver: local # ===================================== # Networks diff --git a/services/ai-analysis-service/server.py b/services/ai-analysis-service/server.py index 77d9bf0..9e1998f 100644 --- a/services/ai-analysis-service/server.py +++ b/services/ai-analysis-service/server.py @@ -13,8 +13,10 @@ import time import hashlib import traceback import uuid +import re from pathlib import Path -from typing import Dict, Any, Optional, List, Tuple +from typing import Dict, Any, Optional, List, Tuple, Set +from dataclasses import dataclass, field from datetime import datetime from contextlib import asynccontextmanager @@ -53,7 +55,8 @@ from ai_analyze import ( CodeQualityAnalysis, Issue, ModuleAnalysis, - ModuleSummary + ModuleSummary, + FileAnalysis ) # Import enhanced analyzer (backward compatible) @@ -72,6 +75,222 @@ analyzer = None neo4j_client: Optional[Neo4jGraphClient] = None USE_KNOWLEDGE_GRAPH = False +CANONICAL_CHUNK_SUFFIX_RE = re.compile(r'(_part|_chunk)\d+$', re.IGNORECASE) + + +def get_canonical_module_name(raw_name: str) -> str: + """Normalize chunk/module names so split chunks collapse to one canonical module.""" + if not raw_name: + return "unknown" + cleaned = raw_name.strip() + canonical = CANONICAL_CHUNK_SUFFIX_RE.sub("", cleaned) + canonical = canonical.strip("_- ") + return canonical or cleaned + + +def _ensure_list_of_strings(value: Any) -> List[str]: + if value is None: + return [] + if isinstance(value, str): + value = value.strip() + return [value] if value else [] + if isinstance(value, (list, tuple, set)): + return [str(item).strip() for item in value if item is not None and str(item).strip()] + return [] + + +def _dedupe_preserve_order(items: List[str]) -> List[str]: + seen = set() + result = [] + for item in items: + if item not in seen: + seen.add(item) + result.append(item) + return result + + +def sanitize_file_analysis_for_aggregation(fa: Any) -> FileAnalysis: + """Create a lightweight, serialization-safe FileAnalysis for aggregation.""" + if isinstance(fa, FileAnalysis): + path = str(fa.path) if fa.path else "" + language = fa.language or "unknown" + lines = int(fa.lines_of_code or 0) + complexity = float(fa.complexity_score or 0.0) + severity_value = fa.severity_score + severity = float(severity_value) if isinstance(severity_value, (int, float)) else 5.0 + issues = _ensure_list_of_strings(fa.issues_found) + recommendations = _ensure_list_of_strings(fa.recommendations) + detailed = fa.detailed_analysis or "" + elif isinstance(fa, dict): + path = str(fa.get("path") or fa.get("file_path") or "") + language = fa.get("language") or "unknown" + lines = int(fa.get("lines_of_code") or 0) + complexity = float(fa.get("complexity_score") or 0.0) + severity_value = fa.get("severity_score") + severity = float(severity_value) if isinstance(severity_value, (int, float)) else 5.0 + issues = _ensure_list_of_strings(fa.get("issues_found", [])) + recommendations = _ensure_list_of_strings(fa.get("recommendations", [])) + detailed = fa.get("detailed_analysis") or "" + else: + path = str(getattr(fa, "path", "") or "") + language = getattr(fa, "language", "unknown") or "unknown" + lines = int(getattr(fa, "lines_of_code", 0) or 0) + complexity = float(getattr(fa, "complexity_score", 0) or 0.0) + severity_value = getattr(fa, "severity_score", 5.0) + severity = float(severity_value) if isinstance(severity_value, (int, float)) else 5.0 + issues = _ensure_list_of_strings(getattr(fa, "issues_found", [])) + recommendations = _ensure_list_of_strings(getattr(fa, "recommendations", [])) + detailed = getattr(fa, "detailed_analysis", "") or "" + + return FileAnalysis( + path=path, + language=language, + lines_of_code=lines, + complexity_score=complexity, + issues_found=issues, + recommendations=recommendations, + detailed_analysis=detailed, + severity_score=severity + ) + + +def merge_file_analyses(existing: FileAnalysis, new: FileAnalysis) -> FileAnalysis: + """Merge two FileAnalysis objects for the same file path.""" + severity = ( + (existing.severity_score or 0) + (new.severity_score or 0) + ) / 2.0 if isinstance(existing.severity_score, (int, float)) and isinstance(new.severity_score, (int, float)) else (existing.severity_score or new.severity_score or 5.0) + + complexity = ( + (existing.complexity_score or 0) + (new.complexity_score or 0) + ) / 2.0 if isinstance(existing.complexity_score, (int, float)) and isinstance(new.complexity_score, (int, float)) else (existing.complexity_score or new.complexity_score or 0.0) + + language = existing.language if existing.language and existing.language != "unknown" else new.language + + issues = _ensure_list_of_strings(existing.issues_found) + _ensure_list_of_strings(new.issues_found) + recommendations = _ensure_list_of_strings(existing.recommendations) + _ensure_list_of_strings(new.recommendations) + + issues = _dedupe_preserve_order(issues) + recommendations = _dedupe_preserve_order(recommendations) + + detailed = existing.detailed_analysis or new.detailed_analysis or "" + + return FileAnalysis( + path=existing.path or new.path, + language=language or "unknown", + lines_of_code=max(existing.lines_of_code or 0, new.lines_of_code or 0), + complexity_score=complexity, + issues_found=issues, + recommendations=recommendations, + detailed_analysis=detailed, + severity_score=severity + ) + + +@dataclass +class AggregatedModuleData: + canonical_name: str + original_names: Set[str] = field(default_factory=set) + chunk_ids: List[str] = field(default_factory=list) + chunk_types: Set[str] = field(default_factory=set) + file_map: Dict[str, FileAnalysis] = field(default_factory=dict) + quality_scores: List[float] = field(default_factory=list) + overviews: List[str] = field(default_factory=list) + architectures: List[str] = field(default_factory=list) + security_notes: List[str] = field(default_factory=list) + recommendations: Set[str] = field(default_factory=set) + ai_responses: List[str] = field(default_factory=list) + dependencies: Set[str] = field(default_factory=set) + metadata_records: List[Dict[str, Any]] = field(default_factory=list) + context_dependencies: Set[str] = field(default_factory=set) + + +class ModuleAggregationManager: + """Collects chunk-level results and exposes aggregated module summaries.""" + + def __init__(self) -> None: + self._cache: Dict[str, Dict[str, AggregatedModuleData]] = {} + + def reset(self, run_id: str) -> None: + self._cache[run_id] = {} + + def clear(self, run_id: Optional[str]) -> None: + if run_id and run_id in self._cache: + del self._cache[run_id] + + def add_chunk( + self, + run_id: str, + chunk_name: str, + chunk_id: Optional[str], + chunk_type: Optional[str], + chunk: Dict[str, Any], + chunk_analysis: Dict[str, Any], + file_analyses: List[Any], + metadata: Dict[str, Any], + ai_response: str + ) -> None: + if not run_id: + return + + canonical_name = get_canonical_module_name(chunk_name) + modules = self._cache.setdefault(run_id, {}) + module_data = modules.get(canonical_name) + if module_data is None: + module_data = AggregatedModuleData(canonical_name=canonical_name) + modules[canonical_name] = module_data + + module_data.original_names.add(chunk_name) + if chunk_id: + module_data.chunk_ids.append(chunk_id) + if chunk_type: + module_data.chunk_types.add(chunk_type) + + quality_value = chunk_analysis.get('module_quality_score') + if quality_value is None: + quality_value = chunk_analysis.get('module_quality') + if isinstance(quality_value, (int, float)): + module_data.quality_scores.append(float(quality_value)) + + overview_text = chunk_analysis.get('module_overview') + if overview_text: + module_data.overviews.append(str(overview_text).strip()) + + architecture_text = chunk_analysis.get('module_architecture') + if architecture_text: + module_data.architectures.append(str(architecture_text).strip()) + + security_text = chunk_analysis.get('module_security_assessment') + if security_text: + module_data.security_notes.append(str(security_text).strip()) + + recommendations = chunk_analysis.get('module_recommendations', []) + module_data.recommendations.update(_ensure_list_of_strings(recommendations)) + + if ai_response: + module_data.ai_responses.append(ai_response) + + module_data.dependencies.update(chunk.get('dependencies', [])) + module_data.context_dependencies.update(chunk.get('context_dependencies', [])) + + for fa in file_analyses: + sanitized = sanitize_file_analysis_for_aggregation(fa) + if not sanitized.path: + continue + existing = module_data.file_map.get(sanitized.path) + if existing: + module_data.file_map[sanitized.path] = merge_file_analyses(existing, sanitized) + else: + module_data.file_map[sanitized.path] = sanitized + + if metadata: + module_data.metadata_records.append(metadata) + + def get_modules(self, run_id: str) -> Dict[str, AggregatedModuleData]: + return self._cache.get(run_id, {}) + + +module_aggregation_manager = ModuleAggregationManager() + @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events.""" @@ -2105,10 +2324,14 @@ def estimate_tokens(files: List[Tuple[str, str]]) -> int: # Rough estimate: 4 characters per token return total_chars // 4 -def split_by_token_limit(module_files: List[Tuple[str, str]], max_tokens: int = 15000) -> List[List[Tuple[str, str]]]: +def split_by_token_limit( + module_files: List[Tuple[str, str]], + max_tokens: int = 15000, + max_files: int = 12 +) -> List[List[Tuple[str, str]]]: """Split large module into sub-chunks while preserving related files together.""" - sub_chunks = [] - current_chunk = [] + sub_chunks: List[List[Tuple[str, str]]] = [] + current_chunk: List[Tuple[str, str]] = [] current_tokens = 0 for file_path, content in module_files: @@ -2116,9 +2339,15 @@ def split_by_token_limit(module_files: List[Tuple[str, str]], max_tokens: int = continue file_tokens = len(content) // 4 + should_split = ( + current_chunk + and ( + current_tokens + file_tokens > max_tokens + or len(current_chunk) >= max_files + ) + ) - if current_tokens + file_tokens > max_tokens and current_chunk: - # Save current chunk and start new one + if should_split: sub_chunks.append(current_chunk) current_chunk = [(file_path, content)] current_tokens = file_tokens @@ -2158,6 +2387,10 @@ def find_dependencies(chunk_files: List[Tuple[str, str]], dependency_graph: Opti # For now, return empty list - can be enhanced with actual dependency tracking return dependencies +MAX_TOKENS_PER_CHUNK = int(os.getenv("MAX_TOKENS_PER_CHUNK", "18000")) +MAX_FILES_PER_CHUNK = int(os.getenv("MAX_FILES_PER_CHUNK", "12")) + + def create_intelligent_chunks(files: List[Tuple[str, str]], dependency_graph: Optional[Dict] = None) -> List[Dict]: """ Group files by module/feature for semantic analysis. @@ -2192,14 +2425,15 @@ def create_intelligent_chunks(files: List[Tuple[str, str]], dependency_graph: Op if not module_files: continue - # Check token limit (increased for better context and fewer chunks) - # With 2000 req/min API limit, we can handle larger chunks - # Increased from 15000 to 25000 tokens for better module-level context + # Check token and file limits to keep prompts manageable for Claude module_tokens = estimate_tokens(module_files) - MAX_TOKENS_PER_CHUNK = 25000 # Increased for more files per chunk - if module_tokens > MAX_TOKENS_PER_CHUNK: + if module_tokens > MAX_TOKENS_PER_CHUNK or len(module_files) > MAX_FILES_PER_CHUNK: # Split large modules - sub_chunks = split_by_token_limit(module_files, MAX_TOKENS_PER_CHUNK) + sub_chunks = split_by_token_limit( + module_files, + max_tokens=MAX_TOKENS_PER_CHUNK, + max_files=MAX_FILES_PER_CHUNK + ) for i, sub_chunk in enumerate(sub_chunks): chunks.append({ 'id': f'chunk_{chunk_counter:03d}', @@ -2354,7 +2588,8 @@ def update_state_with_findings(analysis_state: Dict, chunk: Dict, chunk_analysis Update analysis_state with findings from current chunk analysis. Returns updated analysis_state. """ - chunk_name = chunk.get('name', 'unknown') + raw_chunk_name = chunk.get('name', 'unknown') + chunk_name = get_canonical_module_name(raw_chunk_name) chunk_id = chunk.get('id', 'unknown') # Initialize state if needed @@ -2522,6 +2757,7 @@ def build_intelligent_chunk_prompt(chunk: Dict, analysis_state: Optional[Dict] = "## RESPONSE FORMAT:", "", "âš ī¸ CRITICAL: You MUST analyze ALL files listed above. Do NOT skip any files.", + "If a file looks empty or repetitive, still return a JSON entry with notes explaining limited context.", f"Files to analyze ({len(optimized_files)} total):", ]) for i, file_path in enumerate(file_paths_list, 1): @@ -3271,19 +3507,144 @@ async def store_chunk_analysis_in_memory(chunk: Dict, file_analyses: List, chunk }, 'file_analyses': file_analyses_data } + metadata['dependencies'] = { + 'depends_on_chunks': chunk.get('context_dependencies', []), + 'raw_dependencies': chunk.get('dependencies', []) + } + + canonical_name = get_canonical_module_name(chunk_name) + module_aggregation_manager.add_chunk( + run_id=run_id, + chunk_name=chunk_name, + chunk_id=chunk.get('id'), + chunk_type=chunk_type, + chunk=chunk, + chunk_analysis=chunk_analysis or {}, + file_analyses=file_analyses, + metadata=metadata, + ai_response=ai_response + ) + print(f" đŸ“Ļ Aggregated chunk '{chunk_name}' into canonical module '{canonical_name}'") + return canonical_name - # Prioritize Knowledge Graph storage + except Exception as e: + print(f"❌ [MEMORY] Failed to store chunk analysis: {e}") + import traceback + traceback.print_exc() + return None + +async def flush_module_aggregations(run_id: Optional[str], repository_id: str, session_id: Optional[str] = None) -> None: + """Persist aggregated module data to the knowledge graph (or fallback memory).""" + if not run_id: + return + + aggregated_modules = module_aggregation_manager.get_modules(run_id) + if not aggregated_modules: + print(f"â„šī¸ [AGGREGATION] No aggregated modules to persist for run {run_id}") + return + + print(f"đŸ“Ļ [AGGREGATION] Persisting {len(aggregated_modules)} aggregated modules for run {run_id}") + + for canonical_name, module_data in aggregated_modules.items(): + file_list = list(module_data.file_map.values()) + if not file_list: + print(f" âš ī¸ [AGGREGATION] Skipping module '{canonical_name}' (no file analyses aggregated)") + continue + + total_files = len(file_list) + total_lines = sum(fa.lines_of_code or 0 for fa in file_list) + total_issues = sum(len(_ensure_list_of_strings(fa.issues_found)) for fa in file_list) + total_recommendations = sum(len(_ensure_list_of_strings(fa.recommendations)) for fa in file_list) + high_quality = len([fa for fa in file_list if isinstance(fa.severity_score, (int, float)) and fa.severity_score >= 8]) + medium_quality = len([fa for fa in file_list if isinstance(fa.severity_score, (int, float)) and 5 <= fa.severity_score < 8]) + low_quality = len([fa for fa in file_list if isinstance(fa.severity_score, (int, float)) and fa.severity_score < 5]) + + if module_data.quality_scores: + quality_score = sum(module_data.quality_scores) / max(len(module_data.quality_scores), 1) + elif total_files: + severity_sum = sum(fa.severity_score for fa in file_list if isinstance(fa.severity_score, (int, float))) + quality_score = severity_sum / total_files if total_files else 5.0 + else: + quality_score = 5.0 + + overviews = _dedupe_preserve_order([text for text in module_data.overviews if text]) + architectures = _dedupe_preserve_order([text for text in module_data.architectures if text]) + security_notes = _dedupe_preserve_order([text for text in module_data.security_notes if text]) + recommendations_list = _dedupe_preserve_order(list(module_data.recommendations)) + + module_overview = "\n\n".join(overviews) + module_architecture = "\n\n".join(architectures) + module_security = "\n\n".join(security_notes) + + ai_response_blocks = _dedupe_preserve_order([text for text in module_data.ai_responses if text]) + ai_response_text = "\n\n".join(ai_response_blocks) if ai_response_blocks else module_overview + + aggregated_chunk_analysis = { + 'module_overview': module_overview or f"Aggregated analysis for {canonical_name}", + 'module_quality_score': round(quality_score, 2), + 'module_architecture': module_architecture, + 'module_security_assessment': module_security, + 'module_recommendations': recommendations_list + } + + file_analyses_for_metadata = [ + { + 'file_path': fa.path, + 'language': fa.language, + 'lines_of_code': fa.lines_of_code, + 'complexity_score': fa.complexity_score, + 'severity_score': fa.severity_score, + 'issues_found': _ensure_list_of_strings(fa.issues_found), + 'recommendations': _ensure_list_of_strings(fa.recommendations), + 'detailed_analysis': fa.detailed_analysis, + } + for fa in file_list + ] + + metadata = { + 'type': 'module_analysis', + 'run_id': run_id, + 'chunk_name': canonical_name, + 'chunk_type': 'module', + 'repository_id': repository_id, + 'total_files_in_chunk': total_files, + 'chunk_metrics': { + 'total_issues': total_issues, + 'total_recommendations': total_recommendations, + 'high_quality_files': high_quality, + 'medium_quality_files': medium_quality, + 'low_quality_files': low_quality + }, + 'file_analyses': file_analyses_for_metadata, + 'dependencies': { + 'depends_on_chunks': sorted(module_data.context_dependencies), + 'raw_dependencies': sorted(module_data.dependencies) + }, + 'source_chunks': sorted(module_data.original_names), + 'total_lines': total_lines + } + + aggregated_chunk = { + 'id': module_data.chunk_ids[0] if module_data.chunk_ids else f'aggregated_{canonical_name}', + 'name': canonical_name, + 'priority': 2, + 'type': 'module', + 'context_dependencies': list(module_data.context_dependencies), + 'dependencies': list(module_data.dependencies) + } + + stored = False if USE_KNOWLEDGE_GRAPH and neo4j_client: try: module_payload = kg_ops.build_module_payload( run_id=run_id, repository_id=repository_id, - module_name=chunk_name, - chunk=chunk, - chunk_analysis=chunk_analysis, - file_analyses=file_analyses, + module_name=canonical_name, + chunk=aggregated_chunk, + chunk_analysis=aggregated_chunk_analysis, + file_analyses=file_list, metadata=metadata, - ai_response=ai_response, + ai_response=ai_response_text, ) await kg_ops.store_module_analysis( client=neo4j_client, @@ -3291,33 +3652,30 @@ async def store_chunk_analysis_in_memory(chunk: Dict, file_analyses: List, chunk repository_id=repository_id, module_payload=module_payload, ) - print(f" ✅ Stored in Neo4j knowledge graph (module: {chunk_name})") - return module_payload["module_props"]["module_id"] + print(f" ✅ [AGGREGATION] Stored aggregated module '{canonical_name}' in Neo4j") + stored = True except Exception as kg_error: - print(f" âš ī¸ Failed to store module in knowledge graph: {kg_error}. Falling back to episodic memory.") - - # Fallback to Episodic Memory - try: - memory_id = await analyzer.memory_manager.store_episodic_memory( - session_id=session_id, - user_query=user_query, - ai_response=ai_response, - repo_context=repository_id, - metadata=metadata - ) - print(f" ✅ Stored in episodic memory with ID: {memory_id}") - return memory_id - except Exception as memory_error: - print(f" ❌ Failed to store in episodic memory: {memory_error}") - import traceback - traceback.print_exc() - return None - - except Exception as e: - print(f"❌ [MEMORY] Failed to store chunk analysis: {e}") - import traceback - traceback.print_exc() - return None + print(f" âš ī¸ [AGGREGATION] Failed to store '{canonical_name}' in Neo4j: {kg_error}") + + if not stored and analyzer and hasattr(analyzer, 'memory_manager'): + try: + memory_id = await analyzer.memory_manager.store_episodic_memory( + session_id=session_id, + user_query=f"Aggregated analysis for module: {canonical_name}", + ai_response=ai_response_text or module_overview or f"Aggregated analysis for {canonical_name}", + repo_context=repository_id, + metadata=metadata + ) + print(f" ✅ [AGGREGATION] Stored aggregated module '{canonical_name}' in episodic memory (ID: {memory_id})") + stored = True + except Exception as memory_error: + print(f" ❌ [AGGREGATION] Failed to store '{canonical_name}' in episodic memory: {memory_error}") + traceback.print_exc() + + if not stored: + print(f" ❌ [AGGREGATION] Unable to persist aggregated module '{canonical_name}' in any storage backend") + + module_aggregation_manager.clear(run_id) async def store_cumulative_analysis_state(session_id: str, repository_id: str, analysis_state: Dict, chunk_sequence: int): """ @@ -5307,6 +5665,7 @@ async def process_chunks_in_parallel_batches(chunks, repository_id, progress_mgr async def analyze_repository_with_optimizations_parallel(repo_path: str, repository_id: str, user_id: str, max_files: Optional[int] = None, progress_mgr: Optional[AnalysisProgressManager] = None): """Analyze repository with PARALLEL BATCH PROCESSING for faster analysis.""" + run_id: Optional[str] = None try: # Set run_id early so it's available for chunk storage # Extract analysis_id from progress_mgr if available, otherwise generate @@ -5321,6 +5680,9 @@ async def analyze_repository_with_optimizations_parallel(repo_path: str, reposit if not hasattr(analyzer, 'session_id') or not analyzer.session_id: analyzer.session_id = str(uuid.uuid4()) + if run_id: + module_aggregation_manager.reset(run_id) + print(f"🔑 [ANALYSIS] Set run_id: {run_id}") # Get repository files from Git Integration Service API @@ -5447,6 +5809,11 @@ async def analyze_repository_with_optimizations_parallel(repo_path: str, reposit }) print(f"✅ [STORAGE] All chunk analyses stored") + await flush_module_aggregations( + run_id=run_id, + repository_id=repository_id, + session_id=getattr(analyzer, 'session_id', None) if analyzer else None + ) # ======================================================================== # PHASE 2: CROSS-MODULE SYNTHESIS @@ -5568,11 +5935,15 @@ async def analyze_repository_with_optimizations_parallel(repo_path: str, reposit except Exception as e: print(f"Error in parallel analysis: {e}") raise + finally: + if run_id: + module_aggregation_manager.clear(run_id) async def analyze_repository_with_optimizations(repo_path: str, repository_id: str, user_id: str, max_files: int = 100, progress_mgr: Optional[AnalysisProgressManager] = None): """Analyze repository with SMART BATCHING for maximum efficiency.""" from pathlib import Path + run_id: Optional[str] = None try: # Get repository files from Git Integration Service API files_to_analyze = await get_repository_files_from_api(repository_id, user_id, max_files) @@ -5601,6 +5972,19 @@ async def analyze_repository_with_optimizations(repo_path: str, repository_id: s for i, chunk in enumerate(chunks, 1): print(f" Chunk {i}: {chunk['name']} ({chunk['chunk_type']}) - {len(chunk['files'])} files") + if analyzer: + run_id = getattr(analyzer, 'run_id', None) + if not run_id: + run_id = f"repo_analysis_{repository_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + analyzer.run_id = run_id + if not hasattr(analyzer, 'session_id') or not analyzer.session_id: + analyzer.session_id = str(uuid.uuid4()) + else: + run_id = f"repo_analysis_{repository_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + if run_id: + module_aggregation_manager.reset(run_id) + # Initialize analysis_state for progressive context analysis_state = {} @@ -5739,6 +6123,12 @@ async def analyze_repository_with_optimizations(repo_path: str, repository_id: s print(f"🎉 [INTELLIGENT CHUNKING] Completed all {total_chunks} chunks - {len(file_analyses)} files analyzed") + await flush_module_aggregations( + run_id=run_id, + repository_id=repository_id, + session_id=getattr(analyzer, 'session_id', None) if analyzer else None + ) + # ======================================================================== # PHASE 2: CROSS-MODULE SYNTHESIS # ======================================================================== @@ -5868,6 +6258,9 @@ async def analyze_repository_with_optimizations(repo_path: str, repository_id: s except Exception as e: print(f"Error in optimized analysis: {e}") raise + finally: + if run_id: + module_aggregation_manager.clear(run_id) @app.get("/repository/{repository_id}/info") async def get_repository_info(repository_id: str, user_id: str): diff --git a/services/api-gateway/src/server.js b/services/api-gateway/src/server.js index 1fe2ded..09261bb 100644 --- a/services/api-gateway/src/server.js +++ b/services/api-gateway/src/server.js @@ -70,6 +70,7 @@ const serviceTargets = { AI_MOCKUP_URL: process.env.AI_MOCKUP_URL || 'http://localhost:8021', AI_ANALYSIS_URL: process.env.AI_ANALYSIS_URL || 'http://localhost:8022', FAST_AI_ANALYSIS_URL: process.env.FAST_AI_ANALYSIS_URL || 'http://localhost:8023', + MULTI_DOCUMENT_UPLOAD_URL: process.env.MULTI_DOCUMENT_UPLOAD_URL || 'http://localhost:8024', }; // Log service targets for debugging @@ -944,6 +945,31 @@ app.use('/api/ai/repository', } ); +// Multi-Document Upload Service - handles large multipart uploads +console.log('🔧 Registering /api/multi-docs proxy route...'); +app.use('/api/multi-docs', + createServiceLimiter(120), + (req, res, next) => { + console.log(`📁 [MULTI-DOCS PROXY] ${req.method} ${req.originalUrl}`); + next(); + }, + createProxyMiddleware({ + target: serviceTargets.MULTI_DOCUMENT_UPLOAD_URL, + changeOrigin: true, + pathRewrite: { '^/api/multi-docs': '' }, + logLevel: 'warn', + proxyTimeout: 1800000, + timeout: 1800000, + onProxyReq: (proxyReq, req, res) => { + proxyReq.setHeader('X-Forwarded-By', 'api-gateway'); + }, + onProxyRes: (proxyRes, req, res) => { + res.setHeader('Access-Control-Allow-Origin', req.headers.origin || '*'); + res.setHeader('Access-Control-Allow-Credentials', 'true'); + } + }) +); + // Template Manager AI - expose AI recommendations through the gateway console.log('🔧 Registering /api/ai/tech-stack proxy route...'); app.use('/api/ai/tech-stack', diff --git a/services/multi-document-upload-service/Dockerfile b/services/multi-document-upload-service/Dockerfile new file mode 100644 index 0000000..a741f09 --- /dev/null +++ b/services/multi-document-upload-service/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +WORKDIR /app + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + build-essential \ + poppler-utils \ + tesseract-ocr \ + ffmpeg \ + libmagic1 \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src ./src + +ENV PYTHONPATH=/app/src \ + MULTI_DOC_STORAGE_ROOT=/app/storage \ + MULTI_DOC_CLAUDE_MODEL=claude-3-5-sonnet-20241022 \ + PORT=8024 + +EXPOSE 8024 + +CMD ["sh", "-c", "uvicorn multi_document_upload_service.main:app --host 0.0.0.0 --port ${PORT:-8024}"] + diff --git a/services/multi-document-upload-service/FIX_EMPTY_GRAPH.md b/services/multi-document-upload-service/FIX_EMPTY_GRAPH.md new file mode 100644 index 0000000..3110aa1 --- /dev/null +++ b/services/multi-document-upload-service/FIX_EMPTY_GRAPH.md @@ -0,0 +1,144 @@ +# Fix: Empty Graph in Neo4j (No Relationships Found) + +## Problem + +When querying Neo4j for `CAUSES` relationships, you get "(no changes, no records)" because: + +1. **PDF extraction failed** - Missing dependencies (`unstructured[pdf]`) +2. **0 relations extracted** - No text was extracted, so no analysis happened +3. **0 relations written** - Nothing was written to Neo4j (correct behavior) + +## Root Cause + +The service completed with 0 relations because: +- PDF file extraction failed: `partition_pdf() is not available because one or more dependencies are not installed` +- No text was extracted from the PDF +- No chunks were created +- No Claude analysis happened +- 0 relations were extracted +- 0 relations were written to Neo4j + +## Solution + +### Step 1: Update Dependencies + +The `requirements.txt` has been updated to include: +``` +unstructured[pdf]>=0.15.0 +unstructured[docx]>=0.15.0 +unstructured[pptx]>=0.15.0 +unstructured[xlsx]>=0.15.0 +``` + +### Step 2: Rebuild the Service + +```bash +cd /home/tech4biz/Desktop/prakash/codenuk/backend_new1/codenuk_backend_mine + +# Rebuild the service with new dependencies +docker-compose build multi-document-upload-service + +# Restart the service +docker-compose restart multi-document-upload-service + +# Check logs to verify it's working +docker-compose logs -f multi-document-upload-service +``` + +### Step 3: Verify Dependencies + +```bash +# Check if unstructured[pdf] is installed +docker-compose exec multi-document-upload-service pip list | grep unstructured +``` + +### Step 4: Re-upload Documents + +1. Go to Project Builder in the frontend +2. Click on "Upload Documents for Knowledge Graph" +3. Upload a PDF or other document +4. Wait for processing to complete +5. Check Neo4j for relationships + +### Step 5: Check Neo4j + +Run these queries in Neo4j Browser: + +```cypher +// Check if any nodes exist +MATCH (n) +RETURN count(n) as node_count + +// Check for CAUSES relationships +MATCH (n:Concept)-[r:CAUSES]->(m:Concept) +RETURN n.name as cause, m.name as effect, r.confidence as confidence +LIMIT 50 +``` + +## Expected Behavior After Fix + +1. **PDF extraction succeeds** - Text is extracted from PDF files +2. **Text is chunked** - Document is split into manageable chunks +3. **Claude analyzes** - Causal relationships are extracted +4. **Relations are written** - Relationships are stored in Neo4j +5. **Query returns results** - Neo4j query shows relationships + +## Verification Steps + +1. **Check service logs**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "extracted\|relation\|neo4j" + ``` + +2. **Check job status**: + ```bash + curl http://localhost:8000/api/multi-docs/jobs/{job_id} + ``` + Should show: `"processed_files": 1` and relations count > 0 + +3. **Check Neo4j**: + ```cypher + MATCH (n:Concept)-[r:CAUSES]->(m:Concept) + RETURN count(r) as relation_count + ``` + +## Improvements Made + +1. ✅ **Added PDF dependencies** - `unstructured[pdf]`, `unstructured[docx]`, etc. +2. ✅ **Added fallback extractors** - Uses `pdfplumber` if unstructured fails +3. ✅ **Better error handling** - Shows actual errors in job status +4. ✅ **Improved logging** - More detailed logs for debugging +5. ✅ **Better Neo4j query** - Validates data before writing + +## Troubleshooting + +If you still see 0 relations after rebuilding: + +1. **Check extraction logs**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "extract" + ``` + +2. **Check Claude analysis**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "claude\|analyze" + ``` + +3. **Check Neo4j connection**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "neo4j\|graph" + ``` + +4. **Verify document has causal language**: + - Not all documents contain causal relationships + - Try uploading a document with clear cause-effect statements + - Example: "Smoking causes lung cancer" or "Rain causes flooding" + +## Next Steps + +1. Rebuild the service with new dependencies +2. Re-upload documents +3. Check Neo4j for relationships +4. If still no results, check service logs for errors +5. Verify the document contains causal language + diff --git a/services/multi-document-upload-service/NEO4J_DIAGNOSTIC_QUERIES.md b/services/multi-document-upload-service/NEO4J_DIAGNOSTIC_QUERIES.md new file mode 100644 index 0000000..1b96d67 --- /dev/null +++ b/services/multi-document-upload-service/NEO4J_DIAGNOSTIC_QUERIES.md @@ -0,0 +1,176 @@ +# Neo4j Diagnostic Queries + +## Issue: No relationships found in Neo4j + +If you're seeing "(no changes, no records)" when querying for `CAUSES` relationships, here are diagnostic queries to check what's actually in the database. + +## Diagnostic Queries + +### 1. Check if any nodes exist +```cypher +MATCH (n) +RETURN count(n) as node_count +LIMIT 1 +``` + +### 2. Check if Concept nodes exist +```cypher +MATCH (n:Concept) +RETURN count(n) as concept_count, + collect(DISTINCT labels(n)) as labels, + collect(DISTINCT keys(n)) as properties +LIMIT 10 +``` + +### 3. Check all relationship types +```cypher +CALL db.relationshipTypes() YIELD relationshipType +RETURN relationshipType +``` + +### 4. Check all node labels +```cypher +CALL db.labels() YIELD label +RETURN label +``` + +### 5. Check all relationships (any type) +```cypher +MATCH (n)-[r]->(m) +RETURN type(r) as relationship_type, + count(r) as count, + labels(n) as from_labels, + labels(m) as to_labels +LIMIT 50 +``` + +### 6. Check for CAUSES relationships specifically +```cypher +MATCH (n)-[r:CAUSES]->(m) +RETURN n, r, m +LIMIT 50 +``` + +### 7. Check for relationships with lowercase "causes" +```cypher +MATCH (n)-[r]->(m) +WHERE type(r) =~ '(?i)causes' +RETURN type(r) as relationship_type, n, r, m +LIMIT 50 +``` + +### 8. Check all nodes and their relationships +```cypher +MATCH (n) +OPTIONAL MATCH (n)-[r]->(m) +RETURN n, labels(n) as node_labels, + type(r) as relationship_type, + m, labels(m) as target_labels +LIMIT 50 +``` + +### 9. Check for nodes created by the service (by job_id property) +```cypher +MATCH (n)-[r]->(m) +WHERE r.job_id IS NOT NULL +RETURN n, r, m, r.job_id as job_id +LIMIT 50 +``` + +### 10. Check database statistics +```cypher +MATCH (n) +RETURN count(n) as total_nodes, + size([(n)-[r]->() | r]) as total_relationships +``` + +## Common Issues and Solutions + +### Issue 1: No nodes at all +**Symptom**: Query 1 returns 0 nodes +**Cause**: Service hasn't written anything to Neo4j, or connection failed +**Solution**: +- Check service logs: `docker-compose logs multi-document-upload-service` +- Verify Neo4j connection in service configuration +- Check if job completed with 0 relations (extraction failed) + +### Issue 2: Nodes exist but no relationships +**Symptom**: Query 1 returns nodes, but Query 6 returns no relationships +**Cause**: Relationships weren't created, or different relationship type +**Solution**: +- Check Query 5 to see what relationship types actually exist +- Check service logs for graph writing errors +- Verify the job actually extracted relations (check job status) + +### Issue 3: Different relationship type +**Symptom**: Query 5 shows relationships but not `CAUSES` +**Cause**: Service might be using a different relationship type +**Solution**: +- Check Query 3 to see all relationship types +- Update query to use the correct relationship type + +### Issue 4: Different node labels +**Symptom**: Query 6 returns no results, but Query 2 shows different labels +**Cause**: Service might be using different node labels +**Solution**: +- Check Query 2 to see what labels exist +- Update query to match actual labels + +## Expected Structure + +After a successful upload, you should see: + +### Nodes +- **Label**: `Concept` +- **Properties**: `name`, `lastSeen` + +### Relationships +- **Type**: `CAUSES` +- **Properties**: `confidence`, `explanation`, `source_file_id`, `source_snippet`, `job_id`, `model`, `updated_at` + +### Example Query +```cypher +MATCH (cause:Concept)-[r:CAUSES]->(effect:Concept) +RETURN cause.name as cause, + effect.name as effect, + r.confidence as confidence, + r.job_id as job_id, + r.source_file_id as source_file +LIMIT 50 +``` + +## Troubleshooting Steps + +1. **Check service logs**: + ```bash + docker-compose logs -f multi-document-upload-service + ``` + +2. **Check if job completed successfully**: + ```bash + curl http://localhost:8000/api/multi-docs/jobs/{job_id} + ``` + +3. **Check Neo4j connection**: + ```bash + docker-compose logs neo4j | grep -i error + ``` + +4. **Verify Neo4j is running**: + ```bash + docker-compose ps neo4j + ``` + +5. **Test Neo4j connection manually**: + ```bash + docker-compose exec neo4j cypher-shell -u neo4j -p password "MATCH (n) RETURN count(n)" + ``` + +## Next Steps + +1. Run the diagnostic queries above +2. Check the service logs for errors +3. Verify the job status via API +4. Re-upload documents after fixing dependencies +5. Check if relations were actually extracted (job status should show relation count) + diff --git a/services/multi-document-upload-service/QUICK_TEST.md b/services/multi-document-upload-service/QUICK_TEST.md new file mode 100644 index 0000000..af656eb --- /dev/null +++ b/services/multi-document-upload-service/QUICK_TEST.md @@ -0,0 +1,85 @@ +# Quick Testing Guide - Multi-Document Upload + +## 🚀 Quick Start Testing + +### 1. Start Services +```bash +cd /home/tech4biz/Desktop/prakash/codenuk/backend_new1/codenuk_backend_mine +docker-compose up -d multi-document-upload-service neo4j redis postgres api-gateway +``` + +### 2. Verify Services +```bash +# Check health +curl http://localhost:8024/health +curl http://localhost:8000/api/multi-docs/health +``` + +### 3. Test via Frontend + +1. **Open Frontend**: `http://localhost:3001` +2. **Login** (if required) +3. **Go to Project Builder** +4. **Complete Steps 1-2** (Project Type & Features) +5. **Step 3: Multi Docs Upload** appears +6. **Upload files**: + - Click upload area + - Select multiple files (PDF, DOCX, etc.) + - Click "Start Upload" +7. **Watch Progress**: + - Progress bar updates + - Status messages appear + - Polls every 4 seconds +8. **Auto-proceeds** when completed + +### 4. Verify in Neo4j + +```bash +# Open Neo4j Browser: http://localhost:7474 +# Login: neo4j / password + +# Query causal relationships: +MATCH (n)-[r:CAUSES]->(m) +RETURN n, r, m +LIMIT 50 +``` + +## 📝 Test Checklist + +- [ ] Service starts successfully +- [ ] Health endpoint works +- [ ] Frontend component renders +- [ ] File upload works +- [ ] Progress updates correctly +- [ ] Job completes successfully +- [ ] Neo4j graph contains relationships +- [ ] Error handling works +- [ ] Skip button works + +## 🔍 Debug Commands + +```bash +# View service logs +docker-compose logs -f multi-document-upload-service + +# Check job status (replace {job_id}) +curl http://localhost:8000/api/multi-docs/jobs/{job_id} + +# Check graph summary +curl http://localhost:8000/api/multi-docs/jobs/{job_id}/graph +``` + +## âš ī¸ Common Issues + +1. **502 Bad Gateway**: Service not running → `docker-compose ps` +2. **413 Too Large**: File too big → Reduce file size +3. **No progress**: Check browser console → Check network tab +4. **No relationships**: Check Claude API key → Check service logs + +## đŸŽ¯ Expected Flow + +``` +Upload Files → Job Created → Files Saved → Content Extracted → +Claude Analysis → Graph Built → Completed → Auto-proceed to Next Step +``` + diff --git a/services/multi-document-upload-service/README.md b/services/multi-document-upload-service/README.md new file mode 100644 index 0000000..cab9672 --- /dev/null +++ b/services/multi-document-upload-service/README.md @@ -0,0 +1,36 @@ +# Multi Document Upload Service + +This service accepts large batches of heterogeneous documents, extracts causal +relationships with Claude Sonnet 3.5, and writes them into Neo4j as a +knowledge graph. + +## Features + +- Multipart upload endpoint (`POST /jobs`) capable of handling dozens of files + and mixed formats (PDF, DOCX, PPTX, XLSX/CSV, JSON/XML, images, audio/video). +- Content extraction powered by the `unstructured` library with fallbacks. +- Chunking tuned for Claude Sonnet (800 token target, 200 overlap). +- High-accuracy causal extraction using Anthropic Claude with provenance. +- Neo4j graph writer that upserts `Concept` nodes and `CAUSES` edges. +- Status endpoint (`GET /jobs/{id}`) and graph summary endpoint + (`GET /jobs/{id}/graph`). + +## Configuration + +Environment variables: + +- `ANTHROPIC_API_KEY` (required) +- `MULTI_DOC_CLAUDE_MODEL` (default `claude-3-5-sonnet-20241022`) +- `NEO4J_URI` (default `bolt://localhost:7687`) +- `NEO4J_USER` / `NEO4J_PASSWORD` (default `neo4j` / `neo4j`) +- `MULTI_DOC_STORAGE_ROOT` (default `storage` inside project) + +## Run locally + +```bash +uvicorn multi_document_upload_service.main:app --reload --host 0.0.0.0 --port 8035 +``` + +Ensure Neo4j is reachable and Anthropic credentials are exported before +starting the service. + diff --git a/services/multi-document-upload-service/REBUILD_INSTRUCTIONS.md b/services/multi-document-upload-service/REBUILD_INSTRUCTIONS.md new file mode 100644 index 0000000..5b84c8c --- /dev/null +++ b/services/multi-document-upload-service/REBUILD_INSTRUCTIONS.md @@ -0,0 +1,152 @@ +# Rebuild Instructions - Multi-Document Upload Service + +## Issue: Empty Graph in Neo4j + +**Problem**: Query returns "(no changes, no records)" because the job completed with 0 relations. + +**Root Cause**: PDF extraction failed due to missing dependencies (`unstructured[pdf]`). + +## Fixes Applied + +1. ✅ Added PDF dependencies (`unstructured[pdf]`, `unstructured[docx]`, etc.) +2. ✅ Added fallback extractors (pdfplumber, python-docx, python-pptx) +3. ✅ Improved error handling and logging +4. ✅ Fixed Neo4j query syntax +5. ✅ Better status messages + +## Rebuild Steps + +### Step 1: Rebuild the Service + +```bash +cd /home/tech4biz/Desktop/prakash/codenuk/backend_new1/codenuk_backend_mine + +# Stop the service +docker-compose stop multi-document-upload-service + +# Rebuild with new dependencies +docker-compose build --no-cache multi-document-upload-service + +# Start the service +docker-compose up -d multi-document-upload-service + +# Check logs to verify it's starting correctly +docker-compose logs -f multi-document-upload-service +``` + +### Step 2: Verify Dependencies + +```bash +# Check if unstructured[pdf] is installed +docker-compose exec multi-document-upload-service pip list | grep unstructured + +# You should see: +# unstructured +# unstructured-pdf +# unstructured-docx +# etc. +``` + +### Step 3: Test the Service + +```bash +# Check health endpoint +curl http://localhost:8024/health + +# Should return: +# { +# "status": "ok", +# "claude_model": "claude-3-5-haiku-latest", +# ... +# } +``` + +### Step 4: Re-upload Documents + +1. Open frontend: `http://localhost:3001/project-builder` +2. Go to Step 1: Project Type +3. Find "Upload Documents for Knowledge Graph" section +4. Upload a PDF or other document +5. Wait for processing to complete +6. Check status - should show relation count > 0 + +### Step 5: Verify in Neo4j + +Run these queries in Neo4j Browser (`http://localhost:7474`): + +```cypher +// Check if any nodes exist +MATCH (n) +RETURN count(n) as node_count + +// Check for CAUSES relationships +MATCH (n:Concept)-[r:CAUSES]->(m:Concept) +RETURN n.name as cause, + m.name as effect, + r.confidence as confidence, + r.job_id as job_id +LIMIT 50 +``` + +## Expected Results + +After rebuilding and re-uploading: + +1. **PDF extraction succeeds** ✅ +2. **Text is extracted** ✅ +3. **Relations are extracted** ✅ +4. **Relations are written to Neo4j** ✅ +5. **Query returns results** ✅ + +## Troubleshooting + +If you still see 0 relations: + +1. **Check service logs**: + ```bash + docker-compose logs multi-document-upload-service | tail -50 + ``` + +2. **Check extraction logs**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "extract\|pdf" + ``` + +3. **Check Claude analysis**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "claude\|analyze\|relation" + ``` + +4. **Check Neo4j connection**: + ```bash + docker-compose logs multi-document-upload-service | grep -i "neo4j\|graph\|write" + ``` + +5. **Verify document has causal language**: + - Not all documents contain causal relationships + - Try uploading a document with clear cause-effect statements + - Example: "Smoking causes lung cancer" + +## Quick Test + +Test with a simple text file: + +1. Create a test file `test_causal.txt`: + ``` + Smoking cigarettes causes lung cancer. + Heavy rain causes flooding. + Exercise improves health. + ``` + +2. Upload it via the frontend +3. Check Neo4j for relationships +4. Should see 3 causal relationships + +## Next Steps + +1. Rebuild the service +2. Re-upload documents +3. Check Neo4j for relationships +4. If still no results, check service logs +5. Verify the document contains causal language + diff --git a/services/multi-document-upload-service/TESTING_GUIDE.md b/services/multi-document-upload-service/TESTING_GUIDE.md new file mode 100644 index 0000000..cfd7294 --- /dev/null +++ b/services/multi-document-upload-service/TESTING_GUIDE.md @@ -0,0 +1,300 @@ +# Multi-Document Upload Service - Frontend Testing Guide + +## Prerequisites + +1. **Backend Services Running**: + ```bash + cd /home/tech4biz/Desktop/prakash/codenuk/backend_new1/codenuk_backend_mine + docker-compose up -d + ``` + +2. **Verify Services are Running**: + - API Gateway: `http://localhost:8000/health` + - Multi-Document Upload Service: `http://localhost:8024/health` + - Neo4j: `http://localhost:7474` (Browser interface) + - Frontend: `http://localhost:3001` (or your frontend port) + +3. **Check Service Health**: + ```bash + # Check API Gateway + curl http://localhost:8000/health + + # Check Multi-Document Upload Service directly + curl http://localhost:8024/health + + # Check via API Gateway proxy + curl http://localhost:8000/api/multi-docs/health + ``` + +## Frontend Testing Steps + +### Step 1: Navigate to Project Builder + +1. Open your browser and go to: `http://localhost:3001` (or your frontend URL) +2. Log in if required +3. Click on **"Project Builder"** in the navigation + +### Step 2: Go to Multi Docs Upload Step + +1. In the Project Builder, you should see the workflow steps: + - **Step 1**: Project Type + - **Step 2**: Features + - **Step 3**: Multi Docs Upload ← **This is the new step** + - **Step 4**: Business Context + - **Step 5**: Generate + - **Step 6**: Architecture + +2. Complete Steps 1 and 2 (Project Type and Features selection) +3. You will automatically be taken to **Step 3: Multi Docs Upload** + +### Step 3: Upload Documents + +1. **Click on the upload area** or **drag and drop files** +2. **Select multiple files** (you can mix different formats): + - PDF files (`.pdf`) + - Word documents (`.doc`, `.docx`) + - PowerPoint (`.ppt`, `.pptx`) + - Excel files (`.xls`, `.xlsx`) + - JSON files (`.json`) + - XML files (`.xml`) + - Markdown files (`.md`) + - Images (`.png`, `.jpg`, `.jpeg`) - will use OCR + - Audio files (`.mp3`, `.wav`) - will be transcribed + - Video files (`.mp4`, `.avi`) - will be transcribed + +3. **View selected files**: You should see a list of all selected files with: + - File icon + - File name + - Remove button for each file + +4. **Click "Start Upload"** button + +### Step 4: Monitor Upload Progress + +After clicking "Start Upload", you should see: + +1. **Upload Status**: + - Button shows "Uploading..." with spinner + - Progress bar appears + - Stage messages appear: + - "Job received" + - "Saving files" + - "Extracting document content" + - "Calling Claude for causal relations" + - "Writing to Neo4j knowledge graph" + - "Completed" + +2. **Progress Indicators**: + - Progress percentage (0-100%) + - Status message showing current stage + - Processed files count vs total files count + +3. **Polling**: The frontend automatically polls the job status every 4 seconds + +### Step 5: Verify Results + +Once the job is completed: + +1. **Check Neo4j Graph**: + - Open Neo4j Browser: `http://localhost:7474` + - Login with: + - Username: `neo4j` + - Password: `password` + - Run Cypher query to see the graph: + ```cypher + MATCH (n)-[r:CAUSES]->(m) + RETURN n, r, m + LIMIT 50 + ``` + +2. **Check Job Status via API**: + ```bash + # Replace {job_id} with the actual job ID from the frontend + curl http://localhost:8000/api/multi-docs/jobs/{job_id} + ``` + +3. **Get Graph Summary**: + ```bash + curl http://localhost:8000/api/multi-docs/jobs/{job_id}/graph + ``` + +## Testing Different Scenarios + +### Scenario 1: Single PDF File +- Upload one PDF file +- Verify it processes correctly +- Check Neo4j for causal relationships + +### Scenario 2: Multiple Mixed Format Files +- Upload 3-5 files of different formats (PDF, DOCX, JSON, image) +- Verify all files are processed +- Check that progress updates correctly + +### Scenario 3: Large Files +- Upload a large PDF (10+ MB) +- Verify it handles large files correctly +- Check processing time + +### Scenario 4: Error Handling +- Try uploading an unsupported file type +- Verify error message appears +- Check that the error is displayed clearly + +### Scenario 5: Skip Option +- Upload files +- Click "Skip" button before completion +- Verify you can proceed to the next step +- Job continues processing in the background + +## Browser Developer Tools + +### Check Network Requests + +1. **Open Developer Tools** (F12) +2. **Go to Network tab** +3. **Filter by "multi-docs"** +4. **Monitor requests**: + - `POST /api/multi-docs/jobs` - Upload files + - `GET /api/multi-docs/jobs/{job_id}` - Poll job status + - `GET /api/multi-docs/jobs/{job_id}/graph` - Get graph summary + +### Check Console Logs + +1. **Open Console tab** +2. **Look for**: + - Upload progress logs + - Job status updates + - Any error messages + +### Check Response Data + +Verify the API responses: + +```javascript +// Upload response should be: +{ + "job_id": "uuid-here", + "stage": "received", + "total_files": 3, + "created_at": "2024-01-01T00:00:00Z" +} + +// Status response should be: +{ + "job_id": "uuid-here", + "stage": "extracting", + "status_message": "Extracting document content", + "total_files": 3, + "processed_files": 1, + "error": null, + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "files": [...] +} +``` + +## Troubleshooting + +### Issue: Upload fails with 502 Bad Gateway +**Solution**: +- Check if multi-document-upload-service is running: + ```bash + docker-compose ps multi-document-upload-service + ``` +- Check service logs: + ```bash + docker-compose logs multi-document-upload-service + ``` + +### Issue: Upload fails with 413 Request Entity Too Large +**Solution**: +- Check file sizes (max 500MB total per job) +- Reduce number of files or file sizes +- Check API Gateway body size limits + +### Issue: Status polling stops working +**Solution**: +- Check browser console for errors +- Verify job ID is correct +- Check if job completed or failed +- Check network tab for failed requests + +### Issue: No causal relationships found +**Solution**: +- Check Claude API key is configured correctly +- Check service logs for Claude API errors +- Verify documents contain causal language +- Check Neo4j connection + +### Issue: Frontend shows "Failed" status +**Solution**: +- Check the error message in the frontend +- Check backend service logs: + ```bash + docker-compose logs -f multi-document-upload-service + ``` +- Verify all dependencies are running (Neo4j, Redis, Postgres) + +## Expected Behavior + +### Successful Flow: +1. ✅ Files upload successfully +2. ✅ Job ID is returned +3. ✅ Status polling starts automatically +4. ✅ Progress updates every 4 seconds +5. ✅ Stage changes are displayed +6. ✅ Progress bar updates +7. ✅ Job completes successfully +8. ✅ Frontend automatically proceeds to next step +9. ✅ Neo4j contains causal relationships + +### Error Flow: +1. ✅ Error message is displayed clearly +2. ✅ User can retry upload +3. ✅ User can skip and proceed +4. ✅ Error details are logged in console + +## API Endpoints Reference + +### Upload Files +```bash +POST /api/multi-docs/jobs +Content-Type: multipart/form-data + +Form Data: +- files: File[] (multiple files) +- job_name: string (optional) +``` + +### Get Job Status +```bash +GET /api/multi-docs/jobs/{job_id} +``` + +### Get Graph Summary +```bash +GET /api/multi-docs/jobs/{job_id}/graph +``` + +### Health Check +```bash +GET /api/multi-docs/health +``` + +## Next Steps After Testing + +1. **Verify Neo4j Graph**: Check that causal relationships are stored correctly +2. **Check Storage**: Verify files are stored in the persistent volume +3. **Monitor Performance**: Check processing times for different file types +4. **Test Error Scenarios**: Verify error handling works correctly +5. **Test Large Batches**: Upload 50+ files to test scalability + +## Support + +If you encounter issues: +1. Check service logs: `docker-compose logs multi-document-upload-service` +2. Check API Gateway logs: `docker-compose logs api-gateway` +3. Check Neo4j logs: `docker-compose logs neo4j` +4. Verify all environment variables are set correctly +5. Check network connectivity between services + diff --git a/services/multi-document-upload-service/requirements.txt b/services/multi-document-upload-service/requirements.txt new file mode 100644 index 0000000..00a9795 --- /dev/null +++ b/services/multi-document-upload-service/requirements.txt @@ -0,0 +1,34 @@ +fastapi>=0.110.0 +uvicorn[standard]>=0.29.0 +anthropic>=0.33.0 +neo4j>=5.23.0 +python-multipart>=0.0.9 +pydantic>=2.7.0 +pydantic-settings>=2.2.1 +aiofiles>=23.2.1 +tenacity>=8.2.3 +python-dotenv>=1.0.1 +unstructured[pdf]>=0.15.0 +unstructured[docx]>=0.15.0 +unstructured[pptx]>=0.15.0 +unstructured[xlsx]>=0.15.0 +pdfplumber>=0.11.0 +python-docx>=1.1.0 +python-pptx>=0.6.23 +pandas>=2.2.2 +openpyxl>=3.1.2 +xlrd>=2.0.1 +pytesseract>=0.3.10 +Pillow>=10.3.0 +opencv-python-headless>=4.9.0.80 +PyMuPDF>=1.23.0 +pdf2image>=1.16.3 +faster-whisper>=0.10.0 +ffmpeg-python>=0.2.0 +pydub>=0.25.1 +beautifulsoup4>=4.12.3 +lxml>=5.2.1 +sqlalchemy>=2.0.25 +httpx>=0.27.0 +tiktoken>=0.7.0 + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/__init__.py b/services/multi-document-upload-service/src/multi_document_upload_service/__init__.py new file mode 100644 index 0000000..6ba235d --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/__init__.py @@ -0,0 +1,4 @@ +""" +Multi Document Upload Service package. +""" + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/claude_client.py b/services/multi-document-upload-service/src/multi_document_upload_service/claude_client.py new file mode 100644 index 0000000..cc2e6df --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/claude_client.py @@ -0,0 +1,328 @@ +from __future__ import annotations + +import base64 +import json +import logging +import re +from pathlib import Path +from typing import Iterable, List + +from anthropic import Anthropic, BadRequestError +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential, RetryCallState + +from .models import CausalRelation + +logger = logging.getLogger(__name__) + + +def is_billing_error(exception: Exception) -> bool: + """Check if the exception is a billing/credit related error that shouldn't be retried.""" + if isinstance(exception, BadRequestError): + error_message = str(exception).lower() + billing_keywords = ["credit", "balance", "too low", "billing", "upgrade", "purchase credits"] + return any(keyword in error_message for keyword in billing_keywords) + return False + + +def should_retry_exception(retry_state: RetryCallState) -> bool: + """Custom retry condition that excludes billing errors.""" + exception = retry_state.outcome.exception() + if exception is None: + return False + # Don't retry billing errors - they won't be resolved by retrying + if is_billing_error(exception): + return False + # Retry other exceptions + return True + + +CLAUDE_PROMPT_TEMPLATE = """You are an expert analyst extracting causal relationships from documents. + +Given the following text chunk, identify all explicit or strongly implied cause and effect pairs. +Return JSON with the schema: +[ + { + "cause": "", + "effect": "", + "confidence": 0-1 float, + "explanation": "", + "source_snippet": "" + } +] + +Only include items when the causal direction is clear. +If none are found, return an empty list []. + +Text chunk: +``` +<<>> +```""" + +IMAGE_PROMPT_TEMPLATE = """You are an expert analyst extracting causal relationships from images, diagrams, and visual content. + +Analyze this image/diagram for causal relationships. Look for: +- Architecture flows (A → B → C) +- Dependency relationships +- Cause-effect chains in diagrams +- Process flows +- System interactions +- Data flows +- Sequential relationships +- Visual connections between components + +Return JSON with the schema: +[ + { + "cause": "", + "effect": "", + "confidence": 0-1 float, + "explanation": "", + "source_snippet": "" + } +] + +Only include items when the causal direction is clear from the visual structure. +If none are found, return an empty list [].""" + + +class ClaudeCausalExtractor: + def __init__(self, api_key: str, model: str, max_output_tokens: int = 4000): + self.client = Anthropic(api_key=api_key) + self.model = model + self.max_output_tokens = max_output_tokens + + @retry( + retry=should_retry_exception, + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + ) + def analyze_chunk(self, chunk: str, source_file_id: str) -> List[CausalRelation]: + logger.debug("Analyzing chunk with Claude model %s", self.model) + + # Validate chunk is not empty and is readable text + if not chunk or not chunk.strip(): + logger.warning("Empty or whitespace-only chunk, skipping") + return [] + + # Check if chunk contains mostly readable text (not binary data) + # Simple heuristic: if >50% of characters are non-printable or control chars, skip it + printable_chars = sum(1 for c in chunk if c.isprintable() or c.isspace()) + if len(chunk) > 100 and printable_chars / len(chunk) < 0.5: + logger.warning("Chunk appears to contain binary data, skipping analysis") + return [] + + # Use string replacement with a unique placeholder to avoid KeyError with braces in content + # This prevents Python's .format() from interpreting braces in the chunk text as format placeholders + prompt_text = CLAUDE_PROMPT_TEMPLATE.replace("<<>>", chunk) + + try: + message = self.client.messages.create( + model=self.model, + max_tokens=self.max_output_tokens, + temperature=0.0, + system="You extract causal (cause→effect) relations with high precision.", + messages=[ + { + "role": "user", + "content": [{"type": "text", "text": prompt_text}], + } + ], + ) + except BadRequestError as e: + # Check if it's a billing error + if is_billing_error(e): + error_msg = ( + "Anthropic API credit balance is too low. " + "Please go to Plans & Billing to upgrade or purchase credits. " + f"Error: {str(e)}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) from e + # Re-raise other BadRequestErrors + raise + + content_blocks = message.content or [] + raw_text = "".join(block.text for block in content_blocks if hasattr(block, "text")) # type: ignore[attr-defined] + if not raw_text: + return [] + + # Try to extract JSON from markdown code blocks if present + json_text = raw_text.strip() + + # Look for JSON in markdown code blocks (```json ... ```) + json_match = re.search(r'```(?:json)?\s*(\[.*?\])\s*```', json_text, re.DOTALL) + if json_match: + json_text = json_match.group(1) + else: + # Look for JSON array/object at the start or end + json_match = re.search(r'(\[.*?\]|{.*?})', json_text, re.DOTALL) + if json_match: + json_text = json_match.group(1) + + try: + data = json.loads(json_text) + if not isinstance(data, list): + logger.warning("Claude response is not a list: %s", type(data)) + return [] + + relations: List[CausalRelation] = [] + for item in data: + if not isinstance(item, dict): + continue + cause = item.get("cause", "").strip() + effect = item.get("effect", "").strip() + if not cause or not effect: + continue # Skip invalid relations + + relations.append( + CausalRelation( + cause=cause, + effect=effect, + confidence=float(item.get("confidence", 0.0)), + explanation=item.get("explanation"), + source_file_id=source_file_id, + source_snippet=item.get("source_snippet"), + metadata={"model": self.model}, + ) + ) + logger.info("Extracted %d relations from Claude response", len(relations)) + return relations + except json.JSONDecodeError as e: + logger.warning("Failed to parse Claude response as JSON: %s. Raw text: %s", e, raw_text[:200]) + return [] + + def analyze(self, chunks: Iterable[str], source_file_id: str) -> List[CausalRelation]: + relations: List[CausalRelation] = [] + for chunk in chunks: + relations.extend(self.analyze_chunk(chunk, source_file_id=source_file_id)) + return relations + + @retry( + retry=should_retry_exception, + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + ) + def analyze_image(self, image_path: Path, source_file_id: str) -> List[CausalRelation]: + """ + Analyze an image using Claude Vision API to extract causal relationships. + Sends image directly to Claude (no OCR). + """ + logger.info("Analyzing image with Claude Vision: %s", image_path.name) + + try: + # Read and encode image as base64 + with open(image_path, "rb") as image_file: + image_data = image_file.read() + + # Determine media type + suffix = image_path.suffix.lower() + media_type_map = { + ".png": "image/png", + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".gif": "image/gif", + ".webp": "image/webp", + } + media_type = media_type_map.get(suffix, "image/png") + + # Encode to base64 + base64_image = base64.b64encode(image_data).decode("utf-8") + + # Prepare content for Claude Vision API + content = [ + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": base64_image, + }, + }, + { + "type": "text", + "text": IMAGE_PROMPT_TEMPLATE, + }, + ] + + # Call Claude Vision API + try: + message = self.client.messages.create( + model=self.model, # Claude models support vision + max_tokens=self.max_output_tokens, + temperature=0.0, + system="You extract causal (cause→effect) relations from visual content with high precision.", + messages=[ + { + "role": "user", + "content": content, + } + ], + ) + except BadRequestError as e: + # Check if it's a billing error + if is_billing_error(e): + error_msg = ( + "Anthropic API credit balance is too low. " + "Please go to Plans & Billing to upgrade or purchase credits. " + f"Error: {str(e)}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) from e + # Re-raise other BadRequestErrors + raise + + # Parse response + content_blocks = message.content or [] + raw_text = "".join(block.text for block in content_blocks if hasattr(block, "text")) # type: ignore[attr-defined] + if not raw_text: + logger.warning("No text response from Claude Vision for image %s", image_path.name) + return [] + + # Extract JSON from response + json_text = raw_text.strip() + json_match = re.search(r'```(?:json)?\s*(\[.*?\])\s*```', json_text, re.DOTALL) + if json_match: + json_text = json_match.group(1) + else: + json_match = re.search(r'(\[.*?\]|{.*?})', json_text, re.DOTALL) + if json_match: + json_text = json_match.group(1) + + try: + data = json.loads(json_text) + if not isinstance(data, list): + logger.warning("Claude Vision response is not a list: %s", type(data)) + return [] + + relations: List[CausalRelation] = [] + for item in data: + if not isinstance(item, dict): + continue + cause = item.get("cause", "").strip() + effect = item.get("effect", "").strip() + if not cause or not effect: + continue + + relations.append( + CausalRelation( + cause=cause, + effect=effect, + confidence=float(item.get("confidence", 0.0)), + explanation=item.get("explanation"), + source_file_id=source_file_id, + source_snippet=item.get("source_snippet") or f"Image: {image_path.name}", + metadata={"model": self.model, "content_type": "image", "image_path": str(image_path)}, + ) + ) + logger.info("Extracted %d relations from image %s", len(relations), image_path.name) + return relations + except json.JSONDecodeError as e: + logger.warning("Failed to parse Claude Vision response as JSON: %s. Raw text: %s", e, raw_text[:200]) + return [] + + except Exception as exc: + logger.exception("Failed to analyze image %s: %s", image_path, exc) + return [] + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/config.py b/services/multi-document-upload-service/src/multi_document_upload_service/config.py new file mode 100644 index 0000000..54c4b07 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/config.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os +from functools import lru_cache +from pathlib import Path + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +DEFAULT_STORAGE_ROOT = Path( + os.getenv("MULTI_DOC_STORAGE_ROOT", Path(__file__).resolve().parent.parent.parent / "storage") +) +DEFAULT_STORAGE_ROOT.mkdir(parents=True, exist_ok=True) + + +class Settings(BaseSettings): + """Application configuration loaded from environment variables.""" + + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + + anthropic_api_key: str | None = Field(default=None, validation_alias="ANTHROPIC_API_KEY") + claude_model: str = Field(default=os.getenv("MULTI_DOC_CLAUDE_MODEL", "claude-3-5-sonnet-20241022")) + claude_max_input_tokens: int = Field(default=200_000) + claude_max_output_tokens: int = Field(default=16_000) + + neo4j_uri: str = Field(default=os.getenv("NEO4J_URI", "bolt://localhost:7687")) + neo4j_user: str = Field(default=os.getenv("NEO4J_USER", "neo4j")) + neo4j_password: str = Field(default=os.getenv("NEO4J_PASSWORD", "neo4j")) + + storage_root: Path = Field(default=DEFAULT_STORAGE_ROOT) + max_upload_size_mb: int = Field(default=500) + max_files_per_job: int = Field(default=200) + + chunk_token_target: int = Field(default=800) + chunk_token_overlap: int = Field(default=200) + + job_retention_days: int = Field(default=30) + + def ensure_storage_dirs(self) -> None: + (self.storage_root / "jobs").mkdir(parents=True, exist_ok=True) + (self.storage_root / "uploads").mkdir(parents=True, exist_ok=True) + (self.storage_root / "extracted").mkdir(parents=True, exist_ok=True) + (self.storage_root / "images").mkdir(parents=True, exist_ok=True) + + +@lru_cache +def get_settings() -> Settings: + settings = Settings() + settings.ensure_storage_dirs() + return settings + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/extractors/auto.py b/services/multi-document-upload-service/src/multi_document_upload_service/extractors/auto.py new file mode 100644 index 0000000..fb87e18 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/extractors/auto.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import List + +logger = logging.getLogger(__name__) + +# Try to import unstructured, but fall back to alternatives if not available +try: + from unstructured.partition.auto import partition + HAS_UNSTRUCTURED = True +except ImportError: + HAS_UNSTRUCTURED = False + logger.warning("unstructured not available, will use fallback extractors") + +# Fallback extractors +try: + import pdfplumber + HAS_PDFPLUMBER = True +except ImportError: + HAS_PDFPLUMBER = False + +try: + from docx import Document as DocxDocument + HAS_DOCX = True +except ImportError: + HAS_DOCX = False + +try: + from pptx import Presentation + HAS_PPTX = True +except ImportError: + HAS_PPTX = False + +# Image processing libraries +try: + from PIL import Image + import pytesseract + HAS_OCR = True +except ImportError: + HAS_OCR = False + logger.warning("OCR libraries not available, image extraction will be limited") + + +def extract_text(path: Path) -> str: + """ + Extract text from a file using multiple strategies. + Falls back through: unstructured -> format-specific -> plain text read. + """ + suffix = path.suffix.lower() + + # Validate PDF file before processing + if suffix == ".pdf": + # Quick validation: check if file starts with PDF magic bytes + try: + with path.open("rb") as f: + header = f.read(4) + if header != b"%PDF": + raise ValueError( + f"File {path.name} does not appear to be a valid PDF. " + f"PDF files must start with '%PDF' magic bytes. " + f"Got: {header[:20] if len(header) > 0 else 'empty file'}" + ) + except Exception as exc: + if isinstance(exc, ValueError): + raise + logger.warning("Could not validate PDF header: %s", exc) + + # Image files - return empty text (will be processed directly with Claude Vision) + # We skip OCR and send images directly to Claude Vision API + if suffix in {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"}: + logger.info("Image file detected: %s. Will be processed directly with Claude Vision (no OCR)", path.name) + # Return empty string - images will be handled separately in pipeline + return "" + + # Plain text files - direct read + if suffix in {".txt", ".md", ".json", ".xml", ".html", ".csv"}: + try: + return path.read_text(encoding="utf-8", errors="ignore") + except Exception as exc: + logger.warning("Failed to read %s as text: %s", path, exc) + raise + + # Try unstructured first (if available) + if HAS_UNSTRUCTURED: + try: + elements = partition(filename=str(path)) + lines: List[str] = [] + for element in elements: + text = getattr(element, "text", None) + if text: + lines.append(text.strip()) + if lines: + logger.info("Extracted %d lines using unstructured", len(lines)) + return "\n".join(lines) + except Exception as exc: + logger.warning("unstructured extraction failed for %s: %s", path, exc) + # Continue to fallback methods + + # Fallback: PDF with pdfplumber + if suffix == ".pdf" and HAS_PDFPLUMBER: + try: + with pdfplumber.open(path) as pdf: + text_parts = [] + for page in pdf.pages: + page_text = page.extract_text() + if page_text: + text_parts.append(page_text) + if text_parts: + logger.info("Extracted PDF using pdfplumber") + return "\n".join(text_parts) + except Exception as exc: + logger.warning("pdfplumber extraction failed for %s: %s", path, exc) + + # Fallback: DOCX + if suffix == ".docx" and HAS_DOCX: + try: + doc = DocxDocument(path) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] + if paragraphs: + logger.info("Extracted DOCX using python-docx") + return "\n".join(paragraphs) + except Exception as exc: + logger.warning("python-docx extraction failed for %s: %s", path, exc) + + # Fallback: PPTX + if suffix in {".pptx", ".ppt"} and HAS_PPTX: + try: + prs = Presentation(path) + text_parts = [] + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text") and shape.text: + text_parts.append(shape.text.strip()) + if text_parts: + logger.info("Extracted PPTX using python-pptx") + return "\n".join(text_parts) + except Exception as exc: + logger.warning("python-pptx extraction failed for %s: %s", path, exc) + + # Last resort: try to read as text anyway, but validate it's readable + try: + content = path.read_text(encoding="utf-8", errors="ignore") + if content.strip(): + # Check if content is actually readable text (not binary data) + # Simple heuristic: if >30% of characters are printable, consider it text + printable_chars = sum(1 for c in content if c.isprintable() or c.isspace()) + total_chars = len(content) + + if total_chars > 0 and printable_chars / total_chars > 0.3: + logger.warning("Read %s as plain text (may contain binary data)", path) + return content + else: + logger.error("Content from %s appears to be binary data, cannot extract text", path) + raise ValueError(f"File {path} appears to be binary or corrupted. Cannot extract readable text.") + except Exception as exc: + if isinstance(exc, ValueError): + raise + logger.warning("Failed to read %s as text: %s", path, exc) + + # If all else fails, raise an error + raise ValueError( + f"Could not extract text from {path}. " + f"File type may not be supported, file may be corrupted, or dependencies are missing. " + f"Supported formats: PDF, DOCX, PPTX, XLSX, TXT, MD, JSON, XML, HTML, CSV, PNG, JPG, JPEG (with OCR)" + ) + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/extractors/image_extractor.py b/services/multi-document-upload-service/src/multi_document_upload_service/extractors/image_extractor.py new file mode 100644 index 0000000..a75c220 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/extractors/image_extractor.py @@ -0,0 +1,514 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import List, Tuple +from io import BytesIO + +from PIL import Image + +logger = logging.getLogger(__name__) + +# Header/Footer detection thresholds +HEADER_THRESHOLD = 0.15 # Top 15% of page is considered header +FOOTER_THRESHOLD = 0.15 # Bottom 15% of page is considered footer +MIN_CONTENT_HEIGHT = 0.3 # Minimum 30% of page height for content area + +# Try to import PDF libraries +try: + import fitz # PyMuPDF + HAS_PYMUPDF = True +except ImportError: + HAS_PYMUPDF = False + logger.warning("PyMuPDF not available, PDF image extraction will be limited") + +try: + from pdf2image import convert_from_path + HAS_PDF2IMAGE = True +except ImportError: + HAS_PDF2IMAGE = False + +# DOCX image extraction +try: + from docx import Document as DocxDocument + HAS_DOCX = True +except ImportError: + HAS_DOCX = False + +# PPTX image extraction +try: + from pptx import Presentation + HAS_PPTX = True +except ImportError: + HAS_PPTX = False + + +def is_header_footer_image(bbox: Tuple[float, float, float, float], page_height: float, page_width: float) -> bool: + """ + Check if an image is in header or footer region. + bbox: (x0, y0, x1, y1) - image bounding box coordinates + Returns True if image is in header/footer, False otherwise (i.e., in body/content area). + """ + x0, y0, x1, y1 = bbox + + # Calculate relative positions + top_ratio = y0 / page_height if page_height > 0 else 0 + bottom_ratio = y1 / page_height if page_height > 0 else 0 + height_ratio = (y1 - y0) / page_height if page_height > 0 else 0 + + # AGGRESSIVE header/footer detection - use 25% threshold for top and bottom + # This ensures we only extract images from the middle 50% of the page (body area) + HEADER_THRESHOLD = 0.25 # Top 25% is header + FOOTER_THRESHOLD = 0.25 # Bottom 25% is footer + BODY_START = HEADER_THRESHOLD # Body starts at 25% + BODY_END = 1.0 - FOOTER_THRESHOLD # Body ends at 75% + + # PRIMARY CHECK: Image must be ENTIRELY in the body area (middle 50%) + # If ANY part of the image is in header or footer, skip it + image_center_y = (y0 + y1) / 2.0 / page_height if page_height > 0 else 0 + + # Check if image is completely in header region (top 25%) + if bottom_ratio <= HEADER_THRESHOLD: + logger.info("Image in header region (top: %.2f%%, bottom: %.2f%%)", top_ratio * 100, bottom_ratio * 100) + return True + + # Check if image is completely in footer region (bottom 25%) + if top_ratio >= BODY_END: + logger.info("Image in footer region (top: %.2f%%, bottom: %.2f%%)", top_ratio * 100, bottom_ratio * 100) + return True + + # Check if image overlaps header (starts in header, even if extends into body) + if top_ratio < HEADER_THRESHOLD: + logger.info("Image overlaps header region (top: %.2f%%, bottom: %.2f%%)", top_ratio * 100, bottom_ratio * 100) + return True + + # Check if image overlaps footer (ends in footer, even if starts in body) + if bottom_ratio > BODY_END: + logger.info("Image overlaps footer region (top: %.2f%%, bottom: %.2f%%)", top_ratio * 100, bottom_ratio * 100) + return True + + # Check if image center is in header or footer (even if image spans both) + if image_center_y < HEADER_THRESHOLD or image_center_y > BODY_END: + logger.info("Image center in header/footer (center: %.2f%%)", image_center_y * 100) + return True + + # Check if image is very small and near edges (likely logo/icon) + if height_ratio < 0.10: # Less than 10% of page height + # If it's small and in top 30% or bottom 30%, likely header/footer + if top_ratio < 0.30 or bottom_ratio > 0.70: + logger.info("Small image near header/footer (height: %.2f%%, top: %.2f%%, bottom: %.2f%%)", + height_ratio * 100, top_ratio * 100, bottom_ratio * 100) + return True + + # Image is in body/content area - allow it + return False + + +def crop_header_footer(image_path: Path, output_path: Path, header_ratio: float = HEADER_THRESHOLD, footer_ratio: float = FOOTER_THRESHOLD) -> bool: + """ + Crop header and footer regions from a full-page image. + Returns True if cropping was successful, False otherwise. + """ + try: + img = Image.open(image_path) + width, height = img.size + + # Calculate crop boundaries + header_pixels = int(height * header_ratio) + footer_pixels = int(height * footer_ratio) + + # Ensure there's enough content height left after cropping + remaining_height = height - header_pixels - footer_pixels + remaining_ratio = remaining_height / height + + if remaining_ratio < MIN_CONTENT_HEIGHT: + logger.warning("Cropping would remove too much content from %s (remaining: %.2f%% < %.2f%%), skipping crop", + image_path.name, remaining_ratio * 100, MIN_CONTENT_HEIGHT * 100) + return False + + # Crop: remove top (header) and bottom (footer) + cropped = img.crop((0, header_pixels, width, height - footer_pixels)) + + # Save cropped image + cropped.save(output_path) + logger.info("Cropped header/footer from %s (removed %dpx top, %dpx bottom, remaining: %.2f%%)", + image_path.name, header_pixels, footer_pixels, remaining_ratio * 100) + return True + except Exception as exc: + logger.warning("Failed to crop header/footer from %s: %s", image_path, exc) + return False + + +def extract_images_from_pdf(pdf_path: Path, output_dir: Path) -> List[Path]: + """ + Extract all images from a PDF file. + Returns list of paths to extracted image files. + """ + extracted_images: List[Path] = [] + + if not HAS_PYMUPDF: + logger.warning("PyMuPDF not available, cannot extract images from PDF") + return extracted_images + + try: + doc = fitz.open(pdf_path) + image_count = 0 + skipped_count = 0 + + for page_num, page in enumerate(doc): + page_rect = page.rect + page_height = page_rect.height + page_width = page_rect.width + + # Extract embedded images + image_list = page.get_images() + + # Log total images found on this page BEFORE filtering + logger.info("Page %d: Found %d embedded images (page size: %.0fx%.0f)", + page_num, len(image_list), page_width, page_height) + + for img_index, img in enumerate(image_list): + try: + xref = img[0] + base_image = doc.extract_image(xref) + image_bytes = base_image["image"] + image_ext = base_image["ext"] + + logger.debug("Processing image %d from page %d (xref: %d, ext: %s, size: %d bytes)", + img_index, page_num, xref, image_ext, len(image_bytes)) + + # Get image position and size for header/footer detection + is_header_footer = False + image_rect = None + img_width, img_height = 0, 0 + position_detection_succeeded = False + size_detection_succeeded = False + aspect_ratio = 0.0 + img_height_ratio = 0.0 + img_width_ratio = 0.0 + + # PRIMARY METHOD: Check position FIRST (most reliable for header/footer detection) + # Position-based detection is the most accurate way to determine if image is in body area + try: + image_rect = page.get_image_rect(xref) + if image_rect and not image_rect.is_empty and image_rect.width > 0 and image_rect.height > 0: + position_detection_succeeded = True + # Check if image is in header/footer based on position + bbox = (image_rect.x0, image_rect.y0, image_rect.x1, image_rect.y1) + if is_header_footer_image(bbox, page_height, page_width): + logger.info("Skipping header/footer image %d from page %d (position: y0=%.1f, y1=%.1f, height=%.1f, width=%.1f)", + img_index, page_num, image_rect.y0, image_rect.y1, image_rect.height, image_rect.width) + skipped_count += 1 + is_header_footer = True + except Exception as bbox_exc: + logger.debug("Could not get image rect for image %d on page %d: %s", img_index, page_num, bbox_exc) + position_detection_succeeded = False + + # SECONDARY METHOD: Check size (only if position check didn't catch it or failed) + # Use size-based detection as a fallback for banner-like images + if not is_header_footer: + try: + # Check image dimensions - useful for catching banners + from PIL import Image as PILImage + from io import BytesIO + img_obj = PILImage.open(BytesIO(image_bytes)) + img_width, img_height = img_obj.size + size_detection_succeeded = True + + # Calculate relative size + img_height_ratio = img_height / page_height if page_height > 0 else 0 + img_width_ratio = img_width / page_width if page_width > 0 else 0 + aspect_ratio = img_width / img_height if img_height > 0 else 0 + + # Size-based filtering: Skip banner-like images + # These checks catch wide banners and small logos/icons + + # 1. Very small absolute height (< 300px) - catches logos and small banners + is_very_small_height = img_height < 300 + + # 2. Banner aspect ratio (width >> height) - catches wide banners + is_banner_aspect = aspect_ratio > 2.5 + + # 3. Short relative to page (< 30% of page height) - catches banners + is_short_relative = img_height_ratio < 0.30 + + # 4. Tiny relative size (< 20% height AND < 50% width) - catches icons/logos + is_tiny_relative = (img_height_ratio < 0.20 and img_width_ratio < 0.50) + + # 5. Wide banner pattern: short height (< 400px) AND wide (width > 2x height) + is_wide_banner_pattern = (img_height < 400 and img_width > img_height * 2.0) + + # 6. Typical banner size: very wide (> 1000px) AND short (< 300px) + is_typical_banner_size = (img_width > 1000 and img_height < 300) + + # 7. Very wide images: width > 800px AND height < 250px + is_very_wide = (img_width > 800 and img_height < 250) + + # 8. Short and wide: height < 250px AND width > 600px + is_short_wide = (img_height < 250 and img_width > 600) + + # 9. Very common banner: width > 600px AND height < 200px + is_common_banner = (img_width > 600 and img_height < 200) + + # Combine checks - skip if it looks like a banner or header/footer element + is_likely_header_footer = ( + is_very_small_height or + is_banner_aspect or + is_short_relative or + is_tiny_relative or + is_wide_banner_pattern or + is_typical_banner_size or + is_very_wide or + is_short_wide or + is_common_banner or + # If short AND wide, definitely skip + (is_short_relative and is_banner_aspect) or + # Final catch-all: if width is much larger than height, skip + (img_width > img_height * 2.0 and img_height < 400) + ) + + if is_likely_header_footer: + logger.info("Skipping header/footer image %d from page %d (size-based: %dx%d, aspect: %.2f, height_ratio: %.2f%%, width_ratio: %.2f%%)", + img_index, page_num, img_width, img_height, aspect_ratio, + img_height_ratio * 100, img_width_ratio * 100) + skipped_count += 1 + is_header_footer = True + except Exception as size_exc: + logger.debug("Could not analyze image size for image %d on page %d: %s", img_index, page_num, size_exc) + size_detection_succeeded = False + + # FINAL SAFETY: If position detection failed, be more aggressive + # If we can't verify position, skip images that are suspicious + if not position_detection_succeeded and size_detection_succeeded and not is_header_footer: + # Skip images larger than the page (likely background/header/footer images) + if img_height_ratio > 1.0 or img_width_ratio > 1.0: + logger.info("Skipping image %d from page %d (position unknown, but image larger than page: height_ratio=%.1f%%, width_ratio=%.1f%%)", + img_index, page_num, img_height_ratio * 100, img_width_ratio * 100) + skipped_count += 1 + is_header_footer = True + # Also skip if image is very large relative to page (likely background) + elif img_height_ratio > 0.80 or img_width_ratio > 0.80: + logger.info("Skipping image %d from page %d (position unknown, but image very large relative to page: height_ratio=%.1f%%, width_ratio=%.1f%%)", + img_index, page_num, img_height_ratio * 100, img_width_ratio * 100) + skipped_count += 1 + is_header_footer = True + + # FINAL SAFETY: If we can't determine position AND size, skip the image (conservative approach) + # This prevents unknown images from slipping through + if not position_detection_succeeded and not size_detection_succeeded and not is_header_footer: + logger.warning("Cannot determine position or size for image %d on page %d, skipping for safety (cannot verify it's in body area)", img_index, page_num) + skipped_count += 1 + is_header_footer = True + + # Skip this image if it's in header/footer + if is_header_footer: + continue + + # Save image (not in header/footer, passed all checks - must be in body area) + image_filename = f"page_{page_num}_img_{img_index}.{image_ext}" + image_path = output_dir / image_filename + + # Get position info for logging + position_info = "" + if image_rect: + # Calculate relative position to show it's in body area + y0_ratio = image_rect.y0 / page_height if page_height > 0 else 0 + y1_ratio = image_rect.y1 / page_height if page_height > 0 else 0 + position_info = f", position: y0={image_rect.y0:.1f} ({y0_ratio*100:.1f}%), y1={image_rect.y1:.1f} ({y1_ratio*100:.1f}%) [BODY AREA]" + elif size_detection_succeeded: + position_info = f", size: {img_width}x{img_height}, aspect_ratio={aspect_ratio:.2f}, height_ratio={img_height_ratio*100:.1f}%" + + with open(image_path, "wb") as img_file: + img_file.write(image_bytes) + + extracted_images.append(image_path) + image_count += 1 + logger.info("Extracted image %s from PDF page %d (BODY CONTENT image, size: %dx%d%s)", + image_filename, page_num, img_width if img_width > 0 else 0, img_height if img_height > 0 else 0, position_info) + except Exception as exc: + logger.warning("Failed to extract image %d from page %d: %s", img_index, page_num, exc) + + # DO NOT extract full-page images - only extract embedded images + # Full-page images often contain headers/footers and are not needed + # We only want actual embedded images from the document content + logger.debug("Skipping full-page image extraction for page %d (only extracting embedded images)", page_num) + + doc.close() + if skipped_count > 0: + logger.info("Extracted %d images from PDF %s (skipped %d header/footer images)", + image_count, pdf_path.name, skipped_count) + else: + logger.info("Extracted %d images from PDF %s", image_count, pdf_path.name) + return extracted_images + + except Exception as exc: + logger.exception("Failed to extract images from PDF %s: %s", pdf_path, exc) + return extracted_images + + +def extract_images_from_docx(docx_path: Path, output_dir: Path) -> List[Path]: + """ + Extract all embedded images from a DOCX file. + Returns list of paths to extracted image files. + """ + extracted_images: List[Path] = [] + + if not HAS_DOCX: + logger.warning("python-docx not available, cannot extract images from DOCX") + return extracted_images + + try: + doc = DocxDocument(docx_path) + image_count = 0 + + # Access document relationships to find images + for rel_id, rel in doc.part.rels.items(): + # Check if relationship is an image + if "image" in rel.target_ref or rel.target_part.content_type.startswith("image/"): + try: + image_part = rel.target_part + image_bytes = image_part.blob + + # Determine image extension from content type + content_type = image_part.content_type + ext_map = { + "image/png": "png", + "image/jpeg": "jpg", + "image/jpg": "jpg", + "image/gif": "gif", + "image/bmp": "bmp", + "image/webp": "webp", + } + ext = ext_map.get(content_type, "png") + + # Check image size - small images are likely logos/icons (header/footer) + try: + from PIL import Image as PILImage + from io import BytesIO + img_obj = PILImage.open(BytesIO(image_bytes)) + img_width, img_height = img_obj.size + # Skip very small images (likely logos/icons in headers/footers) + if img_width < 200 and img_height < 200: + logger.debug("Skipping small image from DOCX (likely header/footer logo, size: %dx%d)", + img_width, img_height) + continue + except Exception: + pass # Continue with extraction if size check fails + + # Save image + image_filename = f"docx_img_{image_count}.{ext}" + image_path = output_dir / image_filename + + with open(image_path, "wb") as img_file: + img_file.write(image_bytes) + + extracted_images.append(image_path) + image_count += 1 + logger.debug("Extracted image %s from DOCX", image_filename) + except Exception as exc: + logger.warning("Failed to extract image from DOCX: %s", exc) + + logger.info("Extracted %d images from DOCX %s", image_count, docx_path.name) + return extracted_images + + except Exception as exc: + logger.exception("Failed to extract images from DOCX %s: %s", docx_path, exc) + return extracted_images + + +def extract_images_from_pptx(pptx_path: Path, output_dir: Path) -> List[Path]: + """ + Extract all images from a PPTX file. + Returns list of paths to extracted image files. + """ + extracted_images: List[Path] = [] + + if not HAS_PPTX: + logger.warning("python-pptx not available, cannot extract images from PPTX") + return extracted_images + + try: + prs = Presentation(pptx_path) + image_count = 0 + + for slide_num, slide in enumerate(prs.slides): + for shape_num, shape in enumerate(slide.shapes): + # Check if shape is a picture + if hasattr(shape, "image"): + try: + image = shape.image + image_bytes = image.blob + + # Determine extension from content type + ext = image.ext # Usually 'png', 'jpg', etc. + if not ext: + ext = "png" + + # Check image size and position + # Small images at edges are likely logos/icons + try: + from PIL import Image as PILImage + from io import BytesIO + img_obj = PILImage.open(BytesIO(image_bytes)) + img_width, img_height = img_obj.size + + # Get shape position (if available) + shape_left = shape.left if hasattr(shape, 'left') else 0 + shape_top = shape.top if hasattr(shape, 'top') else 0 + slide_width = slide.slide_width if hasattr(slide, 'slide_width') else 10000 + slide_height = slide.slide_height if hasattr(slide, 'slide_height') else 10000 + + # Check if small image is in corner (likely logo) + is_small = img_width < 200 and img_height < 200 + is_in_corner = ( + (shape_left < slide_width * 0.1 and shape_top < slide_height * 0.1) or # Top-left + (shape_left > slide_width * 0.9 and shape_top < slide_height * 0.1) or # Top-right + (shape_left < slide_width * 0.1 and shape_top > slide_height * 0.9) or # Bottom-left + (shape_left > slide_width * 0.9 and shape_top > slide_height * 0.9) # Bottom-right + ) + + if is_small and is_in_corner: + logger.debug("Skipping small corner image from slide %d (likely header/footer logo)", slide_num) + continue + except Exception: + pass # Continue with extraction if check fails + + # Save image + image_filename = f"slide_{slide_num}_img_{shape_num}.{ext}" + image_path = output_dir / image_filename + + with open(image_path, "wb") as img_file: + img_file.write(image_bytes) + + extracted_images.append(image_path) + image_count += 1 + logger.debug("Extracted image %s from slide %d", image_filename, slide_num) + except Exception as exc: + logger.warning("Failed to extract image from shape: %s", exc) + + logger.info("Extracted %d images from PPTX %s", image_count, pptx_path.name) + return extracted_images + + except Exception as exc: + logger.exception("Failed to extract images from PPTX %s: %s", pptx_path, exc) + return extracted_images + + +def extract_images_from_file(file_path: Path, output_dir: Path) -> List[Path]: + """ + Extract images from a file based on its type. + Returns list of paths to extracted image files. + """ + suffix = file_path.suffix.lower() + output_dir.mkdir(parents=True, exist_ok=True) + + if suffix == ".pdf": + return extract_images_from_pdf(file_path, output_dir) + elif suffix == ".docx": + return extract_images_from_docx(file_path, output_dir) + elif suffix in {".pptx", ".ppt"}: + return extract_images_from_pptx(file_path, output_dir) + else: + logger.debug("No image extraction needed for file type: %s", suffix) + return [] + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/jobs.py b/services/multi-document-upload-service/src/multi_document_upload_service/jobs.py new file mode 100644 index 0000000..cf3f948 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/jobs.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import json +import threading +import uuid +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, Optional + +from .models import JobRecord, JobStage + + +class JobStore: + """Simple persistent job store backed by a JSON file.""" + + def __init__(self, storage_root: Path): + self._storage_root = Path(storage_root) + self._jobs_dir = self._storage_root / "jobs" + self._jobs_dir.mkdir(parents=True, exist_ok=True) + self._index_path = self._jobs_dir / "index.json" + self._lock = threading.Lock() + self._jobs: Dict[str, JobRecord] = {} + self._load() + + def _load(self) -> None: + if self._index_path.exists(): + try: + data = json.loads(self._index_path.read_text()) + self._jobs = {job_id: JobRecord.model_validate(job_data) for job_id, job_data in data.items()} + except Exception as exc: # noqa: BLE001 + print(f"[JobStore] Failed to load job index: {exc}") + self._jobs = {} + + def _persist(self) -> None: + serializable = {job_id: job.model_dump(mode="json") for job_id, job in self._jobs.items()} + tmp_path = self._index_path.with_suffix(".json.tmp") + tmp_path.write_text(json.dumps(serializable, indent=2, default=str)) + tmp_path.replace(self._index_path) + + def create(self, name: Optional[str], total_files: int) -> JobRecord: + with self._lock: + job_id = uuid.uuid4().hex + job = JobRecord(id=job_id, name=name, total_files=total_files) + self._jobs[job_id] = job + self._persist() + return job + + def update(self, job_id: str, **kwargs) -> JobRecord: + with self._lock: + job = self._jobs[job_id] + for key, value in kwargs.items(): + setattr(job, key, value) + job.updated_at = datetime.utcnow() + self._jobs[job_id] = job + self._persist() + return job + + def get(self, job_id: str) -> JobRecord: + with self._lock: + return self._jobs[job_id] + + def exists(self, job_id: str) -> bool: + with self._lock: + return job_id in self._jobs + + def list_jobs(self) -> Dict[str, JobRecord]: + with self._lock: + return dict(self._jobs) + + def mark_error(self, job_id: str, message: str) -> JobRecord: + return self.update( + job_id, + stage=JobStage.FAILED, + status_message=message, + error=message, + ) + + def cleanup(self, older_than_days: int) -> int: + """Remove jobs older than the retention threshold.""" + cutoff = datetime.utcnow() - timedelta(days=older_than_days) + removed = 0 + with self._lock: + for job_id in list(self._jobs.keys()): + if self._jobs[job_id].created_at < cutoff: + removed += 1 + del self._jobs[job_id] + if removed: + self._persist() + return removed + + +__all__ = ["JobStore"] + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/main.py b/services/multi-document-upload-service/src/multi_document_upload_service/main.py new file mode 100644 index 0000000..5d8bd45 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/main.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import List, Optional + +from fastapi import BackgroundTasks, Depends, FastAPI, File, Form, HTTPException, UploadFile +from fastapi.middleware.cors import CORSMiddleware + +from .claude_client import ClaudeCausalExtractor +from .config import Settings, get_settings +from .jobs import JobStore +from .models import CreateJobResponse, JobGraphSummary, JobStage, JobStatusResponse +from .processors.graph_writer import GraphWriter +from .storage import StorageManager +from .workflows.pipeline import JobPipeline + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +app = FastAPI( + title="Multi Document Upload Service", + version="0.1.0", + description="Processes multi-format documents to build causal knowledge graphs using Claude.", +) + + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@dataclass +class ServiceContainer: + settings: Settings + storage: StorageManager + job_store: JobStore + graph_writer: GraphWriter + claude_extractor: ClaudeCausalExtractor + pipeline: JobPipeline + + +_container: ServiceContainer | None = None + + +def get_container() -> ServiceContainer: + global _container + if _container is None: + settings = get_settings() + if not settings.anthropic_api_key: + raise HTTPException(status_code=500, detail="ANTHROPIC_API_KEY is not configured") + + storage = StorageManager(settings.storage_root) + job_store = JobStore(settings.storage_root) + graph_writer = GraphWriter(settings.neo4j_uri, settings.neo4j_user, settings.neo4j_password) + claude_extractor = ClaudeCausalExtractor( + api_key=settings.anthropic_api_key, + model=settings.claude_model, + max_output_tokens=min(settings.claude_max_output_tokens, 4000), + ) + pipeline = JobPipeline( + job_store=job_store, + storage=storage, + graph_writer=graph_writer, + claude_extractor=claude_extractor, + ) + _container = ServiceContainer( + settings=settings, + storage=storage, + job_store=job_store, + graph_writer=graph_writer, + claude_extractor=claude_extractor, + pipeline=pipeline, + ) + return _container + + +def get_dependencies() -> ServiceContainer: + return get_container() + + +@app.post("/jobs", response_model=CreateJobResponse, status_code=202) +async def create_job( + background_tasks: BackgroundTasks, + files: List[UploadFile] = File(...), + job_name: Optional[str] = Form(default=None), + container: ServiceContainer = Depends(get_dependencies), +) -> CreateJobResponse: + settings = container.settings + storage = container.storage + job_store = container.job_store + pipeline = container.pipeline + + if not files: + raise HTTPException(status_code=400, detail="At least one file must be uploaded.") + if len(files) > settings.max_files_per_job: + raise HTTPException(status_code=400, detail="Too many files uploaded for a single job.") + + total_size_bytes = 0 + for file in files: + file.file.seek(0, 2) + total_size_bytes += file.file.tell() + file.file.seek(0) + if total_size_bytes > settings.max_upload_size_mb * 1024 * 1024: + raise HTTPException(status_code=400, detail="Uploaded files exceed maximum allowed size.") + + job = job_store.create(job_name, total_files=len(files)) + job.stage = JobStage.SAVING_FILES + + saved_paths: List[str] = [] + for upload in files: + file_record = storage.save_upload(job.id, upload) + saved_paths.append(file_record.stored_path) + job.files.append(file_record) + + job_store.update( + job.id, + stage=JobStage.EXTRACTING, + status_message="Files saved; extraction queued.", + files=job.files, + ) + + background_tasks.add_task(pipeline.process_job, job.id, saved_paths) + + return CreateJobResponse( + job_id=job.id, + stage=job.stage, + total_files=job.total_files, + created_at=job.created_at, + ) + + +@app.get("/jobs/{job_id}", response_model=JobStatusResponse) +async def get_job_status(job_id: str, container: ServiceContainer = Depends(get_dependencies)) -> JobStatusResponse: + job_store = container.job_store + if not job_store.exists(job_id): + raise HTTPException(status_code=404, detail="Job not found") + job = job_store.get(job_id) + return JobStatusResponse( + job_id=job.id, + stage=job.stage, + status_message=job.status_message, + total_files=job.total_files, + processed_files=job.processed_files, + error=job.error, + created_at=job.created_at, + updated_at=job.updated_at, + files=job.files, + ) + + +@app.get("/jobs/{job_id}/graph", response_model=JobGraphSummary) +async def get_job_graph(job_id: str, container: ServiceContainer = Depends(get_dependencies)) -> JobGraphSummary: + job_store = container.job_store + if not job_store.exists(job_id): + raise HTTPException(status_code=404, detail="Job not found") + job = job_store.get(job_id) + if job.stage != JobStage.COMPLETED: + raise HTTPException(status_code=409, detail="Job not completed yet") + return JobGraphSummary( + job_id=job.id, + relations=job.relations, + node_count=len({rel.cause for rel in job.relations} | {rel.effect for rel in job.relations}), + edge_count=len(job.relations), + generated_at=job.updated_at, + ) + + +@app.get("/health") +async def healthcheck(container: ServiceContainer = Depends(get_dependencies)): + settings = container.settings + return { + "status": "ok", + "claude_model": settings.claude_model, + "max_input_tokens_per_min": settings.claude_max_input_tokens, + "max_output_tokens_per_min": settings.claude_max_output_tokens, + } + + +@app.on_event("shutdown") +async def shutdown_event() -> None: + container = _container + if container: + container.graph_writer.close() + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/models.py b/services/multi-document-upload-service/src/multi_document_upload_service/models.py new file mode 100644 index 0000000..e55e9b1 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/models.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class JobStage(str, Enum): + RECEIVED = "received" + SAVING_FILES = "saving_files" + EXTRACTING = "extracting" + ANALYZING = "analyzing" + BUILDING_GRAPH = "building_graph" + COMPLETED = "completed" + FAILED = "failed" + + +class FileRecord(BaseModel): + id: str + filename: str + content_type: str | None = None + size_bytes: int + stored_path: str + extracted_path: str | None = None + error: str | None = None + + +class CausalRelation(BaseModel): + cause: str + effect: str + confidence: float = Field(default=0.0, ge=0.0, le=1.0) + explanation: Optional[str] = None + source_file_id: Optional[str] = None + source_snippet: Optional[str] = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class JobRecord(BaseModel): + id: str + name: str | None = None + stage: JobStage = JobStage.RECEIVED + status_message: str | None = None + files: List[FileRecord] = Field(default_factory=list) + total_files: int = 0 + processed_files: int = 0 + relations: List[CausalRelation] = Field(default_factory=list) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + error: str | None = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + @property + def is_finished(self) -> bool: + return self.stage in {JobStage.COMPLETED, JobStage.FAILED} + + +class CreateJobResponse(BaseModel): + job_id: str + stage: JobStage + total_files: int + created_at: datetime + + +class JobStatusResponse(BaseModel): + job_id: str + stage: JobStage + status_message: str | None = None + total_files: int + processed_files: int + error: str | None = None + created_at: datetime + updated_at: datetime + files: List[FileRecord] + + +class JobGraphSummary(BaseModel): + job_id: str + relations: List[CausalRelation] + node_count: int + edge_count: int + generated_at: datetime + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/processors/chunker.py b/services/multi-document-upload-service/src/multi_document_upload_service/processors/chunker.py new file mode 100644 index 0000000..89f914e --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/processors/chunker.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from typing import Iterable, List + +import tiktoken + + +class TextChunker: + def __init__(self, model_name: str, token_target: int = 800, overlap: int = 200): + self.encoder = tiktoken.encoding_for_model("gpt-4o") if "claude" not in model_name else tiktoken.get_encoding("cl100k_base") + self.token_target = token_target + self.overlap = overlap + + def chunk(self, text: str) -> Iterable[str]: + tokens = self.encoder.encode(text) + step = max(self.token_target - self.overlap, 1) + chunks: List[str] = [] + for start in range(0, len(tokens), step): + end = min(start + self.token_target, len(tokens)) + chunk_tokens = tokens[start:end] + chunk_text = self.encoder.decode(chunk_tokens) + chunks.append(chunk_text) + return chunks + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/processors/graph_writer.py b/services/multi-document-upload-service/src/multi_document_upload_service/processors/graph_writer.py new file mode 100644 index 0000000..aadd5bc --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/processors/graph_writer.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import logging +from typing import Iterable + +from neo4j import GraphDatabase, Transaction + +from ..models import CausalRelation + +logger = logging.getLogger(__name__) + + +MERGE_QUERY = """ +MERGE (cause:Concept {name: $cause}) +ON CREATE SET cause.created_at = timestamp(), cause.lastSeen = timestamp() +ON MATCH SET cause.lastSeen = timestamp() +MERGE (effect:Concept {name: $effect}) +ON CREATE SET effect.created_at = timestamp(), effect.lastSeen = timestamp() +ON MATCH SET effect.lastSeen = timestamp() +MERGE (cause)-[r:CAUSES]->(effect) +ON CREATE SET r.confidence = $confidence, + r.explanation = $explanation, + r.source_file_id = $source_file_id, + r.source_snippet = $source_snippet, + r.job_id = $job_id, + r.model = $model, + r.created_at = timestamp(), + r.updated_at = timestamp() +ON MATCH SET r.confidence = $confidence, + r.explanation = $explanation, + r.source_file_id = $source_file_id, + r.source_snippet = $source_snippet, + r.job_id = $job_id, + r.model = $model, + r.updated_at = timestamp() +""" + + +class GraphWriter: + def __init__(self, uri: str, user: str, password: str): + self._driver = GraphDatabase.driver(uri, auth=(user, password)) + + def close(self) -> None: + self._driver.close() + + def write_relations(self, job_id: str, relations: Iterable[CausalRelation]) -> None: + relations_list = list(relations) + if not relations_list: + logger.warning("No relations to write for job %s", job_id) + return + + logger.info("Writing %d relations to Neo4j for job %s", len(relations_list), job_id) + + with self._driver.session() as session: + def _write(tx: Transaction) -> None: + count = 0 + for relation in relations_list: + if not relation.cause or not relation.effect: + logger.warning("Skipping relation with empty cause or effect: %s -> %s", relation.cause, relation.effect) + continue + try: + result = tx.run( + MERGE_QUERY, + cause=relation.cause.strip(), + effect=relation.effect.strip(), + confidence=float(relation.confidence) if relation.confidence else 0.0, + explanation=relation.explanation or "", + source_file_id=relation.source_file_id or "", + source_snippet=relation.source_snippet or "", + job_id=job_id, + model=relation.metadata.get("model") or "", + ) + count += 1 + logger.debug("Wrote relation: %s -> %s (confidence: %s)", relation.cause, relation.effect, relation.confidence) + except Exception as exc: + logger.exception("Failed to write relation %s -> %s: %s", relation.cause, relation.effect, exc) + logger.info("Successfully wrote %d/%d relations to Neo4j", count, len(relations_list)) + + session.execute_write(_write) + logger.info("Persisted causal relations for job %s", job_id) + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/storage.py b/services/multi-document-upload-service/src/multi_document_upload_service/storage.py new file mode 100644 index 0000000..4f2f700 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/storage.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import shutil +from pathlib import Path +from typing import Iterable, Tuple + +from fastapi import UploadFile + +from .models import FileRecord + + +class StorageManager: + def __init__(self, root: Path): + self.root = Path(root) + self.upload_dir = self.root / "uploads" + self.extract_dir = self.root / "extracted" + self.images_dir = self.root / "images" + self.upload_dir.mkdir(parents=True, exist_ok=True) + self.extract_dir.mkdir(parents=True, exist_ok=True) + self.images_dir.mkdir(parents=True, exist_ok=True) + + def save_upload(self, job_id: str, upload: UploadFile) -> FileRecord: + job_dir = self.upload_dir / job_id + job_dir.mkdir(parents=True, exist_ok=True) + + destination = job_dir / upload.filename + upload.file.seek(0) + with destination.open("wb") as out_file: + shutil.copyfileobj(upload.file, out_file) + + size_bytes = destination.stat().st_size + return FileRecord( + id=destination.stem, + filename=upload.filename, + content_type=upload.content_type, + size_bytes=size_bytes, + stored_path=str(destination), + ) + + def stage_extracted_content(self, job_id: str, file_name: str, content: str) -> Path: + job_dir = self.extract_dir / job_id + job_dir.mkdir(parents=True, exist_ok=True) + safe_name = f"{Path(file_name).stem}.txt" + destination = job_dir / safe_name + destination.write_text(content, encoding="utf-8") + return destination + + def list_saved_files(self, job_id: str) -> Iterable[Tuple[str, Path]]: + job_dir = self.upload_dir / job_id + if not job_dir.exists(): + return [] + return [(file.name, file) for file in job_dir.iterdir() if file.is_file()] + + def get_images_dir(self, job_id: str) -> Path: + """Get or create directory for extracted images.""" + images_dir = self.root / "images" / job_id + images_dir.mkdir(parents=True, exist_ok=True) + return images_dir + diff --git a/services/multi-document-upload-service/src/multi_document_upload_service/workflows/pipeline.py b/services/multi-document-upload-service/src/multi_document_upload_service/workflows/pipeline.py new file mode 100644 index 0000000..9987f03 --- /dev/null +++ b/services/multi-document-upload-service/src/multi_document_upload_service/workflows/pipeline.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Iterable, List + +from ..claude_client import ClaudeCausalExtractor +from ..config import get_settings +from ..extractors.auto import extract_text +from ..extractors.image_extractor import extract_images_from_file +from ..jobs import JobStore +from ..models import CausalRelation, JobStage +from ..processors.chunker import TextChunker +from ..processors.graph_writer import GraphWriter +from ..storage import StorageManager + +logger = logging.getLogger(__name__) + + +class JobPipeline: + def __init__( + self, + job_store: JobStore, + storage: StorageManager, + graph_writer: GraphWriter, + claude_extractor: ClaudeCausalExtractor, + ): + self.job_store = job_store + self.storage = storage + self.graph_writer = graph_writer + self.claude_extractor = claude_extractor + settings = get_settings() + self.chunker = TextChunker( + model_name=settings.claude_model, + token_target=settings.chunk_token_target, + overlap=settings.chunk_token_overlap, + ) + + def process_job(self, job_id: str, saved_files: Iterable[str]) -> None: + job = self.job_store.get(job_id) + logger.info("Processing job %s with %d files", job_id, job.total_files) + + relations: List[CausalRelation] = [] + + try: + self.job_store.update(job_id, stage=JobStage.EXTRACTING, status_message="Extracting content") + for count, file_path in enumerate(saved_files, start=1): + file_path_obj = Path(file_path) + file_record = next((f for f in job.files if f.stored_path == file_path), None) + logger.info("Processing %s", file_path_obj.name) + source_file_id = file_record.id if file_record else file_path_obj.name + suffix = file_path_obj.suffix.lower() + + # Check if this is a direct image upload + is_direct_image = suffix in {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"} + + try: + # Extract text from document (if not a direct image) + text = "" + if not is_direct_image: + try: + text = extract_text(file_path_obj) + + # Process text if available + if text and text.strip(): + # Validate text is readable + printable_chars = sum(1 for c in text if c.isprintable() or c.isspace()) + total_chars = len(text) + if total_chars > 100 and printable_chars / total_chars < 0.3: + logger.warning("Text from %s appears to be binary, skipping text processing", file_path_obj.name) + text = "" + else: + extracted_path = self.storage.stage_extracted_content(job_id, file_path_obj.name, text) + if file_record: + file_record.extracted_path = str(extracted_path) + logger.info("Successfully extracted %d characters from %s", len(text), file_path_obj.name) + except Exception as text_exc: + logger.warning("Text extraction failed for %s: %s. Will continue with image extraction if available.", file_path_obj.name, text_exc) + text = "" + + # Extract images from documents (PDF, DOCX, PPTX) + extracted_images: List[Path] = [] + if suffix in {".pdf", ".docx", ".pptx", ".ppt"}: + try: + images_dir = self.storage.get_images_dir(job_id) + extracted_images = extract_images_from_file(file_path_obj, images_dir) + logger.info("Extracted %d images from %s", len(extracted_images), file_path_obj.name) + except Exception as img_exc: + logger.warning("Failed to extract images from %s: %s", file_path_obj.name, img_exc) + + # For direct image uploads, add the file itself to images list + if is_direct_image: + extracted_images = [file_path_obj] + logger.info("Direct image upload detected: %s", file_path_obj.name) + + except Exception as exc: # noqa: BLE001 + logger.exception("Extraction failed for %s", file_path_obj) + if file_record: + file_record.error = str(exc) + continue + + self.job_store.update( + job_id, + files=job.files, + processed_files=count, + status_message=f"Analyzing causal relations ({count}/{job.total_files})", + stage=JobStage.ANALYZING, + ) + + # Process text content + if text and text.strip(): + chunks = self.chunker.chunk(text) + text_relations = self.claude_extractor.analyze(chunks, source_file_id=source_file_id) + relations.extend(text_relations) + logger.info("Extracted %d relations from text in %s", len(text_relations), file_path_obj.name) + + # Process images (extracted from documents or direct uploads) + if extracted_images: + for image_path in extracted_images: + try: + image_relations = self.claude_extractor.analyze_image(image_path, source_file_id=source_file_id) + relations.extend(image_relations) + logger.info("Extracted %d relations from image %s", len(image_relations), image_path.name) + except Exception as img_exc: + logger.warning("Failed to analyze image %s: %s", image_path, img_exc) + # Continue with other images + elif not text or not text.strip(): + # No text and no images - file might be empty or unsupported + logger.warning("File %s has no extractable text or images", file_path_obj.name) + if file_record: + file_record.error = "No extractable content found (no text or images)" + + # Write relations to Neo4j if any were found + if relations: + self.job_store.update(job_id, status_message="Writing to knowledge graph", stage=JobStage.BUILDING_GRAPH) + try: + self.graph_writer.write_relations(job_id, relations) + logger.info("Wrote %d relations to Neo4j for job %s", len(relations), job_id) + status_message = f"Completed with {len(relations)} causal relationship(s) written to Neo4j" + except Exception as graph_exc: + logger.exception("Failed to write relations to Neo4j for job %s: %s", job_id, graph_exc) + status_message = f"Completed with {len(relations)} relations extracted, but failed to write to Neo4j: {graph_exc}" + else: + logger.warning("Job %s completed with 0 relations - no causal relationships found", job_id) + # Check if any files failed to extract + failed_files = [f for f in job.files if f.error] + if failed_files: + status_message = f"Completed but {len(failed_files)} file(s) failed to extract. No relations found." + else: + status_message = "Completed but no causal relationships were found in the documents." + + # Final update + self.job_store.update( + job_id, + stage=JobStage.COMPLETED, + status_message=status_message, + relations=relations, + processed_files=job.total_files, + ) + logger.info("Job %s completed with %d relations", job_id, len(relations)) + except Exception as exc: # noqa: BLE001 + logger.exception("Job %s failed: %s", job_id, exc) + self.job_store.mark_error(job_id, f"Pipeline failed: {exc}") +