Ver código fonte

feat: add scalable agent worker

Jax Docker 1 mês atrás
pai
commit
ae0a2840b0

+ 26 - 0
README.md

@@ -265,6 +265,25 @@ Invoke-RestMethod -Method Post `
   -Body '{"tenant_id":"t1","worker_key":"agent-worker-1"}'
 ```
 
+Execute one queued agent run through the worker claim API:
+
+```powershell
+Invoke-RestMethod -Method Post `
+  -Uri http://127.0.0.1:8007/agents/workers/execute-next `
+  -ContentType "application/json" `
+  -Body '{"worker_key":"agent-worker-1","lease_seconds":300,"dry_run":true}'
+```
+
+Run a standalone agent worker process:
+
+```powershell
+Push-Location .\services\agent-service
+$env:AGENT_PLATFORM_DATABASE_URL="sqlite:///./agent_service.db"
+$env:AGENT_PLATFORM_WORKER_DRY_RUN="true"
+..\..\.venv\Scripts\python -m app.worker
+Pop-Location
+```
+
 `runtime-service` now includes a typed executor skeleton for these node types:
 
 - `llm`
@@ -719,6 +738,12 @@ Scale runtime workers:
 docker compose -f .\deployments\docker\docker-compose.yml up --build -d --scale runtime-worker=3
 ```
 
+Scale agent workers:
+
+```powershell
+docker compose -f .\deployments\docker\docker-compose.yml up --build -d --scale agent-worker=3
+```
+
 Stop and remove containers:
 
 ```powershell
@@ -729,6 +754,7 @@ Important notes:
 
 - `workflow-service`, `session-service`, `runtime-service`, `tool-service`, and `api-gateway` use SQLite files mounted under `/data`
 - `agent-service` stores agent definitions, prompt/config versions, and agent run records under `/data`
+- `agent-worker` has no exposed port and can be scaled independently; set `AGENT_PLATFORM_AGENT_WORKER_DRY_RUN=true` for no-key local smoke runs
 - `runtime-worker` has no exposed port and can be scaled independently; prefer PostgreSQL for real multi-worker write concurrency
 - `runtime-service` automatically resolves internal URLs to `workflow-service`, `tool-service`, `model-gateway-service`, and `code-runner-service`
 - `model-gateway-service` defaults to `http://host.docker.internal:11434/v1`; replace it in `.env` if you want OpenAI or another OpenAI-compatible provider

+ 1 - 0
deployments/docker/.env.example

@@ -5,3 +5,4 @@ AGENT_PLATFORM_MAX_TIMEOUT_SECONDS=30
 AGENT_PLATFORM_AUTH_REQUIRED=false
 AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS=1
 AGENT_PLATFORM_WORKER_LEASE_SECONDS=300
+AGENT_PLATFORM_AGENT_WORKER_DRY_RUN=false

+ 19 - 0
deployments/docker/docker-compose.yml

@@ -126,6 +126,25 @@ services:
       timeout: 5s
       retries: 5
 
+  agent-worker:
+    build:
+      context: ../..
+      dockerfile: deployments/docker/python-service.Dockerfile
+      args:
+        SERVICE_PATH: services/agent-service
+    command: ["python", "-m", "app.worker"]
+    environment:
+      AGENT_PLATFORM_DATABASE_URL: sqlite:////data/agent_service.db
+      AGENT_PLATFORM_MODEL_GATEWAY_SERVICE_URL: http://model-gateway-service:8005
+      AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS: ${AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS:-1}
+      AGENT_PLATFORM_WORKER_LEASE_SECONDS: ${AGENT_PLATFORM_WORKER_LEASE_SECONDS:-300}
+      AGENT_PLATFORM_WORKER_DRY_RUN: ${AGENT_PLATFORM_AGENT_WORKER_DRY_RUN:-false}
+    volumes:
+      - agent_service_data:/data
+    depends_on:
+      model-gateway-service:
+        condition: service_started
+
   runtime-service:
     build:
       context: ../..

+ 2 - 0
libs/core-domain/src/core_domain/agent_contracts.py

@@ -81,6 +81,8 @@ class AgentRunContract(BaseModel):
     output_json: dict[str, JSONValue] | None = None
     status: AgentRunStatus
     worker_key: str | None = None
+    queued_time: datetime | None = None
+    lease_expire_time: datetime | None = None
     started_time: datetime | None = None
     finished_time: datetime | None = None
     error_code: str | None = None

+ 34 - 0
services/agent-service/alembic/versions/20260425_0002_add_agent_run_worker_fields.py

@@ -0,0 +1,34 @@
+"""add agent run worker fields
+
+Revision ID: 20260425_0002
+Revises: 20260424_0001
+Create Date: 2026-04-25 12:35:00
+"""
+
+from collections.abc import Sequence
+
+from alembic import op
+import sqlalchemy as sa
+
+
+revision: str = "20260425_0002"
+down_revision: str | None = "20260424_0001"
+branch_labels: Sequence[str] | None = None
+depends_on: Sequence[str] | None = None
+
+
+def upgrade() -> None:
+    op.add_column("agent_run", sa.Column("queued_time", sa.DateTime(), nullable=True))
+    op.add_column("agent_run", sa.Column("lease_expire_time", sa.DateTime(), nullable=True))
+    op.create_index(
+        "ix_agent_run_worker_queue",
+        "agent_run",
+        ["status", "lease_expire_time", "created_time"],
+        unique=False,
+    )
+
+
+def downgrade() -> None:
+    op.drop_index("ix_agent_run_worker_queue", table_name="agent_run")
+    op.drop_column("agent_run", "lease_expire_time")
+    op.drop_column("agent_run", "queued_time")

+ 30 - 16
services/agent-service/app/api/routes.py

@@ -4,15 +4,9 @@ from sqlalchemy.orm import Session
 
 from core_domain import ServiceHealth
 
-from app.application.services import AgentApplicationService
+from app.application.services import AgentApplicationService, build_agent_application_service
 from app.bootstrap.settings import AgentServiceSettings
 from app.db.session import get_db
-from app.domain.repositories import (
-    AgentDefinitionRepository,
-    AgentRunRepository,
-    AgentVersionRepository,
-)
-from app.infrastructure.model_gateway_client import ModelGatewayClient
 from app.schemas.agent import (
     AgentCreateRequest,
     AgentResponse,
@@ -22,6 +16,8 @@ from app.schemas.agent import (
     AgentRunResponse,
     AgentRunStatusUpdateRequest,
     AgentStatusUpdateRequest,
+    AgentWorkerExecuteNextRequest,
+    AgentWorkerExecuteNextResponse,
     AgentVersionCreateRequest,
     AgentVersionResponse,
 )
@@ -37,15 +33,7 @@ def get_agent_application_service(
     db: Session = Depends(get_db),
     settings: AgentServiceSettings = Depends(get_agent_service_settings),
 ) -> AgentApplicationService:
-    return AgentApplicationService(
-        agent_repository=AgentDefinitionRepository(db),
-        agent_version_repository=AgentVersionRepository(db),
-        agent_run_repository=AgentRunRepository(db),
-        model_gateway_client=ModelGatewayClient(
-            base_url=settings.model_gateway_service_url,
-            timeout_seconds=settings.model_gateway_timeout_seconds,
-        ),
-    )
+    return build_agent_application_service(db=db, settings=settings)
 
 
 @router.get("/health", response_model=ServiceHealth)
@@ -166,3 +154,29 @@ def execute_agent_run(
         model=model_value if isinstance(model_value, str) else None,
         dry_run=dry_run_value if isinstance(dry_run_value, bool) else False,
     )
+
+
+@router.post("/workers/execute-next", response_model=AgentWorkerExecuteNextResponse)
+def execute_next_worker_task(
+    payload: AgentWorkerExecuteNextRequest,
+    settings: AgentServiceSettings = Depends(get_agent_service_settings),
+    service: AgentApplicationService = Depends(get_agent_application_service),
+) -> AgentWorkerExecuteNextResponse:
+    result = service.execute_next_claimed_agent_run(
+        worker_key=payload.worker_key,
+        lease_seconds=payload.lease_seconds or settings.worker_lease_seconds,
+        dry_run=payload.dry_run if payload.dry_run is not None else settings.worker_dry_run,
+    )
+    if result is None:
+        raise HTTPException(status_code=404, detail="queued agent_run not found")
+
+    entity, released_lease_count = result
+    output_json = entity.output_json or {}
+    model_value = output_json.get("model")
+    dry_run_value = output_json.get("dry_run")
+    return AgentWorkerExecuteNextResponse(
+        run=AgentRunResponse.from_entity(entity),
+        model=model_value if isinstance(model_value, str) else None,
+        dry_run=dry_run_value if isinstance(dry_run_value, bool) else False,
+        released_lease_count=released_lease_count,
+    )

+ 50 - 0
services/agent-service/app/application/services.py

@@ -1,6 +1,11 @@
+from datetime import datetime, timedelta
+
+from sqlalchemy.orm import Session
+
 from core_domain import ChatCompletionRequestContract, ChatMessageContract
 from core_shared import JSONValue
 
+from app.bootstrap.settings import AgentServiceSettings
 from app.db.models import AgentDefinition, AgentRun, AgentVersion
 from app.domain.repositories import (
     AgentDefinitionRepository,
@@ -236,6 +241,35 @@ class AgentApplicationService:
             },
         )
 
+    def execute_next_claimed_agent_run(
+        self,
+        *,
+        worker_key: str,
+        lease_seconds: int,
+        dry_run: bool,
+    ) -> tuple[AgentRun, int] | None:
+        released_lease_count = self.agent_run_repository.release_expired_leases(
+            now_time=datetime.utcnow(),
+        )
+        claimed_agent_run = self.agent_run_repository.claim_next_queued(
+            worker_key=worker_key,
+            lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
+        )
+        if claimed_agent_run is None:
+            return None
+
+        result = self.execute_agent_run(
+            agent_run_id=claimed_agent_run.id,
+            payload=AgentRunExecuteRequest(
+                tenant_id=claimed_agent_run.tenant_id,
+                worker_key=worker_key,
+                dry_run=dry_run,
+            ),
+        )
+        if result is None:
+            return None
+        return result, released_lease_count
+
     def _resolve_agent_version(
         self,
         *,
@@ -299,3 +333,19 @@ class AgentApplicationService:
         if isinstance(value, int) and not isinstance(value, bool):
             return value
         return None
+
+
+def build_agent_application_service(
+    *,
+    db: Session,
+    settings: AgentServiceSettings,
+) -> AgentApplicationService:
+    return AgentApplicationService(
+        agent_repository=AgentDefinitionRepository(db),
+        agent_version_repository=AgentVersionRepository(db),
+        agent_run_repository=AgentRunRepository(db),
+        model_gateway_client=ModelGatewayClient(
+            base_url=settings.model_gateway_service_url,
+            timeout_seconds=settings.model_gateway_timeout_seconds,
+        ),
+    )

+ 4 - 0
services/agent-service/app/bootstrap/settings.py

@@ -7,3 +7,7 @@ class AgentServiceSettings(ServiceSettings):
     database_url: str = "sqlite:///./agent_service.db"
     model_gateway_service_url: str = "http://127.0.0.1:8005"
     model_gateway_timeout_seconds: float = 60.0
+    worker_poll_interval_seconds: float = 1.0
+    worker_lease_seconds: int = 300
+    worker_max_idle_cycles: int | None = None
+    worker_dry_run: bool = False

+ 2 - 0
services/agent-service/app/db/models/agent_run.py

@@ -16,6 +16,8 @@ class AgentRun(TenantMixin, AuditMixin, VersionMixin, Base):
     session_id: Mapped[str | None] = mapped_column(String(36), nullable=True, index=True)
     status: Mapped[str] = mapped_column(String(32), default="queued", index=True)
     worker_key: Mapped[str | None] = mapped_column(String(128), nullable=True)
+    queued_time: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
+    lease_expire_time: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
     input_text: Mapped[str | None] = mapped_column(Text, nullable=True)
     input_json: Mapped[dict[str, JSONValue] | None] = mapped_column(JSON, nullable=True)
     output_text: Mapped[str | None] = mapped_column(Text, nullable=True)

+ 52 - 0
services/agent-service/app/domain/repositories.py

@@ -156,6 +156,7 @@ class AgentRunRepository:
         input_text: str | None,
         input_json: dict[str, JSONValue] | None,
     ) -> AgentRun:
+        now = datetime.utcnow()
         entity = AgentRun(
             tenant_id=tenant_id,
             agent_id=agent_id,
@@ -164,6 +165,7 @@ class AgentRunRepository:
             input_text=input_text,
             input_json=input_json,
             status="queued",
+            queued_time=now,
         )
         self.db.add(entity)
         self.db.commit()
@@ -193,6 +195,55 @@ class AgentRunRepository:
         )
         return self.db.scalar(stmt)
 
+    def claim_next_queued(
+        self,
+        *,
+        worker_key: str,
+        lease_expire_time: datetime,
+    ) -> AgentRun | None:
+        stmt = (
+            select(AgentRun)
+            .where(AgentRun.status == "queued")
+            .order_by(AgentRun.created_time.asc())
+            .with_for_update(skip_locked=True)
+            .limit(1)
+        )
+        entity = self.db.scalar(stmt)
+        if entity is None:
+            return None
+
+        now = datetime.utcnow()
+        entity.status = "running"
+        entity.worker_key = worker_key
+        entity.started_time = entity.started_time or now
+        entity.lease_expire_time = lease_expire_time
+        self.db.commit()
+        self.db.refresh(entity)
+        return entity
+
+    def release_expired_leases(self, *, now_time: datetime, max_items: int = 100) -> int:
+        stmt = (
+            select(AgentRun)
+            .where(AgentRun.status == "running")
+            .where(AgentRun.lease_expire_time.is_not(None))
+            .where(AgentRun.lease_expire_time <= now_time)
+            .order_by(AgentRun.lease_expire_time.asc())
+            .limit(max_items)
+        )
+        entities = list(self.db.scalars(stmt))
+        for entity in entities:
+            entity.status = "queued"
+            entity.worker_key = None
+            entity.lease_expire_time = None
+            entity.queued_time = now_time
+            entity.started_time = None
+            entity.finished_time = None
+
+        if entities:
+            self.db.commit()
+
+        return len(entities)
+
     def update_status(
         self,
         *,
@@ -219,6 +270,7 @@ class AgentRunRepository:
             entity.started_time = now
         if status in {"completed", "failed", "cancelled"}:
             entity.finished_time = now
+            entity.lease_expire_time = None
 
         self.db.commit()
         self.db.refresh(entity)

+ 13 - 0
services/agent-service/app/schemas/agent.py

@@ -91,6 +91,12 @@ class AgentRunExecuteRequest(BaseModel):
     dry_run: bool = False
 
 
+class AgentWorkerExecuteNextRequest(BaseModel):
+    worker_key: str
+    lease_seconds: int | None = Field(default=None, gt=0)
+    dry_run: bool | None = None
+
+
 class AgentRunResponse(AgentRunContract):
     @classmethod
     def from_entity(cls, entity: "AgentRun") -> "AgentRunResponse":
@@ -103,6 +109,13 @@ class AgentRunExecuteResponse(BaseModel):
     dry_run: bool = False
 
 
+class AgentWorkerExecuteNextResponse(BaseModel):
+    run: AgentRunResponse
+    model: str | None = None
+    dry_run: bool = False
+    released_lease_count: int = 0
+
+
 class AgentHealthResponse(BaseModel):
     service: str
     status: str

+ 107 - 0
services/agent-service/app/worker.py

@@ -0,0 +1,107 @@
+from __future__ import annotations
+
+import os
+import socket
+import time
+import traceback
+from dataclasses import dataclass
+from uuid import uuid4
+
+from sqlalchemy.orm import Session, sessionmaker
+
+from app.application.services import build_agent_application_service
+from app.bootstrap.settings import AgentServiceSettings
+from app.db.session import build_session_factory
+
+
+@dataclass(frozen=True)
+class AgentWorkerStats:
+    worker_key: str
+    executed_count: int = 0
+    idle_count: int = 0
+    error_count: int = 0
+
+
+class AgentWorker:
+    def __init__(
+        self,
+        *,
+        settings: AgentServiceSettings,
+        session_factory: sessionmaker[Session],
+        worker_key: str,
+    ) -> None:
+        self.settings = settings
+        self.session_factory = session_factory
+        self.worker_key = worker_key
+
+    def run_forever(self) -> AgentWorkerStats:
+        executed_count = 0
+        idle_count = 0
+        error_count = 0
+
+        while True:
+            try:
+                executed = self.run_once()
+            except Exception:
+                error_count += 1
+                traceback.print_exc()
+                executed = False
+
+            if executed:
+                executed_count += 1
+                idle_count = 0
+            else:
+                idle_count += 1
+                time.sleep(self.settings.worker_poll_interval_seconds)
+
+            if self.settings.worker_max_idle_cycles is not None:
+                if idle_count >= self.settings.worker_max_idle_cycles:
+                    return AgentWorkerStats(
+                        worker_key=self.worker_key,
+                        executed_count=executed_count,
+                        idle_count=idle_count,
+                        error_count=error_count,
+                    )
+
+    def run_once(self) -> bool:
+        db = self.session_factory()
+        try:
+            service = build_agent_application_service(db=db, settings=self.settings)
+            result = service.execute_next_claimed_agent_run(
+                worker_key=self.worker_key,
+                lease_seconds=self.settings.worker_lease_seconds,
+                dry_run=self.settings.worker_dry_run,
+            )
+            return result is not None
+        finally:
+            db.close()
+
+
+def build_worker_key() -> str:
+    configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
+    if configured_key:
+        return configured_key
+    hostname = socket.gethostname()
+    return f"{hostname}-{uuid4().hex[:8]}"
+
+
+def main() -> None:
+    settings = AgentServiceSettings()
+    worker = AgentWorker(
+        settings=settings,
+        session_factory=build_session_factory(settings),
+        worker_key=build_worker_key(),
+    )
+    stats = worker.run_forever()
+    print(
+        "agent-worker stopped "
+        f"worker_key={stats.worker_key} "
+        f"executed_count={stats.executed_count} "
+        f"idle_count={stats.idle_count} "
+        f"error_count={stats.error_count}",
+        flush=True,
+    )
+
+
+if __name__ == "__main__":
+    main()