migrate_all.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from __future__ import annotations
  2. import argparse
  3. import os
  4. import shutil
  5. import subprocess
  6. import sys
  7. from dataclasses import dataclass
  8. from pathlib import Path
  9. DEFAULT_SERVICE_ORDER = [
  10. "session-service",
  11. "tool-service",
  12. "model-gateway-service",
  13. "memory-service",
  14. "skill-service",
  15. "agent-service",
  16. "team-service",
  17. "human-service",
  18. "knowledge-service",
  19. "event-service",
  20. "auth-service",
  21. "scheduler-service",
  22. "api-gateway",
  23. ]
  24. @dataclass(frozen=True)
  25. class MigrationTarget:
  26. service_name: str
  27. service_path: Path
  28. alembic_ini_path: Path
  29. def main() -> int:
  30. args = parse_args()
  31. repo_root = Path(__file__).resolve().parents[1]
  32. targets = discover_targets(
  33. repo_root=repo_root,
  34. only_services=args.only,
  35. skip_missing=args.skip_missing)
  36. if args.dry_run:
  37. for target in targets:
  38. print(f"{target.service_name}: {target.alembic_ini_path}")
  39. return 0
  40. if args.database_url:
  41. print(f"using database url: {mask_database_url(args.database_url)}", flush=True)
  42. failed_services: list[str] = []
  43. for target in targets:
  44. print(f"==> migrating {target.service_name}", flush=True)
  45. result = run_alembic_upgrade(
  46. target=target,
  47. python_executable=args.python,
  48. database_url=args.database_url)
  49. if result.returncode != 0:
  50. failed_services.append(target.service_name)
  51. if not args.continue_on_error:
  52. break
  53. if failed_services:
  54. print("migration failed for: " + ", ".join(failed_services), file=sys.stderr)
  55. return 1
  56. print(f"migrated {len(targets)} service(s)")
  57. return 0
  58. def parse_args() -> argparse.Namespace:
  59. parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.")
  60. parser.add_argument(
  61. "--only",
  62. action="append",
  63. default=[],
  64. help="Only migrate a service. Can be passed multiple times.")
  65. parser.add_argument(
  66. "--python",
  67. default=sys.executable,
  68. help="Python executable to use for `python -m alembic`.")
  69. parser.add_argument(
  70. "--database-url",
  71. default=os.environ.get("AGENT_PLATFORM_DATABASE_URL"),
  72. help=(
  73. "Override every service migration target with one database URL. "
  74. "Defaults to AGENT_PLATFORM_DATABASE_URL when set."))
  75. parser.add_argument(
  76. "--continue-on-error",
  77. action="store_true",
  78. help="Continue migrating remaining services if one migration fails.")
  79. parser.add_argument(
  80. "--skip-missing",
  81. action="store_true",
  82. help="Skip services without alembic.ini instead of failing.")
  83. parser.add_argument(
  84. "--dry-run",
  85. action="store_true",
  86. help="Print migration targets without executing migrations.")
  87. return parser.parse_args()
  88. def discover_targets(
  89. *,
  90. repo_root: Path,
  91. only_services: list[str],
  92. skip_missing: bool) -> list[MigrationTarget]:
  93. requested_services = only_services or DEFAULT_SERVICE_ORDER
  94. targets: list[MigrationTarget] = []
  95. for service_name in requested_services:
  96. service_path = repo_root / "services" / service_name
  97. alembic_ini_path = service_path / "alembic.ini"
  98. if not alembic_ini_path.exists():
  99. if skip_missing:
  100. continue
  101. raise FileNotFoundError(f"alembic.ini not found for service: {service_name}")
  102. targets.append(
  103. MigrationTarget(
  104. service_name=service_name,
  105. service_path=service_path,
  106. alembic_ini_path=alembic_ini_path)
  107. )
  108. return targets
  109. def run_alembic_upgrade(
  110. *,
  111. target: MigrationTarget,
  112. python_executable: str,
  113. database_url: str | None) -> subprocess.CompletedProcess[str]:
  114. env = os.environ.copy()
  115. if database_url:
  116. env["AGENT_PLATFORM_DATABASE_URL"] = database_url
  117. env["PYTHONPATH"] = build_pythonpath(target=target, existing=env.get("PYTHONPATH"))
  118. result = subprocess.run(
  119. [resolve_alembic_executable(python_executable), "upgrade", "head"],
  120. cwd=target.service_path,
  121. env=env,
  122. text=True,
  123. check=False)
  124. return result
  125. def build_pythonpath(*, target: MigrationTarget, existing: str | None) -> str:
  126. repo_root = target.service_path.parents[1]
  127. paths = [target.service_path]
  128. paths.extend(sorted((repo_root / "libs").glob("*/src")))
  129. path_entries = [str(path) for path in paths]
  130. if existing:
  131. path_entries.append(existing)
  132. return os.pathsep.join(path_entries)
  133. def resolve_alembic_executable(python_executable: str) -> str:
  134. python_path = Path(python_executable)
  135. candidates = [
  136. python_path.parent / "Scripts" / "alembic.exe",
  137. python_path.parent / "Scripts" / "alembic",
  138. python_path.parent / "alembic.exe",
  139. python_path.parent / "alembic",
  140. ]
  141. for candidate in candidates:
  142. if candidate.exists():
  143. return str(candidate)
  144. resolved = shutil.which("alembic")
  145. if resolved:
  146. return resolved
  147. raise FileNotFoundError(
  148. "alembic executable not found. Install it with `python -m pip install alembic`.")
  149. def mask_database_url(database_url: str) -> str:
  150. marker = "://"
  151. if marker not in database_url:
  152. return database_url
  153. scheme, rest = database_url.split(marker, 1)
  154. if "@" not in rest or ":" not in rest.split("@", 1)[0]:
  155. return database_url
  156. credentials, host = rest.split("@", 1)
  157. username = credentials.split(":", 1)[0]
  158. return f"{scheme}{marker}{username}:***@{host}"
  159. if __name__ == "__main__":
  160. raise SystemExit(main())