from __future__ import annotations import argparse import os import shutil import subprocess import sys from dataclasses import dataclass from pathlib import Path DEFAULT_SERVICE_ORDER = [ "session-service", "tool-service", "model-gateway-service", "memory-service", "skill-service", "agent-service", "team-service", "human-service", "knowledge-service", "event-service", "auth-service", "scheduler-service", "api-gateway", ] @dataclass(frozen=True) class MigrationTarget: service_name: str service_path: Path alembic_ini_path: Path def main() -> int: args = parse_args() repo_root = Path(__file__).resolve().parents[1] targets = discover_targets( repo_root=repo_root, only_services=args.only, skip_missing=args.skip_missing) if args.dry_run: for target in targets: print(f"{target.service_name}: {target.alembic_ini_path}") return 0 if args.database_url: print(f"using database url: {mask_database_url(args.database_url)}", flush=True) failed_services: list[str] = [] for target in targets: print(f"==> migrating {target.service_name}", flush=True) result = run_alembic_upgrade( target=target, python_executable=args.python, database_url=args.database_url) if result.returncode != 0: failed_services.append(target.service_name) if not args.continue_on_error: break if failed_services: print("migration failed for: " + ", ".join(failed_services), file=sys.stderr) return 1 print(f"migrated {len(targets)} service(s)") return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.") parser.add_argument( "--only", action="append", default=[], help="Only migrate a service. Can be passed multiple times.") parser.add_argument( "--python", default=sys.executable, help="Python executable to use for `python -m alembic`.") parser.add_argument( "--database-url", default=os.environ.get("AGENT_PLATFORM_DATABASE_URL"), help=( "Override every service migration target with one database URL. " "Defaults to AGENT_PLATFORM_DATABASE_URL when set.")) parser.add_argument( "--continue-on-error", action="store_true", help="Continue migrating remaining services if one migration fails.") parser.add_argument( "--skip-missing", action="store_true", help="Skip services without alembic.ini instead of failing.") parser.add_argument( "--dry-run", action="store_true", help="Print migration targets without executing migrations.") return parser.parse_args() def discover_targets( *, repo_root: Path, only_services: list[str], skip_missing: bool) -> list[MigrationTarget]: requested_services = only_services or DEFAULT_SERVICE_ORDER targets: list[MigrationTarget] = [] for service_name in requested_services: service_path = repo_root / "services" / service_name alembic_ini_path = service_path / "alembic.ini" if not alembic_ini_path.exists(): if skip_missing: continue raise FileNotFoundError(f"alembic.ini not found for service: {service_name}") targets.append( MigrationTarget( service_name=service_name, service_path=service_path, alembic_ini_path=alembic_ini_path) ) return targets def run_alembic_upgrade( *, target: MigrationTarget, python_executable: str, database_url: str | None) -> subprocess.CompletedProcess[str]: env = os.environ.copy() if database_url: env["AGENT_PLATFORM_DATABASE_URL"] = database_url env["PYTHONPATH"] = build_pythonpath(target=target, existing=env.get("PYTHONPATH")) result = subprocess.run( [resolve_alembic_executable(python_executable), "upgrade", "head"], cwd=target.service_path, env=env, text=True, check=False) return result def build_pythonpath(*, target: MigrationTarget, existing: str | None) -> str: repo_root = target.service_path.parents[1] paths = [target.service_path] paths.extend(sorted((repo_root / "libs").glob("*/src"))) path_entries = [str(path) for path in paths] if existing: path_entries.append(existing) return os.pathsep.join(path_entries) def resolve_alembic_executable(python_executable: str) -> str: python_path = Path(python_executable) candidates = [ python_path.parent / "Scripts" / "alembic.exe", python_path.parent / "Scripts" / "alembic", python_path.parent / "alembic.exe", python_path.parent / "alembic", ] for candidate in candidates: if candidate.exists(): return str(candidate) resolved = shutil.which("alembic") if resolved: return resolved raise FileNotFoundError( "alembic executable not found. Install it with `python -m pip install alembic`.") def mask_database_url(database_url: str) -> str: marker = "://" if marker not in database_url: return database_url scheme, rest = database_url.split(marker, 1) if "@" not in rest or ":" not in rest.split("@", 1)[0]: return database_url credentials, host = rest.split("@", 1) username = credentials.split(":", 1)[0] return f"{scheme}{marker}{username}:***@{host}" if __name__ == "__main__": raise SystemExit(main())