| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- from __future__ import annotations
- from collections.abc import Generator
- from datetime import datetime
- from pathlib import Path
- from sqlalchemy.orm import Session
- from tests.conftest import build_fastapi_test_client, prepare_service_import
- def test_runtime_debugger_pause_step_and_breakpoint_continue(tmp_path: Path) -> None:
- prepare_service_import(
- "runtime-service",
- libs=("core-domain", "core-shared", "core-db", "core-events", "core-dsl"),
- )
- from app.api.routes import get_runtime_application_service
- from app.application.services import RuntimeApplicationService
- from app.bootstrap.app import create_app
- from app.bootstrap.settings import RuntimeServiceSettings
- from app.db.session import build_session_factory
- from app.domain.repositories import (
- ExecutionLogRepository,
- NodeArtifactRepository,
- NodeRunRepository,
- TraceSpanRepository,
- WorkflowRunRepository,
- )
- from app.infrastructure.executors import build_node_execution_dispatcher
- from core_db import Base
- from core_domain import WorkflowVersionContract
- class FakeWorkflowClient:
- def get_workflow_version(
- self,
- *,
- tenant_id: str,
- workflow_version_id: str,
- ) -> WorkflowVersionContract:
- assert tenant_id == "t1"
- assert workflow_version_id == "wv1"
- return WorkflowVersionContract(
- id="wv1",
- tenant_id="t1",
- workflow_id="wf1",
- version_no=1,
- status="published",
- created_time=datetime.utcnow(),
- dsl_json={
- "code": "debug_flow",
- "nodes": [
- {"id": "start", "type": "template", "config": {"template": "hello"}},
- {"id": "answer", "type": "answer", "config": {"text": "done"}},
- ],
- "edges": [{"source": "start", "target": "answer"}],
- },
- )
- settings = RuntimeServiceSettings(database_url=f"sqlite:///{tmp_path / 'runtime.db'}")
- session_factory = build_session_factory(settings)
- engine = session_factory.kw["bind"]
- Base.metadata.create_all(bind=engine)
- def override_service() -> Generator[RuntimeApplicationService, None, None]:
- db: Session = session_factory()
- try:
- yield RuntimeApplicationService(
- workflow_run_repository=WorkflowRunRepository(db),
- node_run_repository=NodeRunRepository(db),
- execution_log_repository=ExecutionLogRepository(db),
- node_artifact_repository=NodeArtifactRepository(db),
- trace_span_repository=TraceSpanRepository(db),
- execution_dispatcher=build_node_execution_dispatcher(),
- workflow_client=FakeWorkflowClient(),
- )
- finally:
- db.close()
- app = create_app()
- app.state.session_factory = session_factory
- app.dependency_overrides[get_runtime_application_service] = override_service
- client = build_fastapi_test_client(app)
- create_response = client.post(
- "/runtime/runs",
- json={
- "tenant_id": "t1",
- "app_id": "app1",
- "app_version_id": "av1",
- "workflow_id": "wf1",
- "workflow_version_id": "wv1",
- },
- )
- assert create_response.status_code == 200
- run_id = create_response.json()["run"]["id"]
- pause_response = client.post(
- f"/runtime/runs/{run_id}/debug/pause",
- params={"tenant_id": "t1"},
- )
- assert pause_response.status_code == 200
- assert pause_response.json()["run"]["status"] == "paused"
- assert pause_response.json()["queued_node_ids"] == ["start"]
- protected_execute_response = client.post(
- f"/runtime/runs/{run_id}/execute-next",
- params={"tenant_id": "t1"},
- json={"worker_key": "debugger"},
- )
- assert protected_execute_response.status_code == 200
- assert protected_execute_response.json()["executor_name"] == "debug_paused"
- assert protected_execute_response.json()["node_run"]["status"] == "queued"
- step_response = client.post(
- f"/runtime/runs/{run_id}/debug/step",
- params={"tenant_id": "t1"},
- json={"worker_key": "debugger"},
- )
- assert step_response.status_code == 200
- step_payload = step_response.json()
- assert step_payload["reason"] == "step_completed"
- assert [item["node_id"] for item in step_payload["executed_node_runs"]] == ["start"]
- assert step_payload["snapshot"]["run"]["status"] == "paused"
- assert step_payload["snapshot"]["completed_node_ids"] == ["start"]
- assert step_payload["snapshot"]["queued_node_ids"] == ["answer"]
- breakpoint_response = client.post(
- f"/runtime/runs/{run_id}/debug/continue",
- params={"tenant_id": "t1"},
- json={"worker_key": "debugger", "breakpoint_node_ids": ["answer"], "max_steps": 5},
- )
- assert breakpoint_response.status_code == 200
- breakpoint_payload = breakpoint_response.json()
- assert breakpoint_payload["reason"] == "breakpoint_hit"
- assert breakpoint_payload["paused_before_node_id"] == "answer"
- assert breakpoint_payload["executed_node_runs"] == []
- assert breakpoint_payload["snapshot"]["queued_node_ids"] == ["answer"]
- finish_response = client.post(
- f"/runtime/runs/{run_id}/debug/continue",
- params={"tenant_id": "t1"},
- json={"worker_key": "debugger", "max_steps": 5},
- )
- assert finish_response.status_code == 200
- finish_payload = finish_response.json()
- assert finish_payload["snapshot"]["run"]["status"] == "completed"
- assert finish_payload["snapshot"]["completed_node_ids"] == ["start", "answer"]
- assert finish_payload["snapshot"]["queued_node_ids"] == []
- engine.dispose()
|