from datetime import datetime, timedelta from sqlalchemy.orm import Session from core_domain import ( InitialNodeContract, NodeExecutionContextContract, NodeExecutionResultContract, NodeRunStatus, WorkflowRunStatus, ) from app.db.models import NodeRun, WorkflowRun from app.domain.repositories import ( ExecutionLogRepository, NodeArtifactRepository, NodeRunRepository, TraceSpanRepository, WorkflowRunRepository, ) from app.infrastructure.executors import NodeExecutionDispatcher, build_node_execution_dispatcher_with_clients from app.infrastructure.code_runner_client import CodeRunnerClient from app.infrastructure.model_gateway_client import ModelGatewayClient from app.infrastructure.planner import derive_initial_node, derive_node_config, derive_successor_nodes from app.infrastructure.tool_client import ToolServiceClient from app.infrastructure.workflow_client import WorkflowServiceClient from app.bootstrap.settings import RuntimeServiceSettings from app.schemas.run import ( NodeRunExecuteRequest, NodeRunStatusUpdateRequest, RunCreateRequest, RunExecuteRequest, WorkflowRunStatusUpdateRequest, ) from core_shared import JSONValue class RuntimeApplicationService: def __init__( self, workflow_run_repository: WorkflowRunRepository, node_run_repository: NodeRunRepository, execution_log_repository: ExecutionLogRepository, node_artifact_repository: NodeArtifactRepository, trace_span_repository: TraceSpanRepository, execution_dispatcher: NodeExecutionDispatcher, workflow_client: WorkflowServiceClient | None = None, ) -> None: self.workflow_run_repository = workflow_run_repository self.node_run_repository = node_run_repository self.execution_log_repository = execution_log_repository self.node_artifact_repository = node_artifact_repository self.trace_span_repository = trace_span_repository self.execution_dispatcher = execution_dispatcher self.workflow_client = workflow_client def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]: initial_node = payload.initial_node or self._plan_initial_node(payload) workflow_run = self.workflow_run_repository.create( tenant_id=payload.tenant_id, app_id=payload.app_id, app_version_id=payload.app_version_id, workflow_id=payload.workflow_id, workflow_version_id=payload.workflow_version_id, session_id=payload.session_id, parent_run_id=payload.parent_run_id, root_run_id=payload.root_run_id, run_type=payload.run_type, trigger_type=payload.trigger_type, priority=payload.priority, ) node_run = None if initial_node is not None: self.workflow_run_repository.update_node_count( run_id=workflow_run.id, current_node_count=1, ) node_run = self.node_run_repository.create( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_id=initial_node.node_id, node_type=initial_node.node_type, status=initial_node.status, ) self._log_event( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_run_id=node_run.id, event_type="node_queued", message=f"initial node queued: {initial_node.node_id}", detail_json={ "node_id": initial_node.node_id, "node_type": initial_node.node_type, "status": initial_node.status, }, ) self._log_event( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_run_id=node_run.id if node_run is not None else None, event_type="run_created", message="workflow run created", detail_json={ "workflow_id": payload.workflow_id, "workflow_version_id": payload.workflow_version_id, "session_id": payload.session_id, }, ) return workflow_run, node_run def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]: return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id) def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]: return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) def list_execution_logs( self, tenant_id: str, run_id: str | None = None, node_run_id: str | None = None, ): return self.execution_log_repository.list_by_scope( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, ) def list_node_artifacts( self, tenant_id: str, run_id: str | None = None, node_run_id: str | None = None, artifact_type: str | None = None, ): return self.node_artifact_repository.list_by_scope( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, artifact_type=artifact_type, ) def list_trace_spans( self, tenant_id: str, run_id: str | None = None, node_run_id: str | None = None, span_type: str | None = None, ): return self.trace_span_repository.list_by_scope( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, span_type=span_type, ) def update_run_status( self, run_id: str, payload: WorkflowRunStatusUpdateRequest, ) -> WorkflowRun | None: return self.workflow_run_repository.update_status( run_id=run_id, status=payload.status, error_code=payload.error_code, error_message=payload.error_message, ) def update_node_run_status( self, node_run_id: str, payload: NodeRunStatusUpdateRequest, ) -> NodeRun | None: node_run = self.node_run_repository.update_status( node_run_id=node_run_id, status=payload.status, worker_key=payload.worker_key, error_code=payload.error_code, error_message=payload.error_message, output_text=payload.output_text, output_json=payload.output_json, ) if node_run is None: return None self._log_event( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=node_run.id, event_type="node_status_updated", message=f"node status updated to {payload.status}", detail_json={ "node_id": node_run.node_id, "node_type": node_run.node_type, "status": payload.status, "error_code": payload.error_code, }, ) if payload.status == "completed": self._schedule_successor_nodes(node_run) self._sync_workflow_run_status_from_nodes( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) return node_run def execute_node_run( self, node_run_id: str, payload: NodeRunExecuteRequest, ) -> tuple[WorkflowRun, NodeRun, str] | None: node_run = self.node_run_repository.get_by_id(node_run_id) if node_run is None: return None workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is None: return None if node_run.status in {"completed", "failed", "skipped"}: executor_name = self.execution_dispatcher.resolve_executor(node_run.node_type).executor_name return workflow_run, node_run, executor_name running_node_run = self.node_run_repository.update_status( node_run_id=node_run_id, status="running", worker_key=payload.worker_key, ) if running_node_run is None: return None self._log_event( tenant_id=running_node_run.tenant_id, run_id=running_node_run.run_id, node_run_id=running_node_run.id, event_type="node_execution_started", message=f"executing node {running_node_run.node_id}", detail_json={ "node_id": running_node_run.node_id, "node_type": running_node_run.node_type, "worker_key": payload.worker_key, }, ) context = self._build_execution_context( workflow_run=workflow_run, node_run=running_node_run, worker_key=payload.worker_key, ) executor_name = self.execution_dispatcher.resolve_executor( running_node_run.node_type ).executor_name trace_span = self.trace_span_repository.start( tenant_id=running_node_run.tenant_id, run_id=running_node_run.run_id, node_run_id=running_node_run.id, parent_span_id=None, span_type="node_execution", name=f"{running_node_run.node_type}:{running_node_run.node_id}", attributes_json={ "node_id": running_node_run.node_id, "node_type": running_node_run.node_type, "executor_name": executor_name, "worker_key": payload.worker_key, }, ) try: result, executor_name = self.execution_dispatcher.execute(context=context, request=payload) except Exception as exc: result = NodeExecutionResultContract( status="failed", worker_key=payload.worker_key, error_code="executor_error", error_message=str(exc), ) final_node_run = self.update_node_run_status( node_run_id=running_node_run.id, payload=NodeRunStatusUpdateRequest( status=result.status, worker_key=result.worker_key, error_code=result.error_code, error_message=result.error_message, output_text=result.output_text, output_json=result.output_json, ), ) if final_node_run is None: return None self.trace_span_repository.finish( span_id=trace_span.id, status="ok" if final_node_run.status == "completed" else "error", error_code=final_node_run.error_code, error_message=final_node_run.error_message, attributes_json={ "node_status": final_node_run.status, "executor_name": executor_name, "has_output_text": final_node_run.output_text is not None, "has_output_json": final_node_run.output_json is not None, }, ) self._persist_node_execution_artifact(final_node_run) self._log_event( tenant_id=final_node_run.tenant_id, run_id=final_node_run.run_id, node_run_id=final_node_run.id, event_type="node_execution_finished", message=f"node execution finished with status {final_node_run.status}", detail_json={ "node_id": final_node_run.node_id, "node_type": final_node_run.node_type, "executor_name": executor_name, "status": final_node_run.status, }, ) workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id) if workflow_run is None: return None return workflow_run, final_node_run, executor_name def execute_next_node_run( self, tenant_id: str, run_id: str, payload: NodeRunExecuteRequest, ) -> tuple[WorkflowRun, NodeRun, str] | None: next_node_run = self.node_run_repository.get_next_queued_by_run( tenant_id=tenant_id, run_id=run_id, ) if next_node_run is None: return None return self.execute_node_run(node_run_id=next_node_run.id, payload=payload) def execute_run( self, tenant_id: str, run_id: str, payload: RunExecuteRequest, ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None: workflow_run = self.workflow_run_repository.get_by_id(run_id) if workflow_run is None or workflow_run.tenant_id != tenant_id: return None executed_node_runs: list[NodeRun] = [] executor_names: list[str] = [] for _ in range(payload.max_steps): step_result = self.execute_next_node_run( tenant_id=tenant_id, run_id=run_id, payload=NodeRunExecuteRequest(worker_key=payload.worker_key), ) if step_result is None: break workflow_run, node_run, executor_name = step_result executed_node_runs.append(node_run) executor_names.append(executor_name) if node_run.status != "completed": break final_run = self.workflow_run_repository.get_by_id(run_id) if final_run is None: return None return final_run, executed_node_runs, executor_names def execute_next_claimed_node_run( self, *, worker_key: str, lease_seconds: int, ) -> tuple[WorkflowRun, NodeRun, str, int] | None: released_lease_count = self.node_run_repository.release_expired_leases( now_time=datetime.utcnow(), ) claimed_node_run = self.node_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds), ) if claimed_node_run is None: return None result = self.execute_node_run( node_run_id=claimed_node_run.id, payload=NodeRunExecuteRequest(worker_key=worker_key), ) if result is None: return None workflow_run, node_run, executor_name = result return workflow_run, node_run, executor_name, released_lease_count def _persist_node_execution_artifact(self, node_run: NodeRun) -> None: if node_run.output_text is None and node_run.output_json is None: return size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None self.node_artifact_repository.create( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=node_run.id, node_id=node_run.node_id, artifact_type="execution_result", name=f"{node_run.node_id}-execution-result", mime_type="application/json" if node_run.output_json is not None else "text/plain", content_text=node_run.output_text, content_json=node_run.output_json, size_bytes=size_bytes, ) def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None: if self.workflow_client is None: return None workflow_version = self.workflow_client.get_workflow_version( tenant_id=payload.tenant_id, workflow_version_id=payload.workflow_version_id, ) return derive_initial_node(workflow_version) def _resolve_node_config( self, *, tenant_id: str, workflow_version_id: str, node_id: str, ) -> dict[str, JSONValue]: if self.workflow_client is None: return {} workflow_version = self.workflow_client.get_workflow_version( tenant_id=tenant_id, workflow_version_id=workflow_version_id, ) return derive_node_config(workflow_version, node_id) def _schedule_successor_nodes(self, node_run: NodeRun) -> None: if self.workflow_client is None: return workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is None: return workflow_version = self.workflow_client.get_workflow_version( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, ) run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = ( self._build_run_state_maps( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) ) successor_nodes = derive_successor_nodes( workflow_version, node_run.node_id, current_output_json=node_run.output_json, run_state_json=run_state_json, node_output_json_by_node_id=node_output_json_by_node_id, node_output_text_by_node_id=node_output_text_by_node_id, ) if not successor_nodes: return existing_nodes = self.node_run_repository.list_by_run_and_node_ids( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_ids=[item.node_id for item in successor_nodes], ) existing_node_ids = {item.node_id for item in existing_nodes} for successor in successor_nodes: if successor.node_id in existing_node_ids: continue self.node_run_repository.create( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_id=successor.node_id, node_type=successor.node_type, status=successor.status, ) self._log_event( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=None, event_type="node_queued", message=f"successor node queued: {successor.node_id}", detail_json={ "node_id": successor.node_id, "node_type": successor.node_type, "status": successor.status, "source_node_id": node_run.node_id, }, ) def _build_execution_context( self, *, workflow_run: WorkflowRun, node_run: NodeRun, worker_key: str | None, ) -> NodeExecutionContextContract: run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = ( self._build_run_state_maps( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) ) return NodeExecutionContextContract( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_run_id=node_run.id, node_id=node_run.node_id, node_type=node_run.node_type, node_config_json=self._resolve_node_config( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, node_id=node_run.node_id, ), run_state_json=run_state_json, node_output_json_by_node_id=node_output_json_by_node_id, node_output_text_by_node_id=node_output_text_by_node_id, worker_key=worker_key, ) def _build_run_state_maps( self, *, tenant_id: str, run_id: str, ) -> tuple[ dict[str, JSONValue], dict[str, dict[str, JSONValue]], dict[str, str], ]: node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) run_state_json: dict[str, JSONValue] = {} node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {} node_output_text_by_node_id: dict[str, str] = {} for item in node_runs: if item.output_json is not None: node_output_json_by_node_id[item.node_id] = dict(item.output_json) state_updates = item.output_json.get("state_updates") if isinstance(state_updates, dict): for state_key, state_value in state_updates.items(): run_state_json[str(state_key)] = state_value if item.output_text is not None: node_output_text_by_node_id[item.node_id] = item.output_text return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None: node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) if not node_runs: return self.workflow_run_repository.update_node_count( run_id=run_id, current_node_count=len(node_runs), ) next_status, error_code, error_message = self._derive_run_status(node_runs) self.workflow_run_repository.update_status( run_id=run_id, status=next_status, error_code=error_code, error_message=error_message, ) self._log_event( tenant_id=tenant_id, run_id=run_id, node_run_id=None, event_type="run_status_synced", message=f"workflow run status synced to {next_status}", detail_json={ "status": next_status, "error_code": error_code, }, ) def _derive_run_status( self, node_runs: list[NodeRun], ) -> tuple[WorkflowRunStatus, str | None, str | None]: statuses = {node_run.status for node_run in node_runs} if "failed" in statuses: failed_node = next((item for item in node_runs if item.status == "failed"), None) error_code = failed_node.error_code if failed_node is not None else None error_message = failed_node.error_message if failed_node is not None else None return "failed", error_code, error_message if "running" in statuses: return "running", None, None terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"} if statuses and statuses.issubset(terminal_statuses): return "completed", None, None return "running", None, None def _log_event( self, *, tenant_id: str, run_id: str, node_run_id: str | None, event_type: str, message: str, detail_json: dict[str, JSONValue] | None, level: str = "info", ) -> None: self.execution_log_repository.create( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, event_type=event_type, level=level, message=message, detail_json=detail_json, ) def build_runtime_application_service( *, db: Session, settings: RuntimeServiceSettings, ) -> RuntimeApplicationService: return RuntimeApplicationService( workflow_run_repository=WorkflowRunRepository(db), node_run_repository=NodeRunRepository(db), execution_log_repository=ExecutionLogRepository(db), node_artifact_repository=NodeArtifactRepository(db), trace_span_repository=TraceSpanRepository(db), execution_dispatcher=build_node_execution_dispatcher_with_clients( code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url), model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url), tool_client=ToolServiceClient(base_url=settings.tool_service_url), ), workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url), )