#!/usr/bin/env python3 """ Tech Stack Selector Database Migration Script This script creates minimal tables for tech stack recommendations. """ import os import sys import asyncio import asyncpg from pathlib import Path async def get_database_connection(): """Get database connection using environment variables.""" try: # Get database connection parameters from environment db_host = os.getenv('POSTGRES_HOST', 'postgres') db_port = int(os.getenv('POSTGRES_PORT', '5432')) db_name = os.getenv('POSTGRES_DB', 'dev_pipeline') db_user = os.getenv('POSTGRES_USER', 'pipeline_admin') db_password = os.getenv('POSTGRES_PASSWORD', 'secure_pipeline_2024') # Create connection conn = await asyncpg.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password ) return conn except Exception as e: print(f"❌ Failed to connect to database: {e}") sys.exit(1) async def create_migrations_table(conn): """Create the migrations tracking table if it doesn't exist.""" await conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( id SERIAL PRIMARY KEY, version VARCHAR(255) NOT NULL UNIQUE, service VARCHAR(100) NOT NULL, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, description TEXT ) """) async def is_migration_applied(conn, version): """Check if a migration has already been applied.""" result = await conn.fetchval( 'SELECT 1 FROM schema_migrations WHERE version = $1 AND service = $2', version, 'tech-stack-selector' ) return result is not None async def mark_migration_applied(conn, version, description): """Mark a migration as applied.""" await conn.execute( 'INSERT INTO schema_migrations (version, service, description) VALUES ($1, $2, $3) ON CONFLICT (version) DO NOTHING', version, 'tech-stack-selector', description ) async def run_migration(): """Run the database migration.""" print('🚀 Starting Tech Stack Selector database migrations...') # Define migrations migrations = [ { 'file': '001_minimal_schema.sql', 'version': '001_minimal_schema', 'description': 'Create minimal tech stack recommendation tables' } ] try: # Get database connection conn = await get_database_connection() print('✅ Database connection established') # Ensure required extensions exist print('🔧 Ensuring required PostgreSQL extensions...') await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";') print('✅ Extensions ready') # Create migrations tracking table await create_migrations_table(conn) print('✅ Migration tracking table ready') applied_count = 0 skipped_count = 0 for migration in migrations: migration_path = Path(__file__).parent / 'db' / migration['file'] if not migration_path.exists(): print(f"⚠️ Migration file {migration['file']} not found, skipping...") continue # Check if migration was already applied if await is_migration_applied(conn, migration['version']): print(f"⏭️ Migration {migration['file']} already applied, skipping...") skipped_count += 1 continue # Read and execute migration SQL migration_sql = migration_path.read_text() print(f"📄 Running migration: {migration['file']}") await conn.execute(migration_sql) await mark_migration_applied(conn, migration['version'], migration['description']) print(f"✅ Migration {migration['file']} completed!") applied_count += 1 print(f"📊 Migration summary: {applied_count} applied, {skipped_count} skipped") # Verify tables were created result = await conn.fetch(""" SELECT schemaname, tablename, tableowner FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('tech_stack_recommendations', 'stack_analysis_cache') ORDER BY tablename """) print('🔍 Verified tables:') for row in result: print(f" - {row['tablename']}") await conn.close() print('✅ Tech Stack Selector migrations completed successfully!') except Exception as error: print(f"❌ Migration failed: {error}") sys.exit(1) if __name__ == '__main__': asyncio.run(run_migration())