from core_domain import ServiceHealth from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import text from sqlalchemy.orm import Session from app.application.services import ( RuntimeApplicationService, RuntimeDebugSnapshot, build_runtime_application_service, ) from app.bootstrap.settings import RuntimeServiceSettings from app.db.session import get_db from app.infrastructure.code_runner_client import CodeRunnerClientError from app.infrastructure.human_client import HumanServiceClientError from app.infrastructure.model_gateway_client import ModelGatewayClientError from app.infrastructure.tool_client import ToolServiceClientError from app.infrastructure.workflow_client import WorkflowServiceClientError from app.schemas.run import ( ExecutionLogResponse, HumanNodeResumeRequest, NodeArtifactResponse, NodeRunExecuteRequest, NodeRunExecuteResponse, NodeRunResponse, NodeRunStatusUpdateRequest, RunBootstrapResponse, RunCreateRequest, RunExecuteRequest, RunExecuteResponse, RuntimeDebugContinueRequest, RuntimeDebugSnapshotResponse, RuntimeDebugStepResponse, TraceSpanResponse, WorkerExecuteNextRequest, WorkerExecuteNextResponse, WorkflowRunResponse, WorkflowRunStatusUpdateRequest, ) router = APIRouter() def build_runtime_debug_snapshot_response(snapshot: RuntimeDebugSnapshot) -> RuntimeDebugSnapshotResponse: return RuntimeDebugSnapshotResponse( run=WorkflowRunResponse.from_entity(snapshot.run), node_runs=[ NodeRunResponse.from_entity(item) for item in snapshot.node_runs ], run_state_json=snapshot.run_state_json, node_output_json_by_node_id=snapshot.node_output_json_by_node_id, node_output_text_by_node_id=snapshot.node_output_text_by_node_id, queued_node_ids=snapshot.queued_node_ids, running_node_ids=snapshot.running_node_ids, completed_node_ids=snapshot.completed_node_ids, failed_node_ids=snapshot.failed_node_ids, execution_logs=[ ExecutionLogResponse.from_entity(item) for item in snapshot.execution_logs ], node_artifacts=[ NodeArtifactResponse.from_entity(item) for item in snapshot.node_artifacts ], trace_spans=[ TraceSpanResponse.from_entity(item) for item in snapshot.trace_spans ]) def get_runtime_settings() -> RuntimeServiceSettings: return RuntimeServiceSettings() def get_runtime_application_service( db: Session = Depends(get_db), settings: RuntimeServiceSettings = Depends(get_runtime_settings)) -> RuntimeApplicationService: return build_runtime_application_service(db=db, settings=settings) @router.get("/health", response_model=ServiceHealth) def health_check(db: Session = Depends(get_db)) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="runtime-service", status="ok", database="ok") @router.post("/runs", response_model=RunBootstrapResponse) def create_run( payload: RunCreateRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RunBootstrapResponse: try: workflow_run, initial_node = service.create_run(payload) except ( CodeRunnerClientError, ModelGatewayClientError, HumanServiceClientError, ToolServiceClientError, WorkflowServiceClientError) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc return RunBootstrapResponse( run=WorkflowRunResponse.from_entity(workflow_run), initial_node=NodeRunResponse.from_entity(initial_node) if initial_node else None) @router.get("/runs", response_model=list[WorkflowRunResponse]) def list_runs( session_id: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[WorkflowRunResponse]: return [ WorkflowRunResponse.from_entity(item) for item in service.list_runs(session_id=session_id) ] @router.get("/node-runs", response_model=list[NodeRunResponse]) def list_node_runs( run_id: str = Query(...), service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[NodeRunResponse]: return [ NodeRunResponse.from_entity(item) for item in service.list_node_runs(run_id=run_id) ] @router.get("/execution-logs", response_model=list[ExecutionLogResponse]) def list_execution_logs( run_id: str | None = Query(default=None), node_run_id: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[ExecutionLogResponse]: return [ ExecutionLogResponse.from_entity(item) for item in service.list_execution_logs( run_id=run_id, node_run_id=node_run_id) ] @router.get("/node-artifacts", response_model=list[NodeArtifactResponse]) def list_node_artifacts( run_id: str | None = Query(default=None), node_run_id: str | None = Query(default=None), artifact_type: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[NodeArtifactResponse]: return [ NodeArtifactResponse.from_entity(item) for item in service.list_node_artifacts( run_id=run_id, node_run_id=node_run_id, artifact_type=artifact_type) ] @router.get("/trace-spans", response_model=list[TraceSpanResponse]) def list_trace_spans( run_id: str | None = Query(default=None), node_run_id: str | None = Query(default=None), span_type: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[TraceSpanResponse]: return [ TraceSpanResponse.from_entity(item) for item in service.list_trace_spans( run_id=run_id, node_run_id=node_run_id, span_type=span_type) ] @router.post("/runs/{run_id}/status", response_model=WorkflowRunResponse) def update_run_status( run_id: str, payload: WorkflowRunStatusUpdateRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> WorkflowRunResponse: entity = service.update_run_status(run_id=run_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") return WorkflowRunResponse.from_entity(entity) @router.post("/node-runs/{node_run_id}/status", response_model=NodeRunResponse) def update_node_run_status( node_run_id: str, payload: NodeRunStatusUpdateRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunResponse: entity = service.update_node_run_status(node_run_id=node_run_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}") return NodeRunResponse.from_entity(entity) @router.post("/node-runs/{node_run_id}/execute", response_model=NodeRunExecuteResponse) def execute_node_run( node_run_id: str, payload: NodeRunExecuteRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunExecuteResponse: try: result = service.execute_node_run(node_run_id=node_run_id, payload=payload) except ( CodeRunnerClientError, ModelGatewayClientError, HumanServiceClientError, ToolServiceClientError, WorkflowServiceClientError) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}") workflow_run, node_run, executor_name = result return NodeRunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_run=NodeRunResponse.from_entity(node_run), executor_name=executor_name) @router.post("/runs/{run_id}/execute-next", response_model=NodeRunExecuteResponse) def execute_next_node_run( run_id: str, payload: NodeRunExecuteRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunExecuteResponse: try: result = service.execute_next_node_run( run_id=run_id, payload=payload) except ( CodeRunnerClientError, ModelGatewayClientError, HumanServiceClientError, ToolServiceClientError, WorkflowServiceClientError) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail=f"queued node_run not found for run: {run_id}") workflow_run, node_run, executor_name = result return NodeRunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_run=NodeRunResponse.from_entity(node_run), executor_name=executor_name) @router.post("/runs/{run_id}/execute", response_model=RunExecuteResponse) def execute_run( run_id: str, payload: RunExecuteRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RunExecuteResponse: try: result = service.execute_run( run_id=run_id, payload=payload) except ( CodeRunnerClientError, ModelGatewayClientError, HumanServiceClientError, ToolServiceClientError, WorkflowServiceClientError) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") workflow_run, node_runs, executor_names = result return RunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_runs=[NodeRunResponse.from_entity(item) for item in node_runs], executor_names=executor_names) @router.get("/runs/{run_id}/debug/snapshot", response_model=RuntimeDebugSnapshotResponse) def get_runtime_debug_snapshot( run_id: str, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugSnapshotResponse: snapshot = service.get_debug_snapshot(run_id=run_id) if snapshot is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") return build_runtime_debug_snapshot_response(snapshot) @router.post("/runs/{run_id}/debug/pause", response_model=RuntimeDebugSnapshotResponse) def pause_runtime_debug_run( run_id: str, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugSnapshotResponse: snapshot = service.pause_run(run_id=run_id) if snapshot is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") return build_runtime_debug_snapshot_response(snapshot) @router.post("/runs/{run_id}/debug/resume", response_model=RuntimeDebugSnapshotResponse) def resume_runtime_debug_run( run_id: str, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugSnapshotResponse: snapshot = service.resume_run(run_id=run_id) if snapshot is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") return build_runtime_debug_snapshot_response(snapshot) @router.post("/runs/{run_id}/debug/step", response_model=RuntimeDebugStepResponse) def step_runtime_debug_run( run_id: str, payload: NodeRunExecuteRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugStepResponse: result = service.step_debug_run( run_id=run_id, worker_key=payload.worker_key) if result is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") snapshot, executed_node_runs, executor_names, reason = result return RuntimeDebugStepResponse( snapshot=build_runtime_debug_snapshot_response(snapshot), executed_node_runs=[NodeRunResponse.from_entity(item) for item in executed_node_runs], executor_names=executor_names, reason=reason) @router.post("/runs/{run_id}/debug/continue", response_model=RuntimeDebugStepResponse) def continue_runtime_debug_run( run_id: str, payload: RuntimeDebugContinueRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugStepResponse: result = service.continue_debug_run( run_id=run_id, payload=payload) if result is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") snapshot, executed_node_runs, executor_names, paused_before_node_id, reason = result return RuntimeDebugStepResponse( snapshot=build_runtime_debug_snapshot_response(snapshot), executed_node_runs=[NodeRunResponse.from_entity(item) for item in executed_node_runs], executor_names=executor_names, paused_before_node_id=paused_before_node_id, reason=reason) @router.post("/node-runs/{node_run_id}/resume-human", response_model=NodeRunExecuteResponse) def resume_human_node_run( node_run_id: str, payload: HumanNodeResumeRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunExecuteResponse: try: result = service.resume_human_node_run( node_run_id=node_run_id, payload=payload) except ( CodeRunnerClientError, ModelGatewayClientError, HumanServiceClientError, ToolServiceClientError, WorkflowServiceClientError) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException( status_code=404, detail=f"human node_run not found or task mismatch: {node_run_id}") workflow_run, node_run, executor_name = result return NodeRunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_run=NodeRunResponse.from_entity(node_run), executor_name=executor_name) @router.post("/workers/execute-next", response_model=WorkerExecuteNextResponse) def execute_next_worker_task( payload: WorkerExecuteNextRequest, settings: RuntimeServiceSettings = Depends(get_runtime_settings), service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> WorkerExecuteNextResponse: try: result = service.execute_next_claimed_node_run( worker_key=payload.worker_key, lease_seconds=payload.lease_seconds or settings.worker_lease_seconds) except ( CodeRunnerClientError, ModelGatewayClientError, HumanServiceClientError, ToolServiceClientError, WorkflowServiceClientError) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail="queued worker task not found") workflow_run, node_run, executor_name, released_lease_count = result return WorkerExecuteNextResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_run=NodeRunResponse.from_entity(node_run), executor_name=executor_name, released_lease_count=released_lease_count)