Ver Fonte

feat: execute agent runs via model gateway

Jax Docker há 1 mês atrás
pai
commit
42dc4d8afc

+ 18 - 0
README.md

@@ -247,6 +247,24 @@ Invoke-RestMethod -Method Post `
 
 Through `api-gateway`, use `/gateway/agents/**`.
 
+Execute an agent run without calling an external model:
+
+```powershell
+Invoke-RestMethod -Method Post `
+  -Uri http://127.0.0.1:8007/agents/runs/agent-run-id/execute `
+  -ContentType "application/json" `
+  -Body '{"tenant_id":"t1","worker_key":"agent-worker-1","dry_run":true}'
+```
+
+Execute with `model-gateway-service`:
+
+```powershell
+Invoke-RestMethod -Method Post `
+  -Uri http://127.0.0.1:8007/agents/runs/agent-run-id/execute `
+  -ContentType "application/json" `
+  -Body '{"tenant_id":"t1","worker_key":"agent-worker-1"}'
+```
+
 `runtime-service` now includes a typed executor skeleton for these node types:
 
 - `llm`

+ 4 - 0
deployments/docker/docker-compose.yml

@@ -112,10 +112,14 @@ services:
     command: ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8007"]
     environment:
       AGENT_PLATFORM_DATABASE_URL: sqlite:////data/agent_service.db
+      AGENT_PLATFORM_MODEL_GATEWAY_SERVICE_URL: http://model-gateway-service:8005
     ports:
       - "8007:8007"
     volumes:
       - agent_service_data:/data
+    depends_on:
+      model-gateway-service:
+        condition: service_started
     healthcheck:
       test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8007/agents/health').read()"]
       interval: 15s

+ 36 - 1
services/agent-service/app/api/routes.py

@@ -5,16 +5,20 @@ from sqlalchemy.orm import Session
 from core_domain import ServiceHealth
 
 from app.application.services import AgentApplicationService
+from app.bootstrap.settings import AgentServiceSettings
 from app.db.session import get_db
 from app.domain.repositories import (
     AgentDefinitionRepository,
     AgentRunRepository,
     AgentVersionRepository,
 )
+from app.infrastructure.model_gateway_client import ModelGatewayClient
 from app.schemas.agent import (
     AgentCreateRequest,
     AgentResponse,
     AgentRunCreateRequest,
+    AgentRunExecuteRequest,
+    AgentRunExecuteResponse,
     AgentRunResponse,
     AgentRunStatusUpdateRequest,
     AgentStatusUpdateRequest,
@@ -25,11 +29,22 @@ from app.schemas.agent import (
 router = APIRouter()
 
 
-def get_agent_application_service(db: Session = Depends(get_db)) -> AgentApplicationService:
+def get_agent_service_settings() -> AgentServiceSettings:
+    return AgentServiceSettings()
+
+
+def get_agent_application_service(
+    db: Session = Depends(get_db),
+    settings: AgentServiceSettings = Depends(get_agent_service_settings),
+) -> AgentApplicationService:
     return AgentApplicationService(
         agent_repository=AgentDefinitionRepository(db),
         agent_version_repository=AgentVersionRepository(db),
         agent_run_repository=AgentRunRepository(db),
+        model_gateway_client=ModelGatewayClient(
+            base_url=settings.model_gateway_service_url,
+            timeout_seconds=settings.model_gateway_timeout_seconds,
+        ),
     )
 
 
@@ -131,3 +146,23 @@ def update_agent_run_status(
     if entity is None:
         raise HTTPException(status_code=404, detail=f"agent_run not found: {agent_run_id}")
     return AgentRunResponse.from_entity(entity)
+
+
+@router.post("/runs/{agent_run_id}/execute", response_model=AgentRunExecuteResponse)
+def execute_agent_run(
+    agent_run_id: str,
+    payload: AgentRunExecuteRequest,
+    service: AgentApplicationService = Depends(get_agent_application_service),
+) -> AgentRunExecuteResponse:
+    entity = service.execute_agent_run(agent_run_id=agent_run_id, payload=payload)
+    if entity is None:
+        raise HTTPException(status_code=404, detail=f"agent_run not found: {agent_run_id}")
+
+    output_json = entity.output_json or {}
+    model_value = output_json.get("model")
+    dry_run_value = output_json.get("dry_run")
+    return AgentRunExecuteResponse(
+        run=AgentRunResponse.from_entity(entity),
+        model=model_value if isinstance(model_value, str) else None,
+        dry_run=dry_run_value if isinstance(dry_run_value, bool) else False,
+    )

+ 155 - 0
services/agent-service/app/application/services.py

@@ -1,12 +1,17 @@
+from core_domain import ChatCompletionRequestContract, ChatMessageContract
+from core_shared import JSONValue
+
 from app.db.models import AgentDefinition, AgentRun, AgentVersion
 from app.domain.repositories import (
     AgentDefinitionRepository,
     AgentRunRepository,
     AgentVersionRepository,
 )
+from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError
 from app.schemas.agent import (
     AgentCreateRequest,
     AgentRunCreateRequest,
+    AgentRunExecuteRequest,
     AgentRunStatusUpdateRequest,
     AgentStatusUpdateRequest,
     AgentVersionCreateRequest,
@@ -20,10 +25,12 @@ class AgentApplicationService:
         agent_repository: AgentDefinitionRepository,
         agent_version_repository: AgentVersionRepository,
         agent_run_repository: AgentRunRepository,
+        model_gateway_client: ModelGatewayClient | None = None,
     ) -> None:
         self.agent_repository = agent_repository
         self.agent_version_repository = agent_version_repository
         self.agent_run_repository = agent_run_repository
+        self.model_gateway_client = model_gateway_client
 
     def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition:
         return self.agent_repository.create(
@@ -128,6 +135,107 @@ class AgentApplicationService:
             error_message=payload.error_message,
         )
 
+    def execute_agent_run(
+        self,
+        *,
+        agent_run_id: str,
+        payload: AgentRunExecuteRequest,
+    ) -> AgentRun | None:
+        agent_run = self.agent_run_repository.get_by_id(
+            tenant_id=payload.tenant_id,
+            agent_run_id=agent_run_id,
+        )
+        if agent_run is None:
+            return None
+
+        agent_version = self.agent_version_repository.get_by_id(
+            tenant_id=payload.tenant_id,
+            agent_version_id=agent_run.agent_version_id,
+        )
+        if agent_version is None:
+            return self.agent_run_repository.update_status(
+                agent_run_id=agent_run.id,
+                status="failed",
+                worker_key=payload.worker_key,
+                error_code="agent_version_missing",
+                error_message=f"agent version not found: {agent_run.agent_version_id}",
+            )
+
+        self.agent_run_repository.update_status(
+            agent_run_id=agent_run.id,
+            status="running",
+            worker_key=payload.worker_key,
+        )
+
+        messages = self._build_chat_messages(agent_run=agent_run, agent_version=agent_version)
+        if payload.dry_run:
+            return self.agent_run_repository.update_status(
+                agent_run_id=agent_run.id,
+                status="completed",
+                worker_key=payload.worker_key,
+                output_text=self._build_dry_run_output(
+                    agent_run=agent_run,
+                    agent_version=agent_version,
+                ),
+                output_json={
+                    "dry_run": True,
+                    "agent_version_id": agent_version.id,
+                    "message_count": len(messages),
+                    "messages": [message.model_dump(mode="json") for message in messages],
+                },
+            )
+
+        if self.model_gateway_client is None:
+            return self.agent_run_repository.update_status(
+                agent_run_id=agent_run.id,
+                status="failed",
+                worker_key=payload.worker_key,
+                error_code="model_gateway_missing",
+                error_message="model gateway client is not configured",
+            )
+
+        try:
+            response = self.model_gateway_client.create_chat_completion(
+                ChatCompletionRequestContract(
+                    model=self._read_optional_string(agent_version.model_config_json, "model"),
+                    temperature=self._read_optional_float(
+                        agent_version.model_config_json,
+                        "temperature",
+                    ),
+                    max_tokens=self._read_optional_int(agent_version.model_config_json, "max_tokens"),
+                    messages=messages,
+                    metadata_json={
+                        "tenant_id": agent_run.tenant_id,
+                        "agent_id": agent_run.agent_id,
+                        "agent_version_id": agent_version.id,
+                        "agent_run_id": agent_run.id,
+                    },
+                )
+            )
+        except ModelGatewayClientError as exc:
+            return self.agent_run_repository.update_status(
+                agent_run_id=agent_run.id,
+                status="failed",
+                worker_key=payload.worker_key,
+                error_code="model_gateway_error",
+                error_message=str(exc),
+            )
+
+        return self.agent_run_repository.update_status(
+            agent_run_id=agent_run.id,
+            status="completed",
+            worker_key=payload.worker_key,
+            output_text=response.content,
+            output_json={
+                "dry_run": False,
+                "agent_version_id": agent_version.id,
+                "model": response.model,
+                "finish_reason": response.finish_reason,
+                "usage_json": response.usage_json,
+                "raw_response_json": response.raw_response_json,
+            },
+        )
+
     def _resolve_agent_version(
         self,
         *,
@@ -144,3 +252,50 @@ class AgentApplicationService:
             tenant_id=tenant_id,
             agent_id=agent_id,
         )
+
+    def _build_chat_messages(
+        self,
+        *,
+        agent_run: AgentRun,
+        agent_version: AgentVersion,
+    ) -> list[ChatMessageContract]:
+        messages = [
+            ChatMessageContract(role="system", content=agent_version.system_prompt),
+        ]
+        if agent_version.goal:
+            messages.append(ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}"))
+        if agent_run.input_text:
+            messages.append(ChatMessageContract(role="user", content=agent_run.input_text))
+        if agent_run.input_json:
+            messages.append(
+                ChatMessageContract(
+                    role="user",
+                    content=f"Structured input: {agent_run.input_json}",
+                )
+            )
+        return messages
+
+    def _build_dry_run_output(self, *, agent_run: AgentRun, agent_version: AgentVersion) -> str:
+        input_preview = agent_run.input_text or str(agent_run.input_json or {})
+        return (
+            f"[dry-run] Agent role={agent_version.role} "
+            f"version={agent_version.version_no} received: {input_preview}"
+        )
+
+    def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
+        value = payload.get(key)
+        if isinstance(value, str) and value:
+            return value
+        return None
+
+    def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None:
+        value = payload.get(key)
+        if isinstance(value, (int, float)) and not isinstance(value, bool):
+            return float(value)
+        return None
+
+    def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None:
+        value = payload.get(key)
+        if isinstance(value, int) and not isinstance(value, bool):
+            return value
+        return None

+ 2 - 0
services/agent-service/app/bootstrap/settings.py

@@ -5,3 +5,5 @@ class AgentServiceSettings(ServiceSettings):
     service_name: str = "agent-service"
     service_port: int = 8007
     database_url: str = "sqlite:///./agent_service.db"
+    model_gateway_service_url: str = "http://127.0.0.1:8005"
+    model_gateway_timeout_seconds: float = 60.0

+ 1 - 0
services/agent-service/app/infrastructure/__init__.py

@@ -0,0 +1 @@
+

+ 28 - 0
services/agent-service/app/infrastructure/model_gateway_client.py

@@ -0,0 +1,28 @@
+import httpx
+
+from core_domain import ChatCompletionRequestContract, ChatCompletionResponseContract
+
+
+class ModelGatewayClientError(Exception):
+    pass
+
+
+class ModelGatewayClient:
+    def __init__(self, *, base_url: str, timeout_seconds: float = 60.0) -> None:
+        self.base_url = base_url.rstrip("/")
+        self.timeout_seconds = timeout_seconds
+
+    def create_chat_completion(
+        self,
+        payload: ChatCompletionRequestContract,
+    ) -> ChatCompletionResponseContract:
+        try:
+            with httpx.Client(timeout=self.timeout_seconds) as client:
+                response = client.post(
+                    f"{self.base_url}/models/chat-completions",
+                    json=payload.model_dump(mode="json"),
+                )
+                response.raise_for_status()
+                return ChatCompletionResponseContract.model_validate(response.json())
+        except httpx.HTTPError as exc:
+            raise ModelGatewayClientError(f"model-gateway-service request failed: {exc}") from exc

+ 12 - 0
services/agent-service/app/schemas/agent.py

@@ -85,12 +85,24 @@ class AgentRunStatusUpdateRequest(BaseModel):
     error_message: str | None = None
 
 
+class AgentRunExecuteRequest(BaseModel):
+    tenant_id: str
+    worker_key: str | None = None
+    dry_run: bool = False
+
+
 class AgentRunResponse(AgentRunContract):
     @classmethod
     def from_entity(cls, entity: "AgentRun") -> "AgentRunResponse":
         return cls.model_validate(entity, from_attributes=True)
 
 
+class AgentRunExecuteResponse(BaseModel):
+    run: AgentRunResponse
+    model: str | None = None
+    dry_run: bool = False
+
+
 class AgentHealthResponse(BaseModel):
     service: str
     status: str