Просмотр исходного кода

feat: add scalable runtime worker

Jax Docker 2 месяцев назад
Родитель
Сommit
f64c4e0c8f

+ 27 - 0
README.md

@@ -250,6 +250,26 @@ Invoke-RestMethod -Method Post `
   -Body '{"worker_key":"runtime-worker-1","max_steps":16}'
 ```
 
+Execute one queued node through the worker claim API:
+
+```powershell
+Invoke-RestMethod -Method Post `
+  -Uri "http://127.0.0.1:8003/runtime/workers/execute-next" `
+  -ContentType "application/json" `
+  -Body '{"worker_key":"runtime-worker-1","lease_seconds":300}'
+```
+
+Run a standalone runtime worker process:
+
+```powershell
+Push-Location .\services\runtime-service
+$env:AGENT_PLATFORM_DATABASE_URL="sqlite:///./runtime_service.db"
+..\..\.venv\Scripts\python -m app.worker
+Pop-Location
+```
+
+The worker uses `node_run.status` plus `lease_expire_time` as a DB-backed queue. This keeps the first scalable version dependency-light; for heavier production concurrency, move `AGENT_PLATFORM_DATABASE_URL` to PostgreSQL before scaling many workers.
+
 Node execution results are now persisted on `node_run`:
 
 - `output_text`
@@ -639,6 +659,12 @@ Start in detached mode:
 docker compose -f .\deployments\docker\docker-compose.yml up --build -d
 ```
 
+Scale runtime workers:
+
+```powershell
+docker compose -f .\deployments\docker\docker-compose.yml up --build -d --scale runtime-worker=3
+```
+
 Stop and remove containers:
 
 ```powershell
@@ -648,5 +674,6 @@ docker compose -f .\deployments\docker\docker-compose.yml down
 Important notes:
 
 - `workflow-service`, `session-service`, `runtime-service`, `tool-service`, and `api-gateway` use SQLite files mounted under `/data`
+- `runtime-worker` has no exposed port and can be scaled independently; prefer PostgreSQL for real multi-worker write concurrency
 - `runtime-service` automatically resolves internal URLs to `workflow-service`, `tool-service`, `model-gateway-service`, and `code-runner-service`
 - `model-gateway-service` defaults to `http://host.docker.internal:11434/v1`; replace it in `.env` if you want OpenAI or another OpenAI-compatible provider

+ 2 - 0
deployments/docker/.env.example

@@ -3,3 +3,5 @@ AGENT_PLATFORM_PROVIDER_API_KEY=replace-me
 AGENT_PLATFORM_DEFAULT_MODEL=gpt-4o-mini
 AGENT_PLATFORM_MAX_TIMEOUT_SECONDS=30
 AGENT_PLATFORM_AUTH_REQUIRED=false
+AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS=1
+AGENT_PLATFORM_WORKER_LEASE_SECONDS=300

+ 28 - 1
deployments/docker/docker-compose.yml

@@ -116,7 +116,6 @@ services:
       AGENT_PLATFORM_TOOL_SERVICE_URL: http://tool-service:8004
       AGENT_PLATFORM_MODEL_GATEWAY_SERVICE_URL: http://model-gateway-service:8005
       AGENT_PLATFORM_CODE_RUNNER_SERVICE_URL: http://code-runner-service:8006
-      AGENT_PLATFORM_AUTH_REQUIRED: ${AGENT_PLATFORM_AUTH_REQUIRED:-false}
     ports:
       - "8003:8003"
     volumes:
@@ -136,6 +135,33 @@ services:
       timeout: 5s
       retries: 5
 
+  runtime-worker:
+    build:
+      context: ../..
+      dockerfile: deployments/docker/python-service.Dockerfile
+      args:
+        SERVICE_PATH: services/runtime-service
+    command: ["python", "-m", "app.worker"]
+    environment:
+      AGENT_PLATFORM_DATABASE_URL: sqlite:////data/runtime_service.db
+      AGENT_PLATFORM_WORKFLOW_SERVICE_URL: http://workflow-service:8002
+      AGENT_PLATFORM_TOOL_SERVICE_URL: http://tool-service:8004
+      AGENT_PLATFORM_MODEL_GATEWAY_SERVICE_URL: http://model-gateway-service:8005
+      AGENT_PLATFORM_CODE_RUNNER_SERVICE_URL: http://code-runner-service:8006
+      AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS: ${AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS:-1}
+      AGENT_PLATFORM_WORKER_LEASE_SECONDS: ${AGENT_PLATFORM_WORKER_LEASE_SECONDS:-300}
+    volumes:
+      - runtime_service_data:/data
+    depends_on:
+      workflow-service:
+        condition: service_started
+      tool-service:
+        condition: service_started
+      model-gateway-service:
+        condition: service_started
+      code-runner-service:
+        condition: service_started
+
   api-gateway:
     build:
       context: ../..
@@ -152,6 +178,7 @@ services:
       AGENT_PLATFORM_TOOL_SERVICE_URL: http://tool-service:8004
       AGENT_PLATFORM_MODEL_GATEWAY_SERVICE_URL: http://model-gateway-service:8005
       AGENT_PLATFORM_CODE_RUNNER_SERVICE_URL: http://code-runner-service:8006
+      AGENT_PLATFORM_AUTH_REQUIRED: ${AGENT_PLATFORM_AUTH_REQUIRED:-false}
     ports:
       - "8000:8000"
     volumes:

+ 29 - 0
services/runtime-service/alembic/versions/20260423_0006_add_runtime_worker_indexes.py

@@ -0,0 +1,29 @@
+"""add runtime worker indexes
+
+Revision ID: 20260423_0006
+Revises: 20260423_0005
+Create Date: 2026-04-23 18:40:00
+"""
+
+from collections.abc import Sequence
+
+from alembic import op
+
+
+revision: str = "20260423_0006"
+down_revision: str | None = "20260423_0005"
+branch_labels: Sequence[str] | None = None
+depends_on: Sequence[str] | None = None
+
+
+def upgrade() -> None:
+    op.create_index(
+        "ix_node_run_worker_queue",
+        "node_run",
+        ["status", "lease_expire_time", "created_time"],
+        unique=False,
+    )
+
+
+def downgrade() -> None:
+    op.drop_index("ix_node_run_worker_queue", table_name="node_run")

+ 39 - 26
services/runtime-service/app/api/routes.py

@@ -3,21 +3,13 @@ from sqlalchemy import text
 from sqlalchemy.orm import Session
 
 from core_domain import ServiceHealth
-from app.application.services import RuntimeApplicationService
+from app.application.services import RuntimeApplicationService, build_runtime_application_service
 from app.bootstrap.settings import RuntimeServiceSettings
 from app.db.session import get_db
-from app.domain.repositories import (
-    ExecutionLogRepository,
-    NodeArtifactRepository,
-    NodeRunRepository,
-    TraceSpanRepository,
-    WorkflowRunRepository,
-)
-from app.infrastructure.code_runner_client import CodeRunnerClient, CodeRunnerClientError
-from app.infrastructure.executors import build_node_execution_dispatcher_with_clients
-from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError
-from app.infrastructure.tool_client import ToolServiceClient, ToolServiceClientError
-from app.infrastructure.workflow_client import WorkflowServiceClient, WorkflowServiceClientError
+from app.infrastructure.code_runner_client import CodeRunnerClientError
+from app.infrastructure.model_gateway_client import ModelGatewayClientError
+from app.infrastructure.tool_client import ToolServiceClientError
+from app.infrastructure.workflow_client import WorkflowServiceClientError
 from app.schemas.run import (
     ExecutionLogResponse,
     NodeArtifactResponse,
@@ -30,6 +22,8 @@ from app.schemas.run import (
     RunExecuteRequest,
     RunExecuteResponse,
     TraceSpanResponse,
+    WorkerExecuteNextRequest,
+    WorkerExecuteNextResponse,
     WorkflowRunResponse,
     WorkflowRunStatusUpdateRequest,
 )
@@ -45,19 +39,7 @@ def get_runtime_application_service(
     db: Session = Depends(get_db),
     settings: RuntimeServiceSettings = Depends(get_runtime_settings),
 ) -> RuntimeApplicationService:
-    return RuntimeApplicationService(
-        workflow_run_repository=WorkflowRunRepository(db),
-        node_run_repository=NodeRunRepository(db),
-        execution_log_repository=ExecutionLogRepository(db),
-        node_artifact_repository=NodeArtifactRepository(db),
-        trace_span_repository=TraceSpanRepository(db),
-        execution_dispatcher=build_node_execution_dispatcher_with_clients(
-            code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
-            model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
-            tool_client=ToolServiceClient(base_url=settings.tool_service_url),
-        ),
-        workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
-    )
+    return build_runtime_application_service(db=db, settings=settings)
 
 
 @router.get("/health", response_model=ServiceHealth)
@@ -278,3 +260,34 @@ def execute_run(
         node_runs=[NodeRunResponse.from_entity(item) for item in node_runs],
         executor_names=executor_names,
     )
+
+
+@router.post("/workers/execute-next", response_model=WorkerExecuteNextResponse)
+def execute_next_worker_task(
+    payload: WorkerExecuteNextRequest,
+    settings: RuntimeServiceSettings = Depends(get_runtime_settings),
+    service: RuntimeApplicationService = Depends(get_runtime_application_service),
+) -> WorkerExecuteNextResponse:
+    try:
+        result = service.execute_next_claimed_node_run(
+            worker_key=payload.worker_key,
+            lease_seconds=payload.lease_seconds or settings.worker_lease_seconds,
+        )
+    except (
+        CodeRunnerClientError,
+        ModelGatewayClientError,
+        ToolServiceClientError,
+        WorkflowServiceClientError,
+    ) as exc:
+        raise HTTPException(status_code=502, detail=str(exc)) from exc
+
+    if result is None:
+        raise HTTPException(status_code=404, detail="queued worker task not found")
+
+    workflow_run, node_run, executor_name, released_lease_count = result
+    return WorkerExecuteNextResponse(
+        run=WorkflowRunResponse.from_entity(workflow_run),
+        node_run=NodeRunResponse.from_entity(node_run),
+        executor_name=executor_name,
+        released_lease_count=released_lease_count,
+    )

+ 55 - 1
services/runtime-service/app/application/services.py

@@ -1,3 +1,7 @@
+from datetime import datetime, timedelta
+
+from sqlalchemy.orm import Session
+
 from core_domain import (
     InitialNodeContract,
     NodeExecutionContextContract,
@@ -14,9 +18,13 @@ from app.domain.repositories import (
     TraceSpanRepository,
     WorkflowRunRepository,
 )
-from app.infrastructure.executors import NodeExecutionDispatcher
+from app.infrastructure.executors import NodeExecutionDispatcher, build_node_execution_dispatcher_with_clients
+from app.infrastructure.code_runner_client import CodeRunnerClient
+from app.infrastructure.model_gateway_client import ModelGatewayClient
 from app.infrastructure.planner import derive_initial_node, derive_node_config, derive_successor_nodes
+from app.infrastructure.tool_client import ToolServiceClient
 from app.infrastructure.workflow_client import WorkflowServiceClient
+from app.bootstrap.settings import RuntimeServiceSettings
 from app.schemas.run import (
     NodeRunExecuteRequest,
     NodeRunStatusUpdateRequest,
@@ -367,6 +375,32 @@ class RuntimeApplicationService:
             return None
         return final_run, executed_node_runs, executor_names
 
+    def execute_next_claimed_node_run(
+        self,
+        *,
+        worker_key: str,
+        lease_seconds: int,
+    ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
+        released_lease_count = self.node_run_repository.release_expired_leases(
+            now_time=datetime.utcnow(),
+        )
+        claimed_node_run = self.node_run_repository.claim_next_queued(
+            worker_key=worker_key,
+            lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
+        )
+        if claimed_node_run is None:
+            return None
+
+        result = self.execute_node_run(
+            node_run_id=claimed_node_run.id,
+            payload=NodeRunExecuteRequest(worker_key=worker_key),
+        )
+        if result is None:
+            return None
+
+        workflow_run, node_run, executor_name = result
+        return workflow_run, node_run, executor_name, released_lease_count
+
     def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
         if node_run.output_text is None and node_run.output_json is None:
             return
@@ -598,3 +632,23 @@ class RuntimeApplicationService:
             message=message,
             detail_json=detail_json,
         )
+
+
+def build_runtime_application_service(
+    *,
+    db: Session,
+    settings: RuntimeServiceSettings,
+) -> RuntimeApplicationService:
+    return RuntimeApplicationService(
+        workflow_run_repository=WorkflowRunRepository(db),
+        node_run_repository=NodeRunRepository(db),
+        execution_log_repository=ExecutionLogRepository(db),
+        node_artifact_repository=NodeArtifactRepository(db),
+        trace_span_repository=TraceSpanRepository(db),
+        execution_dispatcher=build_node_execution_dispatcher_with_clients(
+            code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
+            model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
+            tool_client=ToolServiceClient(base_url=settings.tool_service_url),
+        ),
+        workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
+    )

+ 3 - 0
services/runtime-service/app/bootstrap/settings.py

@@ -9,3 +9,6 @@ class RuntimeServiceSettings(ServiceSettings):
     tool_service_url: str = "http://127.0.0.1:8004"
     model_gateway_service_url: str = "http://127.0.0.1:8005"
     code_runner_service_url: str = "http://127.0.0.1:8006"
+    worker_poll_interval_seconds: float = 1.0
+    worker_lease_seconds: int = 300
+    worker_max_idle_cycles: int | None = None

+ 52 - 0
services/runtime-service/app/domain/repositories.py

@@ -162,6 +162,57 @@ class NodeRunRepository:
         )
         return self.db.scalar(stmt)
 
+    def claim_next_queued(
+        self,
+        *,
+        worker_key: str,
+        lease_expire_time: datetime,
+    ) -> NodeRun | None:
+        stmt = (
+            select(NodeRun)
+            .join(WorkflowRun, NodeRun.run_id == WorkflowRun.id)
+            .where(NodeRun.status == "queued")
+            .order_by(WorkflowRun.priority.desc(), NodeRun.created_time.asc())
+            .with_for_update(skip_locked=True)
+            .limit(1)
+        )
+        entity = self.db.scalar(stmt)
+        if entity is None:
+            return None
+
+        now = datetime.utcnow()
+        entity.status = "running"
+        entity.worker_key = worker_key
+        entity.started_time = entity.started_time or now
+        entity.lease_expire_time = lease_expire_time
+        self.db.commit()
+        self.db.refresh(entity)
+        return entity
+
+    def release_expired_leases(self, *, now_time: datetime, max_items: int = 100) -> int:
+        stmt = (
+            select(NodeRun)
+            .where(NodeRun.status == "running")
+            .where(NodeRun.lease_expire_time.is_not(None))
+            .where(NodeRun.lease_expire_time <= now_time)
+            .order_by(NodeRun.lease_expire_time.asc())
+            .limit(max_items)
+        )
+        entities = list(self.db.scalars(stmt))
+        for entity in entities:
+            entity.status = "queued"
+            entity.worker_key = None
+            entity.lease_expire_time = None
+            entity.queued_time = now_time
+            entity.started_time = None
+            entity.finished_time = None
+            entity.attempt_no += 1
+
+        if entities:
+            self.db.commit()
+
+        return len(entities)
+
     def update_status(
         self,
         *,
@@ -189,6 +240,7 @@ class NodeRunRepository:
             entity.started_time = now
         if status in {"completed", "failed", "skipped"}:
             entity.finished_time = now
+            entity.lease_expire_time = None
 
         self.db.commit()
         self.db.refresh(entity)

+ 13 - 1
services/runtime-service/app/schemas/run.py

@@ -1,7 +1,7 @@
 from datetime import datetime
 from typing import TYPE_CHECKING
 
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
 
 from core_domain import (
     InitialNodeContract,
@@ -75,6 +75,18 @@ class RunExecuteResponse(BaseModel):
     executor_names: list[str]
 
 
+class WorkerExecuteNextRequest(BaseModel):
+    worker_key: str
+    lease_seconds: int | None = Field(default=None, gt=0)
+
+
+class WorkerExecuteNextResponse(BaseModel):
+    run: WorkflowRunResponse
+    node_run: NodeRunResponse
+    executor_name: str
+    released_lease_count: int = 0
+
+
 class ExecutionLogResponse(BaseModel):
     id: str
     tenant_id: str

+ 106 - 0
services/runtime-service/app/worker.py

@@ -0,0 +1,106 @@
+from __future__ import annotations
+
+import os
+import socket
+import time
+import traceback
+from dataclasses import dataclass
+from uuid import uuid4
+
+from sqlalchemy.orm import sessionmaker, Session
+
+from app.application.services import build_runtime_application_service
+from app.bootstrap.settings import RuntimeServiceSettings
+from app.db.session import build_session_factory
+
+
+@dataclass(frozen=True)
+class RuntimeWorkerStats:
+    worker_key: str
+    executed_count: int = 0
+    idle_count: int = 0
+    error_count: int = 0
+
+
+class RuntimeWorker:
+    def __init__(
+        self,
+        *,
+        settings: RuntimeServiceSettings,
+        session_factory: sessionmaker[Session],
+        worker_key: str,
+    ) -> None:
+        self.settings = settings
+        self.session_factory = session_factory
+        self.worker_key = worker_key
+
+    def run_forever(self) -> RuntimeWorkerStats:
+        executed_count = 0
+        idle_count = 0
+        error_count = 0
+
+        while True:
+            try:
+                executed = self.run_once()
+            except Exception:
+                error_count += 1
+                traceback.print_exc()
+                executed = False
+
+            if executed:
+                executed_count += 1
+                idle_count = 0
+            else:
+                idle_count += 1
+                time.sleep(self.settings.worker_poll_interval_seconds)
+
+            if self.settings.worker_max_idle_cycles is not None:
+                if idle_count >= self.settings.worker_max_idle_cycles:
+                    return RuntimeWorkerStats(
+                        worker_key=self.worker_key,
+                        executed_count=executed_count,
+                        idle_count=idle_count,
+                        error_count=error_count,
+                    )
+
+    def run_once(self) -> bool:
+        db = self.session_factory()
+        try:
+            service = build_runtime_application_service(db=db, settings=self.settings)
+            result = service.execute_next_claimed_node_run(
+                worker_key=self.worker_key,
+                lease_seconds=self.settings.worker_lease_seconds,
+            )
+            return result is not None
+        finally:
+            db.close()
+
+
+def build_worker_key() -> str:
+    configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
+    if configured_key:
+        return configured_key
+    hostname = socket.gethostname()
+    return f"{hostname}-{uuid4().hex[:8]}"
+
+
+def main() -> None:
+    settings = RuntimeServiceSettings()
+    worker = RuntimeWorker(
+        settings=settings,
+        session_factory=build_session_factory(settings),
+        worker_key=build_worker_key(),
+    )
+    stats = worker.run_forever()
+    print(
+        "runtime-worker stopped "
+        f"worker_key={stats.worker_key} "
+        f"executed_count={stats.executed_count} "
+        f"idle_count={stats.idle_count} "
+        f"error_count={stats.error_count}",
+        flush=True,
+    )
+
+
+if __name__ == "__main__":
+    main()