mitlist/be/alembic/migrations.py
mohamad 20daadc112
All checks were successful
Deploy to Production, build images and push to Gitea Registry / build_and_push (pull_request) Successful in 1m16s
refactor: Update migration function to access revision strings from RevisionStep objects for improved clarity
2025-06-01 17:50:02 +02:00

74 lines
2.7 KiB
Python

"""
Async migrations handler for FastAPI application.
This file is separate from env.py to avoid Alembic context issues.
"""
import os
import sys
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import pool
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations
# Ensure the app directory is in the Python path
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..')))
from app.database import Base as DatabaseBase
from app.config import settings
def _get_migration_fn(script_directory, current_rev):
"""Create a migration function that knows how to upgrade from current revision."""
def migration_fn(rev, context):
# Get all upgrade steps from current revision to head
revisions = script_directory._upgrade_revs("head", current_rev)
for revision_step in revisions:
# Access the revision string from the RevisionStep object
script = script_directory.get_revision(revision_step.revision)
script.module.upgrade(context)
return migration_fn
async def run_migrations():
"""Run database migrations asynchronously."""
# Get alembic configuration and script directory
alembic_cfg = Config(os.path.join(os.path.dirname(__file__), '..', 'alembic.ini'))
script_directory = ScriptDirectory.from_config(alembic_cfg)
# Create async engine
engine = create_async_engine(
settings.DATABASE_URL,
poolclass=pool.NullPool,
)
async with engine.connect() as connection:
def get_current_rev(conn):
migration_context = MigrationContext.configure(
conn,
opts={
'target_metadata': DatabaseBase.metadata,
'script': script_directory
}
)
return migration_context.get_current_revision()
current_rev = await connection.run_sync(get_current_rev)
def upgrade_to_head(conn):
migration_context = MigrationContext.configure(
conn,
opts={
'target_metadata': DatabaseBase.metadata,
'script': script_directory,
'as_sql': False,
}
)
# Set the migration function
migration_context._migrations_fn = _get_migration_fn(script_directory, current_rev)
with migration_context.begin_transaction():
migration_context.run_migrations()
await connection.run_sync(upgrade_to_head)
await engine.dispose()