Эх сурвалжийг харах

feat: add migrations and redis idempotency

Jax Docker 1 сар өмнө
parent
commit
8635d11eae

+ 13 - 0
README.md

@@ -1229,6 +1229,18 @@ Production-like infrastructure:
 - Copy `deployments/docker/.env.example` to `.env` to use per-service PostgreSQL databases such as `workflow_service`, `agent_service`, and `knowledge_service`.
 - Set `AGENT_PLATFORM_REDIS_URL=redis://redis:6379/0` to enable shared Redis-backed locks, idempotency keys, and queues.
 
+Run all service migrations:
+
+```powershell
+python .\scripts\migrate_all.py
+```
+
+Run only selected migrations:
+
+```powershell
+python .\scripts\migrate_all.py --only agent-service --only runtime-service
+```
+
 Scale runtime workers:
 
 ```powershell
@@ -1264,6 +1276,7 @@ Important notes:
 - Services still fall back to SQLite files under `/data` if `AGENT_PLATFORM_DATABASE_URL` is not set.
 - For scaled workers, use PostgreSQL plus Redis rather than SQLite.
 - `core-shared.redis_primitives` provides `DistributedLock`, `IdempotencyStore`, and `RedisQueue` for services that need cross-process coordination.
+- `agent-worker` and `scheduler-worker` use Redis locks/idempotency when Redis is available, and fall back to DB leases when Redis is not available.
 - `agent-service` stores agent definitions, prompt/config versions, and agent run records under `/data`
 - `memory-service` stores scoped memories under `/data`; move it to PostgreSQL before enabling high-volume memory writes
 - `team-service` stores multi-agent team definitions, team versions, and team run records under `/data`

+ 2 - 0
libs/core-shared/src/core_shared/__init__.py

@@ -1,8 +1,10 @@
 from .config import ServiceSettings
+from .optional_redis import try_build_redis_client
 from .types import JSONPrimitive, JSONValue
 
 __all__ = [
     "JSONPrimitive",
     "JSONValue",
     "ServiceSettings",
+    "try_build_redis_client",
 ]

+ 1 - 0
libs/core-shared/src/core_shared/config.py

@@ -9,6 +9,7 @@ class ServiceSettings(BaseSettings):
     service_port: int = Field(default=8000)
     debug: bool = Field(default=True)
     database_url: str = Field(default="sqlite:///./service.db")
+    redis_url: str = Field(default="redis://127.0.0.1:6379/0")
     echo_sql: bool = Field(default=False)
 
     model_config = SettingsConfigDict(

+ 17 - 0
libs/core-shared/src/core_shared/optional_redis.py

@@ -0,0 +1,17 @@
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from redis import Redis
+
+
+def try_build_redis_client(redis_url: str) -> "Redis | None":
+    try:
+        from redis import Redis
+    except ImportError:
+        return None
+    try:
+        client = Redis.from_url(redis_url, decode_responses=False)
+        client.ping()
+    except Exception:
+        return None
+    return client

+ 3 - 0
pyproject.toml

@@ -8,10 +8,13 @@ members = [
   "services/api-gateway",
   "services/agent-service",
   "services/code-runner-service",
+  "services/auth-service",
+  "services/event-service",
   "services/human-service",
   "services/knowledge-service",
   "services/memory-service",
   "services/model-gateway-service",
+  "services/scheduler-service",
   "services/session-service",
   "services/skill-service",
   "services/team-service",

+ 138 - 0
scripts/migrate_all.py

@@ -0,0 +1,138 @@
+from __future__ import annotations
+
+import argparse
+import os
+import subprocess
+import sys
+from dataclasses import dataclass
+from pathlib import Path
+
+
+DEFAULT_SERVICE_ORDER = [
+    "workflow-service",
+    "session-service",
+    "tool-service",
+    "runtime-service",
+    "memory-service",
+    "skill-service",
+    "agent-service",
+    "team-service",
+    "human-service",
+    "knowledge-service",
+    "event-service",
+    "auth-service",
+    "scheduler-service",
+    "api-gateway",
+]
+
+
+@dataclass(frozen=True)
+class MigrationTarget:
+    service_name: str
+    service_path: Path
+    alembic_ini_path: Path
+
+
+def main() -> int:
+    args = parse_args()
+    repo_root = Path(__file__).resolve().parents[1]
+    targets = discover_targets(
+        repo_root=repo_root,
+        only_services=args.only,
+        skip_missing=args.skip_missing,
+    )
+    if args.dry_run:
+        for target in targets:
+            print(f"{target.service_name}: {target.alembic_ini_path}")
+        return 0
+
+    failed_services: list[str] = []
+    for target in targets:
+        print(f"==> migrating {target.service_name}", flush=True)
+        result = run_alembic_upgrade(target=target, python_executable=args.python)
+        if result.returncode != 0:
+            failed_services.append(target.service_name)
+            if not args.continue_on_error:
+                break
+
+    if failed_services:
+        print("migration failed for: " + ", ".join(failed_services), file=sys.stderr)
+        return 1
+    print(f"migrated {len(targets)} service(s)")
+    return 0
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description="Run Alembic migrations for all services.")
+    parser.add_argument(
+        "--only",
+        action="append",
+        default=[],
+        help="Only migrate a service. Can be passed multiple times.",
+    )
+    parser.add_argument(
+        "--python",
+        default=sys.executable,
+        help="Python executable to use for `python -m alembic`.",
+    )
+    parser.add_argument(
+        "--continue-on-error",
+        action="store_true",
+        help="Continue migrating remaining services if one migration fails.",
+    )
+    parser.add_argument(
+        "--skip-missing",
+        action="store_true",
+        help="Skip services without alembic.ini instead of failing.",
+    )
+    parser.add_argument(
+        "--dry-run",
+        action="store_true",
+        help="Print migration targets without executing migrations.",
+    )
+    return parser.parse_args()
+
+
+def discover_targets(
+    *,
+    repo_root: Path,
+    only_services: list[str],
+    skip_missing: bool,
+) -> list[MigrationTarget]:
+    requested_services = only_services or DEFAULT_SERVICE_ORDER
+    targets: list[MigrationTarget] = []
+    for service_name in requested_services:
+        service_path = repo_root / "services" / service_name
+        alembic_ini_path = service_path / "alembic.ini"
+        if not alembic_ini_path.exists():
+            if skip_missing:
+                continue
+            raise FileNotFoundError(f"alembic.ini not found for service: {service_name}")
+        targets.append(
+            MigrationTarget(
+                service_name=service_name,
+                service_path=service_path,
+                alembic_ini_path=alembic_ini_path,
+            )
+        )
+    return targets
+
+
+def run_alembic_upgrade(
+    *,
+    target: MigrationTarget,
+    python_executable: str,
+) -> subprocess.CompletedProcess[str]:
+    env = os.environ.copy()
+    result = subprocess.run(
+        [python_executable, "-m", "alembic", "upgrade", "head"],
+        cwd=target.service_path,
+        env=env,
+        text=True,
+        check=False,
+    )
+    return result
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())

+ 39 - 8
services/agent-service/app/application/services.py

@@ -559,6 +559,7 @@ class AgentApplicationService:
         worker_key: str,
         lease_seconds: int,
         dry_run: bool,
+        redis_client: object | None = None,
     ) -> tuple[AgentRun, int] | None:
         released_lease_count = self.agent_run_repository.release_expired_leases(
             now_time=datetime.utcnow(),
@@ -570,14 +571,44 @@ class AgentApplicationService:
         if claimed_agent_run is None:
             return None
 
-        result = self.execute_agent_run(
-            agent_run_id=claimed_agent_run.id,
-            payload=AgentRunExecuteRequest(
-                tenant_id=claimed_agent_run.tenant_id,
-                worker_key=worker_key,
-                dry_run=dry_run,
-            ),
-        )
+        if redis_client is not None:
+            from core_shared.redis_primitives import DistributedLock, IdempotencyStore
+
+            lock = DistributedLock(
+                client=redis_client,
+                name=f"agent-run:{claimed_agent_run.id}:lock",
+                ttl_seconds=lease_seconds,
+            )
+            if not lock.acquire():
+                return None
+            idempotency_store = IdempotencyStore(
+                client=redis_client,
+                prefix="agent-run-idempotency",
+            )
+            if not idempotency_store.begin(key=claimed_agent_run.id):
+                lock.release()
+                return None
+        else:
+            lock = None
+            idempotency_store = None
+
+        try:
+            result = self.execute_agent_run(
+                agent_run_id=claimed_agent_run.id,
+                payload=AgentRunExecuteRequest(
+                    tenant_id=claimed_agent_run.tenant_id,
+                    worker_key=worker_key,
+                    dry_run=dry_run,
+                ),
+            )
+            if idempotency_store is not None and result is not None:
+                idempotency_store.complete(
+                    key=claimed_agent_run.id,
+                    result={"status": result.status, "agent_run_id": result.id},
+                )
+        finally:
+            if lock is not None:
+                lock.release()
         if result is None:
             return None
         return result, released_lease_count

+ 4 - 0
services/agent-service/app/worker.py

@@ -9,6 +9,8 @@ from uuid import uuid4
 
 from sqlalchemy.orm import Session, sessionmaker
 
+from core_shared import try_build_redis_client
+
 from app.application.services import build_agent_application_service
 from app.bootstrap.settings import AgentServiceSettings
 from app.db.session import build_session_factory
@@ -33,6 +35,7 @@ class AgentWorker:
         self.settings = settings
         self.session_factory = session_factory
         self.worker_key = worker_key
+        self.redis_client = try_build_redis_client(settings.redis_url)
 
     def run_forever(self) -> AgentWorkerStats:
         executed_count = 0
@@ -71,6 +74,7 @@ class AgentWorker:
                 worker_key=self.worker_key,
                 lease_seconds=self.settings.worker_lease_seconds,
                 dry_run=self.settings.worker_dry_run,
+                redis_client=self.redis_client,
             )
             return result is not None
         finally:

+ 30 - 1
services/scheduler-service/app/worker.py

@@ -11,7 +11,7 @@ from uuid import uuid4
 import httpx
 from sqlalchemy.orm import Session, sessionmaker
 
-from core_shared import JSONValue
+from core_shared import JSONValue, try_build_redis_client
 
 from app.bootstrap.settings import SchedulerServiceSettings
 from app.db.models import ScheduledJob
@@ -86,6 +86,7 @@ class SchedulerWorker:
         self.session_factory = session_factory
         self.worker_key = worker_key
         self.executor = ScheduledJobExecutor(settings=settings)
+        self.redis_client = try_build_redis_client(settings.redis_url)
 
     def run_forever(self) -> SchedulerWorkerStats:
         executed_count = 0
@@ -139,6 +140,25 @@ class SchedulerWorker:
         repository: ScheduledJobRepository,
         job: ScheduledJob,
     ) -> None:
+        lock = None
+        idempotency_store = None
+        if self.redis_client is not None:
+            from core_shared.redis_primitives import DistributedLock, IdempotencyStore
+
+            lock = DistributedLock(
+                client=self.redis_client,
+                name=f"scheduled-job:{job.id}:lock",
+                ttl_seconds=self.settings.worker_lease_seconds,
+            )
+            if not lock.acquire():
+                return
+            idempotency_store = IdempotencyStore(
+                client=self.redis_client,
+                prefix="scheduled-job-idempotency",
+            )
+            if not idempotency_store.begin(key=job.id):
+                lock.release()
+                return
         try:
             self.executor.execute(job)
         except Exception as exc:
@@ -148,7 +168,16 @@ class SchedulerWorker:
                 last_error_message=str(exc),
             )
             traceback.print_exc()
+            if lock is not None:
+                lock.release()
             return
+        if idempotency_store is not None:
+            idempotency_store.complete(
+                key=job.id,
+                result={"status": "completed", "scheduled_job_id": job.id},
+            )
+        if lock is not None:
+            lock.release()
         repository.update_status(job_id=job.id, status="completed")