test_workflow_designer_debugger.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import annotations
  2. from tests.conftest import build_fastapi_test_client, prepare_service_import
  3. prepare_service_import(
  4. "workflow-service",
  5. libs=("core-domain", "core-shared", "core-db", "core-dsl"),
  6. )
  7. from app.bootstrap.app import create_app
  8. from app.api.routes import get_workflow_application_service
  9. from app.application.designer import WorkflowDebugPlan, build_debug_plan
  10. def test_workflow_designer_validate_returns_graph_diagnostics() -> None:
  11. client = build_fastapi_test_client(create_app())
  12. response = client.post(
  13. "/workflows/designer/validate",
  14. json={
  15. "dsl_json": {
  16. "code": "order_flow",
  17. "name": "Order Flow",
  18. "nodes": [
  19. {"id": "start", "type": "llm", "name": "Start"},
  20. {"id": "answer", "type": "answer", "name": "Answer"},
  21. {"id": "orphan", "type": "tool", "name": "Orphan Tool"},
  22. ],
  23. "edges": [{"source": "start", "target": "answer"}],
  24. }
  25. },
  26. )
  27. assert response.status_code == 200
  28. payload = response.json()
  29. assert payload["valid"] is True
  30. assert payload["node_count"] == 3
  31. assert payload["edge_count"] == 1
  32. assert payload["entry_node_ids"] == ["start", "orphan"]
  33. assert payload["terminal_node_ids"] == ["answer", "orphan"]
  34. assert payload["isolated_node_ids"] == ["orphan"]
  35. assert payload["unreachable_node_ids"] == []
  36. assert payload["diagnostics"][0]["code"] == "workflow.multiple_entry_nodes"
  37. assert payload["normalized_dsl_json"]["nodes"][0]["config"] == {}
  38. def test_workflow_designer_validate_reports_blocking_errors() -> None:
  39. client = build_fastapi_test_client(create_app())
  40. response = client.post(
  41. "/workflows/designer/validate",
  42. json={
  43. "dsl_json": {
  44. "code": "broken_flow",
  45. "nodes": [{"id": "start", "type": "llm"}],
  46. "edges": [{"source": "start", "target": "missing"}],
  47. }
  48. },
  49. )
  50. assert response.status_code == 200
  51. payload = response.json()
  52. assert payload["valid"] is False
  53. assert payload["edges"][0]["valid_source"] is True
  54. assert payload["edges"][0]["valid_target"] is False
  55. assert any(item["code"] == "edge.target_missing" for item in payload["diagnostics"])
  56. def test_workflow_debugger_returns_execution_preview() -> None:
  57. client = build_fastapi_test_client(create_app())
  58. response = client.post(
  59. "/workflows/designer/debug",
  60. json={
  61. "dsl_json": {
  62. "code": "debug_flow",
  63. "nodes": [
  64. {"id": "start", "type": "llm"},
  65. {"id": "tool", "type": "tool"},
  66. {"id": "answer", "type": "answer"},
  67. ],
  68. "edges": [
  69. {"source": "start", "target": "tool"},
  70. {"source": "tool", "target": "answer"},
  71. ],
  72. },
  73. "max_preview_steps": 10,
  74. },
  75. )
  76. assert response.status_code == 200
  77. payload = response.json()
  78. assert payload["valid"] is True
  79. assert payload["truncated"] is False
  80. assert [item["node_id"] for item in payload["execution_preview"]] == [
  81. "start",
  82. "tool",
  83. "answer",
  84. ]
  85. assert payload["execution_preview"][0]["next_node_ids"] == ["tool"]
  86. class FakeWorkflowDebugService:
  87. def build_version_debug_plan(
  88. self,
  89. *,
  90. tenant_id: str,
  91. workflow_version_id: str,
  92. max_preview_steps: int,
  93. ) -> WorkflowDebugPlan | None:
  94. assert tenant_id == "t1"
  95. assert workflow_version_id == "version-1"
  96. return build_debug_plan(
  97. {
  98. "code": "persisted_flow",
  99. "nodes": [
  100. {"id": "start", "type": "llm"},
  101. {"id": "answer", "type": "answer"},
  102. ],
  103. "edges": [{"source": "start", "target": "answer"}],
  104. },
  105. max_preview_steps=max_preview_steps,
  106. )
  107. def test_workflow_version_debugger_api_uses_persisted_dsl_service() -> None:
  108. app = create_app()
  109. app.dependency_overrides[get_workflow_application_service] = lambda: FakeWorkflowDebugService()
  110. client = build_fastapi_test_client(app)
  111. debug_response = client.get(
  112. "/workflows/versions/version-1/debug",
  113. params={"tenant_id": "t1"},
  114. )
  115. assert debug_response.status_code == 200
  116. debug_payload = debug_response.json()
  117. assert debug_payload["valid"] is True
  118. assert [item["node_id"] for item in debug_payload["execution_preview"]] == [
  119. "start",
  120. "answer",
  121. ]