from datetime import datetime, timedelta from sqlalchemy.orm import Session from core_dsl import parse_workflow_definition from core_domain import ( InitialNodeContract, NodeExecutionContextContract, NodeExecutionResultContract, NodeRunStatus, WorkflowVersionContract, WorkflowRunStatus, ) from app.db.models import NodeRun, WorkflowRun from app.domain.repositories import ( ExecutionLogRepository, NodeArtifactRepository, NodeRunRepository, TraceSpanRepository, WorkflowRunRepository, ) from app.infrastructure.executors import ( NodeExecutionDispatcher, build_node_execution_dispatcher_with_clients, ) from app.infrastructure.code_runner_client import CodeRunnerClient from app.infrastructure.model_gateway_client import ModelGatewayClient from app.infrastructure.planner import ( derive_initial_node, derive_node_config, derive_successor_nodes, ) from app.infrastructure.tool_client import ToolServiceClient from app.infrastructure.workflow_client import WorkflowServiceClient from app.bootstrap.settings import RuntimeServiceSettings from app.schemas.run import ( NodeRunExecuteRequest, NodeRunStatusUpdateRequest, RunCreateRequest, RunExecuteRequest, WorkflowRunStatusUpdateRequest, ) from core_shared import JSONValue class RuntimeApplicationService: def __init__( self, workflow_run_repository: WorkflowRunRepository, node_run_repository: NodeRunRepository, execution_log_repository: ExecutionLogRepository, node_artifact_repository: NodeArtifactRepository, trace_span_repository: TraceSpanRepository, execution_dispatcher: NodeExecutionDispatcher, workflow_client: WorkflowServiceClient | None = None, ) -> None: self.workflow_run_repository = workflow_run_repository self.node_run_repository = node_run_repository self.execution_log_repository = execution_log_repository self.node_artifact_repository = node_artifact_repository self.trace_span_repository = trace_span_repository self.execution_dispatcher = execution_dispatcher self.workflow_client = workflow_client def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]: initial_node = payload.initial_node or self._plan_initial_node(payload) workflow_run = self.workflow_run_repository.create( tenant_id=payload.tenant_id, app_id=payload.app_id, app_version_id=payload.app_version_id, workflow_id=payload.workflow_id, workflow_version_id=payload.workflow_version_id, session_id=payload.session_id, parent_run_id=payload.parent_run_id, root_run_id=payload.root_run_id, run_type=payload.run_type, trigger_type=payload.trigger_type, priority=payload.priority, ) node_run = None if initial_node is not None: self.workflow_run_repository.update_node_count( run_id=workflow_run.id, current_node_count=1, ) initial_config = self._resolve_node_config( tenant_id=payload.tenant_id, workflow_version_id=payload.workflow_version_id, node_id=initial_node.node_id, ) scheduled_time, timeout_time = self._build_node_timing(initial_config) node_run = self.node_run_repository.create( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_id=initial_node.node_id, node_type=initial_node.node_type, status=initial_node.status, scheduled_time=scheduled_time, timeout_time=timeout_time, ) self._log_event( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_run_id=node_run.id, event_type="node_queued", message=f"initial node queued: {initial_node.node_id}", detail_json={ "node_id": initial_node.node_id, "node_type": initial_node.node_type, "status": initial_node.status, }, ) self._log_event( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_run_id=node_run.id if node_run is not None else None, event_type="run_created", message="workflow run created", detail_json={ "workflow_id": payload.workflow_id, "workflow_version_id": payload.workflow_version_id, "session_id": payload.session_id, }, ) return workflow_run, node_run def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]: return self.workflow_run_repository.list_by_scope( tenant_id=tenant_id, session_id=session_id, ) def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]: return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) def list_execution_logs( self, tenant_id: str, run_id: str | None = None, node_run_id: str | None = None, ): return self.execution_log_repository.list_by_scope( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, ) def list_node_artifacts( self, tenant_id: str, run_id: str | None = None, node_run_id: str | None = None, artifact_type: str | None = None, ): return self.node_artifact_repository.list_by_scope( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, artifact_type=artifact_type, ) def list_trace_spans( self, tenant_id: str, run_id: str | None = None, node_run_id: str | None = None, span_type: str | None = None, ): return self.trace_span_repository.list_by_scope( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, span_type=span_type, ) def update_run_status( self, run_id: str, payload: WorkflowRunStatusUpdateRequest, ) -> WorkflowRun | None: return self.workflow_run_repository.update_status( run_id=run_id, status=payload.status, error_code=payload.error_code, error_message=payload.error_message, ) def update_node_run_status( self, node_run_id: str, payload: NodeRunStatusUpdateRequest, ) -> NodeRun | None: node_run = self.node_run_repository.update_status( node_run_id=node_run_id, status=payload.status, worker_key=payload.worker_key, error_code=payload.error_code, error_message=payload.error_message, output_text=payload.output_text, output_json=payload.output_json, ) if node_run is None: return None self._log_event( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=node_run.id, event_type="node_status_updated", message=f"node status updated to {payload.status}", detail_json={ "node_id": node_run.node_id, "node_type": node_run.node_type, "status": payload.status, "error_code": payload.error_code, }, ) if payload.status == "completed": self._schedule_successor_nodes(node_run) if payload.status == "failed": workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is not None: node_config = self._resolve_node_config( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, node_id=node_run.node_id, ) self._schedule_compensation_node(node_run=node_run, node_config=node_config) self._sync_workflow_run_status_from_nodes( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) return node_run def execute_node_run( self, node_run_id: str, payload: NodeRunExecuteRequest, ) -> tuple[WorkflowRun, NodeRun, str] | None: node_run = self.node_run_repository.get_by_id(node_run_id) if node_run is None: return None workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is None: return None if node_run.status in {"completed", "failed", "skipped"}: executor_name = self.execution_dispatcher.resolve_executor( node_run.node_type ).executor_name return workflow_run, node_run, executor_name node_config = self._resolve_node_config( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, node_id=node_run.node_id, ) if self._node_has_timed_out(node_run): timed_out_node_run = self.update_node_run_status( node_run_id=node_run.id, payload=NodeRunStatusUpdateRequest( status="failed", worker_key=payload.worker_key, error_code="node_timeout", error_message=f"node timed out: {node_run.node_id}", output_json={ "timeout_time": node_run.timeout_time.isoformat() if node_run.timeout_time is not None else None, }, ), ) if timed_out_node_run is None: return None executor_name = self.execution_dispatcher.resolve_executor( node_run.node_type ).executor_name return workflow_run, timed_out_node_run, executor_name running_node_run = self.node_run_repository.update_status( node_run_id=node_run_id, status="running", worker_key=payload.worker_key, ) if running_node_run is None: return None self._log_event( tenant_id=running_node_run.tenant_id, run_id=running_node_run.run_id, node_run_id=running_node_run.id, event_type="node_execution_started", message=f"executing node {running_node_run.node_id}", detail_json={ "node_id": running_node_run.node_id, "node_type": running_node_run.node_type, "worker_key": payload.worker_key, }, ) context = self._build_execution_context( workflow_run=workflow_run, node_run=running_node_run, worker_key=payload.worker_key, node_config_json=node_config, ) executor_name = self.execution_dispatcher.resolve_executor( running_node_run.node_type ).executor_name trace_span = self.trace_span_repository.start( tenant_id=running_node_run.tenant_id, run_id=running_node_run.run_id, node_run_id=running_node_run.id, parent_span_id=None, span_type="node_execution", name=f"{running_node_run.node_type}:{running_node_run.node_id}", attributes_json={ "node_id": running_node_run.node_id, "node_type": running_node_run.node_type, "executor_name": executor_name, "worker_key": payload.worker_key, }, ) try: result, executor_name = self.execution_dispatcher.execute( context=context, request=payload, ) except Exception as exc: result = NodeExecutionResultContract( status="failed", worker_key=payload.worker_key, error_code="executor_error", error_message=str(exc), ) if result.status == "failed" and self._should_retry_node( node_run=running_node_run, node_config_json=context.node_config_json, ): retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json) retried_node_run = self.node_run_repository.requeue_for_retry( node_run_id=running_node_run.id, scheduled_time=retry_time, timeout_time=retry_timeout_time, error_code=result.error_code, error_message=result.error_message, output_text=result.output_text, output_json={ **(result.output_json or {}), "retry_scheduled_time": retry_time.isoformat(), "retry_reason": result.error_code or "node_failed", }, ) if retried_node_run is None: return None self.trace_span_repository.finish( span_id=trace_span.id, status="error", error_code=result.error_code, error_message=result.error_message, attributes_json={ "node_status": "queued", "executor_name": executor_name, "retry_scheduled": True, "attempt_no": retried_node_run.attempt_no, }, ) self._log_event( tenant_id=retried_node_run.tenant_id, run_id=retried_node_run.run_id, node_run_id=retried_node_run.id, event_type="node_retry_scheduled", message=f"node retry scheduled: {retried_node_run.node_id}", detail_json={ "node_id": retried_node_run.node_id, "attempt_no": retried_node_run.attempt_no, "scheduled_time": retry_time.isoformat(), "error_code": result.error_code, }, ) self._sync_workflow_run_status_from_nodes( tenant_id=retried_node_run.tenant_id, run_id=retried_node_run.run_id, ) workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id) if workflow_run is None: return None return workflow_run, retried_node_run, executor_name final_node_run = self.update_node_run_status( node_run_id=running_node_run.id, payload=NodeRunStatusUpdateRequest( status=result.status, worker_key=result.worker_key, error_code=result.error_code, error_message=result.error_message, output_text=result.output_text, output_json=result.output_json, ), ) if final_node_run is None: return None self.trace_span_repository.finish( span_id=trace_span.id, status="ok" if final_node_run.status == "completed" else "error", error_code=final_node_run.error_code, error_message=final_node_run.error_message, attributes_json={ "node_status": final_node_run.status, "executor_name": executor_name, "has_output_text": final_node_run.output_text is not None, "has_output_json": final_node_run.output_json is not None, }, ) self._persist_node_execution_artifact(final_node_run) self._log_event( tenant_id=final_node_run.tenant_id, run_id=final_node_run.run_id, node_run_id=final_node_run.id, event_type="node_execution_finished", message=f"node execution finished with status {final_node_run.status}", detail_json={ "node_id": final_node_run.node_id, "node_type": final_node_run.node_type, "executor_name": executor_name, "status": final_node_run.status, }, ) workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id) if workflow_run is None: return None return workflow_run, final_node_run, executor_name def execute_next_node_run( self, tenant_id: str, run_id: str, payload: NodeRunExecuteRequest, ) -> tuple[WorkflowRun, NodeRun, str] | None: next_node_run = self.node_run_repository.get_next_queued_by_run( tenant_id=tenant_id, run_id=run_id, ) if next_node_run is None: return None return self.execute_node_run(node_run_id=next_node_run.id, payload=payload) def execute_run( self, tenant_id: str, run_id: str, payload: RunExecuteRequest, ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None: workflow_run = self.workflow_run_repository.get_by_id(run_id) if workflow_run is None or workflow_run.tenant_id != tenant_id: return None executed_node_runs: list[NodeRun] = [] executor_names: list[str] = [] for _ in range(payload.max_steps): step_result = self.execute_next_node_run( tenant_id=tenant_id, run_id=run_id, payload=NodeRunExecuteRequest(worker_key=payload.worker_key), ) if step_result is None: break workflow_run, node_run, executor_name = step_result executed_node_runs.append(node_run) executor_names.append(executor_name) if node_run.status != "completed": break final_run = self.workflow_run_repository.get_by_id(run_id) if final_run is None: return None return final_run, executed_node_runs, executor_names def execute_next_claimed_node_run( self, *, worker_key: str, lease_seconds: int, ) -> tuple[WorkflowRun, NodeRun, str, int] | None: released_lease_count = self.node_run_repository.release_expired_leases( now_time=datetime.utcnow(), ) claimed_node_run = self.node_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds), ) if claimed_node_run is None: return None result = self.execute_node_run( node_run_id=claimed_node_run.id, payload=NodeRunExecuteRequest(worker_key=worker_key), ) if result is None: return None workflow_run, node_run, executor_name = result return workflow_run, node_run, executor_name, released_lease_count def _persist_node_execution_artifact(self, node_run: NodeRun) -> None: if node_run.output_text is None and node_run.output_json is None: return size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None self.node_artifact_repository.create( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=node_run.id, node_id=node_run.node_id, artifact_type="execution_result", name=f"{node_run.node_id}-execution-result", mime_type="application/json" if node_run.output_json is not None else "text/plain", content_text=node_run.output_text, content_json=node_run.output_json, size_bytes=size_bytes, ) def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None: if self.workflow_client is None: return None workflow_version = self.workflow_client.get_workflow_version( tenant_id=payload.tenant_id, workflow_version_id=payload.workflow_version_id, ) return derive_initial_node(workflow_version) def _resolve_node_config( self, *, tenant_id: str, workflow_version_id: str, node_id: str, ) -> dict[str, JSONValue]: if self.workflow_client is None: return {} workflow_version = self.workflow_client.get_workflow_version( tenant_id=tenant_id, workflow_version_id=workflow_version_id, ) return derive_node_config(workflow_version, node_id) def _schedule_successor_nodes(self, node_run: NodeRun) -> None: if self.workflow_client is None: return workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is None: return workflow_version = self.workflow_client.get_workflow_version( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, ) run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = ( self._build_run_state_maps( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) ) successor_nodes = derive_successor_nodes( workflow_version, node_run.node_id, current_output_json=node_run.output_json, run_state_json=run_state_json, node_output_json_by_node_id=node_output_json_by_node_id, node_output_text_by_node_id=node_output_text_by_node_id, ) if not successor_nodes: return existing_nodes = self.node_run_repository.list_by_run_and_node_ids( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_ids=[item.node_id for item in successor_nodes], ) existing_node_counts: dict[str, int] = {} for item in existing_nodes: existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1 for successor in successor_nodes: successor_config = derive_node_config(workflow_version, successor.node_id) if not self._is_join_ready( workflow_version=workflow_version, run_node_runs=self.node_run_repository.list_by_run( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ), successor_node_id=successor.node_id, successor_node_type=successor.node_type, successor_config=successor_config, ): self._log_event( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=None, event_type="join_waiting", message=f"join node waiting for predecessors: {successor.node_id}", detail_json={ "node_id": successor.node_id, "source_node_id": node_run.node_id, }, ) continue if not self._can_schedule_repeated_node( successor_config, existing_count=existing_node_counts.get(successor.node_id, 0), ): continue scheduled_time, timeout_time = self._build_node_timing(successor_config) self.node_run_repository.create( tenant_id=node_run.tenant_id, run_id=node_run.run_id, parent_node_run_id=node_run.id, node_id=successor.node_id, node_type=successor.node_type, status=successor.status, scheduled_time=scheduled_time, timeout_time=timeout_time, ) existing_node_counts[successor.node_id] = ( existing_node_counts.get(successor.node_id, 0) + 1 ) self._log_event( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=None, event_type="node_queued", message=f"successor node queued: {successor.node_id}", detail_json={ "node_id": successor.node_id, "node_type": successor.node_type, "status": successor.status, "source_node_id": node_run.node_id, }, ) def _build_execution_context( self, *, workflow_run: WorkflowRun, node_run: NodeRun, worker_key: str | None, node_config_json: dict[str, JSONValue] | None = None, ) -> NodeExecutionContextContract: run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = ( self._build_run_state_maps( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) ) return NodeExecutionContextContract( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=node_run.id, node_id=node_run.node_id, node_type=node_run.node_type, node_config_json=node_config_json if node_config_json is not None else self._resolve_node_config( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, node_id=node_run.node_id, ), run_state_json=run_state_json, node_output_json_by_node_id=node_output_json_by_node_id, node_output_text_by_node_id=node_output_text_by_node_id, worker_key=worker_key, ) def _build_run_state_maps( self, *, tenant_id: str, run_id: str, ) -> tuple[ dict[str, JSONValue], dict[str, dict[str, JSONValue]], dict[str, str], ]: node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) run_state_json: dict[str, JSONValue] = {} node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {} node_output_text_by_node_id: dict[str, str] = {} for item in node_runs: if item.output_json is not None: node_output_json_by_node_id[item.node_id] = dict(item.output_json) state_updates = item.output_json.get("state_updates") if isinstance(state_updates, dict): for state_key, state_value in state_updates.items(): run_state_json[str(state_key)] = state_value if item.output_text is not None: node_output_text_by_node_id[item.node_id] = item.output_text return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id def _build_node_timing( self, node_config_json: dict[str, JSONValue], ) -> tuple[datetime, datetime | None]: now = datetime.utcnow() delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0) timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0) scheduled_time = now + timedelta(seconds=max(delay_seconds, 0)) timeout_time = ( scheduled_time + timedelta(seconds=timeout_seconds) if timeout_seconds > 0 else None ) return scheduled_time, timeout_time def _node_has_timed_out(self, node_run: NodeRun) -> bool: return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow() def _should_retry_node( self, *, node_run: NodeRun, node_config_json: dict[str, JSONValue], ) -> bool: retry_policy = self._read_dict_value(node_config_json, "retry_policy") max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1) return max_attempts > node_run.attempt_no def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int: retry_policy = self._read_dict_value(node_config_json, "retry_policy") return self._read_int_value(retry_policy, "retry_delay_seconds", default=0) def _build_retry_timing( self, node_config_json: dict[str, JSONValue], ) -> tuple[datetime, datetime | None]: retry_time = datetime.utcnow() + timedelta( seconds=self._read_retry_delay_seconds(node_config_json) ) timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0) timeout_time = ( retry_time + timedelta(seconds=timeout_seconds) if timeout_seconds > 0 else None ) return retry_time, timeout_time def _is_join_ready( self, *, workflow_version: WorkflowVersionContract, run_node_runs: list[NodeRun], successor_node_id: str, successor_node_type: str, successor_config: dict[str, JSONValue], ) -> bool: join_policy = self._read_string_value(successor_config, "join_policy") if join_policy is None and successor_node_type != "join": return True workflow = self._parse_workflow(workflow_version) if workflow is None: return True predecessor_ids = [ edge.source for edge in workflow.edges if edge.target == successor_node_id ] if not predecessor_ids: return True completed_node_ids = { item.node_id for item in run_node_runs if item.status in {"completed", "skipped"} } if join_policy in {None, "all_completed"}: return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids) if join_policy == "any_completed": return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids) return True def _can_schedule_repeated_node( self, node_config_json: dict[str, JSONValue], *, existing_count: int, ) -> bool: if existing_count == 0: return True allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False) max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1) return allow_loop and existing_count < max_iterations def _schedule_compensation_node( self, *, node_run: NodeRun, node_config: dict[str, JSONValue], ) -> None: compensation_node_id = self._read_string_value(node_config, "compensation_node_id") if compensation_node_id is None: compensation_config = self._read_dict_value(node_config, "compensation") compensation_node_id = self._read_string_value(compensation_config, "node_id") if compensation_node_id is None: return workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is None: return compensation_config = self._resolve_node_config( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, node_id=compensation_node_id, ) existing_nodes = self.node_run_repository.list_by_run_and_node_ids( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_ids=[compensation_node_id], ) if existing_nodes and not self._can_schedule_repeated_node( compensation_config, existing_count=len(existing_nodes), ): return compensation_node_type = self._resolve_workflow_node_type( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, node_id=compensation_node_id, ) or "compensation" scheduled_time, timeout_time = self._build_node_timing(compensation_config) created = self.node_run_repository.create( tenant_id=node_run.tenant_id, run_id=node_run.run_id, parent_node_run_id=node_run.id, node_id=compensation_node_id, node_type=compensation_node_type, status="queued", scheduled_time=scheduled_time, timeout_time=timeout_time, ) self._log_event( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=created.id, event_type="compensation_queued", message=f"compensation node queued: {compensation_node_id}", detail_json={ "failed_node_id": node_run.node_id, "compensation_node_id": compensation_node_id, }, ) def _parse_workflow(self, workflow_version: WorkflowVersionContract): return parse_workflow_definition(workflow_version.dsl_json) def _resolve_workflow_node_type( self, *, tenant_id: str, workflow_version_id: str, node_id: str, ) -> str | None: if self.workflow_client is None: return None workflow_version = self.workflow_client.get_workflow_version( tenant_id=tenant_id, workflow_version_id=workflow_version_id, ) workflow = self._parse_workflow(workflow_version) if workflow is None: return None for node in workflow.nodes: if node.id == node_id: return node.type return None def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) if isinstance(value, str) and value: return value return None def _read_bool_value( self, payload: dict[str, JSONValue], key: str, *, default: bool, ) -> bool: value = payload.get(key) if isinstance(value, bool): return value return default def _read_int_value( self, payload: dict[str, JSONValue], key: str, *, default: int, ) -> int: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return default def _read_dict_value( self, payload: dict[str, JSONValue], key: str, ) -> dict[str, JSONValue]: value = payload.get(key) if isinstance(value, dict): return {str(item_key): item_value for item_key, item_value in value.items()} return {} def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None: node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) if not node_runs: return self.workflow_run_repository.update_node_count( run_id=run_id, current_node_count=len(node_runs), ) next_status, error_code, error_message = self._derive_run_status(node_runs) self.workflow_run_repository.update_status( run_id=run_id, status=next_status, error_code=error_code, error_message=error_message, ) self._log_event( tenant_id=tenant_id, run_id=run_id, node_run_id=None, event_type="run_status_synced", message=f"workflow run status synced to {next_status}", detail_json={ "status": next_status, "error_code": error_code, }, ) def _derive_run_status( self, node_runs: list[NodeRun], ) -> tuple[WorkflowRunStatus, str | None, str | None]: statuses = {node_run.status for node_run in node_runs} active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"} if statuses.intersection(active_statuses): return "running", None, None if "failed" in statuses: failed_node = next((item for item in node_runs if item.status == "failed"), None) error_code = failed_node.error_code if failed_node is not None else None error_message = failed_node.error_message if failed_node is not None else None return "failed", error_code, error_message terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"} if statuses and statuses.issubset(terminal_statuses): return "completed", None, None return "running", None, None def _log_event( self, *, tenant_id: str, run_id: str, node_run_id: str | None, event_type: str, message: str, detail_json: dict[str, JSONValue] | None, level: str = "info", ) -> None: self.execution_log_repository.create( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, event_type=event_type, level=level, message=message, detail_json=detail_json, ) def build_runtime_application_service( *, db: Session, settings: RuntimeServiceSettings, ) -> RuntimeApplicationService: return RuntimeApplicationService( workflow_run_repository=WorkflowRunRepository(db), node_run_repository=NodeRunRepository(db), execution_log_repository=ExecutionLogRepository(db), node_artifact_repository=NodeArtifactRepository(db), trace_span_repository=TraceSpanRepository(db), execution_dispatcher=build_node_execution_dispatcher_with_clients( code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url), model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url), tool_client=ToolServiceClient(base_url=settings.tool_service_url), ), workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url), )