| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- from core_domain import InitialNodeContract, NodeRunStatus, WorkflowRunStatus
- from app.db.models import NodeRun, WorkflowRun
- from app.domain.repositories import NodeRunRepository, WorkflowRunRepository
- from app.infrastructure.planner import derive_initial_node, derive_successor_nodes
- from app.infrastructure.workflow_client import WorkflowServiceClient
- from app.schemas.run import NodeRunStatusUpdateRequest, RunCreateRequest, WorkflowRunStatusUpdateRequest
- class RuntimeApplicationService:
- def __init__(
- self,
- workflow_run_repository: WorkflowRunRepository,
- node_run_repository: NodeRunRepository,
- workflow_client: WorkflowServiceClient | None = None,
- ) -> None:
- self.workflow_run_repository = workflow_run_repository
- self.node_run_repository = node_run_repository
- self.workflow_client = workflow_client
- def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
- initial_node = payload.initial_node or self._plan_initial_node(payload)
- workflow_run = self.workflow_run_repository.create(
- tenant_id=payload.tenant_id,
- app_id=payload.app_id,
- app_version_id=payload.app_version_id,
- workflow_id=payload.workflow_id,
- workflow_version_id=payload.workflow_version_id,
- session_id=payload.session_id,
- parent_run_id=payload.parent_run_id,
- root_run_id=payload.root_run_id,
- run_type=payload.run_type,
- trigger_type=payload.trigger_type,
- priority=payload.priority,
- )
- node_run = None
- if initial_node is not None:
- self.workflow_run_repository.update_node_count(
- run_id=workflow_run.id,
- current_node_count=1,
- )
- node_run = self.node_run_repository.create(
- tenant_id=payload.tenant_id,
- run_id=workflow_run.id,
- node_id=initial_node.node_id,
- node_type=initial_node.node_type,
- status=initial_node.status,
- )
- return workflow_run, node_run
- def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
- return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
- def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
- return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
- def update_run_status(
- self,
- run_id: str,
- payload: WorkflowRunStatusUpdateRequest,
- ) -> WorkflowRun | None:
- return self.workflow_run_repository.update_status(
- run_id=run_id,
- status=payload.status,
- error_code=payload.error_code,
- error_message=payload.error_message,
- )
- def update_node_run_status(
- self,
- node_run_id: str,
- payload: NodeRunStatusUpdateRequest,
- ) -> NodeRun | None:
- node_run = self.node_run_repository.update_status(
- node_run_id=node_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- error_code=payload.error_code,
- error_message=payload.error_message,
- )
- if node_run is None:
- return None
- if payload.status == "completed":
- self._schedule_successor_nodes(node_run)
- self._sync_workflow_run_status_from_nodes(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- )
- return node_run
- def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
- if self.workflow_client is None:
- return None
- workflow_version = self.workflow_client.get_workflow_version(
- tenant_id=payload.tenant_id,
- workflow_version_id=payload.workflow_version_id,
- )
- return derive_initial_node(workflow_version)
- def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
- if self.workflow_client is None:
- return
- workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
- if workflow_run is None:
- return
- workflow_version = self.workflow_client.get_workflow_version(
- tenant_id=node_run.tenant_id,
- workflow_version_id=workflow_run.workflow_version_id,
- )
- successor_nodes = derive_successor_nodes(workflow_version, node_run.node_id)
- if not successor_nodes:
- return
- existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_ids=[item.node_id for item in successor_nodes],
- )
- existing_node_ids = {item.node_id for item in existing_nodes}
- for successor in successor_nodes:
- if successor.node_id in existing_node_ids:
- continue
- self.node_run_repository.create(
- tenant_id=node_run.tenant_id,
- run_id=node_run.run_id,
- node_id=successor.node_id,
- node_type=successor.node_type,
- status=successor.status,
- )
- def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
- node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
- if not node_runs:
- return
- self.workflow_run_repository.update_node_count(
- run_id=run_id,
- current_node_count=len(node_runs),
- )
- next_status, error_code, error_message = self._derive_run_status(node_runs)
- self.workflow_run_repository.update_status(
- run_id=run_id,
- status=next_status,
- error_code=error_code,
- error_message=error_message,
- )
- def _derive_run_status(
- self,
- node_runs: list[NodeRun],
- ) -> tuple[WorkflowRunStatus, str | None, str | None]:
- statuses = {node_run.status for node_run in node_runs}
- if "failed" in statuses:
- failed_node = next((item for item in node_runs if item.status == "failed"), None)
- error_code = failed_node.error_code if failed_node is not None else None
- error_message = failed_node.error_message if failed_node is not None else None
- return "failed", error_code, error_message
- if "running" in statuses:
- return "running", None, None
- terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
- if statuses and statuses.issubset(terminal_statuses):
- return "completed", None, None
- return "running", None, None
|