101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migration runner for requirement processor service
|
|
Run migrations in order to set up database schema
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import asyncpg
|
|
from pathlib import Path
|
|
from loguru import logger
|
|
|
|
# Database connection settings
|
|
DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://postgres:password@localhost:5432/dev_pipeline')
|
|
|
|
SCHEMA_MIGRATIONS_TABLE_SQL = """
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
id SERIAL PRIMARY KEY,
|
|
version VARCHAR(255) NOT NULL UNIQUE,
|
|
service VARCHAR(100) NOT NULL,
|
|
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
description TEXT
|
|
);
|
|
"""
|
|
|
|
async def ensure_migrations_table(pool) -> None:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(SCHEMA_MIGRATIONS_TABLE_SQL)
|
|
|
|
async def is_applied(pool, version: str, service: str = "requirement-processor") -> bool:
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("SELECT 1 FROM schema_migrations WHERE version = $1 AND service = $2", version, service)
|
|
return row is not None
|
|
|
|
async def mark_applied(pool, version: str, service: str = "requirement-processor", description: str = None) -> None:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"INSERT INTO schema_migrations(version, service, description) VALUES($1, $2, $3) ON CONFLICT (version) DO NOTHING",
|
|
version, service, description
|
|
)
|
|
|
|
async def run_migration(pool, migration_file):
|
|
"""Run a single migration file if not applied"""
|
|
version = migration_file.name
|
|
service = "requirement-processor"
|
|
try:
|
|
if await is_applied(pool, version, service):
|
|
logger.info(f"⏭️ Skipping already applied migration: {version}")
|
|
return True
|
|
|
|
with open(migration_file, 'r') as f:
|
|
sql_content = f.read()
|
|
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(sql_content)
|
|
|
|
await mark_applied(pool, version, service, f"Requirement processor migration: {version}")
|
|
logger.info(f"✅ Migration completed: {version}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"❌ Migration failed: {version} - {e}")
|
|
return False
|
|
|
|
async def run_migrations():
|
|
"""Run all migrations in order"""
|
|
try:
|
|
# Connect to database
|
|
pool = await asyncpg.create_pool(DATABASE_URL)
|
|
logger.info("Connected to database")
|
|
|
|
# Ensure tracking table exists
|
|
await ensure_migrations_table(pool)
|
|
|
|
# Get migration files
|
|
migrations_dir = Path(__file__).parent
|
|
migration_files = sorted(migrations_dir.glob("*.sql"))
|
|
|
|
if not migration_files:
|
|
logger.info("No migration files found")
|
|
return
|
|
|
|
logger.info(f"Found {len(migration_files)} migration files")
|
|
|
|
# Run migrations
|
|
for migration_file in migration_files:
|
|
success = await run_migration(pool, migration_file)
|
|
if not success:
|
|
logger.error("Migration failed, stopping")
|
|
break
|
|
|
|
await pool.close()
|
|
logger.info("All migrations completed successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration runner failed: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_migrations())
|