from __future__ import annotations from collections.abc import Generator from datetime import datetime from pathlib import Path from sqlalchemy.orm import Session from tests.conftest import ( build_fastapi_test_client, build_sqlite_database_url, prepare_known_service_import, ) def test_runtime_debugger_pause_step_and_breakpoint_continue(tmp_path: Path) -> None: prepare_known_service_import("runtime-service") from app.api.routes import get_runtime_application_service from app.application.services import RuntimeApplicationService from app.bootstrap.app import create_app from app.bootstrap.settings import RuntimeServiceSettings from app.db.session import build_session_factory from app.domain.repositories import ( ExecutionLogRepository, NodeArtifactRepository, NodeRunRepository, TraceSpanRepository, WorkflowRunRepository, ) from app.infrastructure.executors import build_node_execution_dispatcher from core_db import Base from core_domain import WorkflowVersionContract class FakeWorkflowClient: def get_workflow_version( self, *, tenant_id: str, workflow_version_id: str, ) -> WorkflowVersionContract: assert tenant_id == "t1" assert workflow_version_id == "wv1" return WorkflowVersionContract( id="wv1", tenant_id="t1", workflow_id="wf1", version_no=1, status="published", created_time=datetime.utcnow(), dsl_json={ "code": "debug_flow", "nodes": [ {"id": "start", "type": "template", "config": {"template": "hello"}}, {"id": "answer", "type": "answer", "config": {"text": "done"}}, ], "edges": [{"source": "start", "target": "answer"}], }, ) settings = RuntimeServiceSettings(database_url=build_sqlite_database_url(tmp_path, "runtime.db")) session_factory = build_session_factory(settings) engine = session_factory.kw["bind"] Base.metadata.create_all(bind=engine) def override_service() -> Generator[RuntimeApplicationService, None, None]: db: Session = session_factory() try: yield RuntimeApplicationService( workflow_run_repository=WorkflowRunRepository(db), node_run_repository=NodeRunRepository(db), execution_log_repository=ExecutionLogRepository(db), node_artifact_repository=NodeArtifactRepository(db), trace_span_repository=TraceSpanRepository(db), execution_dispatcher=build_node_execution_dispatcher(), workflow_client=FakeWorkflowClient(), ) finally: db.close() app = create_app() app.state.session_factory = session_factory app.dependency_overrides[get_runtime_application_service] = override_service client = build_fastapi_test_client(app) create_response = client.post( "/runtime/runs", json={ "tenant_id": "t1", "app_id": "app1", "app_version_id": "av1", "workflow_id": "wf1", "workflow_version_id": "wv1", }, ) assert create_response.status_code == 200 run_id = create_response.json()["run"]["id"] pause_response = client.post( f"/runtime/runs/{run_id}/debug/pause", params={"tenant_id": "t1"}, ) assert pause_response.status_code == 200 assert pause_response.json()["run"]["status"] == "paused" assert pause_response.json()["queued_node_ids"] == ["start"] protected_execute_response = client.post( f"/runtime/runs/{run_id}/execute-next", params={"tenant_id": "t1"}, json={"worker_key": "debugger"}, ) assert protected_execute_response.status_code == 200 assert protected_execute_response.json()["executor_name"] == "debug_paused" assert protected_execute_response.json()["node_run"]["status"] == "queued" step_response = client.post( f"/runtime/runs/{run_id}/debug/step", params={"tenant_id": "t1"}, json={"worker_key": "debugger"}, ) assert step_response.status_code == 200 step_payload = step_response.json() assert step_payload["reason"] == "step_completed" assert [item["node_id"] for item in step_payload["executed_node_runs"]] == ["start"] assert step_payload["snapshot"]["run"]["status"] == "paused" assert step_payload["snapshot"]["completed_node_ids"] == ["start"] assert step_payload["snapshot"]["queued_node_ids"] == ["answer"] breakpoint_response = client.post( f"/runtime/runs/{run_id}/debug/continue", params={"tenant_id": "t1"}, json={"worker_key": "debugger", "breakpoint_node_ids": ["answer"], "max_steps": 5}, ) assert breakpoint_response.status_code == 200 breakpoint_payload = breakpoint_response.json() assert breakpoint_payload["reason"] == "breakpoint_hit" assert breakpoint_payload["paused_before_node_id"] == "answer" assert breakpoint_payload["executed_node_runs"] == [] assert breakpoint_payload["snapshot"]["queued_node_ids"] == ["answer"] finish_response = client.post( f"/runtime/runs/{run_id}/debug/continue", params={"tenant_id": "t1"}, json={"worker_key": "debugger", "max_steps": 5}, ) assert finish_response.status_code == 200 finish_payload = finish_response.json() assert finish_payload["snapshot"]["run"]["status"] == "completed" assert finish_payload["snapshot"]["completed_node_ids"] == ["start", "answer"] assert finish_payload["snapshot"]["queued_node_ids"] == [] engine.dispose()