codenuk_backend_mine/services/ai-analysis-service/knowledge_graph/operations.py
2025-11-13 09:07:54 +05:30

215 lines
7.5 KiB
Python

"""
High-level knowledge graph operations used by the AI Analysis Service.
These helpers translate existing analysis objects into the node/relationship
structure expected by `Neo4jGraphClient`.
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from .neo4j_client import Neo4jGraphClient
def _safe_json(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (str, int, float, bool)):
return str(value)
try:
return json.dumps(value, default=str)
except Exception:
return str(value)
def _normalize_issue(issue: Any, index: int) -> Tuple[str, Dict[str, Any]]:
"""
Convert an issue structure that might be a string or dict into a dict.
Returns (summary, props).
"""
if isinstance(issue, dict):
summary = issue.get("title") or issue.get("issue") or issue.get("description") or f"Issue #{index}"
props = {
"summary": summary,
"severity": issue.get("severity", "medium"),
"category": issue.get("category", "general"),
"description": issue.get("description") or issue.get("details") or "",
"recommendation": issue.get("recommendation") or issue.get("action") or "",
"evidence": _safe_json(issue.get("evidence")),
}
if issue.get("impact"):
props["impact"] = issue["impact"]
if issue.get("line_number"):
props["line_number"] = issue["line_number"]
return summary, props
summary = str(issue)
return summary, {
"summary": summary,
"severity": "medium",
"category": "general",
}
def build_module_payload(
run_id: str,
repository_id: str,
module_name: str,
chunk: Dict[str, Any],
chunk_analysis: Dict[str, Any],
file_analyses: Sequence[Any],
metadata: Dict[str, Any],
ai_response: str,
) -> Dict[str, Any]:
"""Prepare module level payload for graph insertion."""
module_id = chunk.get("id") or str(uuid.uuid4())
module_quality = chunk_analysis.get("module_quality_score")
module_overview = chunk_analysis.get("module_overview", "")
module_architecture = chunk_analysis.get("module_architecture", "")
module_security = chunk_analysis.get("module_security_assessment", "")
module_recommendations = chunk_analysis.get("module_recommendations", [])
files: List[Dict[str, Any]] = []
findings: List[Dict[str, Any]] = []
total_issues = 0
total_recommendations = 0
for fa_index, fa in enumerate(file_analyses):
path = getattr(fa, "path", None) or getattr(fa, "file_path", "unknown")
issues = getattr(fa, "issues_found", None) or []
recommendations = getattr(fa, "recommendations", None) or []
total_issues += len(issues) if isinstance(issues, (list, tuple)) else 0
total_recommendations += len(recommendations) if isinstance(recommendations, (list, tuple)) else 0
files.append(
{
"path": str(path),
"props": {
"language": getattr(fa, "language", "unknown"),
"lines_of_code": getattr(fa, "lines_of_code", 0),
"complexity_score": getattr(fa, "complexity_score", 0),
"severity_score": getattr(fa, "severity_score", 0),
},
}
)
if isinstance(issues, Iterable):
for issue_index, raw_issue in enumerate(issues):
summary, issue_props = _normalize_issue(raw_issue, issue_index)
finding_id = f"{module_id}:{fa_index}:{issue_index}"
issue_props.update(
{
"module": module_name,
"file_path": str(path),
"created_at": datetime.utcnow().isoformat(),
}
)
findings.append(
{
"id": finding_id,
"props": issue_props,
"file_path": str(path),
}
)
module_props: Dict[str, Any] = {
"name": module_name,
"module_id": module_id,
"quality_score": module_quality,
"overview": module_overview,
"architecture": module_architecture,
"security": module_security,
"recommendations": module_recommendations,
"analysis_payload": metadata,
"ai_response": ai_response,
"repository_id": repository_id,
"total_files": len(file_analyses),
"total_issues": total_issues,
"total_recommendations": total_recommendations,
"updated_at": datetime.utcnow().isoformat(),
}
dependencies = []
for dependency in metadata.get("dependencies", {}).get("depends_on_chunks", []):
dependencies.append(
{
"target": dependency,
"kind": "depends_on",
"metadata": {"source": module_name},
}
)
return {
"module_props": module_props,
"files": files,
"findings": findings,
"dependencies": dependencies,
}
async def store_module_analysis(
client: Neo4jGraphClient,
run_id: str,
repository_id: str,
module_payload: Dict[str, Any],
) -> None:
await client.upsert_module_graph(
run_id=run_id,
repository_id=repository_id,
module_props=module_payload["module_props"],
files=module_payload["files"],
findings=module_payload["findings"],
dependencies=module_payload["dependencies"],
)
async def store_analysis_state(client: Neo4jGraphClient, run_id: str, analysis_state: Dict[str, Any]) -> None:
await client.upsert_run_state(run_id=run_id, state=analysis_state)
async def store_synthesis(client: Neo4jGraphClient, run_id: str, synthesis: Dict[str, Any]) -> None:
await client.upsert_synthesis(run_id=run_id, synthesis=synthesis)
async def fetch_module_analyses(client: Neo4jGraphClient, run_id: str) -> List[Dict[str, Any]]:
modules = await client.fetch_modules(run_id)
module_analyses: List[Dict[str, Any]] = []
for entry in modules:
node = entry.get("module", {})
files = entry.get("files", [])
findings = entry.get("findings", [])
analysis_payload = node.get("analysis_payload")
if isinstance(analysis_payload, str):
try:
analysis_payload = json.loads(analysis_payload)
except json.JSONDecodeError:
analysis_payload = {"raw": analysis_payload}
module_analyses.append(
{
"module_name": node.get("name"),
"module_id": node.get("module_id"),
"quality_score": node.get("quality_score"),
"module_overview": node.get("overview"),
"module_architecture": node.get("architecture"),
"module_security_assessment": node.get("security"),
"module_recommendations": node.get("recommendations"),
"files_analyzed": [file.get("path") for file in files if file.get("path")],
"raw_payload": analysis_payload,
"findings": findings,
}
)
return module_analyses
async def fetch_run_state(client: Neo4jGraphClient, run_id: str) -> Optional[Dict[str, Any]]:
return await client.fetch_run_state(run_id)
async def fetch_synthesis(client: Neo4jGraphClient, run_id: str) -> Optional[Dict[str, Any]]:
return await client.fetch_synthesis(run_id)