| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- from __future__ import annotations
- import argparse
- import os
- import subprocess
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- DEFAULT_SERVICE_ORDER = [
- "workflow-service",
- "session-service",
- "tool-service",
- "runtime-service",
- "model-gateway-service",
- "memory-service",
- "skill-service",
- "agent-service",
- "team-service",
- "human-service",
- "knowledge-service",
- "event-service",
- "auth-service",
- "scheduler-service",
- "api-gateway",
- ]
- @dataclass(frozen=True)
- class MigrationTarget:
- service_name: str
- service_path: Path
- alembic_ini_path: Path
- def main() -> int:
- args = parse_args()
- repo_root = Path(__file__).resolve().parents[1]
- targets = discover_targets(
- repo_root=repo_root,
- only_services=args.only,
- skip_missing=args.skip_missing)
- if args.dry_run:
- for target in targets:
- print(f"{target.service_name}: {target.alembic_ini_path}")
- return 0
- failed_services: list[str] = []
- for target in targets:
- print(f"==> migrating {target.service_name}", flush=True)
- result = run_alembic_upgrade(target=target, python_executable=args.python)
- if result.returncode != 0:
- failed_services.append(target.service_name)
- if not args.continue_on_error:
- break
- if failed_services:
- print("migration failed for: " + ", ".join(failed_services), file=sys.stderr)
- return 1
- print(f"migrated {len(targets)} service(s)")
- return 0
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.")
- parser.add_argument(
- "--only",
- action="append",
- default=[],
- help="Only migrate a service. Can be passed multiple times.")
- parser.add_argument(
- "--python",
- default=sys.executable,
- help="Python executable to use for `python -m alembic`.")
- parser.add_argument(
- "--continue-on-error",
- action="store_true",
- help="Continue migrating remaining services if one migration fails.")
- parser.add_argument(
- "--skip-missing",
- action="store_true",
- help="Skip services without alembic.ini instead of failing.")
- parser.add_argument(
- "--dry-run",
- action="store_true",
- help="Print migration targets without executing migrations.")
- return parser.parse_args()
- def discover_targets(
- *,
- repo_root: Path,
- only_services: list[str],
- skip_missing: bool) -> list[MigrationTarget]:
- requested_services = only_services or DEFAULT_SERVICE_ORDER
- targets: list[MigrationTarget] = []
- for service_name in requested_services:
- service_path = repo_root / "services" / service_name
- alembic_ini_path = service_path / "alembic.ini"
- if not alembic_ini_path.exists():
- if skip_missing:
- continue
- raise FileNotFoundError(f"alembic.ini not found for service: {service_name}")
- targets.append(
- MigrationTarget(
- service_name=service_name,
- service_path=service_path,
- alembic_ini_path=alembic_ini_path)
- )
- return targets
- def run_alembic_upgrade(
- *,
- target: MigrationTarget,
- python_executable: str) -> subprocess.CompletedProcess[str]:
- env = os.environ.copy()
- result = subprocess.run(
- [python_executable, "-m", "alembic", "upgrade", "head"],
- cwd=target.service_path,
- env=env,
- text=True,
- check=False)
- return result
- if __name__ == "__main__":
- raise SystemExit(main())
|