Bläddra i källkod

feat: publish runtime events

Jax Docker 1 månad sedan
förälder
incheckning
fafe5aa655

+ 11 - 0
deployments/docker/docker-compose.yml

@@ -116,6 +116,7 @@ services:
       AGENT_PLATFORM_MEMORY_SERVICE_URL: http://memory-service:8008
       AGENT_PLATFORM_TOOL_SERVICE_URL: http://tool-service:8004
       AGENT_PLATFORM_SKILL_SERVICE_URL: http://skill-service:8010
+      AGENT_PLATFORM_EVENT_SERVICE_URL: http://event-service:8013
     ports:
       - "8007:8007"
     volumes:
@@ -129,6 +130,8 @@ services:
         condition: service_started
       skill-service:
         condition: service_started
+      event-service:
+        condition: service_started
     healthcheck:
       test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8007/agents/health').read()"]
       interval: 15s
@@ -148,6 +151,7 @@ services:
       AGENT_PLATFORM_MEMORY_SERVICE_URL: http://memory-service:8008
       AGENT_PLATFORM_TOOL_SERVICE_URL: http://tool-service:8004
       AGENT_PLATFORM_SKILL_SERVICE_URL: http://skill-service:8010
+      AGENT_PLATFORM_EVENT_SERVICE_URL: http://event-service:8013
       AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS: ${AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS:-1}
       AGENT_PLATFORM_WORKER_LEASE_SECONDS: ${AGENT_PLATFORM_WORKER_LEASE_SECONDS:-300}
       AGENT_PLATFORM_WORKER_DRY_RUN: ${AGENT_PLATFORM_AGENT_WORKER_DRY_RUN:-false}
@@ -162,6 +166,8 @@ services:
         condition: service_started
       skill-service:
         condition: service_started
+      event-service:
+        condition: service_started
 
   memory-service:
     build:
@@ -193,6 +199,8 @@ services:
     command: ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8009"]
     environment:
       AGENT_PLATFORM_DATABASE_URL: sqlite:////data/team_service.db
+      AGENT_PLATFORM_AGENT_SERVICE_URL: http://agent-service:8007
+      AGENT_PLATFORM_EVENT_SERVICE_URL: http://event-service:8013
     ports:
       - "8009:8009"
     volumes:
@@ -213,6 +221,7 @@ services:
     environment:
       AGENT_PLATFORM_DATABASE_URL: sqlite:////data/team_service.db
       AGENT_PLATFORM_AGENT_SERVICE_URL: http://agent-service:8007
+      AGENT_PLATFORM_EVENT_SERVICE_URL: http://event-service:8013
       AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS: ${AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS:-1}
       AGENT_PLATFORM_WORKER_LEASE_SECONDS: ${AGENT_PLATFORM_WORKER_LEASE_SECONDS:-300}
       AGENT_PLATFORM_WORKER_DRY_RUN: ${AGENT_PLATFORM_TEAM_WORKER_DRY_RUN:-true}
@@ -221,6 +230,8 @@ services:
     depends_on:
       agent-service:
         condition: service_started
+      event-service:
+        condition: service_started
 
   skill-service:
     build:

+ 2 - 1
libs/core-events/pyproject.toml

@@ -8,7 +8,9 @@ version = "0.1.0"
 description = "Shared event contracts for agent platform."
 requires-python = ">=3.11"
 dependencies = [
+  "httpx>=0.27,<1.0",
   "pydantic>=2.7,<3.0",
+  "core-shared",
 ]
 
 [tool.setuptools]
@@ -16,4 +18,3 @@ package-dir = {"" = "src"}
 
 [tool.setuptools.packages.find]
 where = ["src"]
-

+ 3 - 0
libs/core-events/src/core_events/__init__.py

@@ -1,3 +1,4 @@
+from .client import EventServiceClient, EventServiceClientError
 from .envelope import (
     EventDeliveryStatus,
     EventEnvelope,
@@ -10,4 +11,6 @@ __all__ = [
     "EventEnvelope",
     "EventPublishContract",
     "EventRecordContract",
+    "EventServiceClient",
+    "EventServiceClientError",
 ]

+ 25 - 0
libs/core-events/src/core_events/client.py

@@ -0,0 +1,25 @@
+import httpx
+
+from .envelope import EventPublishContract, EventRecordContract
+
+
+class EventServiceClientError(Exception):
+    pass
+
+
+class EventServiceClient:
+    def __init__(self, base_url: str, timeout_seconds: float = 5.0) -> None:
+        self.base_url = base_url.rstrip("/")
+        self.timeout_seconds = timeout_seconds
+
+    def publish_event(self, payload: EventPublishContract) -> EventRecordContract:
+        try:
+            with httpx.Client(timeout=self.timeout_seconds) as client:
+                response = client.post(
+                    f"{self.base_url}/events",
+                    json=payload.model_dump(mode="json"),
+                )
+                response.raise_for_status()
+                return EventRecordContract.model_validate(response.json())
+        except httpx.HTTPError as exc:
+            raise EventServiceClientError(f"event-service publish failed: {exc}") from exc

+ 73 - 5
services/agent-service/app/application/services.py

@@ -3,6 +3,7 @@ from typing import cast
 
 from sqlalchemy.orm import Session
 
+from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
 from core_domain import (
     AgentSkillRefContract,
     AgentToolRefContract,
@@ -47,6 +48,7 @@ class AgentApplicationService:
         memory_client: MemoryClient | None = None,
         tool_client: ToolServiceClient | None = None,
         skill_client: SkillServiceClient | None = None,
+        event_client: EventServiceClient | None = None,
     ) -> None:
         self.agent_repository = agent_repository
         self.agent_version_repository = agent_version_repository
@@ -55,6 +57,7 @@ class AgentApplicationService:
         self.memory_client = memory_client
         self.tool_client = tool_client
         self.skill_client = skill_client
+        self.event_client = event_client
 
     def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition:
         return self.agent_repository.create(
@@ -115,7 +118,7 @@ class AgentApplicationService:
         if agent_version is None:
             raise ValueError("published agent version not found")
 
-        return self.agent_run_repository.create(
+        agent_run = self.agent_run_repository.create(
             tenant_id=payload.tenant_id,
             agent_id=payload.agent_id,
             agent_version_id=agent_version.id,
@@ -123,6 +126,12 @@ class AgentApplicationService:
             input_text=payload.input_text,
             input_json=payload.input_json,
         )
+        self._publish_event(
+            event_type="agent.run.created",
+            agent_run=agent_run,
+            payload_json={"agent_run_id": agent_run.id, "status": agent_run.status},
+        )
+        return agent_run
 
     def list_agent_runs(
         self,
@@ -207,7 +216,7 @@ class AgentApplicationService:
                     selected_skills=selected_skills,
                 ),
             )
-            return self.agent_run_repository.update_status(
+            completed_run = self.agent_run_repository.update_status(
                 agent_run_id=agent_run.id,
                 status="completed",
                 worker_key=payload.worker_key,
@@ -229,6 +238,17 @@ class AgentApplicationService:
                     **memory_metadata,
                 },
             )
+            if completed_run is not None:
+                self._publish_event(
+                    event_type="agent.run.completed",
+                    agent_run=completed_run,
+                    payload_json={
+                        "agent_run_id": completed_run.id,
+                        "dry_run": True,
+                        "status": completed_run.status,
+                    },
+                )
+            return completed_run
 
         tool_invocations = self._invoke_selected_tools(
             agent_run=agent_run,
@@ -271,7 +291,10 @@ class AgentApplicationService:
                         agent_version.model_config_json,
                         "temperature",
                     ),
-                    max_tokens=self._read_optional_int(agent_version.model_config_json, "max_tokens"),
+                    max_tokens=self._read_optional_int(
+                        agent_version.model_config_json,
+                        "max_tokens",
+                    ),
                     messages=messages,
                     metadata_json={
                         "tenant_id": agent_run.tenant_id,
@@ -295,7 +318,7 @@ class AgentApplicationService:
             agent_version=agent_version,
             output_text=response.content,
         )
-        return self.agent_run_repository.update_status(
+        completed_run = self.agent_run_repository.update_status(
             agent_run_id=agent_run.id,
             status="completed",
             worker_key=payload.worker_key,
@@ -313,6 +336,45 @@ class AgentApplicationService:
                 **memory_write_metadata,
             },
         )
+        if completed_run is not None:
+            self._publish_event(
+                event_type="agent.run.completed",
+                agent_run=completed_run,
+                payload_json={
+                    "agent_run_id": completed_run.id,
+                    "dry_run": False,
+                    "status": completed_run.status,
+                },
+            )
+        return completed_run
+
+    def _publish_event(
+        self,
+        *,
+        event_type: str,
+        agent_run: AgentRun,
+        payload_json: dict[str, JSONValue],
+    ) -> None:
+        if self.event_client is None:
+            return
+        try:
+            self.event_client.publish_event(
+                EventPublishContract(
+                    tenant_id=agent_run.tenant_id,
+                    event_type=event_type,
+                    source_service="agent-service",
+                    aggregate_type="agent_run",
+                    aggregate_id=agent_run.id,
+                    correlation_id=agent_run.session_id,
+                    payload_json={
+                        **payload_json,
+                        "agent_id": agent_run.agent_id,
+                        "agent_version_id": agent_run.agent_version_id,
+                    },
+                )
+            )
+        except EventServiceClientError:
+            return
 
     def execute_next_claimed_agent_run(
         self,
@@ -372,7 +434,9 @@ class AgentApplicationService:
             ChatMessageContract(role="system", content=agent_version.system_prompt),
         ]
         if agent_version.goal:
-            messages.append(ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}"))
+            messages.append(
+                ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}")
+            )
         if memory_results:
             messages.append(
                 ChatMessageContract(
@@ -891,4 +955,8 @@ def build_agent_application_service(
             base_url=settings.skill_service_url,
             timeout_seconds=settings.skill_service_timeout_seconds,
         ),
+        event_client=EventServiceClient(
+            base_url=settings.event_service_url,
+            timeout_seconds=settings.event_service_timeout_seconds,
+        ),
     )

+ 2 - 0
services/agent-service/app/bootstrap/settings.py

@@ -13,6 +13,8 @@ class AgentServiceSettings(ServiceSettings):
     tool_service_timeout_seconds: float = 10.0
     skill_service_url: str = "http://127.0.0.1:8010"
     skill_service_timeout_seconds: float = 10.0
+    event_service_url: str = "http://127.0.0.1:8013"
+    event_service_timeout_seconds: float = 5.0
     worker_poll_interval_seconds: float = 1.0
     worker_lease_seconds: int = 300
     worker_max_idle_cycles: int | None = None

+ 1 - 0
services/agent-service/pyproject.toml

@@ -16,6 +16,7 @@ dependencies = [
   "sqlalchemy>=2.0,<3.0",
   "core-db",
   "core-domain",
+  "core-events",
   "core-shared",
 ]
 

+ 79 - 1
services/runtime-service/app/application/services.py

@@ -3,6 +3,7 @@ from datetime import datetime, timedelta
 from sqlalchemy.orm import Session
 
 from core_dsl import parse_workflow_definition
+from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
 from core_domain import (
     InitialNodeContract,
     NodeExecutionContextContract,
@@ -57,6 +58,7 @@ class RuntimeApplicationService:
         trace_span_repository: TraceSpanRepository,
         execution_dispatcher: NodeExecutionDispatcher,
         workflow_client: WorkflowServiceClient | None = None,
+        event_client: EventServiceClient | None = None,
     ) -> None:
         self.workflow_run_repository = workflow_run_repository
         self.node_run_repository = node_run_repository
@@ -65,6 +67,7 @@ class RuntimeApplicationService:
         self.trace_span_repository = trace_span_repository
         self.execution_dispatcher = execution_dispatcher
         self.workflow_client = workflow_client
+        self.event_client = event_client
 
     def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
         initial_node = payload.initial_node or self._plan_initial_node(payload)
@@ -128,6 +131,16 @@ class RuntimeApplicationService:
                 "session_id": payload.session_id,
             },
         )
+        self._publish_event(
+            event_type="workflow.run.created",
+            workflow_run=workflow_run,
+            payload_json={
+                "run_id": workflow_run.id,
+                "workflow_id": workflow_run.workflow_id,
+                "workflow_version_id": workflow_run.workflow_version_id,
+                "status": workflow_run.status,
+            },
+        )
 
         return workflow_run, node_run
 
@@ -185,12 +198,23 @@ class RuntimeApplicationService:
         run_id: str,
         payload: WorkflowRunStatusUpdateRequest,
     ) -> WorkflowRun | None:
-        return self.workflow_run_repository.update_status(
+        entity = self.workflow_run_repository.update_status(
             run_id=run_id,
             status=payload.status,
             error_code=payload.error_code,
             error_message=payload.error_message,
         )
+        if entity is not None:
+            self._publish_event(
+                event_type=f"workflow.run.{entity.status}",
+                workflow_run=entity,
+                payload_json={
+                    "run_id": entity.id,
+                    "status": entity.status,
+                    "error_code": entity.error_code,
+                },
+            )
+        return entity
 
     def update_node_run_status(
         self,
@@ -222,6 +246,19 @@ class RuntimeApplicationService:
                 "error_code": payload.error_code,
             },
         )
+        self._publish_event(
+            event_type=f"workflow.node.{node_run.status}",
+            workflow_run=None,
+            node_run=node_run,
+            payload_json={
+                "run_id": node_run.run_id,
+                "node_run_id": node_run.id,
+                "node_id": node_run.node_id,
+                "node_type": node_run.node_type,
+                "status": node_run.status,
+                "error_code": node_run.error_code,
+            },
+        )
 
         if payload.status == "completed":
             self._schedule_successor_nodes(node_run)
@@ -1033,6 +1070,43 @@ class RuntimeApplicationService:
             detail_json=detail_json,
         )
 
+    def _publish_event(
+        self,
+        *,
+        event_type: str,
+        payload_json: dict[str, JSONValue],
+        workflow_run: WorkflowRun | None = None,
+        node_run: NodeRun | None = None,
+    ) -> None:
+        if self.event_client is None:
+            return
+        tenant_id = workflow_run.tenant_id if workflow_run is not None else None
+        if tenant_id is None and node_run is not None:
+            tenant_id = node_run.tenant_id
+        if tenant_id is None:
+            return
+        aggregate_id = workflow_run.id if workflow_run is not None else None
+        if aggregate_id is None and node_run is not None:
+            aggregate_id = node_run.id
+        aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
+        correlation_id = workflow_run.session_id if workflow_run is not None else None
+        if correlation_id is None and node_run is not None:
+            correlation_id = node_run.run_id
+        try:
+            self.event_client.publish_event(
+                EventPublishContract(
+                    tenant_id=tenant_id,
+                    event_type=event_type,
+                    source_service="runtime-service",
+                    aggregate_type=aggregate_type,
+                    aggregate_id=aggregate_id,
+                    correlation_id=correlation_id,
+                    payload_json=payload_json,
+                )
+            )
+        except EventServiceClientError:
+            return
+
 
 def build_runtime_application_service(
     *,
@@ -1059,4 +1133,8 @@ def build_runtime_application_service(
             ),
         ),
         workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
+        event_client=EventServiceClient(
+            base_url=settings.event_service_url,
+            timeout_seconds=settings.event_service_timeout_seconds,
+        ),
     )

+ 2 - 0
services/runtime-service/app/bootstrap/settings.py

@@ -13,6 +13,8 @@ class RuntimeServiceSettings(ServiceSettings):
     human_service_timeout_seconds: float = 10.0
     knowledge_service_url: str = "http://127.0.0.1:8012"
     knowledge_service_timeout_seconds: float = 10.0
+    event_service_url: str = "http://127.0.0.1:8013"
+    event_service_timeout_seconds: float = 5.0
     worker_poll_interval_seconds: float = 1.0
     worker_lease_seconds: int = 300
     worker_max_idle_cycles: int | None = None

+ 67 - 4
services/team-service/app/application/services.py

@@ -1,5 +1,6 @@
 from datetime import datetime, timedelta
 
+from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
 from core_domain import AgentRunContract, TeamMemberContract
 from core_shared import JSONValue
 
@@ -29,11 +30,13 @@ class TeamApplicationService:
         team_version_repository: TeamVersionRepository,
         team_run_repository: TeamRunRepository,
         agent_client: AgentServiceClient | None = None,
+        event_client: EventServiceClient | None = None,
     ) -> None:
         self.team_repository = team_repository
         self.team_version_repository = team_version_repository
         self.team_run_repository = team_run_repository
         self.agent_client = agent_client
+        self.event_client = event_client
 
     def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
         return self.team_repository.create(
@@ -90,7 +93,7 @@ class TeamApplicationService:
         if team_version is None:
             raise ValueError("published team version not found")
 
-        return self.team_run_repository.create(
+        team_run = self.team_run_repository.create(
             tenant_id=payload.tenant_id,
             team_id=payload.team_id,
             team_version_id=team_version.id,
@@ -98,6 +101,12 @@ class TeamApplicationService:
             input_text=payload.input_text,
             input_json=payload.input_json,
         )
+        self._publish_event(
+            event_type="team.run.created",
+            team_run=team_run,
+            payload_json={"team_run_id": team_run.id, "status": team_run.status},
+        )
+        return team_run
 
     def list_team_runs(
         self,
@@ -152,7 +161,7 @@ class TeamApplicationService:
             team_version_id=team_run.team_version_id,
         )
         if team_version is None:
-            return self.team_run_repository.update_status(
+            failed_run = self.team_run_repository.update_status(
                 team_run_id=team_run.id,
                 status="failed",
                 worker_key=payload.worker_key,
@@ -219,14 +228,36 @@ class TeamApplicationService:
                 error_code="member_run_failed",
                 error_message=f"{len(failed_results)} member run(s) failed",
             )
-
-        return self.team_run_repository.update_status(
+            if failed_run is not None:
+                self._publish_event(
+                    event_type="team.run.failed",
+                    team_run=failed_run,
+                    payload_json={
+                        "team_run_id": failed_run.id,
+                        "status": failed_run.status,
+                        "failed_member_count": len(failed_results),
+                    },
+                )
+            return failed_run
+
+        completed_run = self.team_run_repository.update_status(
             team_run_id=team_run.id,
             status="completed",
             worker_key=payload.worker_key,
             output_text=output_text,
             output_json=output_json,
         )
+        if completed_run is not None:
+            self._publish_event(
+                event_type="team.run.completed",
+                team_run=completed_run,
+                payload_json={
+                    "team_run_id": completed_run.id,
+                    "status": completed_run.status,
+                    "member_run_count": len(member_results),
+                },
+            )
+        return completed_run
 
     def execute_next_claimed_team_run(
         self,
@@ -418,6 +449,34 @@ class TeamApplicationService:
             "error_message": result.error_message,
         }
 
+    def _publish_event(
+        self,
+        *,
+        event_type: str,
+        team_run: TeamRun,
+        payload_json: dict[str, JSONValue],
+    ) -> None:
+        if self.event_client is None:
+            return
+        try:
+            self.event_client.publish_event(
+                EventPublishContract(
+                    tenant_id=team_run.tenant_id,
+                    event_type=event_type,
+                    source_service="team-service",
+                    aggregate_type="team_run",
+                    aggregate_id=team_run.id,
+                    correlation_id=team_run.session_id,
+                    payload_json={
+                        **payload_json,
+                        "team_id": team_run.team_id,
+                        "team_version_id": team_run.team_version_id,
+                    },
+                )
+            )
+        except EventServiceClientError:
+            return
+
 
 def build_team_application_service(
     *,
@@ -434,4 +493,8 @@ def build_team_application_service(
             base_url=settings.agent_service_url,
             timeout_seconds=settings.agent_service_timeout_seconds,
         ),
+        event_client=EventServiceClient(
+            base_url=settings.event_service_url,
+            timeout_seconds=settings.event_service_timeout_seconds,
+        ),
     )

+ 2 - 0
services/team-service/app/bootstrap/settings.py

@@ -7,6 +7,8 @@ class TeamServiceSettings(ServiceSettings):
     database_url: str = "sqlite:///./team_service.db"
     agent_service_url: str = "http://127.0.0.1:8007"
     agent_service_timeout_seconds: float = 30.0
+    event_service_url: str = "http://127.0.0.1:8013"
+    event_service_timeout_seconds: float = 5.0
     worker_poll_interval_seconds: float = 1.0
     worker_lease_seconds: int = 300
     worker_max_idle_cycles: int | None = None

+ 1 - 0
services/team-service/pyproject.toml

@@ -16,6 +16,7 @@ dependencies = [
   "httpx>=0.27,<1.0",
   "core-db",
   "core-domain",
+  "core-events",
   "core-shared",
 ]