فهرست منبع

feat: add scheduler worker

Jax Docker 1 ماه پیش
والد
کامیت
99518836b7

+ 17 - 0
README.md

@@ -585,6 +585,16 @@ Invoke-RestMethod -Method Post `
 
 Through `api-gateway`, use `/gateway/scheduler/**`.
 
+Run the scheduler worker locally:
+
+```powershell
+Push-Location .\services\scheduler-service
+$env:AGENT_PLATFORM_DATABASE_URL="sqlite:///./scheduler_service.db"
+$env:AGENT_PLATFORM_EVENT_SERVICE_URL="http://127.0.0.1:8013"
+python -m app.worker
+Pop-Location
+```
+
 Execute an agent run without calling an external model:
 
 ```powershell
@@ -1190,6 +1200,12 @@ Scale team workers:
 docker compose -f .\deployments\docker\docker-compose.yml up --build -d --scale team-worker=3
 ```
 
+Scale scheduler workers:
+
+```powershell
+docker compose -f .\deployments\docker\docker-compose.yml up --build -d --scale scheduler-worker=3
+```
+
 Stop and remove containers:
 
 ```powershell
@@ -1210,6 +1226,7 @@ Important notes:
 - `auth-service` stores users, roles, assignments, and permission policy metadata under `/data`
 - `scheduler-service` stores delayed jobs, due-job leases, and retry status under `/data`
 - `agent-worker` has no exposed port and can be scaled independently; set `AGENT_PLATFORM_AGENT_WORKER_DRY_RUN=true` for no-key local smoke runs
+- `scheduler-worker` has no exposed port and can be scaled independently; prefer PostgreSQL for real multi-worker write concurrency
 - `runtime-worker` has no exposed port and can be scaled independently; prefer PostgreSQL for real multi-worker write concurrency
 - `runtime-service` automatically resolves internal URLs to `workflow-service`, `tool-service`, `model-gateway-service`, and `code-runner-service`
 - `model-gateway-service` defaults to `http://host.docker.internal:11434/v1`; replace it in `.env` if you want OpenAI or another OpenAI-compatible provider

+ 1 - 0
deployments/docker/.env.example

@@ -5,5 +5,6 @@ AGENT_PLATFORM_MAX_TIMEOUT_SECONDS=30
 AGENT_PLATFORM_AUTH_REQUIRED=false
 AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS=1
 AGENT_PLATFORM_WORKER_LEASE_SECONDS=300
+AGENT_PLATFORM_SCHEDULER_WORKER_CLAIM_LIMIT=20
 AGENT_PLATFORM_AGENT_WORKER_DRY_RUN=false
 AGENT_PLATFORM_TEAM_WORKER_DRY_RUN=true

+ 21 - 0
deployments/docker/docker-compose.yml

@@ -353,6 +353,27 @@ services:
       timeout: 5s
       retries: 5
 
+  scheduler-worker:
+    build:
+      context: ../..
+      dockerfile: deployments/docker/python-service.Dockerfile
+      args:
+        SERVICE_PATH: services/scheduler-service
+    command: ["python", "-m", "app.worker"]
+    environment:
+      AGENT_PLATFORM_DATABASE_URL: sqlite:////data/scheduler_service.db
+      AGENT_PLATFORM_EVENT_SERVICE_URL: http://event-service:8013
+      AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS: ${AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS:-1}
+      AGENT_PLATFORM_WORKER_LEASE_SECONDS: ${AGENT_PLATFORM_WORKER_LEASE_SECONDS:-300}
+      AGENT_PLATFORM_WORKER_CLAIM_LIMIT: ${AGENT_PLATFORM_SCHEDULER_WORKER_CLAIM_LIMIT:-20}
+    volumes:
+      - scheduler_service_data:/data
+    depends_on:
+      scheduler-service:
+        condition: service_started
+      event-service:
+        condition: service_started
+
   runtime-service:
     build:
       context: ../..

+ 6 - 0
services/scheduler-service/app/bootstrap/settings.py

@@ -7,3 +7,9 @@ class SchedulerServiceSettings(ServiceSettings):
     database_url: str = "sqlite:///./scheduler_service.db"
     default_lease_seconds: int = 300
     default_claim_limit: int = 50
+    event_service_url: str = "http://127.0.0.1:8013"
+    worker_poll_interval_seconds: float = 1.0
+    worker_lease_seconds: int = 300
+    worker_claim_limit: int = 20
+    worker_request_timeout_seconds: float = 30.0
+    worker_max_idle_cycles: int | None = None

+ 4 - 4
services/scheduler-service/app/domain/repositories.py

@@ -73,7 +73,7 @@ class ScheduledJobRepository:
     def claim_due_jobs(
         self,
         *,
-        tenant_id: str,
+        tenant_id: str | None,
         worker_key: str,
         lease_seconds: int,
         limit: int,
@@ -82,13 +82,13 @@ class ScheduledJobRepository:
         self.release_expired_leases(now_time=now_time)
         stmt = (
             select(ScheduledJob)
-            .where(ScheduledJob.tenant_id == tenant_id)
             .where(ScheduledJob.status == "scheduled")
             .where(ScheduledJob.schedule_time <= now_time)
             .where(ScheduledJob.attempt_count < ScheduledJob.max_attempts)
-            .order_by(ScheduledJob.schedule_time.asc())
-            .limit(limit)
         )
+        if tenant_id is not None:
+            stmt = stmt.where(ScheduledJob.tenant_id == tenant_id)
+        stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
         jobs = list(self.db.scalars(stmt))
         lease_expire_time = now_time + timedelta(seconds=lease_seconds)
         for job in jobs:

+ 198 - 0
services/scheduler-service/app/worker.py

@@ -0,0 +1,198 @@
+from __future__ import annotations
+
+import os
+import socket
+import time
+import traceback
+from dataclasses import dataclass
+from datetime import datetime
+from uuid import uuid4
+
+import httpx
+from sqlalchemy.orm import Session, sessionmaker
+
+from core_shared import JSONValue
+
+from app.bootstrap.settings import SchedulerServiceSettings
+from app.db.models import ScheduledJob
+from app.db.session import build_session_factory
+from app.domain.repositories import ScheduledJobRepository
+
+
+@dataclass(frozen=True)
+class SchedulerWorkerStats:
+    worker_key: str
+    executed_count: int = 0
+    idle_count: int = 0
+    error_count: int = 0
+
+
+class ScheduledJobExecutor:
+    def __init__(self, *, settings: SchedulerServiceSettings) -> None:
+        self.settings = settings
+
+    def execute(self, job: ScheduledJob) -> None:
+        if job.job_type == "event":
+            self._publish_event(job)
+            return
+        self._call_target(job)
+
+    def _publish_event(self, job: ScheduledJob) -> None:
+        payload = job.payload_json
+        target_url = job.target_url or f"{self.settings.event_service_url.rstrip('/')}/events"
+        event_payload: dict[str, JSONValue] = {
+            "tenant_id": job.tenant_id,
+            "event_type": str(payload.get("event_type", "scheduled.job.due")),
+            "source_service": str(payload.get("source_service", "scheduler-service")),
+            "aggregate_type": _optional_str(payload.get("aggregate_type")),
+            "aggregate_id": _optional_str(payload.get("aggregate_id")),
+            "correlation_id": _optional_str(payload.get("correlation_id")),
+            "causation_id": str(payload.get("causation_id", job.id)),
+            "payload_json": _dict_json(payload.get("payload_json"), fallback=payload),
+            "metadata_json": job.metadata_json,
+        }
+        self._request(method=job.method or "POST", url=target_url, payload=event_payload)
+
+    def _call_target(self, job: ScheduledJob) -> None:
+        if not job.target_url:
+            raise ValueError(f"scheduled job requires target_url for job_type={job.job_type}")
+        self._request(
+            method=job.method or "POST",
+            url=job.target_url,
+            payload=job.payload_json,
+        )
+
+    def _request(
+        self,
+        *,
+        method: str,
+        url: str,
+        payload: dict[str, JSONValue],
+    ) -> None:
+        with httpx.Client(timeout=self.settings.worker_request_timeout_seconds) as client:
+            response = client.request(method=method, url=url, json=payload)
+        response.raise_for_status()
+
+
+class SchedulerWorker:
+    def __init__(
+        self,
+        *,
+        settings: SchedulerServiceSettings,
+        session_factory: sessionmaker[Session],
+        worker_key: str,
+    ) -> None:
+        self.settings = settings
+        self.session_factory = session_factory
+        self.worker_key = worker_key
+        self.executor = ScheduledJobExecutor(settings=settings)
+
+    def run_forever(self) -> SchedulerWorkerStats:
+        executed_count = 0
+        idle_count = 0
+        error_count = 0
+
+        while True:
+            try:
+                executed = self.run_once()
+            except Exception:
+                error_count += 1
+                traceback.print_exc()
+                executed = False
+
+            if executed:
+                executed_count += 1
+                idle_count = 0
+            else:
+                idle_count += 1
+                time.sleep(self.settings.worker_poll_interval_seconds)
+
+            if self.settings.worker_max_idle_cycles is not None:
+                if idle_count >= self.settings.worker_max_idle_cycles:
+                    return SchedulerWorkerStats(
+                        worker_key=self.worker_key,
+                        executed_count=executed_count,
+                        idle_count=idle_count,
+                        error_count=error_count,
+                    )
+
+    def run_once(self) -> bool:
+        db = self.session_factory()
+        try:
+            repository = ScheduledJobRepository(db)
+            jobs = repository.claim_due_jobs(
+                tenant_id=None,
+                worker_key=self.worker_key,
+                lease_seconds=self.settings.worker_lease_seconds,
+                limit=self.settings.worker_claim_limit,
+                now_time=datetime.utcnow(),
+            )
+            for job in jobs:
+                self._execute_claimed_job(repository=repository, job=job)
+            return bool(jobs)
+        finally:
+            db.close()
+
+    def _execute_claimed_job(
+        self,
+        *,
+        repository: ScheduledJobRepository,
+        job: ScheduledJob,
+    ) -> None:
+        try:
+            self.executor.execute(job)
+        except Exception as exc:
+            repository.update_status(
+                job_id=job.id,
+                status="failed",
+                last_error_message=str(exc),
+            )
+            traceback.print_exc()
+            return
+        repository.update_status(job_id=job.id, status="completed")
+
+
+def _optional_str(value: JSONValue) -> str | None:
+    if value is None:
+        return None
+    return str(value)
+
+
+def _dict_json(
+    value: JSONValue,
+    *,
+    fallback: dict[str, JSONValue],
+) -> dict[str, JSONValue]:
+    if isinstance(value, dict):
+        return value
+    return fallback
+
+
+def build_worker_key() -> str:
+    configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
+    if configured_key:
+        return configured_key
+    hostname = socket.gethostname()
+    return f"{hostname}-{uuid4().hex[:8]}"
+
+
+def main() -> None:
+    settings = SchedulerServiceSettings()
+    worker = SchedulerWorker(
+        settings=settings,
+        session_factory=build_session_factory(settings),
+        worker_key=build_worker_key(),
+    )
+    stats = worker.run_forever()
+    print(
+        "scheduler-worker stopped "
+        f"worker_key={stats.worker_key} "
+        f"executed_count={stats.executed_count} "
+        f"idle_count={stats.idle_count} "
+        f"error_count={stats.error_count}",
+        flush=True,
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 0
services/scheduler-service/pyproject.toml

@@ -10,6 +10,7 @@ requires-python = ">=3.11"
 dependencies = [
   "alembic>=1.13,<2.0",
   "fastapi>=0.111,<1.0",
+  "httpx>=0.27,<1.0",
   "uvicorn[standard]>=0.30,<1.0",
   "pydantic>=2.7,<3.0",
   "sqlalchemy>=2.0,<3.0",