codenuk_backend_mine/services/ai-analysis-service/git-integration-client.py
2025-10-24 13:02:49 +05:30

260 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""
Git Integration Client for AI Analysis Service
Handles communication with Git Integration service to get repository data
"""
import os
import requests
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class RepositoryInfo:
"""Repository information from Git Integration"""
repository_id: str
repository_name: str
owner_name: str
local_path: str
branch_name: str
is_public: bool
sync_status: str
total_files: int
total_size: int
languages: Dict[str, int]
last_synced_at: str
class GitIntegrationClient:
"""Client for communicating with Git Integration service"""
def __init__(self, base_url: str = None):
self.base_url = base_url or os.getenv('GIT_INTEGRATION_SERVICE_URL', 'http://localhost:8012')
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'AI-Analysis-Service/1.0'
})
def get_repository_info(self, repository_id: str) -> Optional[RepositoryInfo]:
"""Get repository information from Git Integration service"""
try:
# First, get repository details
repo_url = f"{self.base_url}/api/github/repository/{repository_id}/ui-view"
response = self.session.get(repo_url, timeout=30)
if response.status_code != 200:
logger.error(f"Failed to get repository info: {response.status_code}")
return None
repo_data = response.json()
if not repo_data.get('success'):
logger.error(f"Repository not found: {repo_data.get('message')}")
return None
data = repo_data.get('data', {})
# Get storage info
storage_info = data.get('storage_info', {})
local_path = storage_info.get('local_path')
if not local_path or not os.path.exists(local_path):
logger.error(f"Repository local path not found: {local_path}")
return None
# Get codebase analysis
codebase_analysis = data.get('codebase_analysis', {})
return RepositoryInfo(
repository_id=repository_id,
repository_name=data.get('repository_name', ''),
owner_name=data.get('owner_name', ''),
local_path=local_path,
branch_name=data.get('branch_name', 'main'),
is_public=data.get('is_public', True),
sync_status=data.get('sync_status', 'unknown'),
total_files=codebase_analysis.get('total_files', 0),
total_size=codebase_analysis.get('total_size', 0),
languages=codebase_analysis.get('languages', {}),
last_synced_at=data.get('last_synced_at', '')
)
except Exception as e:
logger.error(f"Error getting repository info: {e}")
return None
def get_repository_files(self, repository_id: str) -> List[Dict[str, Any]]:
"""Get list of files in the repository"""
try:
repo_info = self.get_repository_info(repository_id)
if not repo_info:
return []
files = []
for root, dirs, filenames in os.walk(repo_info.local_path):
# Skip hidden directories
dirs[:] = [d for d in dirs if not d.startswith('.')]
for filename in filenames:
if filename.startswith('.'):
continue
file_path = os.path.join(root, filename)
rel_path = os.path.relpath(file_path, repo_info.local_path)
try:
stat = os.stat(file_path)
files.append({
'path': rel_path,
'full_path': file_path,
'size': stat.st_size,
'modified': stat.st_mtime,
'is_file': os.path.isfile(file_path)
})
except OSError:
continue
return files
except Exception as e:
logger.error(f"Error getting repository files: {e}")
return []
def get_file_content(self, repository_id: str, file_path: str) -> Optional[str]:
"""Get content of a specific file"""
try:
repo_info = self.get_repository_info(repository_id)
if not repo_info:
return None
full_path = os.path.join(repo_info.local_path, file_path)
if not os.path.exists(full_path):
return None
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return None
def sync_repository(self, repository_id: str) -> bool:
"""Trigger repository sync"""
try:
sync_url = f"{self.base_url}/api/github/repository/{repository_id}/sync"
response = self.session.post(sync_url, timeout=60)
if response.status_code == 200:
result = response.json()
return result.get('success', False)
return False
except Exception as e:
logger.error(f"Error syncing repository: {e}")
return False
def get_repository_metadata(self, repository_id: str) -> Dict[str, Any]:
"""Get comprehensive repository metadata"""
try:
repo_info = self.get_repository_info(repository_id)
if not repo_info:
return {}
files = self.get_repository_files(repository_id)
return {
'repository_info': {
'id': repo_info.repository_id,
'name': repo_info.repository_name,
'owner': repo_info.owner_name,
'local_path': repo_info.local_path,
'branch': repo_info.branch_name,
'is_public': repo_info.is_public,
'sync_status': repo_info.sync_status,
'last_synced': repo_info.last_synced_at
},
'codebase_stats': {
'total_files': len(files),
'total_size': sum(f.get('size', 0) for f in files),
'languages': repo_info.languages
},
'files': files[:1000] # Limit to 1000 files for performance
}
except Exception as e:
logger.error(f"Error getting repository metadata: {e}")
return {}
def get_all_repositories(self) -> List[Dict[str, Any]]:
"""Get all repositories from Git Integration service"""
try:
repos_url = f"{self.base_url}/api/diffs/repositories"
response = self.session.get(repos_url, timeout=30)
if response.status_code != 200:
logger.error(f"Failed to get repositories: {response.status_code}")
return []
repos_data = response.json()
if not repos_data.get('success'):
logger.error(f"Failed to fetch repositories: {repos_data.get('message')}")
return []
return repos_data.get('data', {}).get('repositories', [])
except Exception as e:
logger.error(f"Error getting all repositories: {e}")
return []
def get_repository_by_name(self, repository_name: str, owner_name: str = None) -> Optional[RepositoryInfo]:
"""Get repository by name and optional owner"""
try:
repositories = self.get_all_repositories()
for repo in repositories:
if repo.get('repository_name') == repository_name:
if owner_name is None or repo.get('owner_name') == owner_name:
return self.get_repository_info(repo.get('id'))
return None
except Exception as e:
logger.error(f"Error getting repository by name: {e}")
return None
# Example usage
if __name__ == "__main__":
client = GitIntegrationClient()
# Get all repositories first
print("🔍 Fetching all repositories from Git Integration service...")
repositories = client.get_all_repositories()
if repositories:
print(f"📁 Found {len(repositories)} repositories:")
for repo in repositories:
print(f" - {repo.get('repository_name')} by {repo.get('owner_name')} (ID: {repo.get('id')})")
# Test with the first repository
first_repo = repositories[0]
repo_id = first_repo.get('id')
print(f"\n🔍 Testing with repository: {first_repo.get('repository_name')}")
repo_info = client.get_repository_info(repo_id)
if repo_info:
print(f"✅ Repository: {repo_info.repository_name}")
print(f"📁 Local path: {repo_info.local_path}")
print(f"📄 Files: {repo_info.total_files}")
print(f"🌐 Languages: {repo_info.languages}")
else:
print("❌ Repository not found or not accessible")
else:
print("❌ No repositories found in Git Integration service")