codenuk_backend_mine/services/ai-analysis-service/run_hierarchical_migration.py
2025-11-07 08:54:52 +05:30

212 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Run hierarchical schema migration (002-hierarchical-schema.sql)
This adds the new tables for hierarchical data structure (findings, metrics, report_sections, analysis_runs)
"""
import os
import sys
import subprocess
import time
import psycopg2
from pathlib import Path
from dotenv import load_dotenv
def log(message):
"""Log with timestamp."""
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")
def check_database_connection():
"""Check if database is available."""
try:
load_dotenv()
conn = psycopg2.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=os.getenv('POSTGRES_PORT', '5432'),
database=os.getenv('POSTGRES_DB', 'dev_pipeline'),
user=os.getenv('POSTGRES_USER', 'pipeline_admin'),
password=os.getenv('POSTGRES_PASSWORD', 'secure_pipeline_2024')
)
conn.close()
return True
except Exception as e:
log(f"Database connection failed: {e}")
return False
def run_migration_with_psycopg2():
"""Run migration using psycopg2 directly."""
try:
load_dotenv()
conn = psycopg2.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=os.getenv('POSTGRES_PORT', '5432'),
database=os.getenv('POSTGRES_DB', 'dev_pipeline'),
user=os.getenv('POSTGRES_USER', 'pipeline_admin'),
password=os.getenv('POSTGRES_PASSWORD', 'secure_pipeline_2024')
)
conn.autocommit = True
cursor = conn.cursor()
# Read schema file
schema_file = Path(__file__).parent / "002-hierarchical-schema.sql"
if not schema_file.exists():
log(f"ERROR: Schema file not found: {schema_file}")
return False
log(f"Reading schema file: {schema_file}")
with open(schema_file, 'r') as f:
schema_sql = f.read()
# Execute migration
log("Executing migration SQL...")
cursor.execute(schema_sql)
# Verify tables were created
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name IN ('findings', 'metrics', 'report_sections', 'analysis_runs')
ORDER BY table_name
""")
tables = cursor.fetchall()
table_names = [t[0] for t in tables]
expected_tables = ['analysis_runs', 'findings', 'metrics', 'report_sections']
missing_tables = [t for t in expected_tables if t not in table_names]
if missing_tables:
log(f"⚠️ Warning: Some tables not created: {missing_tables}")
else:
log(f"✅ All tables created successfully: {', '.join(table_names)}")
# Check indexes
cursor.execute("""
SELECT indexname
FROM pg_indexes
WHERE schemaname = 'public'
AND tablename IN ('findings', 'metrics', 'report_sections', 'analysis_runs')
ORDER BY tablename, indexname
""")
indexes = cursor.fetchall()
log(f"✅ Created {len(indexes)} indexes")
cursor.close()
conn.close()
return True
except Exception as e:
log(f"❌ Migration error: {e}")
import traceback
traceback.print_exc()
return False
def verify_tables():
"""Verify that all tables exist and have correct structure."""
try:
load_dotenv()
conn = psycopg2.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=os.getenv('POSTGRES_PORT', '5432'),
database=os.getenv('POSTGRES_DB', 'dev_pipeline'),
user=os.getenv('POSTGRES_USER', 'pipeline_admin'),
password=os.getenv('POSTGRES_PASSWORD', 'secure_pipeline_2024')
)
cursor = conn.cursor()
# Check each table
tables_to_check = {
'findings': ['id', 'run_id', 'module_name', 'severity', 'category', 'title', 'file_path'],
'metrics': ['id', 'run_id', 'module_name', 'lines_of_code', 'architecture_rating', 'security_rating'],
'report_sections': ['id', 'run_id', 'section_name', 'nontechnical_content', 'technical_content'],
'analysis_runs': ['id', 'run_id', 'repository_id', 'session_id', 'status']
}
log("Verifying table structures...")
all_verified = True
for table_name, expected_columns in tables_to_check.items():
cursor.execute(f"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s
""", (table_name,))
columns = [row[0] for row in cursor.fetchall()]
missing_columns = [col for col in expected_columns if col not in columns]
if missing_columns:
log(f"❌ Table '{table_name}' missing columns: {missing_columns}")
all_verified = False
else:
log(f"✅ Table '{table_name}' verified ({len(columns)} columns)")
cursor.close()
conn.close()
return all_verified
except Exception as e:
log(f"❌ Verification error: {e}")
return False
def run_migration():
"""Run the hierarchical schema migration."""
try:
log("=" * 60)
log("Starting Hierarchical Schema Migration (002-hierarchical-schema.sql)")
log("=" * 60)
# Check if database is available
max_retries = 10
retry_count = 0
while retry_count < max_retries:
if check_database_connection():
log("✅ Database connection successful")
break
else:
retry_count += 1
log(f"Database not ready, retrying in 2 seconds... ({retry_count}/{max_retries})")
time.sleep(2)
else:
log("❌ ERROR: Could not connect to database after 20 seconds")
return False
# Run migration
log("Running migration...")
if not run_migration_with_psycopg2():
log("❌ Migration failed")
return False
# Verify tables
log("Verifying migration...")
if not verify_tables():
log("⚠️ Verification found issues, but migration may have completed")
return True # Still return True as tables might exist from previous run
log("=" * 60)
log("✅ Hierarchical schema migration completed successfully!")
log("=" * 60)
return True
except Exception as e:
log(f"❌ Migration error: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = run_migration()
sys.exit(0 if success else 1)