migrate_all.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from __future__ import annotations
  2. import argparse
  3. import os
  4. import shutil
  5. import subprocess
  6. import sys
  7. from dataclasses import dataclass
  8. from pathlib import Path
  9. DEFAULT_SERVICE_ORDER = [
  10. "workflow-service",
  11. "session-service",
  12. "tool-service",
  13. "runtime-service",
  14. "model-gateway-service",
  15. "memory-service",
  16. "skill-service",
  17. "agent-service",
  18. "team-service",
  19. "human-service",
  20. "knowledge-service",
  21. "event-service",
  22. "auth-service",
  23. "scheduler-service",
  24. "api-gateway",
  25. ]
  26. @dataclass(frozen=True)
  27. class MigrationTarget:
  28. service_name: str
  29. service_path: Path
  30. alembic_ini_path: Path
  31. def main() -> int:
  32. args = parse_args()
  33. repo_root = Path(__file__).resolve().parents[1]
  34. targets = discover_targets(
  35. repo_root=repo_root,
  36. only_services=args.only,
  37. skip_missing=args.skip_missing)
  38. if args.dry_run:
  39. for target in targets:
  40. print(f"{target.service_name}: {target.alembic_ini_path}")
  41. return 0
  42. if args.database_url:
  43. print(f"using database url: {mask_database_url(args.database_url)}", flush=True)
  44. failed_services: list[str] = []
  45. for target in targets:
  46. print(f"==> migrating {target.service_name}", flush=True)
  47. result = run_alembic_upgrade(
  48. target=target,
  49. python_executable=args.python,
  50. database_url=args.database_url)
  51. if result.returncode != 0:
  52. failed_services.append(target.service_name)
  53. if not args.continue_on_error:
  54. break
  55. if failed_services:
  56. print("migration failed for: " + ", ".join(failed_services), file=sys.stderr)
  57. return 1
  58. print(f"migrated {len(targets)} service(s)")
  59. return 0
  60. def parse_args() -> argparse.Namespace:
  61. parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.")
  62. parser.add_argument(
  63. "--only",
  64. action="append",
  65. default=[],
  66. help="Only migrate a service. Can be passed multiple times.")
  67. parser.add_argument(
  68. "--python",
  69. default=sys.executable,
  70. help="Python executable to use for `python -m alembic`.")
  71. parser.add_argument(
  72. "--database-url",
  73. default=os.environ.get("AGENT_PLATFORM_DATABASE_URL"),
  74. help=(
  75. "Override every service migration target with one database URL. "
  76. "Defaults to AGENT_PLATFORM_DATABASE_URL when set."))
  77. parser.add_argument(
  78. "--continue-on-error",
  79. action="store_true",
  80. help="Continue migrating remaining services if one migration fails.")
  81. parser.add_argument(
  82. "--skip-missing",
  83. action="store_true",
  84. help="Skip services without alembic.ini instead of failing.")
  85. parser.add_argument(
  86. "--dry-run",
  87. action="store_true",
  88. help="Print migration targets without executing migrations.")
  89. return parser.parse_args()
  90. def discover_targets(
  91. *,
  92. repo_root: Path,
  93. only_services: list[str],
  94. skip_missing: bool) -> list[MigrationTarget]:
  95. requested_services = only_services or DEFAULT_SERVICE_ORDER
  96. targets: list[MigrationTarget] = []
  97. for service_name in requested_services:
  98. service_path = repo_root / "services" / service_name
  99. alembic_ini_path = service_path / "alembic.ini"
  100. if not alembic_ini_path.exists():
  101. if skip_missing:
  102. continue
  103. raise FileNotFoundError(f"alembic.ini not found for service: {service_name}")
  104. targets.append(
  105. MigrationTarget(
  106. service_name=service_name,
  107. service_path=service_path,
  108. alembic_ini_path=alembic_ini_path)
  109. )
  110. return targets
  111. def run_alembic_upgrade(
  112. *,
  113. target: MigrationTarget,
  114. python_executable: str,
  115. database_url: str | None) -> subprocess.CompletedProcess[str]:
  116. env = os.environ.copy()
  117. if database_url:
  118. env["AGENT_PLATFORM_DATABASE_URL"] = database_url
  119. env["PYTHONPATH"] = build_pythonpath(target=target, existing=env.get("PYTHONPATH"))
  120. result = subprocess.run(
  121. [resolve_alembic_executable(python_executable), "upgrade", "head"],
  122. cwd=target.service_path,
  123. env=env,
  124. text=True,
  125. check=False)
  126. return result
  127. def build_pythonpath(*, target: MigrationTarget, existing: str | None) -> str:
  128. repo_root = target.service_path.parents[1]
  129. paths = [target.service_path]
  130. paths.extend(sorted((repo_root / "libs").glob("*/src")))
  131. path_entries = [str(path) for path in paths]
  132. if existing:
  133. path_entries.append(existing)
  134. return os.pathsep.join(path_entries)
  135. def resolve_alembic_executable(python_executable: str) -> str:
  136. python_path = Path(python_executable)
  137. candidates = [
  138. python_path.parent / "Scripts" / "alembic.exe",
  139. python_path.parent / "Scripts" / "alembic",
  140. python_path.parent / "alembic.exe",
  141. python_path.parent / "alembic",
  142. ]
  143. for candidate in candidates:
  144. if candidate.exists():
  145. return str(candidate)
  146. resolved = shutil.which("alembic")
  147. if resolved:
  148. return resolved
  149. raise FileNotFoundError(
  150. "alembic executable not found. Install it with `python -m pip install alembic`.")
  151. def mask_database_url(database_url: str) -> str:
  152. marker = "://"
  153. if marker not in database_url:
  154. return database_url
  155. scheme, rest = database_url.split(marker, 1)
  156. if "@" not in rest or ":" not in rest.split("@", 1)[0]:
  157. return database_url
  158. credentials, host = rest.split("@", 1)
  159. username = credentials.split(":", 1)[0]
  160. return f"{scheme}{marker}{username}:***@{host}"
  161. if __name__ == "__main__":
  162. raise SystemExit(main())