from __future__ import annotations import argparse import os import subprocess import sys from dataclasses import dataclass from pathlib import Path DEFAULT_SERVICE_ORDER = [ "workflow-service", "session-service", "tool-service", "runtime-service", "model-gateway-service", "memory-service", "skill-service", "agent-service", "team-service", "human-service", "knowledge-service", "event-service", "auth-service", "scheduler-service", "api-gateway", ] @dataclass(frozen=True) class MigrationTarget: service_name: str service_path: Path alembic_ini_path: Path def main() -> int: args = parse_args() repo_root = Path(__file__).resolve().parents[1] targets = discover_targets( repo_root=repo_root, only_services=args.only, skip_missing=args.skip_missing) if args.dry_run: for target in targets: print(f"{target.service_name}: {target.alembic_ini_path}") return 0 failed_services: list[str] = [] for target in targets: print(f"==> migrating {target.service_name}", flush=True) result = run_alembic_upgrade(target=target, python_executable=args.python) if result.returncode != 0: failed_services.append(target.service_name) if not args.continue_on_error: break if failed_services: print("migration failed for: " + ", ".join(failed_services), file=sys.stderr) return 1 print(f"migrated {len(targets)} service(s)") return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.") parser.add_argument( "--only", action="append", default=[], help="Only migrate a service. Can be passed multiple times.") parser.add_argument( "--python", default=sys.executable, help="Python executable to use for `python -m alembic`.") parser.add_argument( "--continue-on-error", action="store_true", help="Continue migrating remaining services if one migration fails.") parser.add_argument( "--skip-missing", action="store_true", help="Skip services without alembic.ini instead of failing.") parser.add_argument( "--dry-run", action="store_true", help="Print migration targets without executing migrations.") return parser.parse_args() def discover_targets( *, repo_root: Path, only_services: list[str], skip_missing: bool) -> list[MigrationTarget]: requested_services = only_services or DEFAULT_SERVICE_ORDER targets: list[MigrationTarget] = [] for service_name in requested_services: service_path = repo_root / "services" / service_name alembic_ini_path = service_path / "alembic.ini" if not alembic_ini_path.exists(): if skip_missing: continue raise FileNotFoundError(f"alembic.ini not found for service: {service_name}") targets.append( MigrationTarget( service_name=service_name, service_path=service_path, alembic_ini_path=alembic_ini_path) ) return targets def run_alembic_upgrade( *, target: MigrationTarget, python_executable: str) -> subprocess.CompletedProcess[str]: env = os.environ.copy() result = subprocess.run( [python_executable, "-m", "alembic", "upgrade", "head"], cwd=target.service_path, env=env, text=True, check=False) return result if __name__ == "__main__": raise SystemExit(main())