|
|
@@ -1,3 +1,4 @@
|
|
|
+from dataclasses import dataclass
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
@@ -13,7 +14,7 @@ from core_domain import (
|
|
|
WorkflowRunStatus,
|
|
|
)
|
|
|
|
|
|
-from app.db.models import NodeRun, WorkflowRun
|
|
|
+from app.db.models import ExecutionLog, NodeArtifact, NodeRun, TraceSpan, WorkflowRun
|
|
|
from app.domain.repositories import (
|
|
|
ExecutionLogRepository,
|
|
|
NodeArtifactRepository,
|
|
|
@@ -43,12 +44,29 @@ from app.schemas.run import (
|
|
|
NodeRunStatusUpdateRequest,
|
|
|
RunCreateRequest,
|
|
|
RunExecuteRequest,
|
|
|
+ RuntimeDebugContinueRequest,
|
|
|
WorkflowRunStatusUpdateRequest,
|
|
|
)
|
|
|
from core_shared import JSONValue, try_build_redis_client
|
|
|
from core_shared.task_queue import TaskQueuePublisher
|
|
|
|
|
|
|
|
|
+@dataclass(frozen=True)
|
|
|
+class RuntimeDebugSnapshot:
|
|
|
+ run: WorkflowRun
|
|
|
+ node_runs: list[NodeRun]
|
|
|
+ run_state_json: dict[str, JSONValue]
|
|
|
+ node_output_json_by_node_id: dict[str, dict[str, JSONValue]]
|
|
|
+ node_output_text_by_node_id: dict[str, str]
|
|
|
+ queued_node_ids: list[str]
|
|
|
+ running_node_ids: list[str]
|
|
|
+ completed_node_ids: list[str]
|
|
|
+ failed_node_ids: list[str]
|
|
|
+ execution_logs: list[ExecutionLog]
|
|
|
+ node_artifacts: list[NodeArtifact]
|
|
|
+ trace_spans: list[TraceSpan]
|
|
|
+
|
|
|
+
|
|
|
class RuntimeApplicationService:
|
|
|
def __init__(
|
|
|
self,
|
|
|
@@ -197,6 +215,17 @@ class RuntimeApplicationService:
|
|
|
span_type=span_type,
|
|
|
)
|
|
|
|
|
|
+ def get_debug_snapshot(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ tenant_id: str,
|
|
|
+ run_id: str,
|
|
|
+ ) -> RuntimeDebugSnapshot | None:
|
|
|
+ workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if workflow_run is None or workflow_run.tenant_id != tenant_id:
|
|
|
+ return None
|
|
|
+ return self._build_debug_snapshot(workflow_run)
|
|
|
+
|
|
|
def update_run_status(
|
|
|
self,
|
|
|
run_id: str,
|
|
|
@@ -295,6 +324,9 @@ class RuntimeApplicationService:
|
|
|
if workflow_run is None:
|
|
|
return None
|
|
|
|
|
|
+ if workflow_run.status == "paused":
|
|
|
+ return workflow_run, node_run, "debug_paused"
|
|
|
+
|
|
|
if node_run.status in {"completed", "failed", "skipped"}:
|
|
|
executor_name = self.execution_dispatcher.resolve_executor(
|
|
|
node_run.node_type
|
|
|
@@ -536,6 +568,200 @@ class RuntimeApplicationService:
|
|
|
return None
|
|
|
return final_run, executed_node_runs, executor_names
|
|
|
|
|
|
+ def pause_run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ tenant_id: str,
|
|
|
+ run_id: str,
|
|
|
+ reason: str = "debug_pause",
|
|
|
+ ) -> RuntimeDebugSnapshot | None:
|
|
|
+ workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if workflow_run is None or workflow_run.tenant_id != tenant_id:
|
|
|
+ return None
|
|
|
+ self._sync_workflow_run_status_from_nodes(tenant_id=tenant_id, run_id=run_id)
|
|
|
+ latest_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if latest_run is None:
|
|
|
+ return None
|
|
|
+ if latest_run.status in {"completed", "failed", "cancelled"}:
|
|
|
+ paused_run = latest_run
|
|
|
+ else:
|
|
|
+ paused_run = self.workflow_run_repository.update_status(
|
|
|
+ run_id=run_id,
|
|
|
+ status="paused",
|
|
|
+ )
|
|
|
+ if paused_run is None:
|
|
|
+ return None
|
|
|
+ self._log_event(
|
|
|
+ tenant_id=tenant_id,
|
|
|
+ run_id=run_id,
|
|
|
+ node_run_id=None,
|
|
|
+ event_type="debug_run_paused",
|
|
|
+ message=f"workflow run paused: {reason}",
|
|
|
+ detail_json={"reason": reason},
|
|
|
+ )
|
|
|
+ return self._build_debug_snapshot(paused_run)
|
|
|
+
|
|
|
+ def resume_run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ tenant_id: str,
|
|
|
+ run_id: str,
|
|
|
+ reason: str = "debug_resume",
|
|
|
+ ) -> RuntimeDebugSnapshot | None:
|
|
|
+ workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if workflow_run is None or workflow_run.tenant_id != tenant_id:
|
|
|
+ return None
|
|
|
+ resumed_run = self.workflow_run_repository.update_status(
|
|
|
+ run_id=run_id,
|
|
|
+ status="running",
|
|
|
+ )
|
|
|
+ if resumed_run is None:
|
|
|
+ return None
|
|
|
+ self._log_event(
|
|
|
+ tenant_id=tenant_id,
|
|
|
+ run_id=run_id,
|
|
|
+ node_run_id=None,
|
|
|
+ event_type="debug_run_resumed",
|
|
|
+ message=f"workflow run resumed: {reason}",
|
|
|
+ detail_json={"reason": reason},
|
|
|
+ )
|
|
|
+ return self._build_debug_snapshot(resumed_run)
|
|
|
+
|
|
|
+ def step_debug_run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ tenant_id: str,
|
|
|
+ run_id: str,
|
|
|
+ worker_key: str | None = None,
|
|
|
+ ) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str] | None:
|
|
|
+ workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if workflow_run is None or workflow_run.tenant_id != tenant_id:
|
|
|
+ return None
|
|
|
+ if workflow_run.status == "paused":
|
|
|
+ self.workflow_run_repository.update_status(run_id=run_id, status="running")
|
|
|
+
|
|
|
+ result = self.execute_next_node_run(
|
|
|
+ tenant_id=tenant_id,
|
|
|
+ run_id=run_id,
|
|
|
+ payload=NodeRunExecuteRequest(worker_key=worker_key),
|
|
|
+ )
|
|
|
+ executed_node_runs: list[NodeRun] = []
|
|
|
+ executor_names: list[str] = []
|
|
|
+ reason = "no_queued_node"
|
|
|
+ if result is not None:
|
|
|
+ _, node_run, executor_name = result
|
|
|
+ executed_node_runs.append(node_run)
|
|
|
+ executor_names.append(executor_name)
|
|
|
+ reason = "step_completed"
|
|
|
+
|
|
|
+ self._sync_workflow_run_status_from_nodes(tenant_id=tenant_id, run_id=run_id)
|
|
|
+ latest_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if latest_run is None:
|
|
|
+ return None
|
|
|
+ if latest_run.status in {"completed", "failed", "cancelled"}:
|
|
|
+ paused_run = latest_run
|
|
|
+ else:
|
|
|
+ paused_run = self.workflow_run_repository.update_status(
|
|
|
+ run_id=run_id,
|
|
|
+ status="paused",
|
|
|
+ )
|
|
|
+ if paused_run is None:
|
|
|
+ return None
|
|
|
+ self._log_event(
|
|
|
+ tenant_id=tenant_id,
|
|
|
+ run_id=run_id,
|
|
|
+ node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
|
|
|
+ event_type="debug_step_finished",
|
|
|
+ message=f"debug step finished: {reason}",
|
|
|
+ detail_json={
|
|
|
+ "reason": reason,
|
|
|
+ "executed_node_ids": [item.node_id for item in executed_node_runs],
|
|
|
+ },
|
|
|
+ )
|
|
|
+ return self._build_debug_snapshot(paused_run), executed_node_runs, executor_names, reason
|
|
|
+
|
|
|
+ def continue_debug_run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ tenant_id: str,
|
|
|
+ run_id: str,
|
|
|
+ payload: RuntimeDebugContinueRequest,
|
|
|
+ ) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str | None, str] | None:
|
|
|
+ workflow_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if workflow_run is None or workflow_run.tenant_id != tenant_id:
|
|
|
+ return None
|
|
|
+ if workflow_run.status == "paused":
|
|
|
+ self.workflow_run_repository.update_status(run_id=run_id, status="running")
|
|
|
+
|
|
|
+ breakpoint_node_ids = set(payload.breakpoint_node_ids)
|
|
|
+ executed_node_runs: list[NodeRun] = []
|
|
|
+ executor_names: list[str] = []
|
|
|
+ paused_before_node_id: str | None = None
|
|
|
+ reason = "completed"
|
|
|
+
|
|
|
+ for _ in range(payload.max_steps):
|
|
|
+ next_node_run = self.node_run_repository.get_next_queued_by_run(
|
|
|
+ tenant_id=tenant_id,
|
|
|
+ run_id=run_id,
|
|
|
+ )
|
|
|
+ if next_node_run is None:
|
|
|
+ reason = "no_queued_node"
|
|
|
+ break
|
|
|
+ if next_node_run.node_id in breakpoint_node_ids:
|
|
|
+ paused_before_node_id = next_node_run.node_id
|
|
|
+ reason = "breakpoint_hit"
|
|
|
+ break
|
|
|
+
|
|
|
+ step_result = self.execute_node_run(
|
|
|
+ node_run_id=next_node_run.id,
|
|
|
+ payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
|
|
|
+ )
|
|
|
+ if step_result is None:
|
|
|
+ reason = "node_not_found"
|
|
|
+ break
|
|
|
+ _, node_run, executor_name = step_result
|
|
|
+ executed_node_runs.append(node_run)
|
|
|
+ executor_names.append(executor_name)
|
|
|
+ if node_run.status != "completed":
|
|
|
+ reason = f"node_{node_run.status}"
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ reason = "max_steps_reached"
|
|
|
+
|
|
|
+ self._sync_workflow_run_status_from_nodes(tenant_id=tenant_id, run_id=run_id)
|
|
|
+ latest_run = self.workflow_run_repository.get_by_id(run_id)
|
|
|
+ if latest_run is None:
|
|
|
+ return None
|
|
|
+ if latest_run.status in {"completed", "failed", "cancelled"}:
|
|
|
+ paused_run = latest_run
|
|
|
+ else:
|
|
|
+ paused_run = self.workflow_run_repository.update_status(
|
|
|
+ run_id=run_id,
|
|
|
+ status="paused",
|
|
|
+ )
|
|
|
+ if paused_run is None:
|
|
|
+ return None
|
|
|
+ self._log_event(
|
|
|
+ tenant_id=tenant_id,
|
|
|
+ run_id=run_id,
|
|
|
+ node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
|
|
|
+ event_type="debug_continue_paused",
|
|
|
+ message=f"debug continue paused: {reason}",
|
|
|
+ detail_json={
|
|
|
+ "reason": reason,
|
|
|
+ "paused_before_node_id": paused_before_node_id,
|
|
|
+ "executed_node_ids": [item.node_id for item in executed_node_runs],
|
|
|
+ "breakpoint_node_ids": list(breakpoint_node_ids),
|
|
|
+ },
|
|
|
+ )
|
|
|
+ return (
|
|
|
+ self._build_debug_snapshot(paused_run),
|
|
|
+ executed_node_runs,
|
|
|
+ executor_names,
|
|
|
+ paused_before_node_id,
|
|
|
+ reason,
|
|
|
+ )
|
|
|
+
|
|
|
def resume_human_node_run(
|
|
|
self,
|
|
|
*,
|
|
|
@@ -827,6 +1053,45 @@ class RuntimeApplicationService:
|
|
|
|
|
|
return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
|
|
|
|
|
|
+ def _build_debug_snapshot(self, workflow_run: WorkflowRun) -> RuntimeDebugSnapshot:
|
|
|
+ node_runs = self.node_run_repository.list_by_run(
|
|
|
+ tenant_id=workflow_run.tenant_id,
|
|
|
+ run_id=workflow_run.id,
|
|
|
+ )
|
|
|
+ run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
|
|
|
+ self._build_run_state_maps(
|
|
|
+ tenant_id=workflow_run.tenant_id,
|
|
|
+ run_id=workflow_run.id,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ return RuntimeDebugSnapshot(
|
|
|
+ run=workflow_run,
|
|
|
+ node_runs=node_runs,
|
|
|
+ run_state_json=run_state_json,
|
|
|
+ node_output_json_by_node_id=node_output_json_by_node_id,
|
|
|
+ node_output_text_by_node_id=node_output_text_by_node_id,
|
|
|
+ queued_node_ids=[
|
|
|
+ item.node_id for item in node_runs if item.status in {"pending", "queued"}
|
|
|
+ ],
|
|
|
+ running_node_ids=[item.node_id for item in node_runs if item.status == "running"],
|
|
|
+ completed_node_ids=[
|
|
|
+ item.node_id for item in node_runs if item.status in {"completed", "skipped"}
|
|
|
+ ],
|
|
|
+ failed_node_ids=[item.node_id for item in node_runs if item.status == "failed"],
|
|
|
+ execution_logs=self.execution_log_repository.list_by_scope(
|
|
|
+ tenant_id=workflow_run.tenant_id,
|
|
|
+ run_id=workflow_run.id,
|
|
|
+ ),
|
|
|
+ node_artifacts=self.node_artifact_repository.list_by_scope(
|
|
|
+ tenant_id=workflow_run.tenant_id,
|
|
|
+ run_id=workflow_run.id,
|
|
|
+ ),
|
|
|
+ trace_spans=self.trace_span_repository.list_by_scope(
|
|
|
+ tenant_id=workflow_run.tenant_id,
|
|
|
+ run_id=workflow_run.id,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
def _build_node_timing(
|
|
|
self,
|
|
|
node_config_json: dict[str, JSONValue],
|