#!/usr/bin/env python3 """ Git Integration Client for AI Analysis Service Handles communication with Git Integration service to get repository data """ import os import requests import json from typing import Dict, List, Optional, Any from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class RepositoryInfo: """Repository information from Git Integration""" repository_id: str repository_name: str owner_name: str local_path: str branch_name: str is_public: bool sync_status: str total_files: int total_size: int languages: Dict[str, int] last_synced_at: str class GitIntegrationClient: """Client for communicating with Git Integration service""" def __init__(self, base_url: str = None): self.base_url = base_url or os.getenv('GIT_INTEGRATION_SERVICE_URL', 'http://localhost:8012') self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'User-Agent': 'AI-Analysis-Service/1.0' }) def get_repository_info(self, repository_id: str) -> Optional[RepositoryInfo]: """Get repository information from Git Integration service""" try: # First, get repository details repo_url = f"{self.base_url}/api/github/repository/{repository_id}/ui-view" response = self.session.get(repo_url, timeout=30) if response.status_code != 200: logger.error(f"Failed to get repository info: {response.status_code}") return None repo_data = response.json() if not repo_data.get('success'): logger.error(f"Repository not found: {repo_data.get('message')}") return None data = repo_data.get('data', {}) # Get storage info storage_info = data.get('storage_info', {}) local_path = storage_info.get('local_path') if not local_path or not os.path.exists(local_path): logger.error(f"Repository local path not found: {local_path}") return None # Get codebase analysis codebase_analysis = data.get('codebase_analysis', {}) return RepositoryInfo( repository_id=repository_id, repository_name=data.get('repository_name', ''), owner_name=data.get('owner_name', ''), local_path=local_path, branch_name=data.get('branch_name', 'main'), is_public=data.get('is_public', True), sync_status=data.get('sync_status', 'unknown'), total_files=codebase_analysis.get('total_files', 0), total_size=codebase_analysis.get('total_size', 0), languages=codebase_analysis.get('languages', {}), last_synced_at=data.get('last_synced_at', '') ) except Exception as e: logger.error(f"Error getting repository info: {e}") return None def get_repository_files(self, repository_id: str) -> List[Dict[str, Any]]: """Get list of files in the repository""" try: repo_info = self.get_repository_info(repository_id) if not repo_info: return [] files = [] for root, dirs, filenames in os.walk(repo_info.local_path): # Skip hidden directories dirs[:] = [d for d in dirs if not d.startswith('.')] for filename in filenames: if filename.startswith('.'): continue file_path = os.path.join(root, filename) rel_path = os.path.relpath(file_path, repo_info.local_path) try: stat = os.stat(file_path) files.append({ 'path': rel_path, 'full_path': file_path, 'size': stat.st_size, 'modified': stat.st_mtime, 'is_file': os.path.isfile(file_path) }) except OSError: continue return files except Exception as e: logger.error(f"Error getting repository files: {e}") return [] def get_file_content(self, repository_id: str, file_path: str) -> Optional[str]: """Get content of a specific file""" try: repo_info = self.get_repository_info(repository_id) if not repo_info: return None full_path = os.path.join(repo_info.local_path, file_path) if not os.path.exists(full_path): return None with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() except Exception as e: logger.error(f"Error reading file {file_path}: {e}") return None def sync_repository(self, repository_id: str) -> bool: """Trigger repository sync""" try: sync_url = f"{self.base_url}/api/github/repository/{repository_id}/sync" response = self.session.post(sync_url, timeout=60) if response.status_code == 200: result = response.json() return result.get('success', False) return False except Exception as e: logger.error(f"Error syncing repository: {e}") return False def get_repository_metadata(self, repository_id: str) -> Dict[str, Any]: """Get comprehensive repository metadata""" try: repo_info = self.get_repository_info(repository_id) if not repo_info: return {} files = self.get_repository_files(repository_id) return { 'repository_info': { 'id': repo_info.repository_id, 'name': repo_info.repository_name, 'owner': repo_info.owner_name, 'local_path': repo_info.local_path, 'branch': repo_info.branch_name, 'is_public': repo_info.is_public, 'sync_status': repo_info.sync_status, 'last_synced': repo_info.last_synced_at }, 'codebase_stats': { 'total_files': len(files), 'total_size': sum(f.get('size', 0) for f in files), 'languages': repo_info.languages }, 'files': files[:1000] # Limit to 1000 files for performance } except Exception as e: logger.error(f"Error getting repository metadata: {e}") return {} def get_all_repositories(self) -> List[Dict[str, Any]]: """Get all repositories from Git Integration service""" try: repos_url = f"{self.base_url}/api/diffs/repositories" response = self.session.get(repos_url, timeout=30) if response.status_code != 200: logger.error(f"Failed to get repositories: {response.status_code}") return [] repos_data = response.json() if not repos_data.get('success'): logger.error(f"Failed to fetch repositories: {repos_data.get('message')}") return [] return repos_data.get('data', {}).get('repositories', []) except Exception as e: logger.error(f"Error getting all repositories: {e}") return [] def get_repository_by_name(self, repository_name: str, owner_name: str = None) -> Optional[RepositoryInfo]: """Get repository by name and optional owner""" try: repositories = self.get_all_repositories() for repo in repositories: if repo.get('repository_name') == repository_name: if owner_name is None or repo.get('owner_name') == owner_name: return self.get_repository_info(repo.get('id')) return None except Exception as e: logger.error(f"Error getting repository by name: {e}") return None # Example usage if __name__ == "__main__": client = GitIntegrationClient() # Get all repositories first print("šŸ” Fetching all repositories from Git Integration service...") repositories = client.get_all_repositories() if repositories: print(f"šŸ“ Found {len(repositories)} repositories:") for repo in repositories: print(f" - {repo.get('repository_name')} by {repo.get('owner_name')} (ID: {repo.get('id')})") # Test with the first repository first_repo = repositories[0] repo_id = first_repo.get('id') print(f"\nšŸ” Testing with repository: {first_repo.get('repository_name')}") repo_info = client.get_repository_info(repo_id) if repo_info: print(f"āœ… Repository: {repo_info.repository_name}") print(f"šŸ“ Local path: {repo_info.local_path}") print(f"šŸ“„ Files: {repo_info.total_files}") print(f"🌐 Languages: {repo_info.languages}") else: print("āŒ Repository not found or not accessible") else: print("āŒ No repositories found in Git Integration service")