routes.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. from core_domain import ServiceHealth
  2. from fastapi import APIRouter, Depends, HTTPException, Query
  3. from sqlalchemy import text
  4. from sqlalchemy.orm import Session
  5. from app.application.services import (
  6. RuntimeApplicationService,
  7. RuntimeDebugSnapshot,
  8. build_runtime_application_service,
  9. )
  10. from app.bootstrap.settings import RuntimeServiceSettings
  11. from app.db.session import get_db
  12. from app.infrastructure.code_runner_client import CodeRunnerClientError
  13. from app.infrastructure.human_client import HumanServiceClientError
  14. from app.infrastructure.model_gateway_client import ModelGatewayClientError
  15. from app.infrastructure.tool_client import ToolServiceClientError
  16. from app.infrastructure.workflow_client import WorkflowServiceClientError
  17. from app.schemas.run import (
  18. ExecutionLogResponse,
  19. HumanNodeResumeRequest,
  20. NodeArtifactResponse,
  21. NodeRunExecuteRequest,
  22. NodeRunExecuteResponse,
  23. NodeRunResponse,
  24. NodeRunStatusUpdateRequest,
  25. RunBootstrapResponse,
  26. RunCreateRequest,
  27. RunExecuteRequest,
  28. RunExecuteResponse,
  29. RuntimeDebugContinueRequest,
  30. RuntimeDebugSnapshotResponse,
  31. RuntimeDebugStepResponse,
  32. TraceSpanResponse,
  33. WorkerExecuteNextRequest,
  34. WorkerExecuteNextResponse,
  35. WorkflowRunResponse,
  36. WorkflowRunStatusUpdateRequest,
  37. )
  38. router = APIRouter()
  39. def build_runtime_debug_snapshot_response(snapshot: RuntimeDebugSnapshot) -> RuntimeDebugSnapshotResponse:
  40. return RuntimeDebugSnapshotResponse(
  41. run=WorkflowRunResponse.from_entity(snapshot.run),
  42. node_runs=[
  43. NodeRunResponse.from_entity(item)
  44. for item in snapshot.node_runs
  45. ],
  46. run_state_json=snapshot.run_state_json,
  47. node_output_json_by_node_id=snapshot.node_output_json_by_node_id,
  48. node_output_text_by_node_id=snapshot.node_output_text_by_node_id,
  49. queued_node_ids=snapshot.queued_node_ids,
  50. running_node_ids=snapshot.running_node_ids,
  51. completed_node_ids=snapshot.completed_node_ids,
  52. failed_node_ids=snapshot.failed_node_ids,
  53. execution_logs=[
  54. ExecutionLogResponse.from_entity(item)
  55. for item in snapshot.execution_logs
  56. ],
  57. node_artifacts=[
  58. NodeArtifactResponse.from_entity(item)
  59. for item in snapshot.node_artifacts
  60. ],
  61. trace_spans=[
  62. TraceSpanResponse.from_entity(item)
  63. for item in snapshot.trace_spans
  64. ])
  65. def get_runtime_settings() -> RuntimeServiceSettings:
  66. return RuntimeServiceSettings()
  67. def get_runtime_application_service(
  68. db: Session = Depends(get_db),
  69. settings: RuntimeServiceSettings = Depends(get_runtime_settings)) -> RuntimeApplicationService:
  70. return build_runtime_application_service(db=db, settings=settings)
  71. @router.get("/health", response_model=ServiceHealth)
  72. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  73. db.execute(text("SELECT 1"))
  74. return ServiceHealth(service="runtime-service", status="ok", database="ok")
  75. @router.post("/runs", response_model=RunBootstrapResponse)
  76. def create_run(
  77. payload: RunCreateRequest,
  78. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RunBootstrapResponse:
  79. try:
  80. workflow_run, initial_node = service.create_run(payload)
  81. except (
  82. CodeRunnerClientError,
  83. ModelGatewayClientError,
  84. HumanServiceClientError,
  85. ToolServiceClientError,
  86. WorkflowServiceClientError) as exc:
  87. raise HTTPException(status_code=502, detail=str(exc)) from exc
  88. return RunBootstrapResponse(
  89. run=WorkflowRunResponse.from_entity(workflow_run),
  90. initial_node=NodeRunResponse.from_entity(initial_node) if initial_node else None)
  91. @router.get("/runs", response_model=list[WorkflowRunResponse])
  92. def list_runs(
  93. session_id: str | None = Query(default=None),
  94. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[WorkflowRunResponse]:
  95. return [
  96. WorkflowRunResponse.from_entity(item)
  97. for item in service.list_runs(session_id=session_id)
  98. ]
  99. @router.get("/node-runs", response_model=list[NodeRunResponse])
  100. def list_node_runs(
  101. run_id: str = Query(...),
  102. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[NodeRunResponse]:
  103. return [
  104. NodeRunResponse.from_entity(item)
  105. for item in service.list_node_runs(run_id=run_id)
  106. ]
  107. @router.get("/execution-logs", response_model=list[ExecutionLogResponse])
  108. def list_execution_logs(
  109. run_id: str | None = Query(default=None),
  110. node_run_id: str | None = Query(default=None),
  111. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[ExecutionLogResponse]:
  112. return [
  113. ExecutionLogResponse.from_entity(item)
  114. for item in service.list_execution_logs(
  115. run_id=run_id,
  116. node_run_id=node_run_id)
  117. ]
  118. @router.get("/node-artifacts", response_model=list[NodeArtifactResponse])
  119. def list_node_artifacts(
  120. run_id: str | None = Query(default=None),
  121. node_run_id: str | None = Query(default=None),
  122. artifact_type: str | None = Query(default=None),
  123. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[NodeArtifactResponse]:
  124. return [
  125. NodeArtifactResponse.from_entity(item)
  126. for item in service.list_node_artifacts(
  127. run_id=run_id,
  128. node_run_id=node_run_id,
  129. artifact_type=artifact_type)
  130. ]
  131. @router.get("/trace-spans", response_model=list[TraceSpanResponse])
  132. def list_trace_spans(
  133. run_id: str | None = Query(default=None),
  134. node_run_id: str | None = Query(default=None),
  135. span_type: str | None = Query(default=None),
  136. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> list[TraceSpanResponse]:
  137. return [
  138. TraceSpanResponse.from_entity(item)
  139. for item in service.list_trace_spans(
  140. run_id=run_id,
  141. node_run_id=node_run_id,
  142. span_type=span_type)
  143. ]
  144. @router.post("/runs/{run_id}/status", response_model=WorkflowRunResponse)
  145. def update_run_status(
  146. run_id: str,
  147. payload: WorkflowRunStatusUpdateRequest,
  148. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> WorkflowRunResponse:
  149. entity = service.update_run_status(run_id=run_id, payload=payload)
  150. if entity is None:
  151. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  152. return WorkflowRunResponse.from_entity(entity)
  153. @router.post("/node-runs/{node_run_id}/status", response_model=NodeRunResponse)
  154. def update_node_run_status(
  155. node_run_id: str,
  156. payload: NodeRunStatusUpdateRequest,
  157. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunResponse:
  158. entity = service.update_node_run_status(node_run_id=node_run_id, payload=payload)
  159. if entity is None:
  160. raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}")
  161. return NodeRunResponse.from_entity(entity)
  162. @router.post("/node-runs/{node_run_id}/execute", response_model=NodeRunExecuteResponse)
  163. def execute_node_run(
  164. node_run_id: str,
  165. payload: NodeRunExecuteRequest,
  166. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunExecuteResponse:
  167. try:
  168. result = service.execute_node_run(node_run_id=node_run_id, payload=payload)
  169. except (
  170. CodeRunnerClientError,
  171. ModelGatewayClientError,
  172. HumanServiceClientError,
  173. ToolServiceClientError,
  174. WorkflowServiceClientError) as exc:
  175. raise HTTPException(status_code=502, detail=str(exc)) from exc
  176. if result is None:
  177. raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}")
  178. workflow_run, node_run, executor_name = result
  179. return NodeRunExecuteResponse(
  180. run=WorkflowRunResponse.from_entity(workflow_run),
  181. node_run=NodeRunResponse.from_entity(node_run),
  182. executor_name=executor_name)
  183. @router.post("/runs/{run_id}/execute-next", response_model=NodeRunExecuteResponse)
  184. def execute_next_node_run(
  185. run_id: str,
  186. payload: NodeRunExecuteRequest,
  187. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunExecuteResponse:
  188. try:
  189. result = service.execute_next_node_run(
  190. run_id=run_id,
  191. payload=payload)
  192. except (
  193. CodeRunnerClientError,
  194. ModelGatewayClientError,
  195. HumanServiceClientError,
  196. ToolServiceClientError,
  197. WorkflowServiceClientError) as exc:
  198. raise HTTPException(status_code=502, detail=str(exc)) from exc
  199. if result is None:
  200. raise HTTPException(status_code=404, detail=f"queued node_run not found for run: {run_id}")
  201. workflow_run, node_run, executor_name = result
  202. return NodeRunExecuteResponse(
  203. run=WorkflowRunResponse.from_entity(workflow_run),
  204. node_run=NodeRunResponse.from_entity(node_run),
  205. executor_name=executor_name)
  206. @router.post("/runs/{run_id}/execute", response_model=RunExecuteResponse)
  207. def execute_run(
  208. run_id: str,
  209. payload: RunExecuteRequest,
  210. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RunExecuteResponse:
  211. try:
  212. result = service.execute_run(
  213. run_id=run_id,
  214. payload=payload)
  215. except (
  216. CodeRunnerClientError,
  217. ModelGatewayClientError,
  218. HumanServiceClientError,
  219. ToolServiceClientError,
  220. WorkflowServiceClientError) as exc:
  221. raise HTTPException(status_code=502, detail=str(exc)) from exc
  222. if result is None:
  223. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  224. workflow_run, node_runs, executor_names = result
  225. return RunExecuteResponse(
  226. run=WorkflowRunResponse.from_entity(workflow_run),
  227. node_runs=[NodeRunResponse.from_entity(item) for item in node_runs],
  228. executor_names=executor_names)
  229. @router.get("/runs/{run_id}/debug/snapshot", response_model=RuntimeDebugSnapshotResponse)
  230. def get_runtime_debug_snapshot(
  231. run_id: str,
  232. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugSnapshotResponse:
  233. snapshot = service.get_debug_snapshot(run_id=run_id)
  234. if snapshot is None:
  235. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  236. return build_runtime_debug_snapshot_response(snapshot)
  237. @router.post("/runs/{run_id}/debug/pause", response_model=RuntimeDebugSnapshotResponse)
  238. def pause_runtime_debug_run(
  239. run_id: str,
  240. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugSnapshotResponse:
  241. snapshot = service.pause_run(run_id=run_id)
  242. if snapshot is None:
  243. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  244. return build_runtime_debug_snapshot_response(snapshot)
  245. @router.post("/runs/{run_id}/debug/resume", response_model=RuntimeDebugSnapshotResponse)
  246. def resume_runtime_debug_run(
  247. run_id: str,
  248. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugSnapshotResponse:
  249. snapshot = service.resume_run(run_id=run_id)
  250. if snapshot is None:
  251. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  252. return build_runtime_debug_snapshot_response(snapshot)
  253. @router.post("/runs/{run_id}/debug/step", response_model=RuntimeDebugStepResponse)
  254. def step_runtime_debug_run(
  255. run_id: str,
  256. payload: NodeRunExecuteRequest,
  257. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugStepResponse:
  258. result = service.step_debug_run(
  259. run_id=run_id,
  260. worker_key=payload.worker_key)
  261. if result is None:
  262. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  263. snapshot, executed_node_runs, executor_names, reason = result
  264. return RuntimeDebugStepResponse(
  265. snapshot=build_runtime_debug_snapshot_response(snapshot),
  266. executed_node_runs=[NodeRunResponse.from_entity(item) for item in executed_node_runs],
  267. executor_names=executor_names,
  268. reason=reason)
  269. @router.post("/runs/{run_id}/debug/continue", response_model=RuntimeDebugStepResponse)
  270. def continue_runtime_debug_run(
  271. run_id: str,
  272. payload: RuntimeDebugContinueRequest,
  273. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> RuntimeDebugStepResponse:
  274. result = service.continue_debug_run(
  275. run_id=run_id,
  276. payload=payload)
  277. if result is None:
  278. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  279. snapshot, executed_node_runs, executor_names, paused_before_node_id, reason = result
  280. return RuntimeDebugStepResponse(
  281. snapshot=build_runtime_debug_snapshot_response(snapshot),
  282. executed_node_runs=[NodeRunResponse.from_entity(item) for item in executed_node_runs],
  283. executor_names=executor_names,
  284. paused_before_node_id=paused_before_node_id,
  285. reason=reason)
  286. @router.post("/node-runs/{node_run_id}/resume-human", response_model=NodeRunExecuteResponse)
  287. def resume_human_node_run(
  288. node_run_id: str,
  289. payload: HumanNodeResumeRequest,
  290. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> NodeRunExecuteResponse:
  291. try:
  292. result = service.resume_human_node_run(
  293. node_run_id=node_run_id,
  294. payload=payload)
  295. except (
  296. CodeRunnerClientError,
  297. ModelGatewayClientError,
  298. HumanServiceClientError,
  299. ToolServiceClientError,
  300. WorkflowServiceClientError) as exc:
  301. raise HTTPException(status_code=502, detail=str(exc)) from exc
  302. if result is None:
  303. raise HTTPException(
  304. status_code=404,
  305. detail=f"human node_run not found or task mismatch: {node_run_id}")
  306. workflow_run, node_run, executor_name = result
  307. return NodeRunExecuteResponse(
  308. run=WorkflowRunResponse.from_entity(workflow_run),
  309. node_run=NodeRunResponse.from_entity(node_run),
  310. executor_name=executor_name)
  311. @router.post("/workers/execute-next", response_model=WorkerExecuteNextResponse)
  312. def execute_next_worker_task(
  313. payload: WorkerExecuteNextRequest,
  314. settings: RuntimeServiceSettings = Depends(get_runtime_settings),
  315. service: RuntimeApplicationService = Depends(get_runtime_application_service)) -> WorkerExecuteNextResponse:
  316. try:
  317. result = service.execute_next_claimed_node_run(
  318. worker_key=payload.worker_key,
  319. lease_seconds=payload.lease_seconds or settings.worker_lease_seconds)
  320. except (
  321. CodeRunnerClientError,
  322. ModelGatewayClientError,
  323. HumanServiceClientError,
  324. ToolServiceClientError,
  325. WorkflowServiceClientError) as exc:
  326. raise HTTPException(status_code=502, detail=str(exc)) from exc
  327. if result is None:
  328. raise HTTPException(status_code=404, detail="queued worker task not found")
  329. workflow_run, node_run, executor_name, released_lease_count = result
  330. return WorkerExecuteNextResponse(
  331. run=WorkflowRunResponse.from_entity(workflow_run),
  332. node_run=NodeRunResponse.from_entity(node_run),
  333. executor_name=executor_name,
  334. released_lease_count=released_lease_count)