#!/usr/bin/env python3 """ Migration runner for requirement processor service Run migrations in order to set up database schema """ import os import sys import asyncio import asyncpg from pathlib import Path from loguru import logger # Database connection settings DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://postgres:password@localhost:5432/dev_pipeline') SCHEMA_MIGRATIONS_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS schema_migrations ( id SERIAL PRIMARY KEY, version VARCHAR(255) NOT NULL UNIQUE, service VARCHAR(100) NOT NULL, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, description TEXT ); """ async def ensure_migrations_table(pool) -> None: async with pool.acquire() as conn: await conn.execute(SCHEMA_MIGRATIONS_TABLE_SQL) async def is_applied(pool, version: str, service: str = "requirement-processor") -> bool: async with pool.acquire() as conn: row = await conn.fetchrow("SELECT 1 FROM schema_migrations WHERE version = $1 AND service = $2", version, service) return row is not None async def mark_applied(pool, version: str, service: str = "requirement-processor", description: str = None) -> None: async with pool.acquire() as conn: await conn.execute( "INSERT INTO schema_migrations(version, service, description) VALUES($1, $2, $3) ON CONFLICT (version) DO NOTHING", version, service, description ) async def run_migration(pool, migration_file): """Run a single migration file if not applied""" version = migration_file.name service = "requirement-processor" try: if await is_applied(pool, version, service): logger.info(f"⏭️ Skipping already applied migration: {version}") return True with open(migration_file, 'r') as f: sql_content = f.read() async with pool.acquire() as conn: await conn.execute(sql_content) await mark_applied(pool, version, service, f"Requirement processor migration: {version}") logger.info(f"✅ Migration completed: {version}") return True except Exception as e: logger.error(f"❌ Migration failed: {version} - {e}") return False async def run_migrations(): """Run all migrations in order""" try: # Connect to database pool = await asyncpg.create_pool(DATABASE_URL) logger.info("Connected to database") # Ensure tracking table exists await ensure_migrations_table(pool) # Get migration files migrations_dir = Path(__file__).parent migration_files = sorted(migrations_dir.glob("*.sql")) if not migration_files: logger.info("No migration files found") return logger.info(f"Found {len(migration_files)} migration files") # Run migrations for migration_file in migration_files: success = await run_migration(pool, migration_file) if not success: logger.error("Migration failed, stopping") break await pool.close() logger.info("All migrations completed successfully") except Exception as e: logger.error(f"Migration runner failed: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(run_migrations())