
All checks were successful
Deploy to Production, build images and push to Gitea Registry / build_and_push (pull_request) Successful in 1m17s
73 lines
2.6 KiB
Python
73 lines
2.6 KiB
Python
"""
|
|
Async migrations handler for FastAPI application.
|
|
This file is separate from env.py to avoid Alembic context issues.
|
|
"""
|
|
import os
|
|
import sys
|
|
from sqlalchemy.ext.asyncio import create_async_engine
|
|
from sqlalchemy import pool
|
|
from alembic.config import Config
|
|
from alembic.script import ScriptDirectory
|
|
from alembic.runtime.migration import MigrationContext
|
|
from alembic.operations import Operations
|
|
|
|
# Ensure the app directory is in the Python path
|
|
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
from app.database import Base as DatabaseBase
|
|
from app.config import settings
|
|
|
|
def _get_migration_fn(script_directory, current_rev):
|
|
"""Create a migration function that knows how to upgrade from current revision."""
|
|
def migration_fn(rev, context):
|
|
# Get all upgrade steps from current revision to head
|
|
revisions = script_directory._upgrade_revs("head", current_rev)
|
|
for revision in revisions:
|
|
script = script_directory.get_revision(revision)
|
|
script.module.upgrade(context)
|
|
return migration_fn
|
|
|
|
async def run_migrations():
|
|
"""Run database migrations asynchronously."""
|
|
# Get alembic configuration and script directory
|
|
alembic_cfg = Config(os.path.join(os.path.dirname(__file__), '..', 'alembic.ini'))
|
|
script_directory = ScriptDirectory.from_config(alembic_cfg)
|
|
|
|
# Create async engine
|
|
engine = create_async_engine(
|
|
settings.DATABASE_URL,
|
|
poolclass=pool.NullPool,
|
|
)
|
|
|
|
async with engine.connect() as connection:
|
|
def get_current_rev(conn):
|
|
migration_context = MigrationContext.configure(
|
|
conn,
|
|
opts={
|
|
'target_metadata': DatabaseBase.metadata,
|
|
'script': script_directory
|
|
}
|
|
)
|
|
return migration_context.get_current_revision()
|
|
|
|
current_rev = await connection.run_sync(get_current_rev)
|
|
|
|
def upgrade_to_head(conn):
|
|
migration_context = MigrationContext.configure(
|
|
conn,
|
|
opts={
|
|
'target_metadata': DatabaseBase.metadata,
|
|
'script': script_directory,
|
|
'as_sql': False,
|
|
}
|
|
)
|
|
|
|
# Set the migration function
|
|
migration_context._migrations_fn = _get_migration_fn(script_directory, current_rev)
|
|
|
|
with migration_context.begin_transaction():
|
|
migration_context.run_migrations()
|
|
|
|
await connection.run_sync(upgrade_to_head)
|
|
|
|
await engine.dispose() |