|
|
@@ -1,1260 +0,0 @@
|
|
|
-from dataclasses import dataclass
|
|
|
-from datetime import datetime, timedelta
|
|
|
-
|
|
|
-from core_domain import (
|
|
|
- InitialNodeContract,
|
|
|
- NodeExecutionContextContract,
|
|
|
- NodeExecutionResultContract,
|
|
|
- NodeRunStatus,
|
|
|
- WorkflowRunStatus,
|
|
|
- WorkflowConfigContract,
|
|
|
-)
|
|
|
-from core_dsl import parse_workflow_definition
|
|
|
-from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
|
|
|
-from core_shared import JSONValue, try_build_redis_client
|
|
|
-from core_shared.task_queue import TaskQueuePublisher
|
|
|
-from sqlalchemy.orm import Session
|
|
|
-
|
|
|
-from app.bootstrap.settings import RuntimeServiceSettings
|
|
|
-from app.db.models import ExecutionLog, NodeArtifact, NodeRun, TraceSpan, WorkflowRun
|
|
|
-from app.domain.repositories import (
|
|
|
- ExecutionLogRepository,
|
|
|
- NodeArtifactRepository,
|
|
|
- NodeRunRepository,
|
|
|
- TraceSpanRepository,
|
|
|
- WorkflowRunRepository,
|
|
|
-)
|
|
|
-from app.infrastructure.code_runner_client import CodeRunnerClient
|
|
|
-from app.infrastructure.executors import (
|
|
|
- NodeExecutionDispatcher,
|
|
|
- build_node_execution_dispatcher_with_clients,
|
|
|
-)
|
|
|
-from app.infrastructure.human_client import HumanServiceClient
|
|
|
-from app.infrastructure.knowledge_client import KnowledgeServiceClient
|
|
|
-from app.infrastructure.model_gateway_client import ModelGatewayClient
|
|
|
-from app.infrastructure.planner import (
|
|
|
- derive_initial_node,
|
|
|
- derive_node_config,
|
|
|
- derive_successor_nodes,
|
|
|
-)
|
|
|
-from app.infrastructure.tool_client import ToolServiceClient
|
|
|
-from app.infrastructure.workflow_client import WorkflowServiceClient
|
|
|
-from app.schemas.run import (
|
|
|
- HumanNodeResumeRequest,
|
|
|
- NodeRunExecuteRequest,
|
|
|
- NodeRunStatusUpdateRequest,
|
|
|
- RunCreateRequest,
|
|
|
- RunExecuteRequest,
|
|
|
- RuntimeDebugContinueRequest,
|
|
|
- WorkflowRunStatusUpdateRequest,
|
|
|
-)
|
|
|
-
|
|
|
-
|
|
|
-@dataclass(frozen=True)
|
|
|
-class RuntimeDebugSnapshot:
|
|
|
- run: WorkflowRun
|
|
|
- node_runs: list[NodeRun]
|
|
|
- run_state_json: dict[str, JSONValue]
|
|
|
- node_output_json_by_node_id: dict[str, dict[str, JSONValue]]
|
|
|
- node_output_text_by_node_id: dict[str, str]
|
|
|
- queued_node_ids: list[str]
|
|
|
- running_node_ids: list[str]
|
|
|
- completed_node_ids: list[str]
|
|
|
- failed_node_ids: list[str]
|
|
|
- execution_logs: list[ExecutionLog]
|
|
|
- node_artifacts: list[NodeArtifact]
|
|
|
- trace_spans: list[TraceSpan]
|
|
|
-
|
|
|
-
|
|
|
-class RuntimeApplicationService:
|
|
|
- def __init__(
|
|
|
- self,
|
|
|
- workflow_run_repository: WorkflowRunRepository,
|
|
|
- node_run_repository: NodeRunRepository,
|
|
|
- execution_log_repository: ExecutionLogRepository,
|
|
|
- node_artifact_repository: NodeArtifactRepository,
|
|
|
- trace_span_repository: TraceSpanRepository,
|
|
|
- execution_dispatcher: NodeExecutionDispatcher,
|
|
|
- workflow_client: WorkflowServiceClient | None = None,
|
|
|
- event_client: EventServiceClient | None = None,
|
|
|
- task_queue_publisher: TaskQueuePublisher | None = None) -> None:
|
|
|
- self.workflow_run_repository = workflow_run_repository
|
|
|
- self.node_run_repository = node_run_repository
|
|
|
- self.execution_log_repository = execution_log_repository
|
|
|
- self.node_artifact_repository = node_artifact_repository
|
|
|
- self.trace_span_repository = trace_span_repository
|
|
|
- self.execution_dispatcher = execution_dispatcher
|
|
|
- self.workflow_client = workflow_client
|
|
|
- self.event_client = event_client
|
|
|
- self.task_queue_publisher = task_queue_publisher
|
|
|
-
|
|
|
- def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
|
|
|
- initial_node = payload.initial_node or self._plan_initial_node(payload)
|
|
|
- workflow_run = self.workflow_run_repository.create(
|
|
|
- app_id=payload.app_id,
|
|
|
- app_config_id=payload.app_config_id,
|
|
|
- workflow_id=payload.workflow_id,
|
|
|
- workflow_config_id=payload.workflow_config_id,
|
|
|
- session_id=payload.session_id,
|
|
|
- parent_run_id=payload.parent_run_id,
|
|
|
- root_run_id=payload.root_run_id,
|
|
|
- run_type=payload.run_type,
|
|
|
- trigger_type=payload.trigger_type,
|
|
|
- priority=payload.priority)
|
|
|
-
|
|
|
- node_run = None
|
|
|
- if initial_node is not None:
|
|
|
- self.workflow_run_repository.update_node_count(
|
|
|
- run_id=workflow_run.id,
|
|
|
- current_node_count=1)
|
|
|
- initial_config = self._resolve_node_config(
|
|
|
- workflow_config_id=payload.workflow_config_id,
|
|
|
- node_id=initial_node.node_id)
|
|
|
- scheduled_time, timeout_time = self._build_node_timing(initial_config)
|
|
|
- node_run = self.node_run_repository.create(
|
|
|
- run_id=workflow_run.id,
|
|
|
- node_id=initial_node.node_id,
|
|
|
- node_type=initial_node.node_type,
|
|
|
- status=initial_node.status,
|
|
|
- scheduled_time=scheduled_time,
|
|
|
- timeout_time=timeout_time)
|
|
|
- self._log_event(
|
|
|
- run_id=workflow_run.id,
|
|
|
- node_run_id=node_run.id,
|
|
|
- event_type="node_queued",
|
|
|
- message=f"initial node queued: {initial_node.node_id}",
|
|
|
- detail_json={
|
|
|
- "node_id": initial_node.node_id,
|
|
|
- "node_type": initial_node.node_type,
|
|
|
- "status": initial_node.status,
|
|
|
- })
|
|
|
- self._publish_node_run_to_queue(node_run)
|
|
|
-
|
|
|
- self._log_event(
|
|
|
- run_id=workflow_run.id,
|
|
|
- node_run_id=node_run.id if node_run is not None else None,
|
|
|
- event_type="run_created",
|
|
|
- message="workflow run created",
|
|
|
- detail_json={
|
|
|
- "workflow_id": payload.workflow_id,
|
|
|
- "workflow_config_id": payload.workflow_config_id,
|
|
|
- "session_id": payload.session_id,
|
|
|
- })
|
|
|
- self._publish_event(
|
|
|
- event_type="workflow.run.created",
|
|
|
- workflow_run=workflow_run,
|
|
|
- payload_json={
|
|
|
- "run_id": workflow_run.id,
|
|
|
- "workflow_id": workflow_run.workflow_id,
|
|
|
- "workflow_config_id": workflow_run.workflow_config_id,
|
|
|
- "status": workflow_run.status,
|
|
|
- })
|
|
|
-
|
|
|
- return workflow_run, node_run
|
|
|
-
|
|
|
- def list_runs(
|
|
|
- self,
|
|
|
- session_id: str | None = None,
|
|
|
- *,
|
|
|
- limit: int = 50) -> list[WorkflowRun]:
|
|
|
- return self.workflow_run_repository.list_by_scope(
|
|
|
- session_id=session_id,
|
|
|
- limit=limit)
|
|
|
-
|
|
|
- def list_node_runs(self, run_id: str) -> list[NodeRun]:
|
|
|
- return self.node_run_repository.list_by_run(run_id=run_id)
|
|
|
-
|
|
|
- def list_execution_logs(
|
|
|
- self,
|
|
|
- run_id: str | None = None,
|
|
|
- node_run_id: str | None = None):
|
|
|
- return self.execution_log_repository.list_by_scope(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=node_run_id)
|
|
|
-
|
|
|
- def list_node_artifacts(
|
|
|
- self,
|
|
|
- run_id: str | None = None,
|
|
|
- node_run_id: str | None = None,
|
|
|
- artifact_type: str | None = None):
|
|
|
- return self.node_artifact_repository.list_by_scope(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=node_run_id,
|
|
|
- artifact_type=artifact_type)
|
|
|
-
|
|
|
- def list_trace_spans(
|
|
|
- self,
|
|
|
- run_id: str | None = None,
|
|
|
- node_run_id: str | None = None,
|
|
|
- span_type: str | None = None):
|
|
|
- return self.trace_span_repository.list_by_scope(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=node_run_id,
|
|
|
- span_type=span_type)
|
|
|
-
|
|
|
- def get_debug_snapshot(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str) -> RuntimeDebugSnapshot | None:
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- return self._build_debug_snapshot(workflow_run)
|
|
|
-
|
|
|
- def update_run_status(
|
|
|
- self,
|
|
|
- run_id: str,
|
|
|
- payload: WorkflowRunStatusUpdateRequest) -> WorkflowRun | None:
|
|
|
- entity = self.workflow_run_repository.update_status(
|
|
|
- run_id=run_id,
|
|
|
- status=payload.status,
|
|
|
- error_code=payload.error_code,
|
|
|
- error_message=payload.error_message)
|
|
|
- if entity is not None:
|
|
|
- self._publish_event(
|
|
|
- event_type=f"workflow.run.{entity.status}",
|
|
|
- workflow_run=entity,
|
|
|
- payload_json={
|
|
|
- "run_id": entity.id,
|
|
|
- "status": entity.status,
|
|
|
- "error_code": entity.error_code,
|
|
|
- })
|
|
|
- return entity
|
|
|
-
|
|
|
- def update_node_run_status(
|
|
|
- self,
|
|
|
- node_run_id: str,
|
|
|
- payload: NodeRunStatusUpdateRequest) -> NodeRun | None:
|
|
|
- node_run = self.node_run_repository.update_status(
|
|
|
- node_run_id=node_run_id,
|
|
|
- status=payload.status,
|
|
|
- worker_key=payload.worker_key,
|
|
|
- error_code=payload.error_code,
|
|
|
- error_message=payload.error_message,
|
|
|
- output_text=payload.output_text,
|
|
|
- output_json=payload.output_json)
|
|
|
- if node_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- self._log_event(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_run_id=node_run.id,
|
|
|
- event_type="node_status_updated",
|
|
|
- message=f"node status updated to {payload.status}",
|
|
|
- detail_json={
|
|
|
- "node_id": node_run.node_id,
|
|
|
- "node_type": node_run.node_type,
|
|
|
- "status": payload.status,
|
|
|
- "error_code": payload.error_code,
|
|
|
- })
|
|
|
- self._publish_event(
|
|
|
- event_type=f"workflow.node.{node_run.status}",
|
|
|
- workflow_run=None,
|
|
|
- node_run=node_run,
|
|
|
- payload_json={
|
|
|
- "run_id": node_run.run_id,
|
|
|
- "node_run_id": node_run.id,
|
|
|
- "node_id": node_run.node_id,
|
|
|
- "node_type": node_run.node_type,
|
|
|
- "status": node_run.status,
|
|
|
- "error_code": node_run.error_code,
|
|
|
- })
|
|
|
-
|
|
|
- if payload.status == "completed":
|
|
|
- self._schedule_successor_nodes(node_run)
|
|
|
- if payload.status == "failed":
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
|
|
|
- if workflow_run is not None:
|
|
|
- node_config = self._resolve_node_config(
|
|
|
- workflow_config_id=workflow_run.workflow_config_id,
|
|
|
- node_id=node_run.node_id)
|
|
|
- self._schedule_compensation_node(node_run=node_run, node_config=node_config)
|
|
|
-
|
|
|
- self._sync_workflow_run_status_from_nodes(
|
|
|
- run_id=node_run.run_id)
|
|
|
- return node_run
|
|
|
-
|
|
|
- def execute_node_run(
|
|
|
- self,
|
|
|
- node_run_id: str,
|
|
|
- payload: NodeRunExecuteRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
|
|
|
- node_run = self.node_run_repository.get_by_id(node_run_id)
|
|
|
- if node_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- if workflow_run.status == "paused":
|
|
|
- return workflow_run, node_run, "debug_paused"
|
|
|
-
|
|
|
- if node_run.status in {"completed", "failed", "skipped"}:
|
|
|
- executor_name = self.execution_dispatcher.resolve_executor(
|
|
|
- node_run.node_type
|
|
|
- ).executor_name
|
|
|
- return workflow_run, node_run, executor_name
|
|
|
-
|
|
|
- node_config = self._resolve_node_config(
|
|
|
- workflow_config_id=workflow_run.workflow_config_id,
|
|
|
- node_id=node_run.node_id)
|
|
|
- if self._node_has_timed_out(node_run):
|
|
|
- timed_out_node_run = self.update_node_run_status(
|
|
|
- node_run_id=node_run.id,
|
|
|
- payload=NodeRunStatusUpdateRequest(
|
|
|
- status="failed",
|
|
|
- worker_key=payload.worker_key,
|
|
|
- error_code="node_timeout",
|
|
|
- error_message=f"node timed out: {node_run.node_id}",
|
|
|
- output_json={
|
|
|
- "timeout_time": node_run.timeout_time.isoformat()
|
|
|
- if node_run.timeout_time is not None
|
|
|
- else None,
|
|
|
- }))
|
|
|
- if timed_out_node_run is None:
|
|
|
- return None
|
|
|
- executor_name = self.execution_dispatcher.resolve_executor(
|
|
|
- node_run.node_type
|
|
|
- ).executor_name
|
|
|
- return workflow_run, timed_out_node_run, executor_name
|
|
|
-
|
|
|
- running_node_run = self.node_run_repository.update_status(
|
|
|
- node_run_id=node_run_id,
|
|
|
- status="running",
|
|
|
- worker_key=payload.worker_key)
|
|
|
- if running_node_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- self._log_event(
|
|
|
- run_id=running_node_run.run_id,
|
|
|
- node_run_id=running_node_run.id,
|
|
|
- event_type="node_execution_started",
|
|
|
- message=f"executing node {running_node_run.node_id}",
|
|
|
- detail_json={
|
|
|
- "node_id": running_node_run.node_id,
|
|
|
- "node_type": running_node_run.node_type,
|
|
|
- "worker_key": payload.worker_key,
|
|
|
- })
|
|
|
-
|
|
|
- context = self._build_execution_context(
|
|
|
- workflow_run=workflow_run,
|
|
|
- node_run=running_node_run,
|
|
|
- worker_key=payload.worker_key,
|
|
|
- node_config_json=node_config)
|
|
|
- executor_name = self.execution_dispatcher.resolve_executor(
|
|
|
- running_node_run.node_type
|
|
|
- ).executor_name
|
|
|
- trace_span = self.trace_span_repository.start(
|
|
|
- run_id=running_node_run.run_id,
|
|
|
- node_run_id=running_node_run.id,
|
|
|
- parent_span_id=None,
|
|
|
- span_type="node_execution",
|
|
|
- name=f"{running_node_run.node_type}:{running_node_run.node_id}",
|
|
|
- attributes_json={
|
|
|
- "node_id": running_node_run.node_id,
|
|
|
- "node_type": running_node_run.node_type,
|
|
|
- "executor_name": executor_name,
|
|
|
- "worker_key": payload.worker_key,
|
|
|
- })
|
|
|
-
|
|
|
- try:
|
|
|
- result, executor_name = self.execution_dispatcher.execute(
|
|
|
- context=context,
|
|
|
- request=payload)
|
|
|
- except Exception as exc:
|
|
|
- result = NodeExecutionResultContract(
|
|
|
- status="failed",
|
|
|
- worker_key=payload.worker_key,
|
|
|
- error_code="executor_error",
|
|
|
- error_message=str(exc))
|
|
|
-
|
|
|
- if result.status == "failed" and self._should_retry_node(
|
|
|
- node_run=running_node_run,
|
|
|
- node_config_json=context.node_config_json):
|
|
|
- retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
|
|
|
- retried_node_run = self.node_run_repository.requeue_for_retry(
|
|
|
- node_run_id=running_node_run.id,
|
|
|
- scheduled_time=retry_time,
|
|
|
- timeout_time=retry_timeout_time,
|
|
|
- error_code=result.error_code,
|
|
|
- error_message=result.error_message,
|
|
|
- output_text=result.output_text,
|
|
|
- output_json={
|
|
|
- **(result.output_json or {}),
|
|
|
- "retry_scheduled_time": retry_time.isoformat(),
|
|
|
- "retry_reason": result.error_code or "node_failed",
|
|
|
- })
|
|
|
- if retried_node_run is None:
|
|
|
- return None
|
|
|
- self._publish_node_run_to_queue(retried_node_run)
|
|
|
- self.trace_span_repository.finish(
|
|
|
- span_id=trace_span.id,
|
|
|
- status="error",
|
|
|
- error_code=result.error_code,
|
|
|
- error_message=result.error_message,
|
|
|
- attributes_json={
|
|
|
- "node_status": "queued",
|
|
|
- "executor_name": executor_name,
|
|
|
- "retry_scheduled": True,
|
|
|
- "attempt_no": retried_node_run.attempt_no,
|
|
|
- })
|
|
|
- self._log_event(
|
|
|
- run_id=retried_node_run.run_id,
|
|
|
- node_run_id=retried_node_run.id,
|
|
|
- event_type="node_retry_scheduled",
|
|
|
- message=f"node retry scheduled: {retried_node_run.node_id}",
|
|
|
- detail_json={
|
|
|
- "node_id": retried_node_run.node_id,
|
|
|
- "attempt_no": retried_node_run.attempt_no,
|
|
|
- "scheduled_time": retry_time.isoformat(),
|
|
|
- "error_code": result.error_code,
|
|
|
- })
|
|
|
- self._sync_workflow_run_status_from_nodes(
|
|
|
- run_id=retried_node_run.run_id)
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- return workflow_run, retried_node_run, executor_name
|
|
|
-
|
|
|
- final_node_run = self.update_node_run_status(
|
|
|
- node_run_id=running_node_run.id,
|
|
|
- payload=NodeRunStatusUpdateRequest(
|
|
|
- status=result.status,
|
|
|
- worker_key=result.worker_key,
|
|
|
- error_code=result.error_code,
|
|
|
- error_message=result.error_message,
|
|
|
- output_text=result.output_text,
|
|
|
- output_json=result.output_json))
|
|
|
- if final_node_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- self.trace_span_repository.finish(
|
|
|
- span_id=trace_span.id,
|
|
|
- status="ok" if final_node_run.status == "completed" else "error",
|
|
|
- error_code=final_node_run.error_code,
|
|
|
- error_message=final_node_run.error_message,
|
|
|
- attributes_json={
|
|
|
- "node_status": final_node_run.status,
|
|
|
- "executor_name": executor_name,
|
|
|
- "has_output_text": final_node_run.output_text is not None,
|
|
|
- "has_output_json": final_node_run.output_json is not None,
|
|
|
- })
|
|
|
- self._persist_node_execution_artifact(final_node_run)
|
|
|
-
|
|
|
- self._log_event(
|
|
|
- run_id=final_node_run.run_id,
|
|
|
- node_run_id=final_node_run.id,
|
|
|
- event_type="node_execution_finished",
|
|
|
- message=f"node execution finished with status {final_node_run.status}",
|
|
|
- detail_json={
|
|
|
- "node_id": final_node_run.node_id,
|
|
|
- "node_type": final_node_run.node_type,
|
|
|
- "executor_name": executor_name,
|
|
|
- "status": final_node_run.status,
|
|
|
- })
|
|
|
-
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- return workflow_run, final_node_run, executor_name
|
|
|
-
|
|
|
- def execute_next_node_run(
|
|
|
- self,
|
|
|
- run_id: str,
|
|
|
- payload: NodeRunExecuteRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
|
|
|
- next_node_run = self.node_run_repository.get_next_queued_by_run(
|
|
|
- run_id=run_id)
|
|
|
- if next_node_run is None:
|
|
|
- return None
|
|
|
- return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
|
|
|
-
|
|
|
- def execute_run(
|
|
|
- self,
|
|
|
- run_id: str,
|
|
|
- payload: RunExecuteRequest) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- executed_node_runs: list[NodeRun] = []
|
|
|
- executor_names: list[str] = []
|
|
|
-
|
|
|
- for _ in range(payload.max_steps):
|
|
|
- step_result = self.execute_next_node_run(
|
|
|
- run_id=run_id,
|
|
|
- payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
|
|
|
- if step_result is None:
|
|
|
- break
|
|
|
-
|
|
|
- workflow_run, node_run, executor_name = step_result
|
|
|
- executed_node_runs.append(node_run)
|
|
|
- executor_names.append(executor_name)
|
|
|
-
|
|
|
- if node_run.status != "completed":
|
|
|
- break
|
|
|
-
|
|
|
- final_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if final_run is None:
|
|
|
- return None
|
|
|
- return final_run, executed_node_runs, executor_names
|
|
|
-
|
|
|
- def pause_run(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str,
|
|
|
- reason: str = "debug_pause") -> RuntimeDebugSnapshot | None:
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- self._sync_workflow_run_status_from_nodes(run_id=run_id)
|
|
|
- latest_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if latest_run is None:
|
|
|
- return None
|
|
|
- if latest_run.status in {"completed", "failed", "cancelled"}:
|
|
|
- paused_run = latest_run
|
|
|
- else:
|
|
|
- paused_run = self.workflow_run_repository.update_status(
|
|
|
- run_id=run_id,
|
|
|
- status="paused")
|
|
|
- if paused_run is None:
|
|
|
- return None
|
|
|
- self._log_event(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=None,
|
|
|
- event_type="debug_run_paused",
|
|
|
- message=f"workflow run paused: {reason}",
|
|
|
- detail_json={"reason": reason})
|
|
|
- return self._build_debug_snapshot(paused_run)
|
|
|
-
|
|
|
- def resume_run(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str,
|
|
|
- reason: str = "debug_resume") -> RuntimeDebugSnapshot | None:
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- resumed_run = self.workflow_run_repository.update_status(
|
|
|
- run_id=run_id,
|
|
|
- status="running")
|
|
|
- if resumed_run is None:
|
|
|
- return None
|
|
|
- self._log_event(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=None,
|
|
|
- event_type="debug_run_resumed",
|
|
|
- message=f"workflow run resumed: {reason}",
|
|
|
- detail_json={"reason": reason})
|
|
|
- return self._build_debug_snapshot(resumed_run)
|
|
|
-
|
|
|
- def step_debug_run(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str,
|
|
|
- worker_key: str | None = None) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str] | None:
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- if workflow_run.status == "paused":
|
|
|
- self.workflow_run_repository.update_status(run_id=run_id, status="running")
|
|
|
-
|
|
|
- result = self.execute_next_node_run(
|
|
|
- run_id=run_id,
|
|
|
- payload=NodeRunExecuteRequest(worker_key=worker_key))
|
|
|
- executed_node_runs: list[NodeRun] = []
|
|
|
- executor_names: list[str] = []
|
|
|
- reason = "no_queued_node"
|
|
|
- if result is not None:
|
|
|
- _, node_run, executor_name = result
|
|
|
- executed_node_runs.append(node_run)
|
|
|
- executor_names.append(executor_name)
|
|
|
- reason = "step_completed"
|
|
|
-
|
|
|
- self._sync_workflow_run_status_from_nodes(run_id=run_id)
|
|
|
- latest_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if latest_run is None:
|
|
|
- return None
|
|
|
- if latest_run.status in {"completed", "failed", "cancelled"}:
|
|
|
- paused_run = latest_run
|
|
|
- else:
|
|
|
- paused_run = self.workflow_run_repository.update_status(
|
|
|
- run_id=run_id,
|
|
|
- status="paused")
|
|
|
- if paused_run is None:
|
|
|
- return None
|
|
|
- self._log_event(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
|
|
|
- event_type="debug_step_finished",
|
|
|
- message=f"debug step finished: {reason}",
|
|
|
- detail_json={
|
|
|
- "reason": reason,
|
|
|
- "executed_node_ids": [item.node_id for item in executed_node_runs],
|
|
|
- })
|
|
|
- return self._build_debug_snapshot(paused_run), executed_node_runs, executor_names, reason
|
|
|
-
|
|
|
- def continue_debug_run(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str,
|
|
|
- payload: RuntimeDebugContinueRequest) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str | None, str] | None:
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return None
|
|
|
- if workflow_run.status == "paused":
|
|
|
- self.workflow_run_repository.update_status(run_id=run_id, status="running")
|
|
|
-
|
|
|
- breakpoint_node_ids = set(payload.breakpoint_node_ids)
|
|
|
- executed_node_runs: list[NodeRun] = []
|
|
|
- executor_names: list[str] = []
|
|
|
- paused_before_node_id: str | None = None
|
|
|
- reason = "completed"
|
|
|
-
|
|
|
- for _ in range(payload.max_steps):
|
|
|
- next_node_run = self.node_run_repository.get_next_queued_by_run(
|
|
|
- run_id=run_id)
|
|
|
- if next_node_run is None:
|
|
|
- reason = "no_queued_node"
|
|
|
- break
|
|
|
- if next_node_run.node_id in breakpoint_node_ids:
|
|
|
- paused_before_node_id = next_node_run.node_id
|
|
|
- reason = "breakpoint_hit"
|
|
|
- break
|
|
|
-
|
|
|
- step_result = self.execute_node_run(
|
|
|
- node_run_id=next_node_run.id,
|
|
|
- payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
|
|
|
- if step_result is None:
|
|
|
- reason = "node_not_found"
|
|
|
- break
|
|
|
- _, node_run, executor_name = step_result
|
|
|
- executed_node_runs.append(node_run)
|
|
|
- executor_names.append(executor_name)
|
|
|
- if node_run.status != "completed":
|
|
|
- reason = f"node_{node_run.status}"
|
|
|
- break
|
|
|
- else:
|
|
|
- reason = "max_steps_reached"
|
|
|
-
|
|
|
- self._sync_workflow_run_status_from_nodes(run_id=run_id)
|
|
|
- latest_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
- if latest_run is None:
|
|
|
- return None
|
|
|
- if latest_run.status in {"completed", "failed", "cancelled"}:
|
|
|
- paused_run = latest_run
|
|
|
- else:
|
|
|
- paused_run = self.workflow_run_repository.update_status(
|
|
|
- run_id=run_id,
|
|
|
- status="paused")
|
|
|
- if paused_run is None:
|
|
|
- return None
|
|
|
- self._log_event(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
|
|
|
- event_type="debug_continue_paused",
|
|
|
- message=f"debug continue paused: {reason}",
|
|
|
- detail_json={
|
|
|
- "reason": reason,
|
|
|
- "paused_before_node_id": paused_before_node_id,
|
|
|
- "executed_node_ids": [item.node_id for item in executed_node_runs],
|
|
|
- "breakpoint_node_ids": list(breakpoint_node_ids),
|
|
|
- })
|
|
|
- return (
|
|
|
- self._build_debug_snapshot(paused_run),
|
|
|
- executed_node_runs,
|
|
|
- executor_names,
|
|
|
- paused_before_node_id,
|
|
|
- reason)
|
|
|
-
|
|
|
- def resume_human_node_run(
|
|
|
- self,
|
|
|
- *,
|
|
|
- node_run_id: str,
|
|
|
- payload: HumanNodeResumeRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
|
|
|
- node_run = self.node_run_repository.get_by_id(node_run_id)
|
|
|
- if node_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- output_json = dict(node_run.output_json or {})
|
|
|
- existing_human_task_id = output_json.get("human_task_id")
|
|
|
- if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
|
|
|
- return None
|
|
|
-
|
|
|
- if existing_human_task_id is None:
|
|
|
- output_json["human_task_id"] = payload.human_task_id
|
|
|
- self.node_run_repository.update_status(
|
|
|
- node_run_id=node_run.id,
|
|
|
- status="pending",
|
|
|
- worker_key=payload.worker_key,
|
|
|
- output_json=output_json)
|
|
|
-
|
|
|
- return self.execute_node_run(
|
|
|
- node_run_id=node_run_id,
|
|
|
- payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
|
|
|
-
|
|
|
- def execute_next_claimed_node_run(
|
|
|
- self,
|
|
|
- *,
|
|
|
- worker_key: str,
|
|
|
- lease_seconds: int,
|
|
|
- redis_client: object | None = None) -> tuple[WorkflowRun, NodeRun, str, int] | None:
|
|
|
- released_lease_count = self.node_run_repository.release_expired_leases(
|
|
|
- now_time=datetime.utcnow())
|
|
|
- claimed_node_run = self.node_run_repository.claim_next_queued(
|
|
|
- worker_key=worker_key,
|
|
|
- lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
|
|
|
- if claimed_node_run is None:
|
|
|
- return None
|
|
|
-
|
|
|
- if redis_client is not None:
|
|
|
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
|
|
|
-
|
|
|
- lock = DistributedLock(
|
|
|
- client=redis_client,
|
|
|
- name=f"node-run:{claimed_node_run.id}:lock",
|
|
|
- ttl_seconds=lease_seconds)
|
|
|
- if not lock.acquire():
|
|
|
- return None
|
|
|
- idempotency_store = IdempotencyStore(
|
|
|
- client=redis_client,
|
|
|
- prefix="node-run-idempotency")
|
|
|
- if not idempotency_store.begin(key=claimed_node_run.id):
|
|
|
- lock.release()
|
|
|
- return None
|
|
|
- else:
|
|
|
- lock = None
|
|
|
- idempotency_store = None
|
|
|
-
|
|
|
- try:
|
|
|
- result = self.execute_node_run(
|
|
|
- node_run_id=claimed_node_run.id,
|
|
|
- payload=NodeRunExecuteRequest(worker_key=worker_key))
|
|
|
- if idempotency_store is not None and result is not None:
|
|
|
- _, node_run, executor_name = result
|
|
|
- idempotency_store.complete(
|
|
|
- key=claimed_node_run.id,
|
|
|
- result={
|
|
|
- "status": node_run.status,
|
|
|
- "node_run_id": node_run.id,
|
|
|
- "executor_name": executor_name,
|
|
|
- })
|
|
|
- finally:
|
|
|
- if lock is not None:
|
|
|
- lock.release()
|
|
|
- if result is None:
|
|
|
- return None
|
|
|
-
|
|
|
- workflow_run, node_run, executor_name = result
|
|
|
- return workflow_run, node_run, executor_name, released_lease_count
|
|
|
-
|
|
|
- def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
|
|
|
- if node_run.output_text is None and node_run.output_json is None:
|
|
|
- return
|
|
|
-
|
|
|
- size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
|
|
|
- self.node_artifact_repository.create(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_run_id=node_run.id,
|
|
|
- node_id=node_run.node_id,
|
|
|
- artifact_type="execution_result",
|
|
|
- name=f"{node_run.node_id}-execution-result",
|
|
|
- mime_type="application/json" if node_run.output_json is not None else "text/plain",
|
|
|
- content_text=node_run.output_text,
|
|
|
- content_json=node_run.output_json,
|
|
|
- size_bytes=size_bytes)
|
|
|
-
|
|
|
- def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
|
|
|
- if self.workflow_client is None:
|
|
|
- return None
|
|
|
- workflow_config = self.workflow_client.get_workflow_config(
|
|
|
- workflow_config_id=payload.workflow_config_id)
|
|
|
- return derive_initial_node(workflow_config)
|
|
|
-
|
|
|
- def _resolve_node_config(
|
|
|
- self,
|
|
|
- *,
|
|
|
- workflow_config_id: str,
|
|
|
- node_id: str) -> dict[str, JSONValue]:
|
|
|
- if self.workflow_client is None:
|
|
|
- return {}
|
|
|
- workflow_config = self.workflow_client.get_workflow_config(
|
|
|
- workflow_config_id=workflow_config_id)
|
|
|
- return derive_node_config(workflow_config, node_id)
|
|
|
-
|
|
|
- def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
|
|
|
- if self.workflow_client is None:
|
|
|
- return
|
|
|
-
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return
|
|
|
-
|
|
|
- workflow_config = self.workflow_client.get_workflow_config(
|
|
|
- workflow_config_id=workflow_run.workflow_config_id)
|
|
|
- run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
|
|
|
- self._build_run_state_maps(
|
|
|
- run_id=node_run.run_id)
|
|
|
- )
|
|
|
- successor_nodes = derive_successor_nodes(
|
|
|
- workflow_config,
|
|
|
- node_run.node_id,
|
|
|
- current_output_json=node_run.output_json,
|
|
|
- run_state_json=run_state_json,
|
|
|
- node_output_json_by_node_id=node_output_json_by_node_id,
|
|
|
- node_output_text_by_node_id=node_output_text_by_node_id)
|
|
|
- if not successor_nodes:
|
|
|
- return
|
|
|
-
|
|
|
- existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_ids=[item.node_id for item in successor_nodes])
|
|
|
- existing_node_counts: dict[str, int] = {}
|
|
|
- for item in existing_nodes:
|
|
|
- existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
|
|
|
-
|
|
|
- for successor in successor_nodes:
|
|
|
- successor_config = derive_node_config(workflow_config, successor.node_id)
|
|
|
- if not self._is_join_ready(
|
|
|
- workflow_config=workflow_config,
|
|
|
- run_node_runs=self.node_run_repository.list_by_run(
|
|
|
- run_id=node_run.run_id),
|
|
|
- successor_node_id=successor.node_id,
|
|
|
- successor_node_type=successor.node_type,
|
|
|
- successor_config=successor_config):
|
|
|
- self._log_event(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_run_id=None,
|
|
|
- event_type="join_waiting",
|
|
|
- message=f"join node waiting for predecessors: {successor.node_id}",
|
|
|
- detail_json={
|
|
|
- "node_id": successor.node_id,
|
|
|
- "source_node_id": node_run.node_id,
|
|
|
- })
|
|
|
- continue
|
|
|
- if not self._can_schedule_repeated_node(
|
|
|
- successor_config,
|
|
|
- existing_count=existing_node_counts.get(successor.node_id, 0)):
|
|
|
- continue
|
|
|
- scheduled_time, timeout_time = self._build_node_timing(successor_config)
|
|
|
- created = self.node_run_repository.create(
|
|
|
- run_id=node_run.run_id,
|
|
|
- parent_node_run_id=node_run.id,
|
|
|
- node_id=successor.node_id,
|
|
|
- node_type=successor.node_type,
|
|
|
- status=successor.status,
|
|
|
- scheduled_time=scheduled_time,
|
|
|
- timeout_time=timeout_time)
|
|
|
- self._publish_node_run_to_queue(created)
|
|
|
- existing_node_counts[successor.node_id] = (
|
|
|
- existing_node_counts.get(successor.node_id, 0) + 1
|
|
|
- )
|
|
|
- self._log_event(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_run_id=None,
|
|
|
- event_type="node_queued",
|
|
|
- message=f"successor node queued: {successor.node_id}",
|
|
|
- detail_json={
|
|
|
- "node_id": successor.node_id,
|
|
|
- "node_type": successor.node_type,
|
|
|
- "status": successor.status,
|
|
|
- "source_node_id": node_run.node_id,
|
|
|
- })
|
|
|
-
|
|
|
- def _build_execution_context(
|
|
|
- self,
|
|
|
- *,
|
|
|
- workflow_run: WorkflowRun,
|
|
|
- node_run: NodeRun,
|
|
|
- worker_key: str | None,
|
|
|
- node_config_json: dict[str, JSONValue] | None = None) -> NodeExecutionContextContract:
|
|
|
- run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
|
|
|
- self._build_run_state_maps(
|
|
|
- run_id=node_run.run_id)
|
|
|
- )
|
|
|
- return NodeExecutionContextContract(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_run_id=node_run.id,
|
|
|
- node_id=node_run.node_id,
|
|
|
- node_type=node_run.node_type,
|
|
|
- node_config_json=node_config_json
|
|
|
- if node_config_json is not None
|
|
|
- else self._resolve_node_config(
|
|
|
- workflow_config_id=workflow_run.workflow_config_id,
|
|
|
- node_id=node_run.node_id),
|
|
|
- run_state_json=run_state_json,
|
|
|
- node_output_json_by_node_id=node_output_json_by_node_id,
|
|
|
- node_output_text_by_node_id=node_output_text_by_node_id,
|
|
|
- worker_key=worker_key)
|
|
|
-
|
|
|
- def _build_run_state_maps(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str) -> tuple[
|
|
|
- dict[str, JSONValue],
|
|
|
- dict[str, dict[str, JSONValue]],
|
|
|
- dict[str, str],
|
|
|
- ]:
|
|
|
- node_runs = self.node_run_repository.list_by_run(run_id=run_id)
|
|
|
- run_state_json: dict[str, JSONValue] = {}
|
|
|
- node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
|
|
|
- node_output_text_by_node_id: dict[str, str] = {}
|
|
|
-
|
|
|
- for item in node_runs:
|
|
|
- if item.output_json is not None:
|
|
|
- node_output_json_by_node_id[item.node_id] = dict(item.output_json)
|
|
|
-
|
|
|
- state_updates = item.output_json.get("state_updates")
|
|
|
- if isinstance(state_updates, dict):
|
|
|
- for state_key, state_value in state_updates.items():
|
|
|
- run_state_json[str(state_key)] = state_value
|
|
|
-
|
|
|
- if item.output_text is not None:
|
|
|
- node_output_text_by_node_id[item.node_id] = item.output_text
|
|
|
-
|
|
|
- return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
|
|
|
-
|
|
|
- def _build_debug_snapshot(self, workflow_run: WorkflowRun) -> RuntimeDebugSnapshot:
|
|
|
- node_runs = self.node_run_repository.list_by_run(
|
|
|
- run_id=workflow_run.id)
|
|
|
- run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
|
|
|
- self._build_run_state_maps(
|
|
|
- run_id=workflow_run.id)
|
|
|
- )
|
|
|
- return RuntimeDebugSnapshot(
|
|
|
- run=workflow_run,
|
|
|
- node_runs=node_runs,
|
|
|
- run_state_json=run_state_json,
|
|
|
- node_output_json_by_node_id=node_output_json_by_node_id,
|
|
|
- node_output_text_by_node_id=node_output_text_by_node_id,
|
|
|
- queued_node_ids=[
|
|
|
- item.node_id for item in node_runs if item.status in {"pending", "queued"}
|
|
|
- ],
|
|
|
- running_node_ids=[item.node_id for item in node_runs if item.status == "running"],
|
|
|
- completed_node_ids=[
|
|
|
- item.node_id for item in node_runs if item.status in {"completed", "skipped"}
|
|
|
- ],
|
|
|
- failed_node_ids=[item.node_id for item in node_runs if item.status == "failed"],
|
|
|
- execution_logs=self.execution_log_repository.list_by_scope(
|
|
|
- run_id=workflow_run.id),
|
|
|
- node_artifacts=self.node_artifact_repository.list_by_scope(
|
|
|
- run_id=workflow_run.id),
|
|
|
- trace_spans=self.trace_span_repository.list_by_scope(
|
|
|
- run_id=workflow_run.id))
|
|
|
-
|
|
|
- def _build_node_timing(
|
|
|
- self,
|
|
|
- node_config_json: dict[str, JSONValue]) -> tuple[datetime, datetime | None]:
|
|
|
- now = datetime.utcnow()
|
|
|
- delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
|
|
|
- timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
|
|
|
- scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
|
|
|
- timeout_time = (
|
|
|
- scheduled_time + timedelta(seconds=timeout_seconds)
|
|
|
- if timeout_seconds > 0
|
|
|
- else None
|
|
|
- )
|
|
|
- return scheduled_time, timeout_time
|
|
|
-
|
|
|
- def _node_has_timed_out(self, node_run: NodeRun) -> bool:
|
|
|
- return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
|
|
|
-
|
|
|
- def _should_retry_node(
|
|
|
- self,
|
|
|
- *,
|
|
|
- node_run: NodeRun,
|
|
|
- node_config_json: dict[str, JSONValue]) -> bool:
|
|
|
- retry_policy = self._read_dict_value(node_config_json, "retry_policy")
|
|
|
- max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
|
|
|
- return max_attempts > node_run.attempt_no
|
|
|
-
|
|
|
- def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
|
|
|
- retry_policy = self._read_dict_value(node_config_json, "retry_policy")
|
|
|
- return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
|
|
|
-
|
|
|
- def _build_retry_timing(
|
|
|
- self,
|
|
|
- node_config_json: dict[str, JSONValue]) -> tuple[datetime, datetime | None]:
|
|
|
- retry_time = datetime.utcnow() + timedelta(
|
|
|
- seconds=self._read_retry_delay_seconds(node_config_json)
|
|
|
- )
|
|
|
- timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
|
|
|
- timeout_time = (
|
|
|
- retry_time + timedelta(seconds=timeout_seconds)
|
|
|
- if timeout_seconds > 0
|
|
|
- else None
|
|
|
- )
|
|
|
- return retry_time, timeout_time
|
|
|
-
|
|
|
- def _is_join_ready(
|
|
|
- self,
|
|
|
- *,
|
|
|
- workflow_config: WorkflowConfigContract,
|
|
|
- run_node_runs: list[NodeRun],
|
|
|
- successor_node_id: str,
|
|
|
- successor_node_type: str,
|
|
|
- successor_config: dict[str, JSONValue]) -> bool:
|
|
|
- join_policy = self._read_string_value(successor_config, "join_policy")
|
|
|
- if join_policy is None and successor_node_type != "join":
|
|
|
- return True
|
|
|
- workflow = self._parse_workflow(workflow_config)
|
|
|
- if workflow is None:
|
|
|
- return True
|
|
|
- predecessor_ids = [
|
|
|
- edge.source for edge in workflow.edges if edge.target == successor_node_id
|
|
|
- ]
|
|
|
- if not predecessor_ids:
|
|
|
- return True
|
|
|
- completed_node_ids = {
|
|
|
- item.node_id
|
|
|
- for item in run_node_runs
|
|
|
- if item.status in {"completed", "skipped"}
|
|
|
- }
|
|
|
- if join_policy in {None, "all_completed"}:
|
|
|
- return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
|
|
|
- if join_policy == "any_completed":
|
|
|
- return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
|
|
|
- return True
|
|
|
-
|
|
|
- def _can_schedule_repeated_node(
|
|
|
- self,
|
|
|
- node_config_json: dict[str, JSONValue],
|
|
|
- *,
|
|
|
- existing_count: int) -> bool:
|
|
|
- if existing_count == 0:
|
|
|
- return True
|
|
|
- allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
|
|
|
- max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
|
|
|
- return allow_loop and existing_count < max_iterations
|
|
|
-
|
|
|
- def _schedule_compensation_node(
|
|
|
- self,
|
|
|
- *,
|
|
|
- node_run: NodeRun,
|
|
|
- node_config: dict[str, JSONValue]) -> None:
|
|
|
- compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
|
|
|
- if compensation_node_id is None:
|
|
|
- compensation_config = self._read_dict_value(node_config, "compensation")
|
|
|
- compensation_node_id = self._read_string_value(compensation_config, "node_id")
|
|
|
- if compensation_node_id is None:
|
|
|
- return
|
|
|
-
|
|
|
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
|
|
|
- if workflow_run is None:
|
|
|
- return
|
|
|
- compensation_config = self._resolve_node_config(
|
|
|
- workflow_config_id=workflow_run.workflow_config_id,
|
|
|
- node_id=compensation_node_id)
|
|
|
- existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_ids=[compensation_node_id])
|
|
|
- if existing_nodes and not self._can_schedule_repeated_node(
|
|
|
- compensation_config,
|
|
|
- existing_count=len(existing_nodes)):
|
|
|
- return
|
|
|
- compensation_node_type = self._resolve_workflow_node_type(
|
|
|
- workflow_config_id=workflow_run.workflow_config_id,
|
|
|
- node_id=compensation_node_id) or "compensation"
|
|
|
- scheduled_time, timeout_time = self._build_node_timing(compensation_config)
|
|
|
- created = self.node_run_repository.create(
|
|
|
- run_id=node_run.run_id,
|
|
|
- parent_node_run_id=node_run.id,
|
|
|
- node_id=compensation_node_id,
|
|
|
- node_type=compensation_node_type,
|
|
|
- status="queued",
|
|
|
- scheduled_time=scheduled_time,
|
|
|
- timeout_time=timeout_time)
|
|
|
- self._publish_node_run_to_queue(created)
|
|
|
- self._log_event(
|
|
|
- run_id=node_run.run_id,
|
|
|
- node_run_id=created.id,
|
|
|
- event_type="compensation_queued",
|
|
|
- message=f"compensation node queued: {compensation_node_id}",
|
|
|
- detail_json={
|
|
|
- "failed_node_id": node_run.node_id,
|
|
|
- "compensation_node_id": compensation_node_id,
|
|
|
- })
|
|
|
-
|
|
|
- def _parse_workflow(self, workflow_config: WorkflowConfigContract):
|
|
|
- return parse_workflow_definition(workflow_config.dsl_json)
|
|
|
-
|
|
|
- def _resolve_workflow_node_type(
|
|
|
- self,
|
|
|
- *,
|
|
|
- workflow_config_id: str,
|
|
|
- node_id: str) -> str | None:
|
|
|
- if self.workflow_client is None:
|
|
|
- return None
|
|
|
- workflow_config = self.workflow_client.get_workflow_config(
|
|
|
- workflow_config_id=workflow_config_id)
|
|
|
- workflow = self._parse_workflow(workflow_config)
|
|
|
- if workflow is None:
|
|
|
- return None
|
|
|
- for node in workflow.nodes:
|
|
|
- if node.id == node_id:
|
|
|
- return node.type
|
|
|
- return None
|
|
|
-
|
|
|
- def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
|
|
|
- value = payload.get(key)
|
|
|
- if isinstance(value, str) and value:
|
|
|
- return value
|
|
|
- return None
|
|
|
-
|
|
|
- def _read_bool_value(
|
|
|
- self,
|
|
|
- payload: dict[str, JSONValue],
|
|
|
- key: str,
|
|
|
- *,
|
|
|
- default: bool) -> bool:
|
|
|
- value = payload.get(key)
|
|
|
- if isinstance(value, bool):
|
|
|
- return value
|
|
|
- return default
|
|
|
-
|
|
|
- def _read_int_value(
|
|
|
- self,
|
|
|
- payload: dict[str, JSONValue],
|
|
|
- key: str,
|
|
|
- *,
|
|
|
- default: int) -> int:
|
|
|
- value = payload.get(key)
|
|
|
- if isinstance(value, int) and not isinstance(value, bool):
|
|
|
- return value
|
|
|
- return default
|
|
|
-
|
|
|
- def _read_dict_value(
|
|
|
- self,
|
|
|
- payload: dict[str, JSONValue],
|
|
|
- key: str) -> dict[str, JSONValue]:
|
|
|
- value = payload.get(key)
|
|
|
- if isinstance(value, dict):
|
|
|
- return {str(item_key): item_value for item_key, item_value in value.items()}
|
|
|
- return {}
|
|
|
-
|
|
|
- def _sync_workflow_run_status_from_nodes(self, *, run_id: str) -> None:
|
|
|
- node_runs = self.node_run_repository.list_by_run(run_id=run_id)
|
|
|
- if not node_runs:
|
|
|
- return
|
|
|
-
|
|
|
- self.workflow_run_repository.update_node_count(
|
|
|
- run_id=run_id,
|
|
|
- current_node_count=len(node_runs))
|
|
|
-
|
|
|
- next_status, error_code, error_message = self._derive_run_status(node_runs)
|
|
|
- self.workflow_run_repository.update_status(
|
|
|
- run_id=run_id,
|
|
|
- status=next_status,
|
|
|
- error_code=error_code,
|
|
|
- error_message=error_message)
|
|
|
- self._log_event(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=None,
|
|
|
- event_type="run_status_synced",
|
|
|
- message=f"workflow run status synced to {next_status}",
|
|
|
- detail_json={
|
|
|
- "status": next_status,
|
|
|
- "error_code": error_code,
|
|
|
- })
|
|
|
-
|
|
|
- def _derive_run_status(
|
|
|
- self,
|
|
|
- node_runs: list[NodeRun]) -> tuple[WorkflowRunStatus, str | None, str | None]:
|
|
|
- statuses = {node_run.status for node_run in node_runs}
|
|
|
-
|
|
|
- active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
|
|
|
- if statuses.intersection(active_statuses):
|
|
|
- return "running", None, None
|
|
|
-
|
|
|
- if "failed" in statuses:
|
|
|
- failed_node = next((item for item in node_runs if item.status == "failed"), None)
|
|
|
- error_code = failed_node.error_code if failed_node is not None else None
|
|
|
- error_message = failed_node.error_message if failed_node is not None else None
|
|
|
- return "failed", error_code, error_message
|
|
|
-
|
|
|
- terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
|
|
|
- if statuses and statuses.issubset(terminal_statuses):
|
|
|
- return "completed", None, None
|
|
|
-
|
|
|
- return "running", None, None
|
|
|
-
|
|
|
- def _log_event(
|
|
|
- self,
|
|
|
- *,
|
|
|
- run_id: str,
|
|
|
- node_run_id: str | None,
|
|
|
- event_type: str,
|
|
|
- message: str,
|
|
|
- detail_json: dict[str, JSONValue] | None,
|
|
|
- level: str = "info") -> None:
|
|
|
- self.execution_log_repository.create(
|
|
|
- run_id=run_id,
|
|
|
- node_run_id=node_run_id,
|
|
|
- event_type=event_type,
|
|
|
- level=level,
|
|
|
- message=message,
|
|
|
- detail_json=detail_json)
|
|
|
-
|
|
|
- def _publish_event(
|
|
|
- self,
|
|
|
- *,
|
|
|
- event_type: str,
|
|
|
- payload_json: dict[str, JSONValue],
|
|
|
- workflow_run: WorkflowRun | None = None,
|
|
|
- node_run: NodeRun | None = None) -> None:
|
|
|
- if self.event_client is None:
|
|
|
- return
|
|
|
- aggregate_id = workflow_run.id if workflow_run is not None else None
|
|
|
- if aggregate_id is None and node_run is not None:
|
|
|
- aggregate_id = node_run.id
|
|
|
- aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
|
|
|
- correlation_id = workflow_run.session_id if workflow_run is not None else None
|
|
|
- if correlation_id is None and node_run is not None:
|
|
|
- correlation_id = node_run.run_id
|
|
|
- try:
|
|
|
- self.event_client.publish_event(
|
|
|
- EventPublishContract(
|
|
|
- event_type=event_type,
|
|
|
- source_service="runtime-service",
|
|
|
- aggregate_type=aggregate_type,
|
|
|
- aggregate_id=aggregate_id,
|
|
|
- correlation_id=correlation_id,
|
|
|
- payload_json=payload_json)
|
|
|
- )
|
|
|
- except EventServiceClientError:
|
|
|
- return
|
|
|
-
|
|
|
- def _publish_node_run_to_queue(self, node_run: NodeRun) -> None:
|
|
|
- if node_run.status != "queued" or self.task_queue_publisher is None:
|
|
|
- return
|
|
|
- self.task_queue_publisher.publish_runtime_node_run(
|
|
|
- node_run_id=node_run.id)
|
|
|
-
|
|
|
-
|
|
|
-def build_runtime_application_service(
|
|
|
- *,
|
|
|
- db: Session,
|
|
|
- settings: RuntimeServiceSettings) -> RuntimeApplicationService:
|
|
|
- redis_client = try_build_redis_client(settings.redis_url)
|
|
|
- return RuntimeApplicationService(
|
|
|
- workflow_run_repository=WorkflowRunRepository(db),
|
|
|
- node_run_repository=NodeRunRepository(db),
|
|
|
- execution_log_repository=ExecutionLogRepository(db),
|
|
|
- node_artifact_repository=NodeArtifactRepository(db),
|
|
|
- trace_span_repository=TraceSpanRepository(db),
|
|
|
- execution_dispatcher=build_node_execution_dispatcher_with_clients(
|
|
|
- code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
|
|
|
- model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
|
|
|
- tool_client=ToolServiceClient(base_url=settings.tool_service_url),
|
|
|
- human_client=HumanServiceClient(
|
|
|
- base_url=settings.human_service_url,
|
|
|
- timeout_seconds=settings.human_service_timeout_seconds),
|
|
|
- knowledge_client=KnowledgeServiceClient(
|
|
|
- base_url=settings.knowledge_service_url,
|
|
|
- timeout_seconds=settings.knowledge_service_timeout_seconds)),
|
|
|
- workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
|
|
|
- event_client=EventServiceClient(
|
|
|
- base_url=settings.event_service_url,
|
|
|
- timeout_seconds=settings.event_service_timeout_seconds),
|
|
|
- task_queue_publisher=(
|
|
|
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
|
|
|
- ))
|