Просмотр исходного кода

feat: add workflow designer debugger api

Jax Docker 1 месяц назад
Родитель
Сommit
179230bc6d

+ 39 - 0
services/workflow-service/app/api/routes.py

@@ -16,6 +16,10 @@ from app.schemas.app import AppCreateRequest, AppResponse, AppVersionCreateReque
 from app.schemas.workflow import (
     WorkflowCreateRequest,
     WorkflowDefinitionResponse,
+    WorkflowDebuggerPlanRequest,
+    WorkflowDebuggerPlanResponse,
+    WorkflowDesignerValidateRequest,
+    WorkflowDesignerValidateResponse,
     WorkflowVersionCreateRequest,
     WorkflowVersionResponse,
 )
@@ -51,6 +55,24 @@ def get_sample_workflow() -> WorkflowDefinition:
     )
 
 
+@router.post("/designer/validate", response_model=WorkflowDesignerValidateResponse)
+def validate_workflow_designer_dsl(
+    payload: WorkflowDesignerValidateRequest,
+    service: WorkflowApplicationService = Depends(get_workflow_application_service),
+) -> WorkflowDesignerValidateResponse:
+    inspection = service.validate_designer_workflow(payload)
+    return WorkflowDesignerValidateResponse.from_inspection(inspection)
+
+
+@router.post("/designer/debug", response_model=WorkflowDebuggerPlanResponse)
+def debug_workflow_designer_dsl(
+    payload: WorkflowDebuggerPlanRequest,
+    service: WorkflowApplicationService = Depends(get_workflow_application_service),
+) -> WorkflowDebuggerPlanResponse:
+    plan = service.build_designer_debug_plan(payload)
+    return WorkflowDebuggerPlanResponse.from_plan(plan)
+
+
 @router.post("/apps", response_model=AppResponse)
 def create_app(
     payload: AppCreateRequest,
@@ -137,3 +159,20 @@ def get_workflow_version(
     if entity is None:
         raise HTTPException(status_code=404, detail=f"workflow_version not found: {workflow_version_id}")
     return WorkflowVersionResponse.from_entity(entity)
+
+
+@router.get("/versions/{workflow_version_id}/debug", response_model=WorkflowDebuggerPlanResponse)
+def debug_workflow_version(
+    workflow_version_id: str,
+    tenant_id: str = Query(...),
+    max_preview_steps: int = Query(default=50, ge=1, le=500),
+    service: WorkflowApplicationService = Depends(get_workflow_application_service),
+) -> WorkflowDebuggerPlanResponse:
+    plan = service.build_version_debug_plan(
+        tenant_id=tenant_id,
+        workflow_version_id=workflow_version_id,
+        max_preview_steps=max_preview_steps,
+    )
+    if plan is None:
+        raise HTTPException(status_code=404, detail=f"workflow_version not found: {workflow_version_id}")
+    return WorkflowDebuggerPlanResponse.from_plan(plan)

+ 361 - 0
services/workflow-service/app/application/designer.py

@@ -0,0 +1,361 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Literal
+
+from pydantic import ValidationError
+
+from core_dsl import WorkflowDefinition, parse_workflow_definition
+from core_shared import JSONValue
+
+
+DiagnosticSeverity = Literal["error", "warning", "info"]
+
+
+@dataclass(frozen=True)
+class WorkflowDiagnostic:
+    severity: DiagnosticSeverity
+    code: str
+    message: str
+    node_id: str | None = None
+    edge_index: int | None = None
+
+
+@dataclass(frozen=True)
+class WorkflowNodeInspection:
+    id: str
+    type: str
+    name: str | None
+    incoming_count: int
+    outgoing_count: int
+    reachable: bool
+
+
+@dataclass(frozen=True)
+class WorkflowEdgeInspection:
+    source: str
+    target: str
+    condition: str | None
+    valid_source: bool
+    valid_target: bool
+
+
+@dataclass(frozen=True)
+class WorkflowDebugStep:
+    step_index: int
+    node_id: str
+    node_type: str
+    name: str | None
+    next_node_ids: list[str]
+
+
+@dataclass(frozen=True)
+class WorkflowInspection:
+    valid: bool
+    diagnostics: list[WorkflowDiagnostic]
+    workflow: WorkflowDefinition | None
+    nodes: list[WorkflowNodeInspection]
+    edges: list[WorkflowEdgeInspection]
+    entry_node_ids: list[str]
+    terminal_node_ids: list[str]
+    isolated_node_ids: list[str]
+    unreachable_node_ids: list[str]
+    cycle_detected: bool
+
+
+@dataclass(frozen=True)
+class WorkflowDebugPlan:
+    inspection: WorkflowInspection
+    execution_preview: list[WorkflowDebugStep]
+    max_preview_steps: int
+    truncated: bool
+
+
+def inspect_workflow_dsl(payload: dict[str, JSONValue] | None) -> WorkflowInspection:
+    if payload is None:
+        return WorkflowInspection(
+            valid=False,
+            diagnostics=[
+                WorkflowDiagnostic(
+                    severity="error",
+                    code="dsl.required",
+                    message="workflow dsl_json is required",
+                )
+            ],
+            workflow=None,
+            nodes=[],
+            edges=[],
+            entry_node_ids=[],
+            terminal_node_ids=[],
+            isolated_node_ids=[],
+            unreachable_node_ids=[],
+            cycle_detected=False,
+        )
+
+    try:
+        workflow = parse_workflow_definition(payload)
+    except ValidationError as exc:
+        return WorkflowInspection(
+            valid=False,
+            diagnostics=[
+                WorkflowDiagnostic(
+                    severity="error",
+                    code="dsl.schema_invalid",
+                    message=str(exc),
+                )
+            ],
+            workflow=None,
+            nodes=[],
+            edges=[],
+            entry_node_ids=[],
+            terminal_node_ids=[],
+            isolated_node_ids=[],
+            unreachable_node_ids=[],
+            cycle_detected=False,
+        )
+
+    if workflow is None:
+        return WorkflowInspection(
+            valid=False,
+            diagnostics=[
+                WorkflowDiagnostic(
+                    severity="error",
+                    code="dsl.required",
+                    message="workflow dsl_json is required",
+                )
+            ],
+            workflow=None,
+            nodes=[],
+            edges=[],
+            entry_node_ids=[],
+            terminal_node_ids=[],
+            isolated_node_ids=[],
+            unreachable_node_ids=[],
+            cycle_detected=False,
+        )
+
+    diagnostics: list[WorkflowDiagnostic] = []
+    node_ids = [node.id for node in workflow.nodes]
+    node_id_set = set(node_ids)
+    duplicate_node_ids = sorted({node_id for node_id in node_ids if node_ids.count(node_id) > 1})
+    for node_id in duplicate_node_ids:
+        diagnostics.append(
+            WorkflowDiagnostic(
+                severity="error",
+                code="node.duplicate_id",
+                message=f"duplicate node id: {node_id}",
+                node_id=node_id,
+            )
+        )
+
+    incoming_counts = {node_id: 0 for node_id in node_ids}
+    outgoing_counts = {node_id: 0 for node_id in node_ids}
+    adjacency: dict[str, list[str]] = {node_id: [] for node_id in node_ids}
+    edge_inspections: list[WorkflowEdgeInspection] = []
+
+    for edge_index, edge in enumerate(workflow.edges):
+        valid_source = edge.source in node_id_set
+        valid_target = edge.target in node_id_set
+        edge_inspections.append(
+            WorkflowEdgeInspection(
+                source=edge.source,
+                target=edge.target,
+                condition=edge.condition,
+                valid_source=valid_source,
+                valid_target=valid_target,
+            )
+        )
+        if not valid_source:
+            diagnostics.append(
+                WorkflowDiagnostic(
+                    severity="error",
+                    code="edge.source_missing",
+                    message=f"edge source node does not exist: {edge.source}",
+                    node_id=edge.source,
+                    edge_index=edge_index,
+                )
+            )
+        if not valid_target:
+            diagnostics.append(
+                WorkflowDiagnostic(
+                    severity="error",
+                    code="edge.target_missing",
+                    message=f"edge target node does not exist: {edge.target}",
+                    node_id=edge.target,
+                    edge_index=edge_index,
+                )
+            )
+        if valid_source and valid_target:
+            outgoing_counts[edge.source] = outgoing_counts.get(edge.source, 0) + 1
+            incoming_counts[edge.target] = incoming_counts.get(edge.target, 0) + 1
+            adjacency.setdefault(edge.source, []).append(edge.target)
+
+    if not workflow.nodes:
+        diagnostics.append(
+            WorkflowDiagnostic(
+                severity="error",
+                code="workflow.nodes_required",
+                message="workflow must contain at least one node",
+            )
+        )
+
+    entry_node_ids = [node_id for node_id in node_ids if incoming_counts.get(node_id, 0) == 0]
+    terminal_node_ids = [node_id for node_id in node_ids if outgoing_counts.get(node_id, 0) == 0]
+    isolated_node_ids = [
+        node_id
+        for node_id in node_ids
+        if incoming_counts.get(node_id, 0) == 0 and outgoing_counts.get(node_id, 0) == 0
+    ]
+
+    if len(entry_node_ids) > 1:
+        diagnostics.append(
+            WorkflowDiagnostic(
+                severity="warning",
+                code="workflow.multiple_entry_nodes",
+                message=f"workflow has multiple entry nodes: {', '.join(entry_node_ids)}",
+            )
+        )
+    if not terminal_node_ids and workflow.nodes:
+        diagnostics.append(
+            WorkflowDiagnostic(
+                severity="warning",
+                code="workflow.no_terminal_node",
+                message="workflow has no terminal node",
+            )
+        )
+
+    reachable_node_ids = _find_reachable_nodes(entry_node_ids, adjacency)
+    unreachable_node_ids = [node_id for node_id in node_ids if node_id not in reachable_node_ids]
+    for node_id in unreachable_node_ids:
+        diagnostics.append(
+            WorkflowDiagnostic(
+                severity="warning",
+                code="node.unreachable",
+                message=f"node is not reachable from an entry node: {node_id}",
+                node_id=node_id,
+            )
+        )
+
+    cycle_detected = _detect_cycle(node_ids, adjacency)
+    if cycle_detected:
+        diagnostics.append(
+            WorkflowDiagnostic(
+                severity="warning",
+                code="workflow.cycle_detected",
+                message="workflow graph contains a cycle; debugger preview may be truncated",
+            )
+        )
+
+    node_inspections = [
+        WorkflowNodeInspection(
+            id=node.id,
+            type=node.type,
+            name=node.name,
+            incoming_count=incoming_counts.get(node.id, 0),
+            outgoing_count=outgoing_counts.get(node.id, 0),
+            reachable=node.id in reachable_node_ids,
+        )
+        for node in workflow.nodes
+    ]
+    valid = not any(item.severity == "error" for item in diagnostics)
+    return WorkflowInspection(
+        valid=valid,
+        diagnostics=diagnostics,
+        workflow=workflow,
+        nodes=node_inspections,
+        edges=edge_inspections,
+        entry_node_ids=entry_node_ids,
+        terminal_node_ids=terminal_node_ids,
+        isolated_node_ids=isolated_node_ids,
+        unreachable_node_ids=unreachable_node_ids,
+        cycle_detected=cycle_detected,
+    )
+
+
+def build_debug_plan(
+    payload: dict[str, JSONValue] | None,
+    *,
+    max_preview_steps: int = 50,
+) -> WorkflowDebugPlan:
+    inspection = inspect_workflow_dsl(payload)
+    workflow = inspection.workflow
+    if workflow is None or not inspection.valid:
+        return WorkflowDebugPlan(
+            inspection=inspection,
+            execution_preview=[],
+            max_preview_steps=max_preview_steps,
+            truncated=False,
+        )
+
+    node_map = {node.id: node for node in workflow.nodes}
+    adjacency: dict[str, list[str]] = {node.id: [] for node in workflow.nodes}
+    for edge in workflow.edges:
+        if edge.source in node_map and edge.target in node_map:
+            adjacency.setdefault(edge.source, []).append(edge.target)
+
+    preview: list[WorkflowDebugStep] = []
+    queue = list(inspection.entry_node_ids)
+    seen_visits: dict[str, int] = {}
+    truncated = False
+
+    while queue and len(preview) < max_preview_steps:
+        node_id = queue.pop(0)
+        node = node_map.get(node_id)
+        if node is None:
+            continue
+        seen_visits[node_id] = seen_visits.get(node_id, 0) + 1
+        if seen_visits[node_id] > 1:
+            continue
+        next_node_ids = adjacency.get(node_id, [])
+        preview.append(
+            WorkflowDebugStep(
+                step_index=len(preview),
+                node_id=node.id,
+                node_type=node.type,
+                name=node.name,
+                next_node_ids=next_node_ids,
+            )
+        )
+        queue.extend(next_node_ids)
+
+    if queue:
+        truncated = True
+
+    return WorkflowDebugPlan(
+        inspection=inspection,
+        execution_preview=preview,
+        max_preview_steps=max_preview_steps,
+        truncated=truncated,
+    )
+
+
+def _find_reachable_nodes(entry_node_ids: list[str], adjacency: dict[str, list[str]]) -> set[str]:
+    reachable: set[str] = set()
+    stack = list(entry_node_ids)
+    while stack:
+        node_id = stack.pop()
+        if node_id in reachable:
+            continue
+        reachable.add(node_id)
+        stack.extend(adjacency.get(node_id, []))
+    return reachable
+
+
+def _detect_cycle(node_ids: list[str], adjacency: dict[str, list[str]]) -> bool:
+    visiting: set[str] = set()
+    visited: set[str] = set()
+
+    def visit(node_id: str) -> bool:
+        if node_id in visiting:
+            return True
+        if node_id in visited:
+            return False
+        visiting.add(node_id)
+        for next_node_id in adjacency.get(node_id, []):
+            if visit(next_node_id):
+                return True
+        visiting.remove(node_id)
+        visited.add(node_id)
+        return False
+
+    return any(visit(node_id) for node_id in node_ids)

+ 78 - 2
services/workflow-service/app/application/services.py

@@ -2,6 +2,12 @@ from pydantic import ValidationError
 
 from core_dsl import parse_workflow_definition
 from core_shared import JSONValue
+from app.application.designer import (
+    WorkflowDebugPlan,
+    WorkflowInspection,
+    build_debug_plan,
+    inspect_workflow_dsl,
+)
 from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion
 from app.domain.repositories import (
     AppDefinitionRepository,
@@ -10,7 +16,12 @@ from app.domain.repositories import (
     WorkflowVersionRepository,
 )
 from app.schemas.app import AppCreateRequest, AppVersionCreateRequest
-from app.schemas.workflow import WorkflowCreateRequest, WorkflowVersionCreateRequest
+from app.schemas.workflow import (
+    WorkflowCreateRequest,
+    WorkflowDebuggerPlanRequest,
+    WorkflowDesignerValidateRequest,
+    WorkflowVersionCreateRequest,
+)
 
 
 class WorkflowApplicationService:
@@ -53,11 +64,14 @@ class WorkflowApplicationService:
 
     def create_workflow_version(self, payload: WorkflowVersionCreateRequest) -> WorkflowVersion:
         dsl_json = self._validate_workflow_dsl(payload.dsl_json)
+        compiled_plan_json = payload.compiled_plan_json
+        if compiled_plan_json is None and dsl_json is not None:
+            compiled_plan_json = self._build_compiled_plan_json(dsl_json)
         return self.workflow_version_repository.create(
             tenant_id=payload.tenant_id,
             workflow_id=payload.workflow_id,
             dsl_json=dsl_json,
-            compiled_plan_json=payload.compiled_plan_json,
+            compiled_plan_json=compiled_plan_json,
             schema_version=payload.schema_version,
             checksum=payload.checksum,
             status=payload.status,
@@ -75,6 +89,39 @@ class WorkflowApplicationService:
             workflow_version_id=workflow_version_id,
         )
 
+    def validate_designer_workflow(
+        self,
+        payload: WorkflowDesignerValidateRequest,
+    ) -> WorkflowInspection:
+        return inspect_workflow_dsl(payload.dsl_json)
+
+    def build_designer_debug_plan(
+        self,
+        payload: WorkflowDebuggerPlanRequest,
+    ) -> WorkflowDebugPlan:
+        return build_debug_plan(
+            payload.dsl_json,
+            max_preview_steps=payload.max_preview_steps,
+        )
+
+    def build_version_debug_plan(
+        self,
+        *,
+        tenant_id: str,
+        workflow_version_id: str,
+        max_preview_steps: int,
+    ) -> WorkflowDebugPlan | None:
+        entity = self.get_workflow_version(
+            tenant_id=tenant_id,
+            workflow_version_id=workflow_version_id,
+        )
+        if entity is None:
+            return None
+        return build_debug_plan(
+            entity.dsl_json,
+            max_preview_steps=max_preview_steps,
+        )
+
     def create_app_version(self, payload: AppVersionCreateRequest) -> AppVersion:
         return self.app_version_repository.create(
             tenant_id=payload.tenant_id,
@@ -100,6 +147,35 @@ class WorkflowApplicationService:
         except ValidationError as exc:
             raise ValueError(f"invalid workflow dsl: {exc}") from exc
 
+        inspection = inspect_workflow_dsl(dsl_json)
+        errors = [item for item in inspection.diagnostics if item.severity == "error"]
+        if errors:
+            message = "; ".join(f"{item.code}: {item.message}" for item in errors)
+            raise ValueError(f"invalid workflow dsl: {message}")
+
         if workflow is None:
             return None
         return workflow.model_dump(mode="json")
+
+    def _build_compiled_plan_json(
+        self,
+        dsl_json: dict[str, JSONValue],
+    ) -> dict[str, JSONValue]:
+        plan = build_debug_plan(dsl_json)
+        return {
+            "valid": plan.inspection.valid,
+            "entry_node_ids": plan.inspection.entry_node_ids,
+            "terminal_node_ids": plan.inspection.terminal_node_ids,
+            "node_count": len(plan.inspection.nodes),
+            "edge_count": len(plan.inspection.edges),
+            "execution_preview": [
+                {
+                    "step_index": item.step_index,
+                    "node_id": item.node_id,
+                    "node_type": item.node_type,
+                    "name": item.name,
+                    "next_node_ids": item.next_node_ids,
+                }
+                for item in plan.execution_preview
+            ],
+        }

+ 145 - 1
services/workflow-service/app/schemas/workflow.py

@@ -1,9 +1,17 @@
 from datetime import datetime
 from typing import TYPE_CHECKING
 
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
 from core_domain import WorkflowVersionContract
 from core_shared import JSONValue
+from app.application.designer import (
+    DiagnosticSeverity,
+    WorkflowDebugPlan,
+    WorkflowDiagnostic,
+    WorkflowEdgeInspection,
+    WorkflowInspection,
+    WorkflowNodeInspection,
+)
 
 if TYPE_CHECKING:
     from app.db.models import WorkflowDefinitionModel, WorkflowVersion
@@ -47,3 +55,139 @@ class WorkflowVersionResponse(WorkflowVersionContract):
     @classmethod
     def from_entity(cls, entity: "WorkflowVersion") -> "WorkflowVersionResponse":
         return cls.model_validate(entity, from_attributes=True)
+
+
+class WorkflowDesignerValidateRequest(BaseModel):
+    dsl_json: dict[str, JSONValue]
+
+
+class WorkflowDesignerDiagnosticResponse(BaseModel):
+    severity: DiagnosticSeverity
+    code: str
+    message: str
+    node_id: str | None = None
+    edge_index: int | None = None
+
+    @classmethod
+    def from_inspection(cls, item: WorkflowDiagnostic) -> "WorkflowDesignerDiagnosticResponse":
+        return cls(
+            severity=item.severity,
+            code=item.code,
+            message=item.message,
+            node_id=item.node_id,
+            edge_index=item.edge_index,
+        )
+
+
+class WorkflowDesignerNodeResponse(BaseModel):
+    id: str
+    type: str
+    name: str | None = None
+    incoming_count: int
+    outgoing_count: int
+    reachable: bool
+
+    @classmethod
+    def from_inspection(cls, item: WorkflowNodeInspection) -> "WorkflowDesignerNodeResponse":
+        return cls(
+            id=item.id,
+            type=item.type,
+            name=item.name,
+            incoming_count=item.incoming_count,
+            outgoing_count=item.outgoing_count,
+            reachable=item.reachable,
+        )
+
+
+class WorkflowDesignerEdgeResponse(BaseModel):
+    source: str
+    target: str
+    condition: str | None = None
+    valid_source: bool
+    valid_target: bool
+
+    @classmethod
+    def from_inspection(cls, item: WorkflowEdgeInspection) -> "WorkflowDesignerEdgeResponse":
+        return cls(
+            source=item.source,
+            target=item.target,
+            condition=item.condition,
+            valid_source=item.valid_source,
+            valid_target=item.valid_target,
+        )
+
+
+class WorkflowDesignerValidateResponse(BaseModel):
+    valid: bool
+    diagnostics: list[WorkflowDesignerDiagnosticResponse]
+    node_count: int
+    edge_count: int
+    nodes: list[WorkflowDesignerNodeResponse]
+    edges: list[WorkflowDesignerEdgeResponse]
+    entry_node_ids: list[str]
+    terminal_node_ids: list[str]
+    isolated_node_ids: list[str]
+    unreachable_node_ids: list[str]
+    cycle_detected: bool
+    normalized_dsl_json: dict[str, JSONValue] | None = None
+
+    @classmethod
+    def from_inspection(cls, inspection: WorkflowInspection) -> "WorkflowDesignerValidateResponse":
+        normalized_dsl_json: dict[str, JSONValue] | None = None
+        if inspection.workflow is not None:
+            normalized_dsl_json = inspection.workflow.model_dump(mode="json")
+        return cls(
+            valid=inspection.valid,
+            diagnostics=[
+                WorkflowDesignerDiagnosticResponse.from_inspection(item)
+                for item in inspection.diagnostics
+            ],
+            node_count=len(inspection.nodes),
+            edge_count=len(inspection.edges),
+            nodes=[WorkflowDesignerNodeResponse.from_inspection(item) for item in inspection.nodes],
+            edges=[WorkflowDesignerEdgeResponse.from_inspection(item) for item in inspection.edges],
+            entry_node_ids=inspection.entry_node_ids,
+            terminal_node_ids=inspection.terminal_node_ids,
+            isolated_node_ids=inspection.isolated_node_ids,
+            unreachable_node_ids=inspection.unreachable_node_ids,
+            cycle_detected=inspection.cycle_detected,
+            normalized_dsl_json=normalized_dsl_json,
+        )
+
+
+class WorkflowDebuggerPlanRequest(BaseModel):
+    dsl_json: dict[str, JSONValue]
+    max_preview_steps: int = Field(default=50, ge=1, le=500)
+
+
+class WorkflowDebuggerStepResponse(BaseModel):
+    step_index: int
+    node_id: str
+    node_type: str
+    name: str | None = None
+    next_node_ids: list[str]
+
+
+class WorkflowDebuggerPlanResponse(WorkflowDesignerValidateResponse):
+    execution_preview: list[WorkflowDebuggerStepResponse]
+    max_preview_steps: int
+    truncated: bool
+
+    @classmethod
+    def from_plan(cls, plan: WorkflowDebugPlan) -> "WorkflowDebuggerPlanResponse":
+        base = WorkflowDesignerValidateResponse.from_inspection(plan.inspection)
+        return cls(
+            **base.model_dump(),
+            execution_preview=[
+                WorkflowDebuggerStepResponse(
+                    step_index=item.step_index,
+                    node_id=item.node_id,
+                    node_type=item.node_type,
+                    name=item.name,
+                    next_node_ids=item.next_node_ids,
+                )
+                for item in plan.execution_preview
+            ],
+            max_preview_steps=plan.max_preview_steps,
+            truncated=plan.truncated,
+        )

+ 57 - 0
tests/conftest.py

@@ -0,0 +1,57 @@
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+from typing import Any
+
+
+REPO_ROOT = Path(__file__).resolve().parents[1]
+
+
+def prepare_service_import(
+    service_name: str,
+    *,
+    libs: tuple[str, ...],
+) -> None:
+    for module_name in list(sys.modules):
+        if module_name == "app" or module_name.startswith("app."):
+            del sys.modules[module_name]
+
+    for lib_name in libs:
+        lib_path = REPO_ROOT / "libs" / lib_name / "src"
+        _prepend_sys_path(lib_path)
+    _prepend_sys_path(REPO_ROOT / "services" / service_name)
+
+
+def _prepend_sys_path(path: Path) -> None:
+    path_text = str(path)
+    if path_text in sys.path:
+        sys.path.remove(path_text)
+    sys.path.insert(0, path_text)
+
+
+def build_fastapi_test_client(app: Any) -> Any:
+    _patch_httpx_testclient_compatibility()
+    from fastapi.testclient import TestClient
+
+    return TestClient(app)
+
+
+def _patch_httpx_testclient_compatibility() -> None:
+    import inspect
+
+    import httpx
+
+    if "app" in inspect.signature(httpx.Client.__init__).parameters:
+        return
+    if getattr(httpx.Client.__init__, "_agent_platform_patched", False):
+        return
+
+    original_init = httpx.Client.__init__
+
+    def patched_init(self: httpx.Client, *args: Any, **kwargs: Any) -> None:
+        kwargs.pop("app", None)
+        original_init(self, *args, **kwargs)
+
+    setattr(patched_init, "_agent_platform_patched", True)
+    httpx.Client.__init__ = patched_init

+ 142 - 0
tests/test_workflow_designer_debugger.py

@@ -0,0 +1,142 @@
+from __future__ import annotations
+
+from tests.conftest import build_fastapi_test_client, prepare_service_import
+
+
+prepare_service_import(
+    "workflow-service",
+    libs=("core-domain", "core-shared", "core-db", "core-dsl"),
+)
+
+from app.bootstrap.app import create_app
+from app.api.routes import get_workflow_application_service
+from app.application.designer import WorkflowDebugPlan, build_debug_plan
+
+
+def test_workflow_designer_validate_returns_graph_diagnostics() -> None:
+    client = build_fastapi_test_client(create_app())
+
+    response = client.post(
+        "/workflows/designer/validate",
+        json={
+            "dsl_json": {
+                "code": "order_flow",
+                "name": "Order Flow",
+                "nodes": [
+                    {"id": "start", "type": "llm", "name": "Start"},
+                    {"id": "answer", "type": "answer", "name": "Answer"},
+                    {"id": "orphan", "type": "tool", "name": "Orphan Tool"},
+                ],
+                "edges": [{"source": "start", "target": "answer"}],
+            }
+        },
+    )
+
+    assert response.status_code == 200
+    payload = response.json()
+    assert payload["valid"] is True
+    assert payload["node_count"] == 3
+    assert payload["edge_count"] == 1
+    assert payload["entry_node_ids"] == ["start", "orphan"]
+    assert payload["terminal_node_ids"] == ["answer", "orphan"]
+    assert payload["isolated_node_ids"] == ["orphan"]
+    assert payload["unreachable_node_ids"] == []
+    assert payload["diagnostics"][0]["code"] == "workflow.multiple_entry_nodes"
+    assert payload["normalized_dsl_json"]["nodes"][0]["config"] == {}
+
+
+def test_workflow_designer_validate_reports_blocking_errors() -> None:
+    client = build_fastapi_test_client(create_app())
+
+    response = client.post(
+        "/workflows/designer/validate",
+        json={
+            "dsl_json": {
+                "code": "broken_flow",
+                "nodes": [{"id": "start", "type": "llm"}],
+                "edges": [{"source": "start", "target": "missing"}],
+            }
+        },
+    )
+
+    assert response.status_code == 200
+    payload = response.json()
+    assert payload["valid"] is False
+    assert payload["edges"][0]["valid_source"] is True
+    assert payload["edges"][0]["valid_target"] is False
+    assert any(item["code"] == "edge.target_missing" for item in payload["diagnostics"])
+
+
+def test_workflow_debugger_returns_execution_preview() -> None:
+    client = build_fastapi_test_client(create_app())
+
+    response = client.post(
+        "/workflows/designer/debug",
+        json={
+            "dsl_json": {
+                "code": "debug_flow",
+                "nodes": [
+                    {"id": "start", "type": "llm"},
+                    {"id": "tool", "type": "tool"},
+                    {"id": "answer", "type": "answer"},
+                ],
+                "edges": [
+                    {"source": "start", "target": "tool"},
+                    {"source": "tool", "target": "answer"},
+                ],
+            },
+            "max_preview_steps": 10,
+        },
+    )
+
+    assert response.status_code == 200
+    payload = response.json()
+    assert payload["valid"] is True
+    assert payload["truncated"] is False
+    assert [item["node_id"] for item in payload["execution_preview"]] == [
+        "start",
+        "tool",
+        "answer",
+    ]
+    assert payload["execution_preview"][0]["next_node_ids"] == ["tool"]
+
+
+class FakeWorkflowDebugService:
+    def build_version_debug_plan(
+        self,
+        *,
+        tenant_id: str,
+        workflow_version_id: str,
+        max_preview_steps: int,
+    ) -> WorkflowDebugPlan | None:
+        assert tenant_id == "t1"
+        assert workflow_version_id == "version-1"
+        return build_debug_plan(
+            {
+                "code": "persisted_flow",
+                "nodes": [
+                    {"id": "start", "type": "llm"},
+                    {"id": "answer", "type": "answer"},
+                ],
+                "edges": [{"source": "start", "target": "answer"}],
+            },
+            max_preview_steps=max_preview_steps,
+        )
+
+
+def test_workflow_version_debugger_api_uses_persisted_dsl_service() -> None:
+    app = create_app()
+    app.dependency_overrides[get_workflow_application_service] = lambda: FakeWorkflowDebugService()
+    client = build_fastapi_test_client(app)
+
+    debug_response = client.get(
+        "/workflows/versions/version-1/debug",
+        params={"tenant_id": "t1"},
+    )
+    assert debug_response.status_code == 200
+    debug_payload = debug_response.json()
+    assert debug_payload["valid"] is True
+    assert [item["node_id"] for item in debug_payload["execution_preview"]] == [
+        "start",
+        "answer",
+    ]