| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- from __future__ import annotations
- import argparse
- import os
- import shutil
- import subprocess
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- DEFAULT_SERVICE_ORDER = [
- "session-service",
- "tool-service",
- "model-gateway-service",
- "memory-service",
- "skill-service",
- "agent-service",
- "team-service",
- "human-service",
- "knowledge-service",
- "event-service",
- "auth-service",
- "scheduler-service",
- "api-gateway",
- ]
- @dataclass(frozen=True)
- class MigrationTarget:
- service_name: str
- service_path: Path
- alembic_ini_path: Path
- def main() -> int:
- args = parse_args()
- repo_root = Path(__file__).resolve().parents[1]
- targets = discover_targets(
- repo_root=repo_root,
- only_services=args.only,
- skip_missing=args.skip_missing)
- if args.dry_run:
- for target in targets:
- print(f"{target.service_name}: {target.alembic_ini_path}")
- return 0
- if args.database_url:
- print(f"using database url: {mask_database_url(args.database_url)}", flush=True)
- failed_services: list[str] = []
- for target in targets:
- print(f"==> migrating {target.service_name}", flush=True)
- result = run_alembic_upgrade(
- target=target,
- python_executable=args.python,
- database_url=args.database_url)
- if result.returncode != 0:
- failed_services.append(target.service_name)
- if not args.continue_on_error:
- break
- if failed_services:
- print("migration failed for: " + ", ".join(failed_services), file=sys.stderr)
- return 1
- print(f"migrated {len(targets)} service(s)")
- return 0
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.")
- parser.add_argument(
- "--only",
- action="append",
- default=[],
- help="Only migrate a service. Can be passed multiple times.")
- parser.add_argument(
- "--python",
- default=sys.executable,
- help="Python executable to use for `python -m alembic`.")
- parser.add_argument(
- "--database-url",
- default=os.environ.get("AGENT_PLATFORM_DATABASE_URL"),
- help=(
- "Override every service migration target with one database URL. "
- "Defaults to AGENT_PLATFORM_DATABASE_URL when set."))
- parser.add_argument(
- "--continue-on-error",
- action="store_true",
- help="Continue migrating remaining services if one migration fails.")
- parser.add_argument(
- "--skip-missing",
- action="store_true",
- help="Skip services without alembic.ini instead of failing.")
- parser.add_argument(
- "--dry-run",
- action="store_true",
- help="Print migration targets without executing migrations.")
- return parser.parse_args()
- def discover_targets(
- *,
- repo_root: Path,
- only_services: list[str],
- skip_missing: bool) -> list[MigrationTarget]:
- requested_services = only_services or DEFAULT_SERVICE_ORDER
- targets: list[MigrationTarget] = []
- for service_name in requested_services:
- service_path = repo_root / "services" / service_name
- alembic_ini_path = service_path / "alembic.ini"
- if not alembic_ini_path.exists():
- if skip_missing:
- continue
- raise FileNotFoundError(f"alembic.ini not found for service: {service_name}")
- targets.append(
- MigrationTarget(
- service_name=service_name,
- service_path=service_path,
- alembic_ini_path=alembic_ini_path)
- )
- return targets
- def run_alembic_upgrade(
- *,
- target: MigrationTarget,
- python_executable: str,
- database_url: str | None) -> subprocess.CompletedProcess[str]:
- env = os.environ.copy()
- if database_url:
- env["AGENT_PLATFORM_DATABASE_URL"] = database_url
- env["PYTHONPATH"] = build_pythonpath(target=target, existing=env.get("PYTHONPATH"))
- result = subprocess.run(
- [resolve_alembic_executable(python_executable), "upgrade", "head"],
- cwd=target.service_path,
- env=env,
- text=True,
- check=False)
- return result
- def build_pythonpath(*, target: MigrationTarget, existing: str | None) -> str:
- repo_root = target.service_path.parents[1]
- paths = [target.service_path]
- paths.extend(sorted((repo_root / "libs").glob("*/src")))
- path_entries = [str(path) for path in paths]
- if existing:
- path_entries.append(existing)
- return os.pathsep.join(path_entries)
- def resolve_alembic_executable(python_executable: str) -> str:
- python_path = Path(python_executable)
- candidates = [
- python_path.parent / "Scripts" / "alembic.exe",
- python_path.parent / "Scripts" / "alembic",
- python_path.parent / "alembic.exe",
- python_path.parent / "alembic",
- ]
- for candidate in candidates:
- if candidate.exists():
- return str(candidate)
- resolved = shutil.which("alembic")
- if resolved:
- return resolved
- raise FileNotFoundError(
- "alembic executable not found. Install it with `python -m pip install alembic`.")
- def mask_database_url(database_url: str) -> str:
- marker = "://"
- if marker not in database_url:
- return database_url
- scheme, rest = database_url.split(marker, 1)
- if "@" not in rest or ":" not in rest.split("@", 1)[0]:
- return database_url
- credentials, host = rest.split("@", 1)
- username = credentials.split(":", 1)[0]
- return f"{scheme}{marker}{username}:***@{host}"
- if __name__ == "__main__":
- raise SystemExit(main())
|