| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600 |
- from core_domain import (
- InitialNodeContract,
- NodeExecutionContextContract,
- NodeExecutionResultContract,
- NodeRunStatus,
- WorkflowRunStatus,
- )
- from app.db.models import NodeRun, WorkflowRun
- from app.domain.repositories import (
- ExecutionLogRepository,
- NodeArtifactRepository,
- NodeRunRepository,
- TraceSpanRepository,
- WorkflowRunRepository,
- )
- from app.infrastructure.executors import NodeExecutionDispatcher
- from app.infrastructure.planner import derive_initial_node, derive_node_config, derive_successor_nodes
- from app.infrastructure.workflow_client import WorkflowServiceClient
- from app.schemas.run import (
- NodeRunExecuteRequest,
- NodeRunStatusUpdateRequest,
- RunCreateRequest,
- RunExecuteRequest,
- WorkflowRunStatusUpdateRequest,
- )
- from core_shared import JSONValue
- class RuntimeApplicationService:
- def __init__(
- self,
- workflow_run_repository: WorkflowRunRepository,
- node_run_repository: NodeRunRepository,
- execution_log_repository: ExecutionLogRepository,
- node_artifact_repository: NodeArtifactRepository,
- trace_span_repository: TraceSpanRepository,
- execution_dispatcher: NodeExecutionDispatcher,
- workflow_client: WorkflowServiceClient | None = None,
- ) -> None:
- self.workflow_run_repository = workflow_run_repository
- self.node_run_repository = node_run_repository
- self.execution_log_repository = execution_log_repository
- self.node_artifact_repository = node_artifact_repository
- self.trace_span_repository = trace_span_repository
- self.execution_dispatcher = execution_dispatcher
- self.workflow_client = workflow_client
- def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
- initial_node = payload.initial_node or self._plan_initial_node(payload)
- workflow_run = self.workflow_run_repository.create(
- tenant_id=payload.tenant_id,
- app_id=payload.app_id,
- app_version_id=payload.app_version_id,
- workflow_id=payload.workflow_id,
- workflow_version_id=payload.workflow_version_id,
- session_id=payload.session_id,
- parent_run_id=payload.parent_run_id,
- root_run_id=payload.root_run_id,
- run_type=payload.run_type,
- trigger_type=payload.trigger_type,
- priority=payload.priority,
- )
- node_run = None
- if initial_node is not None:
- self.workflow_run_repository.update_node_count(
- run_id=workflow_run.id,
- current_node_count=1,
- )
- node_run = self.node_run_repository.create(
- tenant_id=payload.tenant_id,
- run_id=workflow_run.id,
- node_id=initial_node.node_id,
- node_type=initial_node.node_type,
- status=initial_node.status,
- )
- self._log_event(
- tenant_id=payload.tenant_id,
- run_id=workflow_run.id,
- node_run_id=node_run.id,
- event_type="node_queued",
- message=f"initial node queued: {initial_node.node_id}",
- detail_json={
- "node_id": initial_node.node_id,
- "node_type": initial_node.node_type,
- "status": initial_node.status,
- },
- )
- self._log_event(
- tenant_id=payload.tenant_id,
- run_id=workflow_run.id,
- node_run_id=node_run.id if node_run is not None else None,
- event_type="run_created",
- message="workflow run created",
- detail_json={
- "workflow_id": payload.workflow_id,
- "workflow_version_id": payload.workflow_version_id,
- "session_id": payload.session_id,
- },
- )
- return workflow_run, node_run
- def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
- return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
- def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
- return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
- def list_execution_logs(
- self,
- tenant_id: str,
- run_id: str | None = None,
- node_run_id: str | None = None,
- ):
- return self.execution_log_repository.list_by_scope(
- tenant_id=tenant_id,
- run_id=run_id,
- node_run_id=node_run_id,
- )
- def list_node_artifacts(
- self,
- tenant_id: str,
- run_id: str | None = None,
- node_run_id: str | None = None,
- artifact_type: str | None = None,
- ):
- return self.node_artifact_repository.list_by_scope(
- tenant_id=tenant_id,
- run_id=run_id,
- node_run_id=node_run_id,
- artifact_type=artifact_type,
- )
- def list_trace_spans(
- self,
- tenant_id: str,
- run_id: str | None = None,
- node_run_id: str | None = None,
- span_type: str | None = None,
- ):
- return self.trace_span_repository.list_by_scope(
- tenant_id=tenant_id,
- run_id=run_id,
- node_run_id=node_run_id,
- span_type=span_type,
- )
- def update_run_status(
- self,
- run_id: str,
- payload: WorkflowRunStatusUpdateRequest,
- ) -> WorkflowRun | None:
- return self.workflow_run_repository.update_status(
- run_id=run_id,
- status=payload.status,
- error_code=payload.error_code,
- error_message=payload.error_message,
- )
- def update_node_run_status(
- self,
- node_run_id: str,
- payload: NodeRunStatusUpdateRequest,
- ) -> NodeRun | None:
- node_run = self.node_run_repository.update_status(
- node_run_id=node_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- error_code=payload.error_code,
- error_message=payload.error_message,
- output_text=payload.output_text,
- output_json=payload.output_json,
- )
- if node_run is None:
- return None
- self._log_event(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_run_id=node_run.id,
- event_type="node_status_updated",
- message=f"node status updated to {payload.status}",
- detail_json={
- "node_id": node_run.node_id,
- "node_type": node_run.node_type,
- "status": payload.status,
- "error_code": payload.error_code,
- },
- )
- if payload.status == "completed":
- self._schedule_successor_nodes(node_run)
- self._sync_workflow_run_status_from_nodes(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- )
- return node_run
- def execute_node_run(
- self,
- node_run_id: str,
- payload: NodeRunExecuteRequest,
- ) -> tuple[WorkflowRun, NodeRun, str] | None:
- node_run = self.node_run_repository.get_by_id(node_run_id)
- if node_run is None:
- return None
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
- if workflow_run is None:
- return None
- if node_run.status in {"completed", "failed", "skipped"}:
- executor_name = self.execution_dispatcher.resolve_executor(node_run.node_type).executor_name
- return workflow_run, node_run, executor_name
- running_node_run = self.node_run_repository.update_status(
- node_run_id=node_run_id,
- status="running",
- worker_key=payload.worker_key,
- )
- if running_node_run is None:
- return None
- self._log_event(
- tenant_id=running_node_run.tenant_id,
- run_id=running_node_run.run_id,
- node_run_id=running_node_run.id,
- event_type="node_execution_started",
- message=f"executing node {running_node_run.node_id}",
- detail_json={
- "node_id": running_node_run.node_id,
- "node_type": running_node_run.node_type,
- "worker_key": payload.worker_key,
- },
- )
- context = self._build_execution_context(
- workflow_run=workflow_run,
- node_run=running_node_run,
- worker_key=payload.worker_key,
- )
- executor_name = self.execution_dispatcher.resolve_executor(
- running_node_run.node_type
- ).executor_name
- trace_span = self.trace_span_repository.start(
- tenant_id=running_node_run.tenant_id,
- run_id=running_node_run.run_id,
- node_run_id=running_node_run.id,
- parent_span_id=None,
- span_type="node_execution",
- name=f"{running_node_run.node_type}:{running_node_run.node_id}",
- attributes_json={
- "node_id": running_node_run.node_id,
- "node_type": running_node_run.node_type,
- "executor_name": executor_name,
- "worker_key": payload.worker_key,
- },
- )
- try:
- result, executor_name = self.execution_dispatcher.execute(context=context, request=payload)
- except Exception as exc:
- result = NodeExecutionResultContract(
- status="failed",
- worker_key=payload.worker_key,
- error_code="executor_error",
- error_message=str(exc),
- )
- final_node_run = self.update_node_run_status(
- node_run_id=running_node_run.id,
- payload=NodeRunStatusUpdateRequest(
- status=result.status,
- worker_key=result.worker_key,
- error_code=result.error_code,
- error_message=result.error_message,
- output_text=result.output_text,
- output_json=result.output_json,
- ),
- )
- if final_node_run is None:
- return None
- self.trace_span_repository.finish(
- span_id=trace_span.id,
- status="ok" if final_node_run.status == "completed" else "error",
- error_code=final_node_run.error_code,
- error_message=final_node_run.error_message,
- attributes_json={
- "node_status": final_node_run.status,
- "executor_name": executor_name,
- "has_output_text": final_node_run.output_text is not None,
- "has_output_json": final_node_run.output_json is not None,
- },
- )
- self._persist_node_execution_artifact(final_node_run)
- self._log_event(
- tenant_id=final_node_run.tenant_id,
- run_id=final_node_run.run_id,
- node_run_id=final_node_run.id,
- event_type="node_execution_finished",
- message=f"node execution finished with status {final_node_run.status}",
- detail_json={
- "node_id": final_node_run.node_id,
- "node_type": final_node_run.node_type,
- "executor_name": executor_name,
- "status": final_node_run.status,
- },
- )
- workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
- if workflow_run is None:
- return None
- return workflow_run, final_node_run, executor_name
- def execute_next_node_run(
- self,
- tenant_id: str,
- run_id: str,
- payload: NodeRunExecuteRequest,
- ) -> tuple[WorkflowRun, NodeRun, str] | None:
- next_node_run = self.node_run_repository.get_next_queued_by_run(
- tenant_id=tenant_id,
- run_id=run_id,
- )
- if next_node_run is None:
- return None
- return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
- def execute_run(
- self,
- tenant_id: str,
- run_id: str,
- payload: RunExecuteRequest,
- ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
- workflow_run = self.workflow_run_repository.get_by_id(run_id)
- if workflow_run is None or workflow_run.tenant_id != tenant_id:
- return None
- executed_node_runs: list[NodeRun] = []
- executor_names: list[str] = []
- for _ in range(payload.max_steps):
- step_result = self.execute_next_node_run(
- tenant_id=tenant_id,
- run_id=run_id,
- payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
- )
- if step_result is None:
- break
- workflow_run, node_run, executor_name = step_result
- executed_node_runs.append(node_run)
- executor_names.append(executor_name)
- if node_run.status != "completed":
- break
- final_run = self.workflow_run_repository.get_by_id(run_id)
- if final_run is None:
- return None
- return final_run, executed_node_runs, executor_names
- def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
- if node_run.output_text is None and node_run.output_json is None:
- return
- size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
- self.node_artifact_repository.create(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_run_id=node_run.id,
- node_id=node_run.node_id,
- artifact_type="execution_result",
- name=f"{node_run.node_id}-execution-result",
- mime_type="application/json" if node_run.output_json is not None else "text/plain",
- content_text=node_run.output_text,
- content_json=node_run.output_json,
- size_bytes=size_bytes,
- )
- def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
- if self.workflow_client is None:
- return None
- workflow_version = self.workflow_client.get_workflow_version(
- tenant_id=payload.tenant_id,
- workflow_version_id=payload.workflow_version_id,
- )
- return derive_initial_node(workflow_version)
- def _resolve_node_config(
- self,
- *,
- tenant_id: str,
- workflow_version_id: str,
- node_id: str,
- ) -> dict[str, JSONValue]:
- if self.workflow_client is None:
- return {}
- workflow_version = self.workflow_client.get_workflow_version(
- tenant_id=tenant_id,
- workflow_version_id=workflow_version_id,
- )
- return derive_node_config(workflow_version, node_id)
- def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
- if self.workflow_client is None:
- return
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
- if workflow_run is None:
- return
- workflow_version = self.workflow_client.get_workflow_version(
- tenant_id=node_run.tenant_id,
- workflow_version_id=workflow_run.workflow_version_id,
- )
- run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
- self._build_run_state_maps(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- )
- )
- successor_nodes = derive_successor_nodes(
- workflow_version,
- node_run.node_id,
- current_output_json=node_run.output_json,
- run_state_json=run_state_json,
- node_output_json_by_node_id=node_output_json_by_node_id,
- node_output_text_by_node_id=node_output_text_by_node_id,
- )
- if not successor_nodes:
- return
- existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_ids=[item.node_id for item in successor_nodes],
- )
- existing_node_ids = {item.node_id for item in existing_nodes}
- for successor in successor_nodes:
- if successor.node_id in existing_node_ids:
- continue
- self.node_run_repository.create(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_id=successor.node_id,
- node_type=successor.node_type,
- status=successor.status,
- )
- self._log_event(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_run_id=None,
- event_type="node_queued",
- message=f"successor node queued: {successor.node_id}",
- detail_json={
- "node_id": successor.node_id,
- "node_type": successor.node_type,
- "status": successor.status,
- "source_node_id": node_run.node_id,
- },
- )
- def _build_execution_context(
- self,
- *,
- workflow_run: WorkflowRun,
- node_run: NodeRun,
- worker_key: str | None,
- ) -> NodeExecutionContextContract:
- run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
- self._build_run_state_maps(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- )
- )
- return NodeExecutionContextContract(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_run_id=node_run.id,
- node_id=node_run.node_id,
- node_type=node_run.node_type,
- node_config_json=self._resolve_node_config(
- tenant_id=node_run.tenant_id,
- workflow_version_id=workflow_run.workflow_version_id,
- node_id=node_run.node_id,
- ),
- run_state_json=run_state_json,
- node_output_json_by_node_id=node_output_json_by_node_id,
- node_output_text_by_node_id=node_output_text_by_node_id,
- worker_key=worker_key,
- )
- def _build_run_state_maps(
- self,
- *,
- tenant_id: str,
- run_id: str,
- ) -> tuple[
- dict[str, JSONValue],
- dict[str, dict[str, JSONValue]],
- dict[str, str],
- ]:
- node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
- run_state_json: dict[str, JSONValue] = {}
- node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
- node_output_text_by_node_id: dict[str, str] = {}
- for item in node_runs:
- if item.output_json is not None:
- node_output_json_by_node_id[item.node_id] = dict(item.output_json)
- state_updates = item.output_json.get("state_updates")
- if isinstance(state_updates, dict):
- for state_key, state_value in state_updates.items():
- run_state_json[str(state_key)] = state_value
- if item.output_text is not None:
- node_output_text_by_node_id[item.node_id] = item.output_text
- return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
- def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
- node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
- if not node_runs:
- return
- self.workflow_run_repository.update_node_count(
- run_id=run_id,
- current_node_count=len(node_runs),
- )
- next_status, error_code, error_message = self._derive_run_status(node_runs)
- self.workflow_run_repository.update_status(
- run_id=run_id,
- status=next_status,
- error_code=error_code,
- error_message=error_message,
- )
- self._log_event(
- tenant_id=tenant_id,
- run_id=run_id,
- node_run_id=None,
- event_type="run_status_synced",
- message=f"workflow run status synced to {next_status}",
- detail_json={
- "status": next_status,
- "error_code": error_code,
- },
- )
- def _derive_run_status(
- self,
- node_runs: list[NodeRun],
- ) -> tuple[WorkflowRunStatus, str | None, str | None]:
- statuses = {node_run.status for node_run in node_runs}
- if "failed" in statuses:
- failed_node = next((item for item in node_runs if item.status == "failed"), None)
- error_code = failed_node.error_code if failed_node is not None else None
- error_message = failed_node.error_message if failed_node is not None else None
- return "failed", error_code, error_message
- if "running" in statuses:
- return "running", None, None
- terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
- if statuses and statuses.issubset(terminal_statuses):
- return "completed", None, None
- return "running", None, None
- def _log_event(
- self,
- *,
- tenant_id: str,
- run_id: str,
- node_run_id: str | None,
- event_type: str,
- message: str,
- detail_json: dict[str, JSONValue] | None,
- level: str = "info",
- ) -> None:
- self.execution_log_repository.create(
- tenant_id=tenant_id,
- run_id=run_id,
- node_run_id=node_run_id,
- event_type=event_type,
- level=level,
- message=message,
- detail_json=detail_json,
- )
|