|
@@ -1,6 +1,10 @@
|
|
|
|
|
+from core_domain import InitialNodeContract, NodeRunStatus, WorkflowRunStatus
|
|
|
|
|
+
|
|
|
from app.db.models import NodeRun, WorkflowRun
|
|
from app.db.models import NodeRun, WorkflowRun
|
|
|
from app.domain.repositories import NodeRunRepository, WorkflowRunRepository
|
|
from app.domain.repositories import NodeRunRepository, WorkflowRunRepository
|
|
|
-from app.schemas.run import RunCreateRequest
|
|
|
|
|
|
|
+from app.infrastructure.planner import derive_initial_node, derive_successor_nodes
|
|
|
|
|
+from app.infrastructure.workflow_client import WorkflowServiceClient
|
|
|
|
|
+from app.schemas.run import NodeRunStatusUpdateRequest, RunCreateRequest, WorkflowRunStatusUpdateRequest
|
|
|
|
|
|
|
|
|
|
|
|
|
class RuntimeApplicationService:
|
|
class RuntimeApplicationService:
|
|
@@ -8,11 +12,14 @@ class RuntimeApplicationService:
|
|
|
self,
|
|
self,
|
|
|
workflow_run_repository: WorkflowRunRepository,
|
|
workflow_run_repository: WorkflowRunRepository,
|
|
|
node_run_repository: NodeRunRepository,
|
|
node_run_repository: NodeRunRepository,
|
|
|
|
|
+ workflow_client: WorkflowServiceClient | None = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
self.workflow_run_repository = workflow_run_repository
|
|
self.workflow_run_repository = workflow_run_repository
|
|
|
self.node_run_repository = node_run_repository
|
|
self.node_run_repository = node_run_repository
|
|
|
|
|
+ self.workflow_client = workflow_client
|
|
|
|
|
|
|
|
def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
|
|
def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
|
|
|
|
|
+ initial_node = payload.initial_node or self._plan_initial_node(payload)
|
|
|
workflow_run = self.workflow_run_repository.create(
|
|
workflow_run = self.workflow_run_repository.create(
|
|
|
tenant_id=payload.tenant_id,
|
|
tenant_id=payload.tenant_id,
|
|
|
app_id=payload.app_id,
|
|
app_id=payload.app_id,
|
|
@@ -27,21 +34,21 @@ class RuntimeApplicationService:
|
|
|
priority=payload.priority,
|
|
priority=payload.priority,
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- initial_node = None
|
|
|
|
|
- if payload.initial_node is not None:
|
|
|
|
|
|
|
+ node_run = None
|
|
|
|
|
+ if initial_node is not None:
|
|
|
self.workflow_run_repository.update_node_count(
|
|
self.workflow_run_repository.update_node_count(
|
|
|
run_id=workflow_run.id,
|
|
run_id=workflow_run.id,
|
|
|
current_node_count=1,
|
|
current_node_count=1,
|
|
|
)
|
|
)
|
|
|
- initial_node = self.node_run_repository.create(
|
|
|
|
|
|
|
+ node_run = self.node_run_repository.create(
|
|
|
tenant_id=payload.tenant_id,
|
|
tenant_id=payload.tenant_id,
|
|
|
run_id=workflow_run.id,
|
|
run_id=workflow_run.id,
|
|
|
- node_id=payload.initial_node.node_id,
|
|
|
|
|
- node_type=payload.initial_node.node_type,
|
|
|
|
|
- status=payload.initial_node.status,
|
|
|
|
|
|
|
+ node_id=initial_node.node_id,
|
|
|
|
|
+ node_type=initial_node.node_type,
|
|
|
|
|
+ status=initial_node.status,
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- return workflow_run, initial_node
|
|
|
|
|
|
|
+ return workflow_run, node_run
|
|
|
|
|
|
|
|
def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
|
|
def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
|
|
|
return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
|
|
return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
|
|
@@ -49,3 +56,120 @@ class RuntimeApplicationService:
|
|
|
def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
|
|
def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
|
|
|
return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
|
|
return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
|
|
|
|
|
|
|
|
|
|
+ def update_run_status(
|
|
|
|
|
+ self,
|
|
|
|
|
+ run_id: str,
|
|
|
|
|
+ payload: WorkflowRunStatusUpdateRequest,
|
|
|
|
|
+ ) -> WorkflowRun | None:
|
|
|
|
|
+ return self.workflow_run_repository.update_status(
|
|
|
|
|
+ run_id=run_id,
|
|
|
|
|
+ status=payload.status,
|
|
|
|
|
+ error_code=payload.error_code,
|
|
|
|
|
+ error_message=payload.error_message,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def update_node_run_status(
|
|
|
|
|
+ self,
|
|
|
|
|
+ node_run_id: str,
|
|
|
|
|
+ payload: NodeRunStatusUpdateRequest,
|
|
|
|
|
+ ) -> NodeRun | None:
|
|
|
|
|
+ node_run = self.node_run_repository.update_status(
|
|
|
|
|
+ node_run_id=node_run_id,
|
|
|
|
|
+ status=payload.status,
|
|
|
|
|
+ worker_key=payload.worker_key,
|
|
|
|
|
+ error_code=payload.error_code,
|
|
|
|
|
+ error_message=payload.error_message,
|
|
|
|
|
+ )
|
|
|
|
|
+ if node_run is None:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ if payload.status == "completed":
|
|
|
|
|
+ self._schedule_successor_nodes(node_run)
|
|
|
|
|
+
|
|
|
|
|
+ self._sync_workflow_run_status_from_nodes(
|
|
|
|
|
+ tenant_id=node_run.tenant_id,
|
|
|
|
|
+ run_id=node_run.run_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ return node_run
|
|
|
|
|
+
|
|
|
|
|
+ def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
|
|
|
|
|
+ if self.workflow_client is None:
|
|
|
|
|
+ return None
|
|
|
|
|
+ workflow_version = self.workflow_client.get_workflow_version(
|
|
|
|
|
+ tenant_id=payload.tenant_id,
|
|
|
|
|
+ workflow_version_id=payload.workflow_version_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ return derive_initial_node(workflow_version)
|
|
|
|
|
+
|
|
|
|
|
+ def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
|
|
|
|
|
+ if self.workflow_client is None:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
|
|
|
|
|
+ if workflow_run is None:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ workflow_version = self.workflow_client.get_workflow_version(
|
|
|
|
|
+ tenant_id=node_run.tenant_id,
|
|
|
|
|
+ workflow_version_id=workflow_run.workflow_version_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ successor_nodes = derive_successor_nodes(workflow_version, node_run.node_id)
|
|
|
|
|
+ if not successor_nodes:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
|
|
|
|
|
+ tenant_id=node_run.tenant_id,
|
|
|
|
|
+ run_id=node_run.run_id,
|
|
|
|
|
+ node_ids=[item.node_id for item in successor_nodes],
|
|
|
|
|
+ )
|
|
|
|
|
+ existing_node_ids = {item.node_id for item in existing_nodes}
|
|
|
|
|
+
|
|
|
|
|
+ for successor in successor_nodes:
|
|
|
|
|
+ if successor.node_id in existing_node_ids:
|
|
|
|
|
+ continue
|
|
|
|
|
+ self.node_run_repository.create(
|
|
|
|
|
+ tenant_id=node_run.tenant_id,
|
|
|
|
|
+ run_id=node_run.run_id,
|
|
|
|
|
+ node_id=successor.node_id,
|
|
|
|
|
+ node_type=successor.node_type,
|
|
|
|
|
+ status=successor.status,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
|
|
|
|
|
+ node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
|
|
|
|
|
+ if not node_runs:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ self.workflow_run_repository.update_node_count(
|
|
|
|
|
+ run_id=run_id,
|
|
|
|
|
+ current_node_count=len(node_runs),
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ next_status, error_code, error_message = self._derive_run_status(node_runs)
|
|
|
|
|
+ self.workflow_run_repository.update_status(
|
|
|
|
|
+ run_id=run_id,
|
|
|
|
|
+ status=next_status,
|
|
|
|
|
+ error_code=error_code,
|
|
|
|
|
+ error_message=error_message,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def _derive_run_status(
|
|
|
|
|
+ self,
|
|
|
|
|
+ node_runs: list[NodeRun],
|
|
|
|
|
+ ) -> tuple[WorkflowRunStatus, str | None, str | None]:
|
|
|
|
|
+ statuses = {node_run.status for node_run in node_runs}
|
|
|
|
|
+
|
|
|
|
|
+ if "failed" in statuses:
|
|
|
|
|
+ failed_node = next((item for item in node_runs if item.status == "failed"), None)
|
|
|
|
|
+ error_code = failed_node.error_code if failed_node is not None else None
|
|
|
|
|
+ error_message = failed_node.error_message if failed_node is not None else None
|
|
|
|
|
+ return "failed", error_code, error_message
|
|
|
|
|
+
|
|
|
|
|
+ if "running" in statuses:
|
|
|
|
|
+ return "running", None, None
|
|
|
|
|
+
|
|
|
|
|
+ terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
|
|
|
|
|
+ if statuses and statuses.issubset(terminal_statuses):
|
|
|
|
|
+ return "completed", None, None
|
|
|
|
|
+
|
|
|
|
|
+ return "running", None, None
|