from app.db.models import NodeRun, WorkflowRun from app.domain.repositories import NodeRunRepository, WorkflowRunRepository from app.schemas.run import RunCreateRequest class RuntimeApplicationService: def __init__( self, workflow_run_repository: WorkflowRunRepository, node_run_repository: NodeRunRepository, ) -> None: self.workflow_run_repository = workflow_run_repository self.node_run_repository = node_run_repository def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]: workflow_run = self.workflow_run_repository.create( tenant_id=payload.tenant_id, app_id=payload.app_id, app_version_id=payload.app_version_id, workflow_id=payload.workflow_id, workflow_version_id=payload.workflow_version_id, session_id=payload.session_id, parent_run_id=payload.parent_run_id, root_run_id=payload.root_run_id, run_type=payload.run_type, trigger_type=payload.trigger_type, priority=payload.priority, ) initial_node = None if payload.initial_node is not None: self.workflow_run_repository.update_node_count( run_id=workflow_run.id, current_node_count=1, ) initial_node = self.node_run_repository.create( tenant_id=payload.tenant_id, run_id=workflow_run.id, node_id=payload.initial_node.node_id, node_type=payload.initial_node.node_type, status=payload.initial_node.status, ) return workflow_run, initial_node def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]: return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id) def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]: return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)