from core_domain import InitialNodeContract, NodeRunStatus, WorkflowRunStatus from app.db.models import NodeRun, WorkflowRun from app.domain.repositories import NodeRunRepository, WorkflowRunRepository from app.infrastructure.planner import derive_initial_node, derive_successor_nodes from app.infrastructure.workflow_client import WorkflowServiceClient from app.schemas.run import NodeRunStatusUpdateRequest, RunCreateRequest, WorkflowRunStatusUpdateRequest class RuntimeApplicationService: def __init__( self, workflow_run_repository: WorkflowRunRepository, node_run_repository: NodeRunRepository, workflow_client: WorkflowServiceClient | None = None, ) -> None: self.workflow_run_repository = workflow_run_repository self.node_run_repository = node_run_repository self.workflow_client = workflow_client def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]: initial_node = payload.initial_node or self._plan_initial_node(payload) workflow_run = self.workflow_run_repository.create( tenant_id=payload.tenant_id, app_id=payload.app_id, app_version_id=payload.app_version_id, workflow_id=payload.workflow_id, workflow_version_id=payload.workflow_version_id, session_id=payload.session_id, parent_run_id=payload.parent_run_id, root_run_id=payload.root_run_id, run_type=payload.run_type, trigger_type=payload.trigger_type, priority=payload.priority, ) node_run = None if initial_node is not None: self.workflow_run_repository.update_node_count( run_id=workflow_run.id, current_node_count=1, ) node_run = self.node_run_repository.create( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_id=initial_node.node_id, node_type=initial_node.node_type, status=initial_node.status, ) return workflow_run, node_run def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]: return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id) def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]: return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) def update_run_status( self, run_id: str, payload: WorkflowRunStatusUpdateRequest, ) -> WorkflowRun | None: return self.workflow_run_repository.update_status( run_id=run_id, status=payload.status, error_code=payload.error_code, error_message=payload.error_message, ) def update_node_run_status( self, node_run_id: str, payload: NodeRunStatusUpdateRequest, ) -> NodeRun | None: node_run = self.node_run_repository.update_status( node_run_id=node_run_id, status=payload.status, worker_key=payload.worker_key, error_code=payload.error_code, error_message=payload.error_message, ) if node_run is None: return None if payload.status == "completed": self._schedule_successor_nodes(node_run) self._sync_workflow_run_status_from_nodes( tenant_id=node_run.tenant_id, run_id=node_run.run_id, ) return node_run def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None: if self.workflow_client is None: return None workflow_version = self.workflow_client.get_workflow_version( tenant_id=payload.tenant_id, workflow_version_id=payload.workflow_version_id, ) return derive_initial_node(workflow_version) def _schedule_successor_nodes(self, node_run: NodeRun) -> None: if self.workflow_client is None: return workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id) if workflow_run is None: return workflow_version = self.workflow_client.get_workflow_version( tenant_id=node_run.tenant_id, workflow_version_id=workflow_run.workflow_version_id, ) successor_nodes = derive_successor_nodes(workflow_version, node_run.node_id) if not successor_nodes: return existing_nodes = self.node_run_repository.list_by_run_and_node_ids( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_ids=[item.node_id for item in successor_nodes], ) existing_node_ids = {item.node_id for item in existing_nodes} for successor in successor_nodes: if successor.node_id in existing_node_ids: continue self.node_run_repository.create( tenant_id=node_run.tenant_id, run_id=node_run.run_id, node_id=successor.node_id, node_type=successor.node_type, status=successor.status, ) def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None: node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id) if not node_runs: return self.workflow_run_repository.update_node_count( run_id=run_id, current_node_count=len(node_runs), ) next_status, error_code, error_message = self._derive_run_status(node_runs) self.workflow_run_repository.update_status( run_id=run_id, status=next_status, error_code=error_code, error_message=error_message, ) def _derive_run_status( self, node_runs: list[NodeRun], ) -> tuple[WorkflowRunStatus, str | None, str | None]: statuses = {node_run.status for node_run in node_runs} if "failed" in statuses: failed_node = next((item for item in node_runs if item.status == "failed"), None) error_code = failed_node.error_code if failed_node is not None else None error_message = failed_node.error_message if failed_node is not None else None return "failed", error_code, error_message if "running" in statuses: return "running", None, None terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"} if statuses and statuses.issubset(terminal_statuses): return "completed", None, None return "running", None, None