""" High-level knowledge graph operations used by the AI Analysis Service. These helpers translate existing analysis objects into the node/relationship structure expected by `Neo4jGraphClient`. """ from __future__ import annotations import json import uuid from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple from .neo4j_client import Neo4jGraphClient def _safe_json(value: Any) -> str: if value is None: return "" if isinstance(value, (str, int, float, bool)): return str(value) try: return json.dumps(value, default=str) except Exception: return str(value) def _normalize_issue(issue: Any, index: int) -> Tuple[str, Dict[str, Any]]: """ Convert an issue structure that might be a string or dict into a dict. Returns (summary, props). """ if isinstance(issue, dict): summary = issue.get("title") or issue.get("issue") or issue.get("description") or f"Issue #{index}" props = { "summary": summary, "severity": issue.get("severity", "medium"), "category": issue.get("category", "general"), "description": issue.get("description") or issue.get("details") or "", "recommendation": issue.get("recommendation") or issue.get("action") or "", "evidence": _safe_json(issue.get("evidence")), } if issue.get("impact"): props["impact"] = issue["impact"] if issue.get("line_number"): props["line_number"] = issue["line_number"] return summary, props summary = str(issue) return summary, { "summary": summary, "severity": "medium", "category": "general", } def build_module_payload( run_id: str, repository_id: str, module_name: str, chunk: Dict[str, Any], chunk_analysis: Dict[str, Any], file_analyses: Sequence[Any], metadata: Dict[str, Any], ai_response: str, ) -> Dict[str, Any]: """Prepare module level payload for graph insertion.""" module_id = chunk.get("id") or str(uuid.uuid4()) module_quality = chunk_analysis.get("module_quality_score") module_overview = chunk_analysis.get("module_overview", "") module_architecture = chunk_analysis.get("module_architecture", "") module_security = chunk_analysis.get("module_security_assessment", "") module_recommendations = chunk_analysis.get("module_recommendations", []) files: List[Dict[str, Any]] = [] findings: List[Dict[str, Any]] = [] total_issues = 0 total_recommendations = 0 for fa_index, fa in enumerate(file_analyses): path = getattr(fa, "path", None) or getattr(fa, "file_path", "unknown") issues = getattr(fa, "issues_found", None) or [] recommendations = getattr(fa, "recommendations", None) or [] total_issues += len(issues) if isinstance(issues, (list, tuple)) else 0 total_recommendations += len(recommendations) if isinstance(recommendations, (list, tuple)) else 0 files.append( { "path": str(path), "props": { "language": getattr(fa, "language", "unknown"), "lines_of_code": getattr(fa, "lines_of_code", 0), "complexity_score": getattr(fa, "complexity_score", 0), "severity_score": getattr(fa, "severity_score", 0), }, } ) if isinstance(issues, Iterable): for issue_index, raw_issue in enumerate(issues): summary, issue_props = _normalize_issue(raw_issue, issue_index) finding_id = f"{module_id}:{fa_index}:{issue_index}" issue_props.update( { "module": module_name, "file_path": str(path), "created_at": datetime.utcnow().isoformat(), } ) findings.append( { "id": finding_id, "props": issue_props, "file_path": str(path), } ) module_props: Dict[str, Any] = { "name": module_name, "module_id": module_id, "quality_score": module_quality, "overview": module_overview, "architecture": module_architecture, "security": module_security, "recommendations": module_recommendations, "analysis_payload": metadata, "ai_response": ai_response, "repository_id": repository_id, "total_files": len(file_analyses), "total_issues": total_issues, "total_recommendations": total_recommendations, "updated_at": datetime.utcnow().isoformat(), } dependencies = [] for dependency in metadata.get("dependencies", {}).get("depends_on_chunks", []): dependencies.append( { "target": dependency, "kind": "depends_on", "metadata": {"source": module_name}, } ) return { "module_props": module_props, "files": files, "findings": findings, "dependencies": dependencies, } async def store_module_analysis( client: Neo4jGraphClient, run_id: str, repository_id: str, module_payload: Dict[str, Any], ) -> None: await client.upsert_module_graph( run_id=run_id, repository_id=repository_id, module_props=module_payload["module_props"], files=module_payload["files"], findings=module_payload["findings"], dependencies=module_payload["dependencies"], ) async def store_analysis_state(client: Neo4jGraphClient, run_id: str, analysis_state: Dict[str, Any]) -> None: await client.upsert_run_state(run_id=run_id, state=analysis_state) async def store_synthesis(client: Neo4jGraphClient, run_id: str, synthesis: Dict[str, Any]) -> None: await client.upsert_synthesis(run_id=run_id, synthesis=synthesis) async def fetch_module_analyses(client: Neo4jGraphClient, run_id: str) -> List[Dict[str, Any]]: modules = await client.fetch_modules(run_id) module_analyses: List[Dict[str, Any]] = [] for entry in modules: node = entry.get("module", {}) files = entry.get("files", []) findings = entry.get("findings", []) analysis_payload = node.get("analysis_payload") if isinstance(analysis_payload, str): try: analysis_payload = json.loads(analysis_payload) except json.JSONDecodeError: analysis_payload = {"raw": analysis_payload} module_analyses.append( { "module_name": node.get("name"), "module_id": node.get("module_id"), "quality_score": node.get("quality_score"), "module_overview": node.get("overview"), "module_architecture": node.get("architecture"), "module_security_assessment": node.get("security"), "module_recommendations": node.get("recommendations"), "files_analyzed": [file.get("path") for file in files if file.get("path")], "raw_payload": analysis_payload, "findings": findings, } ) return module_analyses async def fetch_run_state(client: Neo4jGraphClient, run_id: str) -> Optional[Dict[str, Any]]: return await client.fetch_run_state(run_id) async def fetch_synthesis(client: Neo4jGraphClient, run_id: str) -> Optional[Dict[str, Any]]: return await client.fetch_synthesis(run_id)