codenuk_backend_mine/services/tech-stack-selector/migrate_postgres_to_neo4j.py
2025-10-10 08:56:39 +05:30

233 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
PostgreSQL to Neo4j Migration Script
Migrates existing PostgreSQL data to Neo4j with proper price-based relationships
"""
import os
import sys
import subprocess
from loguru import logger
def run_migration():
"""Run the complete migration process"""
logger.info("="*60)
logger.info("🚀 POSTGRESQL TO NEO4J MIGRATION")
logger.info("="*60)
logger.info("✅ Using existing PostgreSQL data")
logger.info("✅ Creating price-based relationships")
logger.info("✅ Migrating to Neo4j knowledge graph")
logger.info("="*60)
# Get environment variables with defaults
postgres_host = os.getenv("POSTGRES_HOST", "postgres")
postgres_port = int(os.getenv("POSTGRES_PORT", "5432"))
postgres_user = os.getenv("POSTGRES_USER", "pipeline_admin")
postgres_password = os.getenv("POSTGRES_PASSWORD", "secure_pipeline_2024")
postgres_db = os.getenv("POSTGRES_DB", "dev_pipeline")
neo4j_uri = os.getenv("NEO4J_URI", "bolt://neo4j:7687")
neo4j_user = os.getenv("NEO4J_USER", "neo4j")
neo4j_password = os.getenv("NEO4J_PASSWORD", "password")
# Check if PostgreSQL is running
logger.info("🔍 Checking PostgreSQL connection...")
try:
import psycopg2
conn = psycopg2.connect(
host=postgres_host,
port=postgres_port,
user=postgres_user,
password=postgres_password,
database=postgres_db
)
conn.close()
logger.info("✅ PostgreSQL is running and accessible")
except Exception as e:
logger.error(f"❌ PostgreSQL connection failed: {e}")
logger.error("Please ensure PostgreSQL is running and the database is set up")
return False
# Check if Neo4j is running
logger.info("🔍 Checking Neo4j connection...")
try:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
driver.verify_connectivity()
driver.close()
logger.info("✅ Neo4j is running and accessible")
except Exception as e:
logger.error(f"❌ Neo4j connection failed: {e}")
logger.error("Please ensure Neo4j is running")
return False
# Set up Neo4j schema
logger.info("🔧 Setting up Neo4j schema...")
try:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
with driver.session() as session:
# Read and execute the schema file
with open("Neo4j_From_Postgres.cql", 'r') as f:
cql_content = f.read()
# Split by semicolon and execute each statement
statements = [stmt.strip() for stmt in cql_content.split(';') if stmt.strip()]
for i, statement in enumerate(statements):
if statement and not statement.startswith('//'):
try:
session.run(statement)
logger.info(f"✅ Executed schema statement {i+1}/{len(statements)}")
except Exception as e:
logger.warning(f"⚠️ Schema statement {i+1} failed: {e}")
continue
driver.close()
logger.info("✅ Neo4j schema setup completed")
except Exception as e:
logger.error(f"❌ Neo4j schema setup failed: {e}")
return False
# Run the migration
logger.info("🔄 Running PostgreSQL to Neo4j migration...")
try:
# Add src to path
sys.path.append('src')
from postgres_to_neo4j_migration import PostgresToNeo4jMigration
# Configuration
postgres_config = {
"host": postgres_host,
"port": postgres_port,
"user": postgres_user,
"password": postgres_password,
"database": postgres_db
}
neo4j_config = {
"uri": neo4j_uri,
"user": neo4j_user,
"password": neo4j_password
}
# Run migration with TSS namespace
migration = PostgresToNeo4jMigration(postgres_config, neo4j_config, namespace="TSS")
success = migration.run_full_migration()
if success:
logger.info("✅ Migration completed successfully!")
return True
else:
logger.error("❌ Migration failed!")
return False
except Exception as e:
logger.error(f"❌ Migration failed: {e}")
return False
def test_migrated_data():
"""Test the migrated data"""
logger.info("🧪 Testing migrated data...")
try:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
with driver.session() as session:
# Test price tiers (TSS namespace)
result = session.run("MATCH (p:PriceTier:TSS) RETURN count(p) as count")
price_tiers_count = result.single()["count"]
logger.info(f"✅ Price tiers: {price_tiers_count}")
# Test technologies (TSS namespace)
result = session.run("MATCH (t:Technology:TSS) RETURN count(t) as count")
technologies_count = result.single()["count"]
logger.info(f"✅ Technologies: {technologies_count}")
# Test tools (TSS namespace)
result = session.run("MATCH (tool:Tool:TSS) RETURN count(tool) as count")
tools_count = result.single()["count"]
logger.info(f"✅ Tools: {tools_count}")
# Test tech stacks (TSS namespace)
result = session.run("MATCH (s:TechStack:TSS) RETURN count(s) as count")
stacks_count = result.single()["count"]
logger.info(f"✅ Tech stacks: {stacks_count}")
# Test relationships (TSS namespace)
result = session.run("MATCH ()-[r:TSS_BELONGS_TO_TIER]->() RETURN count(r) as count")
relationships_count = result.single()["count"]
logger.info(f"✅ Price tier relationships: {relationships_count}")
# Test complete stacks (TSS namespace)
result = session.run("""
MATCH (s:TechStack:TSS)
WHERE exists((s)-[:TSS_BELONGS_TO_TIER]->())
AND exists((s)-[:TSS_USES_FRONTEND]->())
AND exists((s)-[:TSS_USES_BACKEND]->())
AND exists((s)-[:TSS_USES_DATABASE]->())
AND exists((s)-[:TSS_USES_CLOUD]->())
RETURN count(s) as count
""")
complete_stacks_count = result.single()["count"]
logger.info(f"✅ Complete stacks: {complete_stacks_count}")
driver.close()
logger.info("✅ Data validation completed successfully!")
return True
except Exception as e:
logger.error(f"❌ Data validation failed: {e}")
return False
def start_migrated_service():
"""Start the migrated service"""
logger.info("🚀 Starting migrated service...")
try:
# Set environment variables
os.environ["NEO4J_URI"] = neo4j_uri
os.environ["NEO4J_USER"] = neo4j_user
os.environ["NEO4J_PASSWORD"] = neo4j_password
os.environ["POSTGRES_HOST"] = postgres_host
os.environ["POSTGRES_PORT"] = str(postgres_port)
os.environ["POSTGRES_USER"] = postgres_user
os.environ["POSTGRES_PASSWORD"] = postgres_password
os.environ["POSTGRES_DB"] = postgres_db
os.environ["CLAUDE_API_KEY"] = "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"
# Start the service
subprocess.run([
sys.executable, "src/main_migrated.py"
])
except Exception as e:
logger.error(f"❌ Failed to start migrated service: {e}")
if __name__ == "__main__":
# Run migration
if run_migration():
logger.info("✅ Migration completed successfully!")
# Test migrated data
if test_migrated_data():
logger.info("✅ Data validation passed!")
# Ask user if they want to start the service
response = input("\n🚀 Start the migrated service? (y/n): ")
if response.lower() in ['y', 'yes']:
start_migrated_service()
else:
logger.info("✅ Migration completed. You can start the service later with:")
logger.info(" python src/main_migrated.py")
else:
logger.error("❌ Data validation failed!")
sys.exit(1)
else:
logger.error("❌ Migration failed!")
sys.exit(1)