瀏覽代碼

feat: add runtime human handoff

Jax Docker 1 月之前
父節點
當前提交
621ab597d9

+ 21 - 0
README.md

@@ -403,6 +403,23 @@ Invoke-RestMethod -Method Post `
 
 Through `api-gateway`, use `/gateway/human/**`.
 
+Runtime human-in-the-loop nodes now create `human-service` tasks and pause the
+node in `pending` status until the task is completed. Supported node types:
+
+- `human`
+- `approval`
+- `human-input`
+- `human-takeover`
+
+After completing the human task, resume the blocked node:
+
+```powershell
+Invoke-RestMethod -Method Post `
+  -Uri http://127.0.0.1:8003/runtime/node-runs/node-run-id/resume-human `
+  -ContentType "application/json" `
+  -Body '{"tenant_id":"t1","human_task_id":"human-task-id","worker_key":"runtime-worker-1"}'
+```
+
 ## Knowledge Service APIs
 
 `knowledge-service` stores independent knowledge bases, documents, chunks, and
@@ -506,6 +523,10 @@ Pop-Location
 - `llm`
 - `tool`
 - `code`
+- `human`
+- `approval`
+- `human-input`
+- `human-takeover`
 - `answer`
 - `if-else`
 - `assigner`

+ 2 - 0
libs/core-domain/src/core_domain/__init__.py

@@ -19,6 +19,7 @@ from .execution_contracts import (
 )
 from .human_contracts import (
     HumanTaskContract,
+    HumanTaskCreateContract,
     HumanTaskStatus,
     HumanTaskType,
 )
@@ -101,6 +102,7 @@ __all__ = [
     "ChatCompletionResponseContract",
     "ChatMessageContract",
     "HumanTaskContract",
+    "HumanTaskCreateContract",
     "HumanTaskStatus",
     "HumanTaskType",
     "InitialNodeContract",

+ 15 - 0
libs/core-domain/src/core_domain/human_contracts.py

@@ -37,3 +37,18 @@ class HumanTaskContract(BaseModel):
     claimed_time: datetime | None = None
     completed_time: datetime | None = None
     created_time: datetime
+
+
+class HumanTaskCreateContract(BaseModel):
+    tenant_id: str
+    task_type: HumanTaskType
+    title: str
+    description: str | None = None
+    source_type: str | None = None
+    source_id: str | None = None
+    run_id: str | None = None
+    node_run_id: str | None = None
+    requested_by: str | None = None
+    assigned_to: str | None = None
+    request_payload_json: dict[str, JSONValue] = Field(default_factory=dict)
+    due_time: datetime | None = None

+ 12 - 0
services/human-service/app/api/routes.py

@@ -56,6 +56,18 @@ def list_human_tasks(
     ]
 
 
+@router.get("/tasks/{human_task_id}", response_model=HumanTaskResponse)
+def get_human_task(
+    human_task_id: str,
+    tenant_id: str = Query(...),
+    service: HumanApplicationService = Depends(get_human_application_service),
+) -> HumanTaskResponse:
+    entity = service.get_task(tenant_id=tenant_id, human_task_id=human_task_id)
+    if entity is None:
+        raise HTTPException(status_code=404, detail=f"human task not found: {human_task_id}")
+    return HumanTaskResponse.from_entity(entity)
+
+
 @router.post("/tasks/{human_task_id}/claim", response_model=HumanTaskResponse)
 def claim_human_task(
     human_task_id: str,

+ 6 - 0
services/human-service/app/application/services.py

@@ -46,6 +46,12 @@ class HumanApplicationService:
             limit=limit,
         )
 
+    def get_task(self, *, tenant_id: str, human_task_id: str) -> HumanTask | None:
+        return self.human_task_repository.get_by_id(
+            tenant_id=tenant_id,
+            human_task_id=human_task_id,
+        )
+
     def claim_task(
         self,
         *,

+ 3 - 15
services/human-service/app/schemas/human.py

@@ -1,28 +1,16 @@
-from datetime import datetime
 from typing import TYPE_CHECKING
 
 from pydantic import BaseModel, Field
 
-from core_domain import HumanTaskContract, HumanTaskStatus, HumanTaskType
+from core_domain import HumanTaskContract, HumanTaskCreateContract, HumanTaskStatus
 from core_shared import JSONValue
 
 if TYPE_CHECKING:
     from app.db.models import HumanTask
 
 
-class HumanTaskCreateRequest(BaseModel):
-    tenant_id: str
-    task_type: HumanTaskType
-    title: str
-    description: str | None = None
-    source_type: str | None = None
-    source_id: str | None = None
-    run_id: str | None = None
-    node_run_id: str | None = None
-    requested_by: str | None = None
-    assigned_to: str | None = None
-    request_payload_json: dict[str, JSONValue] = Field(default_factory=dict)
-    due_time: datetime | None = None
+class HumanTaskCreateRequest(HumanTaskCreateContract):
+    pass
 
 
 class HumanTaskClaimRequest(BaseModel):

+ 41 - 0
services/runtime-service/app/api/routes.py

@@ -7,11 +7,13 @@ from app.application.services import RuntimeApplicationService, build_runtime_ap
 from app.bootstrap.settings import RuntimeServiceSettings
 from app.db.session import get_db
 from app.infrastructure.code_runner_client import CodeRunnerClientError
+from app.infrastructure.human_client import HumanServiceClientError
 from app.infrastructure.model_gateway_client import ModelGatewayClientError
 from app.infrastructure.tool_client import ToolServiceClientError
 from app.infrastructure.workflow_client import WorkflowServiceClientError
 from app.schemas.run import (
     ExecutionLogResponse,
+    HumanNodeResumeRequest,
     NodeArtifactResponse,
     NodeRunExecuteRequest,
     NodeRunExecuteResponse,
@@ -58,6 +60,7 @@ def create_run(
     except (
         CodeRunnerClientError,
         ModelGatewayClientError,
+        HumanServiceClientError,
         ToolServiceClientError,
         WorkflowServiceClientError,
     ) as exc:
@@ -182,6 +185,7 @@ def execute_node_run(
     except (
         CodeRunnerClientError,
         ModelGatewayClientError,
+        HumanServiceClientError,
         ToolServiceClientError,
         WorkflowServiceClientError,
     ) as exc:
@@ -214,6 +218,7 @@ def execute_next_node_run(
     except (
         CodeRunnerClientError,
         ModelGatewayClientError,
+        HumanServiceClientError,
         ToolServiceClientError,
         WorkflowServiceClientError,
     ) as exc:
@@ -246,6 +251,7 @@ def execute_run(
     except (
         CodeRunnerClientError,
         ModelGatewayClientError,
+        HumanServiceClientError,
         ToolServiceClientError,
         WorkflowServiceClientError,
     ) as exc:
@@ -262,6 +268,40 @@ def execute_run(
     )
 
 
+@router.post("/node-runs/{node_run_id}/resume-human", response_model=NodeRunExecuteResponse)
+def resume_human_node_run(
+    node_run_id: str,
+    payload: HumanNodeResumeRequest,
+    service: RuntimeApplicationService = Depends(get_runtime_application_service),
+) -> NodeRunExecuteResponse:
+    try:
+        result = service.resume_human_node_run(
+            node_run_id=node_run_id,
+            payload=payload,
+        )
+    except (
+        CodeRunnerClientError,
+        ModelGatewayClientError,
+        HumanServiceClientError,
+        ToolServiceClientError,
+        WorkflowServiceClientError,
+    ) as exc:
+        raise HTTPException(status_code=502, detail=str(exc)) from exc
+
+    if result is None:
+        raise HTTPException(
+            status_code=404,
+            detail=f"human node_run not found or task mismatch: {node_run_id}",
+        )
+
+    workflow_run, node_run, executor_name = result
+    return NodeRunExecuteResponse(
+        run=WorkflowRunResponse.from_entity(workflow_run),
+        node_run=NodeRunResponse.from_entity(node_run),
+        executor_name=executor_name,
+    )
+
+
 @router.post("/workers/execute-next", response_model=WorkerExecuteNextResponse)
 def execute_next_worker_task(
     payload: WorkerExecuteNextRequest,
@@ -276,6 +316,7 @@ def execute_next_worker_task(
     except (
         CodeRunnerClientError,
         ModelGatewayClientError,
+        HumanServiceClientError,
         ToolServiceClientError,
         WorkflowServiceClientError,
     ) as exc:

+ 35 - 0
services/runtime-service/app/application/services.py

@@ -24,6 +24,7 @@ from app.infrastructure.executors import (
     NodeExecutionDispatcher,
     build_node_execution_dispatcher_with_clients,
 )
+from app.infrastructure.human_client import HumanServiceClient
 from app.infrastructure.knowledge_client import KnowledgeServiceClient
 from app.infrastructure.code_runner_client import CodeRunnerClient
 from app.infrastructure.model_gateway_client import ModelGatewayClient
@@ -37,6 +38,7 @@ from app.infrastructure.workflow_client import WorkflowServiceClient
 from app.bootstrap.settings import RuntimeServiceSettings
 from app.schemas.run import (
     NodeRunExecuteRequest,
+    HumanNodeResumeRequest,
     NodeRunStatusUpdateRequest,
     RunCreateRequest,
     RunExecuteRequest,
@@ -492,6 +494,35 @@ class RuntimeApplicationService:
             return None
         return final_run, executed_node_runs, executor_names
 
+    def resume_human_node_run(
+        self,
+        *,
+        node_run_id: str,
+        payload: HumanNodeResumeRequest,
+    ) -> tuple[WorkflowRun, NodeRun, str] | None:
+        node_run = self.node_run_repository.get_by_id(node_run_id)
+        if node_run is None or node_run.tenant_id != payload.tenant_id:
+            return None
+
+        output_json = dict(node_run.output_json or {})
+        existing_human_task_id = output_json.get("human_task_id")
+        if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
+            return None
+
+        if existing_human_task_id is None:
+            output_json["human_task_id"] = payload.human_task_id
+            self.node_run_repository.update_status(
+                node_run_id=node_run.id,
+                status="pending",
+                worker_key=payload.worker_key,
+                output_json=output_json,
+            )
+
+        return self.execute_node_run(
+            node_run_id=node_run_id,
+            payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
+        )
+
     def execute_next_claimed_node_run(
         self,
         *,
@@ -1018,6 +1049,10 @@ def build_runtime_application_service(
             code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
             model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
             tool_client=ToolServiceClient(base_url=settings.tool_service_url),
+            human_client=HumanServiceClient(
+                base_url=settings.human_service_url,
+                timeout_seconds=settings.human_service_timeout_seconds,
+            ),
             knowledge_client=KnowledgeServiceClient(
                 base_url=settings.knowledge_service_url,
                 timeout_seconds=settings.knowledge_service_timeout_seconds,

+ 2 - 0
services/runtime-service/app/bootstrap/settings.py

@@ -9,6 +9,8 @@ class RuntimeServiceSettings(ServiceSettings):
     tool_service_url: str = "http://127.0.0.1:8004"
     model_gateway_service_url: str = "http://127.0.0.1:8005"
     code_runner_service_url: str = "http://127.0.0.1:8006"
+    human_service_url: str = "http://127.0.0.1:8011"
+    human_service_timeout_seconds: float = 10.0
     knowledge_service_url: str = "http://127.0.0.1:8012"
     knowledge_service_timeout_seconds: float = 10.0
     worker_poll_interval_seconds: float = 1.0

+ 2 - 1
services/runtime-service/app/domain/repositories.py

@@ -257,9 +257,10 @@ class NodeRunRepository:
         now = datetime.utcnow()
         if status == "running" and entity.started_time is None:
             entity.started_time = now
+        if status != "running":
+            entity.lease_expire_time = None
         if status in {"completed", "failed", "skipped"}:
             entity.finished_time = now
-            entity.lease_expire_time = None
 
         self.db.commit()
         self.db.refresh(entity)

+ 193 - 0
services/runtime-service/app/infrastructure/executors.py

@@ -1,12 +1,16 @@
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
+from datetime import datetime, timedelta
 import re
+from typing import cast
 
 import httpx
 from core_domain import (
     ChatCompletionRequestContract,
     ChatMessageContract,
     CodeExecutionRequestContract,
+    HumanTaskCreateContract,
+    HumanTaskType,
     KnowledgeSearchRequestContract,
     NodeExecutionContextContract,
     NodeExecutionRequestContract,
@@ -24,6 +28,7 @@ from .context import (
     render_template_string,
     resolve_expression,
 )
+from .human_client import HumanServiceClient, HumanServiceClientError
 from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
 from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
 from .tool_client import ToolServiceClient, ToolServiceClientError
@@ -381,6 +386,142 @@ class CodeNodeExecutor(CompletedNodeExecutor):
         )
 
 
+class HumanNodeExecutor(CompletedNodeExecutor):
+    def __init__(self, human_client: HumanServiceClient | None = None) -> None:
+        super().__init__(
+            executor_name="human-executor",
+            supported_node_types=frozenset(
+                {"human", "approval", "human-input", "human-takeover"}
+            ),
+        )
+        self.human_client = human_client
+
+    def execute(
+        self,
+        context: NodeExecutionContextContract,
+        request: NodeExecutionRequestContract,
+    ) -> NodeExecutionResultContract:
+        worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
+        if self.human_client is None:
+            return NodeExecutionResultContract(
+                status="failed",
+                worker_key=worker_key,
+                error_code="human_service_missing",
+                error_message="human service client is not configured",
+            )
+
+        human_task_id = _resolve_existing_human_task_id(context)
+        if human_task_id is None:
+            return self._create_waiting_task(context=context, worker_key=worker_key)
+
+        try:
+            task = self.human_client.get_task(
+                tenant_id=context.tenant_id,
+                human_task_id=human_task_id,
+            )
+        except HumanServiceClientError as exc:
+            return NodeExecutionResultContract(
+                status="failed",
+                worker_key=worker_key,
+                error_code="human_task_lookup_failed",
+                error_message=str(exc),
+            )
+
+        output_json: dict[str, JSONValue] = {
+            "executor_name": self.executor_name,
+            "human_task_id": task.id,
+            "human_task_status": task.status,
+            "response_payload_json": task.response_payload_json or {},
+        }
+        if task.status in {"pending", "claimed"}:
+            return NodeExecutionResultContract(
+                status="pending",
+                worker_key=worker_key,
+                output_text=f"waiting for human task: {task.id}",
+                output_json=output_json,
+            )
+        if task.status in {"approved", "completed"}:
+            return NodeExecutionResultContract(
+                status="completed",
+                worker_key=worker_key,
+                output_json=output_json,
+            )
+        return NodeExecutionResultContract(
+            status="failed",
+            worker_key=worker_key,
+            error_code=f"human_task_{task.status}",
+            error_message=f"human task ended with status: {task.status}",
+            output_json=output_json,
+        )
+
+    def _create_waiting_task(
+        self,
+        *,
+        context: NodeExecutionContextContract,
+        worker_key: str,
+    ) -> NodeExecutionResultContract:
+        render_context = _build_executor_template_context(context)
+        task_type = _resolve_human_task_type(context.node_type, context.node_config_json)
+        title = render_template_string(
+            _read_string_value(context.node_config_json, "title")
+            or f"Human task for {context.node_id}",
+            render_context,
+        )
+        description_template = _read_string_value(context.node_config_json, "description")
+        description = (
+            render_template_string(description_template, render_context)
+            if description_template is not None
+            else None
+        )
+        request_payload_json = _render_json_dict(
+            _read_dict_value(context.node_config_json, "request_payload_json"),
+            render_context,
+        )
+
+        try:
+            task = self.human_client.create_task(
+                HumanTaskCreateContract(
+                    tenant_id=context.tenant_id,
+                    task_type=task_type,
+                    title=title,
+                    description=description,
+                    source_type="runtime-node",
+                    source_id=context.node_run_id,
+                    run_id=context.run_id,
+                    node_run_id=context.node_run_id,
+                    requested_by=_read_string_value(
+                        context.node_config_json,
+                        "requested_by",
+                    ),
+                    assigned_to=_read_string_value(
+                        context.node_config_json,
+                        "assigned_to",
+                    ),
+                    request_payload_json=request_payload_json,
+                    due_time=_resolve_due_time(context.node_config_json),
+                )
+            )
+        except HumanServiceClientError as exc:
+            return NodeExecutionResultContract(
+                status="failed",
+                worker_key=worker_key,
+                error_code="human_task_create_failed",
+                error_message=str(exc),
+            )
+
+        return NodeExecutionResultContract(
+            status="pending",
+            worker_key=worker_key,
+            output_text=f"waiting for human task: {task.id}",
+            output_json={
+                "executor_name": self.executor_name,
+                "human_task_id": task.id,
+                "human_task_status": task.status,
+                "task_type": task.task_type,
+            },
+        )
+
+
 class AnswerNodeExecutor(CompletedNodeExecutor):
     def execute(
         self,
@@ -692,6 +833,7 @@ def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
         LLMNodeExecutor(),
         ToolNodeExecutor(),
         CodeNodeExecutor(),
+        HumanNodeExecutor(),
         AnswerNodeExecutor(),
         ConditionNodeExecutor(),
         AssignerNodeExecutor(),
@@ -710,11 +852,13 @@ def build_node_execution_dispatcher_with_clients(
     model_gateway_client: ModelGatewayClient | None = None,
     tool_client: ToolServiceClient | None = None,
     knowledge_client: KnowledgeServiceClient | None = None,
+    human_client: HumanServiceClient | None = None,
 ) -> NodeExecutionDispatcher:
     executors: list[NodeExecutor] = [
         LLMNodeExecutor(model_gateway_client=model_gateway_client),
         ToolNodeExecutor(tool_client=tool_client),
         CodeNodeExecutor(code_runner_client=code_runner_client),
+        HumanNodeExecutor(human_client=human_client),
         AnswerNodeExecutor(),
         ConditionNodeExecutor(),
         AssignerNodeExecutor(),
@@ -868,6 +1012,55 @@ def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
     return None
 
 
+def _resolve_existing_human_task_id(context: NodeExecutionContextContract) -> str | None:
+    configured_task_id = _read_string_value(context.node_config_json, "human_task_id")
+    if configured_task_id is not None:
+        return configured_task_id
+
+    current_node_output = context.node_output_json_by_node_id.get(context.node_id)
+    if current_node_output is None:
+        return None
+    return _read_string_value(current_node_output, "human_task_id")
+
+
+def _resolve_human_task_type(
+    node_type: str,
+    payload: dict[str, JSONValue],
+) -> HumanTaskType:
+    configured_task_type = _read_string_value(payload, "task_type")
+    if configured_task_type in {"approval", "input", "takeover", "pause", "resume"}:
+        return cast(HumanTaskType, configured_task_type)
+    if node_type == "approval":
+        return "approval"
+    if node_type == "human-input":
+        return "input"
+    if node_type == "human-takeover":
+        return "takeover"
+    return "input"
+
+
+def _resolve_due_time(payload: dict[str, JSONValue]) -> datetime | None:
+    due_time = _read_datetime_value(payload, "due_time")
+    if due_time is not None:
+        return due_time
+
+    due_after_seconds = _read_int_value(payload, "due_after_seconds")
+    if due_after_seconds is None or due_after_seconds <= 0:
+        return None
+    return datetime.utcnow() + timedelta(seconds=due_after_seconds)
+
+
+def _read_datetime_value(payload: dict[str, JSONValue], key: str) -> datetime | None:
+    value = payload.get(key)
+    if isinstance(value, str) and value.strip():
+        normalized_value = value.strip().replace("Z", "+00:00")
+        try:
+            return datetime.fromisoformat(normalized_value)
+        except ValueError:
+            return None
+    return None
+
+
 def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
     return build_template_context(
         node_id=context.node_id,

+ 39 - 0
services/runtime-service/app/infrastructure/human_client.py

@@ -0,0 +1,39 @@
+import httpx
+
+from core_domain import HumanTaskContract, HumanTaskCreateContract
+
+
+class HumanServiceClientError(Exception):
+    pass
+
+
+class HumanServiceClient:
+    def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
+        self.base_url = base_url.rstrip("/")
+        self.timeout_seconds = timeout_seconds
+
+    def create_task(self, payload: HumanTaskCreateContract) -> HumanTaskContract:
+        try:
+            with httpx.Client(timeout=self.timeout_seconds) as client:
+                response = client.post(
+                    f"{self.base_url}/human/tasks",
+                    json=payload.model_dump(mode="json"),
+                )
+                response.raise_for_status()
+                return HumanTaskContract.model_validate(response.json())
+        except httpx.HTTPError as exc:
+            raise HumanServiceClientError(
+                f"human-service create task failed: {exc}"
+            ) from exc
+
+    def get_task(self, *, tenant_id: str, human_task_id: str) -> HumanTaskContract:
+        try:
+            with httpx.Client(timeout=self.timeout_seconds) as client:
+                response = client.get(
+                    f"{self.base_url}/human/tasks/{human_task_id}",
+                    params={"tenant_id": tenant_id},
+                )
+                response.raise_for_status()
+                return HumanTaskContract.model_validate(response.json())
+        except httpx.HTTPError as exc:
+            raise HumanServiceClientError(f"human-service get task failed: {exc}") from exc

+ 6 - 2
services/runtime-service/app/infrastructure/workflow_client.py

@@ -12,7 +12,12 @@ class WorkflowServiceClient:
         self.base_url = base_url.rstrip("/")
         self.timeout_seconds = timeout_seconds
 
-    def get_workflow_version(self, *, tenant_id: str, workflow_version_id: str) -> WorkflowVersionContract:
+    def get_workflow_version(
+        self,
+        *,
+        tenant_id: str,
+        workflow_version_id: str,
+    ) -> WorkflowVersionContract:
         try:
             with httpx.Client(timeout=self.timeout_seconds) as client:
                 response = client.get(
@@ -23,4 +28,3 @@ class WorkflowServiceClient:
                 return WorkflowVersionContract.model_validate(response.json())
         except httpx.HTTPError as exc:
             raise WorkflowServiceClientError(f"workflow-service request failed: {exc}") from exc
-

+ 6 - 0
services/runtime-service/app/schemas/run.py

@@ -87,6 +87,12 @@ class WorkerExecuteNextResponse(BaseModel):
     released_lease_count: int = 0
 
 
+class HumanNodeResumeRequest(BaseModel):
+    tenant_id: str
+    human_task_id: str
+    worker_key: str | None = None
+
+
 class ExecutionLogResponse(BaseModel):
     id: str
     tenant_id: str