from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import text from sqlalchemy.orm import Session from core_domain import ServiceHealth from app.application.services import RuntimeApplicationService from app.bootstrap.settings import RuntimeServiceSettings from app.db.session import get_db from app.domain.repositories import ( ExecutionLogRepository, NodeArtifactRepository, NodeRunRepository, TraceSpanRepository, WorkflowRunRepository, ) from app.infrastructure.code_runner_client import CodeRunnerClient, CodeRunnerClientError from app.infrastructure.executors import build_node_execution_dispatcher_with_clients from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError from app.infrastructure.tool_client import ToolServiceClient, ToolServiceClientError from app.infrastructure.workflow_client import WorkflowServiceClient, WorkflowServiceClientError from app.schemas.run import ( ExecutionLogResponse, NodeArtifactResponse, NodeRunExecuteRequest, NodeRunExecuteResponse, NodeRunResponse, NodeRunStatusUpdateRequest, RunBootstrapResponse, RunCreateRequest, RunExecuteRequest, RunExecuteResponse, TraceSpanResponse, WorkflowRunResponse, WorkflowRunStatusUpdateRequest, ) router = APIRouter() def get_runtime_settings() -> RuntimeServiceSettings: return RuntimeServiceSettings() def get_runtime_application_service( db: Session = Depends(get_db), settings: RuntimeServiceSettings = Depends(get_runtime_settings), ) -> RuntimeApplicationService: return RuntimeApplicationService( workflow_run_repository=WorkflowRunRepository(db), node_run_repository=NodeRunRepository(db), execution_log_repository=ExecutionLogRepository(db), node_artifact_repository=NodeArtifactRepository(db), trace_span_repository=TraceSpanRepository(db), execution_dispatcher=build_node_execution_dispatcher_with_clients( code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url), model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url), tool_client=ToolServiceClient(base_url=settings.tool_service_url), ), workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url), ) @router.get("/health", response_model=ServiceHealth) def health_check(db: Session = Depends(get_db)) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="runtime-service", status="ok", database="ok") @router.post("/runs", response_model=RunBootstrapResponse) def create_run( payload: RunCreateRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> RunBootstrapResponse: try: workflow_run, initial_node = service.create_run(payload) except ( CodeRunnerClientError, ModelGatewayClientError, ToolServiceClientError, WorkflowServiceClientError, ) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc return RunBootstrapResponse( run=WorkflowRunResponse.from_entity(workflow_run), initial_node=NodeRunResponse.from_entity(initial_node) if initial_node else None, ) @router.get("/runs", response_model=list[WorkflowRunResponse]) def list_runs( tenant_id: str = Query(...), session_id: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> list[WorkflowRunResponse]: return [ WorkflowRunResponse.from_entity(item) for item in service.list_runs(tenant_id=tenant_id, session_id=session_id) ] @router.get("/node-runs", response_model=list[NodeRunResponse]) def list_node_runs( tenant_id: str = Query(...), run_id: str = Query(...), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> list[NodeRunResponse]: return [ NodeRunResponse.from_entity(item) for item in service.list_node_runs(tenant_id=tenant_id, run_id=run_id) ] @router.get("/execution-logs", response_model=list[ExecutionLogResponse]) def list_execution_logs( tenant_id: str = Query(...), run_id: str | None = Query(default=None), node_run_id: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> list[ExecutionLogResponse]: return [ ExecutionLogResponse.from_entity(item) for item in service.list_execution_logs( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, ) ] @router.get("/node-artifacts", response_model=list[NodeArtifactResponse]) def list_node_artifacts( tenant_id: str = Query(...), run_id: str | None = Query(default=None), node_run_id: str | None = Query(default=None), artifact_type: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> list[NodeArtifactResponse]: return [ NodeArtifactResponse.from_entity(item) for item in service.list_node_artifacts( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, artifact_type=artifact_type, ) ] @router.get("/trace-spans", response_model=list[TraceSpanResponse]) def list_trace_spans( tenant_id: str = Query(...), run_id: str | None = Query(default=None), node_run_id: str | None = Query(default=None), span_type: str | None = Query(default=None), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> list[TraceSpanResponse]: return [ TraceSpanResponse.from_entity(item) for item in service.list_trace_spans( tenant_id=tenant_id, run_id=run_id, node_run_id=node_run_id, span_type=span_type, ) ] @router.post("/runs/{run_id}/status", response_model=WorkflowRunResponse) def update_run_status( run_id: str, payload: WorkflowRunStatusUpdateRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> WorkflowRunResponse: entity = service.update_run_status(run_id=run_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") return WorkflowRunResponse.from_entity(entity) @router.post("/node-runs/{node_run_id}/status", response_model=NodeRunResponse) def update_node_run_status( node_run_id: str, payload: NodeRunStatusUpdateRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> NodeRunResponse: entity = service.update_node_run_status(node_run_id=node_run_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}") return NodeRunResponse.from_entity(entity) @router.post("/node-runs/{node_run_id}/execute", response_model=NodeRunExecuteResponse) def execute_node_run( node_run_id: str, payload: NodeRunExecuteRequest, service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> NodeRunExecuteResponse: try: result = service.execute_node_run(node_run_id=node_run_id, payload=payload) except ( CodeRunnerClientError, ModelGatewayClientError, ToolServiceClientError, WorkflowServiceClientError, ) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}") workflow_run, node_run, executor_name = result return NodeRunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_run=NodeRunResponse.from_entity(node_run), executor_name=executor_name, ) @router.post("/runs/{run_id}/execute-next", response_model=NodeRunExecuteResponse) def execute_next_node_run( run_id: str, payload: NodeRunExecuteRequest, tenant_id: str = Query(...), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> NodeRunExecuteResponse: try: result = service.execute_next_node_run( tenant_id=tenant_id, run_id=run_id, payload=payload, ) except ( CodeRunnerClientError, ModelGatewayClientError, ToolServiceClientError, WorkflowServiceClientError, ) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail=f"queued node_run not found for run: {run_id}") workflow_run, node_run, executor_name = result return NodeRunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_run=NodeRunResponse.from_entity(node_run), executor_name=executor_name, ) @router.post("/runs/{run_id}/execute", response_model=RunExecuteResponse) def execute_run( run_id: str, payload: RunExecuteRequest, tenant_id: str = Query(...), service: RuntimeApplicationService = Depends(get_runtime_application_service), ) -> RunExecuteResponse: try: result = service.execute_run( tenant_id=tenant_id, run_id=run_id, payload=payload, ) except ( CodeRunnerClientError, ModelGatewayClientError, ToolServiceClientError, WorkflowServiceClientError, ) as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if result is None: raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}") workflow_run, node_runs, executor_names = result return RunExecuteResponse( run=WorkflowRunResponse.from_entity(workflow_run), node_runs=[NodeRunResponse.from_entity(item) for item in node_runs], executor_names=executor_names, )