routes.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. from fastapi import APIRouter, Depends, HTTPException, Query
  2. from sqlalchemy import text
  3. from sqlalchemy.orm import Session
  4. from core_domain import ServiceHealth
  5. from app.application.services import (
  6. RuntimeApplicationService,
  7. RuntimeDebugSnapshot,
  8. build_runtime_application_service,
  9. )
  10. from app.bootstrap.settings import RuntimeServiceSettings
  11. from app.db.session import get_db
  12. from app.infrastructure.code_runner_client import CodeRunnerClientError
  13. from app.infrastructure.human_client import HumanServiceClientError
  14. from app.infrastructure.model_gateway_client import ModelGatewayClientError
  15. from app.infrastructure.tool_client import ToolServiceClientError
  16. from app.infrastructure.workflow_client import WorkflowServiceClientError
  17. from app.schemas.run import (
  18. ExecutionLogResponse,
  19. HumanNodeResumeRequest,
  20. NodeArtifactResponse,
  21. NodeRunExecuteRequest,
  22. NodeRunExecuteResponse,
  23. NodeRunResponse,
  24. NodeRunStatusUpdateRequest,
  25. RunBootstrapResponse,
  26. RunCreateRequest,
  27. RunExecuteRequest,
  28. RunExecuteResponse,
  29. RuntimeDebugContinueRequest,
  30. RuntimeDebugSnapshotResponse,
  31. RuntimeDebugStepResponse,
  32. TraceSpanResponse,
  33. WorkerExecuteNextRequest,
  34. WorkerExecuteNextResponse,
  35. WorkflowRunResponse,
  36. WorkflowRunStatusUpdateRequest,
  37. )
  38. router = APIRouter()
  39. def build_runtime_debug_snapshot_response(snapshot: RuntimeDebugSnapshot) -> RuntimeDebugSnapshotResponse:
  40. return RuntimeDebugSnapshotResponse(
  41. run=WorkflowRunResponse.from_entity(snapshot.run),
  42. node_runs=[
  43. NodeRunResponse.from_entity(item)
  44. for item in snapshot.node_runs
  45. ],
  46. run_state_json=snapshot.run_state_json,
  47. node_output_json_by_node_id=snapshot.node_output_json_by_node_id,
  48. node_output_text_by_node_id=snapshot.node_output_text_by_node_id,
  49. queued_node_ids=snapshot.queued_node_ids,
  50. running_node_ids=snapshot.running_node_ids,
  51. completed_node_ids=snapshot.completed_node_ids,
  52. failed_node_ids=snapshot.failed_node_ids,
  53. execution_logs=[
  54. ExecutionLogResponse.from_entity(item)
  55. for item in snapshot.execution_logs
  56. ],
  57. node_artifacts=[
  58. NodeArtifactResponse.from_entity(item)
  59. for item in snapshot.node_artifacts
  60. ],
  61. trace_spans=[
  62. TraceSpanResponse.from_entity(item)
  63. for item in snapshot.trace_spans
  64. ],
  65. )
  66. def get_runtime_settings() -> RuntimeServiceSettings:
  67. return RuntimeServiceSettings()
  68. def get_runtime_application_service(
  69. db: Session = Depends(get_db),
  70. settings: RuntimeServiceSettings = Depends(get_runtime_settings),
  71. ) -> RuntimeApplicationService:
  72. return build_runtime_application_service(db=db, settings=settings)
  73. @router.get("/health", response_model=ServiceHealth)
  74. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  75. db.execute(text("SELECT 1"))
  76. return ServiceHealth(service="runtime-service", status="ok", database="ok")
  77. @router.post("/runs", response_model=RunBootstrapResponse)
  78. def create_run(
  79. payload: RunCreateRequest,
  80. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  81. ) -> RunBootstrapResponse:
  82. try:
  83. workflow_run, initial_node = service.create_run(payload)
  84. except (
  85. CodeRunnerClientError,
  86. ModelGatewayClientError,
  87. HumanServiceClientError,
  88. ToolServiceClientError,
  89. WorkflowServiceClientError,
  90. ) as exc:
  91. raise HTTPException(status_code=502, detail=str(exc)) from exc
  92. return RunBootstrapResponse(
  93. run=WorkflowRunResponse.from_entity(workflow_run),
  94. initial_node=NodeRunResponse.from_entity(initial_node) if initial_node else None,
  95. )
  96. @router.get("/runs", response_model=list[WorkflowRunResponse])
  97. def list_runs(
  98. tenant_id: str = Query(...),
  99. session_id: str | None = Query(default=None),
  100. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  101. ) -> list[WorkflowRunResponse]:
  102. return [
  103. WorkflowRunResponse.from_entity(item)
  104. for item in service.list_runs(tenant_id=tenant_id, session_id=session_id)
  105. ]
  106. @router.get("/node-runs", response_model=list[NodeRunResponse])
  107. def list_node_runs(
  108. tenant_id: str = Query(...),
  109. run_id: str = Query(...),
  110. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  111. ) -> list[NodeRunResponse]:
  112. return [
  113. NodeRunResponse.from_entity(item)
  114. for item in service.list_node_runs(tenant_id=tenant_id, run_id=run_id)
  115. ]
  116. @router.get("/execution-logs", response_model=list[ExecutionLogResponse])
  117. def list_execution_logs(
  118. tenant_id: str = Query(...),
  119. run_id: str | None = Query(default=None),
  120. node_run_id: str | None = Query(default=None),
  121. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  122. ) -> list[ExecutionLogResponse]:
  123. return [
  124. ExecutionLogResponse.from_entity(item)
  125. for item in service.list_execution_logs(
  126. tenant_id=tenant_id,
  127. run_id=run_id,
  128. node_run_id=node_run_id,
  129. )
  130. ]
  131. @router.get("/node-artifacts", response_model=list[NodeArtifactResponse])
  132. def list_node_artifacts(
  133. tenant_id: str = Query(...),
  134. run_id: str | None = Query(default=None),
  135. node_run_id: str | None = Query(default=None),
  136. artifact_type: str | None = Query(default=None),
  137. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  138. ) -> list[NodeArtifactResponse]:
  139. return [
  140. NodeArtifactResponse.from_entity(item)
  141. for item in service.list_node_artifacts(
  142. tenant_id=tenant_id,
  143. run_id=run_id,
  144. node_run_id=node_run_id,
  145. artifact_type=artifact_type,
  146. )
  147. ]
  148. @router.get("/trace-spans", response_model=list[TraceSpanResponse])
  149. def list_trace_spans(
  150. tenant_id: str = Query(...),
  151. run_id: str | None = Query(default=None),
  152. node_run_id: str | None = Query(default=None),
  153. span_type: str | None = Query(default=None),
  154. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  155. ) -> list[TraceSpanResponse]:
  156. return [
  157. TraceSpanResponse.from_entity(item)
  158. for item in service.list_trace_spans(
  159. tenant_id=tenant_id,
  160. run_id=run_id,
  161. node_run_id=node_run_id,
  162. span_type=span_type,
  163. )
  164. ]
  165. @router.post("/runs/{run_id}/status", response_model=WorkflowRunResponse)
  166. def update_run_status(
  167. run_id: str,
  168. payload: WorkflowRunStatusUpdateRequest,
  169. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  170. ) -> WorkflowRunResponse:
  171. entity = service.update_run_status(run_id=run_id, payload=payload)
  172. if entity is None:
  173. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  174. return WorkflowRunResponse.from_entity(entity)
  175. @router.post("/node-runs/{node_run_id}/status", response_model=NodeRunResponse)
  176. def update_node_run_status(
  177. node_run_id: str,
  178. payload: NodeRunStatusUpdateRequest,
  179. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  180. ) -> NodeRunResponse:
  181. entity = service.update_node_run_status(node_run_id=node_run_id, payload=payload)
  182. if entity is None:
  183. raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}")
  184. return NodeRunResponse.from_entity(entity)
  185. @router.post("/node-runs/{node_run_id}/execute", response_model=NodeRunExecuteResponse)
  186. def execute_node_run(
  187. node_run_id: str,
  188. payload: NodeRunExecuteRequest,
  189. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  190. ) -> NodeRunExecuteResponse:
  191. try:
  192. result = service.execute_node_run(node_run_id=node_run_id, payload=payload)
  193. except (
  194. CodeRunnerClientError,
  195. ModelGatewayClientError,
  196. HumanServiceClientError,
  197. ToolServiceClientError,
  198. WorkflowServiceClientError,
  199. ) as exc:
  200. raise HTTPException(status_code=502, detail=str(exc)) from exc
  201. if result is None:
  202. raise HTTPException(status_code=404, detail=f"node_run not found: {node_run_id}")
  203. workflow_run, node_run, executor_name = result
  204. return NodeRunExecuteResponse(
  205. run=WorkflowRunResponse.from_entity(workflow_run),
  206. node_run=NodeRunResponse.from_entity(node_run),
  207. executor_name=executor_name,
  208. )
  209. @router.post("/runs/{run_id}/execute-next", response_model=NodeRunExecuteResponse)
  210. def execute_next_node_run(
  211. run_id: str,
  212. payload: NodeRunExecuteRequest,
  213. tenant_id: str = Query(...),
  214. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  215. ) -> NodeRunExecuteResponse:
  216. try:
  217. result = service.execute_next_node_run(
  218. tenant_id=tenant_id,
  219. run_id=run_id,
  220. payload=payload,
  221. )
  222. except (
  223. CodeRunnerClientError,
  224. ModelGatewayClientError,
  225. HumanServiceClientError,
  226. ToolServiceClientError,
  227. WorkflowServiceClientError,
  228. ) as exc:
  229. raise HTTPException(status_code=502, detail=str(exc)) from exc
  230. if result is None:
  231. raise HTTPException(status_code=404, detail=f"queued node_run not found for run: {run_id}")
  232. workflow_run, node_run, executor_name = result
  233. return NodeRunExecuteResponse(
  234. run=WorkflowRunResponse.from_entity(workflow_run),
  235. node_run=NodeRunResponse.from_entity(node_run),
  236. executor_name=executor_name,
  237. )
  238. @router.post("/runs/{run_id}/execute", response_model=RunExecuteResponse)
  239. def execute_run(
  240. run_id: str,
  241. payload: RunExecuteRequest,
  242. tenant_id: str = Query(...),
  243. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  244. ) -> RunExecuteResponse:
  245. try:
  246. result = service.execute_run(
  247. tenant_id=tenant_id,
  248. run_id=run_id,
  249. payload=payload,
  250. )
  251. except (
  252. CodeRunnerClientError,
  253. ModelGatewayClientError,
  254. HumanServiceClientError,
  255. ToolServiceClientError,
  256. WorkflowServiceClientError,
  257. ) as exc:
  258. raise HTTPException(status_code=502, detail=str(exc)) from exc
  259. if result is None:
  260. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  261. workflow_run, node_runs, executor_names = result
  262. return RunExecuteResponse(
  263. run=WorkflowRunResponse.from_entity(workflow_run),
  264. node_runs=[NodeRunResponse.from_entity(item) for item in node_runs],
  265. executor_names=executor_names,
  266. )
  267. @router.get("/runs/{run_id}/debug/snapshot", response_model=RuntimeDebugSnapshotResponse)
  268. def get_runtime_debug_snapshot(
  269. run_id: str,
  270. tenant_id: str = Query(...),
  271. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  272. ) -> RuntimeDebugSnapshotResponse:
  273. snapshot = service.get_debug_snapshot(tenant_id=tenant_id, run_id=run_id)
  274. if snapshot is None:
  275. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  276. return build_runtime_debug_snapshot_response(snapshot)
  277. @router.post("/runs/{run_id}/debug/pause", response_model=RuntimeDebugSnapshotResponse)
  278. def pause_runtime_debug_run(
  279. run_id: str,
  280. tenant_id: str = Query(...),
  281. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  282. ) -> RuntimeDebugSnapshotResponse:
  283. snapshot = service.pause_run(tenant_id=tenant_id, run_id=run_id)
  284. if snapshot is None:
  285. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  286. return build_runtime_debug_snapshot_response(snapshot)
  287. @router.post("/runs/{run_id}/debug/resume", response_model=RuntimeDebugSnapshotResponse)
  288. def resume_runtime_debug_run(
  289. run_id: str,
  290. tenant_id: str = Query(...),
  291. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  292. ) -> RuntimeDebugSnapshotResponse:
  293. snapshot = service.resume_run(tenant_id=tenant_id, run_id=run_id)
  294. if snapshot is None:
  295. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  296. return build_runtime_debug_snapshot_response(snapshot)
  297. @router.post("/runs/{run_id}/debug/step", response_model=RuntimeDebugStepResponse)
  298. def step_runtime_debug_run(
  299. run_id: str,
  300. payload: NodeRunExecuteRequest,
  301. tenant_id: str = Query(...),
  302. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  303. ) -> RuntimeDebugStepResponse:
  304. result = service.step_debug_run(
  305. tenant_id=tenant_id,
  306. run_id=run_id,
  307. worker_key=payload.worker_key,
  308. )
  309. if result is None:
  310. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  311. snapshot, executed_node_runs, executor_names, reason = result
  312. return RuntimeDebugStepResponse(
  313. snapshot=build_runtime_debug_snapshot_response(snapshot),
  314. executed_node_runs=[NodeRunResponse.from_entity(item) for item in executed_node_runs],
  315. executor_names=executor_names,
  316. reason=reason,
  317. )
  318. @router.post("/runs/{run_id}/debug/continue", response_model=RuntimeDebugStepResponse)
  319. def continue_runtime_debug_run(
  320. run_id: str,
  321. payload: RuntimeDebugContinueRequest,
  322. tenant_id: str = Query(...),
  323. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  324. ) -> RuntimeDebugStepResponse:
  325. result = service.continue_debug_run(
  326. tenant_id=tenant_id,
  327. run_id=run_id,
  328. payload=payload,
  329. )
  330. if result is None:
  331. raise HTTPException(status_code=404, detail=f"workflow_run not found: {run_id}")
  332. snapshot, executed_node_runs, executor_names, paused_before_node_id, reason = result
  333. return RuntimeDebugStepResponse(
  334. snapshot=build_runtime_debug_snapshot_response(snapshot),
  335. executed_node_runs=[NodeRunResponse.from_entity(item) for item in executed_node_runs],
  336. executor_names=executor_names,
  337. paused_before_node_id=paused_before_node_id,
  338. reason=reason,
  339. )
  340. @router.post("/node-runs/{node_run_id}/resume-human", response_model=NodeRunExecuteResponse)
  341. def resume_human_node_run(
  342. node_run_id: str,
  343. payload: HumanNodeResumeRequest,
  344. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  345. ) -> NodeRunExecuteResponse:
  346. try:
  347. result = service.resume_human_node_run(
  348. node_run_id=node_run_id,
  349. payload=payload,
  350. )
  351. except (
  352. CodeRunnerClientError,
  353. ModelGatewayClientError,
  354. HumanServiceClientError,
  355. ToolServiceClientError,
  356. WorkflowServiceClientError,
  357. ) as exc:
  358. raise HTTPException(status_code=502, detail=str(exc)) from exc
  359. if result is None:
  360. raise HTTPException(
  361. status_code=404,
  362. detail=f"human node_run not found or task mismatch: {node_run_id}",
  363. )
  364. workflow_run, node_run, executor_name = result
  365. return NodeRunExecuteResponse(
  366. run=WorkflowRunResponse.from_entity(workflow_run),
  367. node_run=NodeRunResponse.from_entity(node_run),
  368. executor_name=executor_name,
  369. )
  370. @router.post("/workers/execute-next", response_model=WorkerExecuteNextResponse)
  371. def execute_next_worker_task(
  372. payload: WorkerExecuteNextRequest,
  373. settings: RuntimeServiceSettings = Depends(get_runtime_settings),
  374. service: RuntimeApplicationService = Depends(get_runtime_application_service),
  375. ) -> WorkerExecuteNextResponse:
  376. try:
  377. result = service.execute_next_claimed_node_run(
  378. worker_key=payload.worker_key,
  379. lease_seconds=payload.lease_seconds or settings.worker_lease_seconds,
  380. )
  381. except (
  382. CodeRunnerClientError,
  383. ModelGatewayClientError,
  384. HumanServiceClientError,
  385. ToolServiceClientError,
  386. WorkflowServiceClientError,
  387. ) as exc:
  388. raise HTTPException(status_code=502, detail=str(exc)) from exc
  389. if result is None:
  390. raise HTTPException(status_code=404, detail="queued worker task not found")
  391. workflow_run, node_run, executor_name, released_lease_count = result
  392. return WorkerExecuteNextResponse(
  393. run=WorkflowRunResponse.from_entity(workflow_run),
  394. node_run=NodeRunResponse.from_entity(node_run),
  395. executor_name=executor_name,
  396. released_lease_count=released_lease_count,
  397. )