services.py 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255
  1. from dataclasses import dataclass
  2. from datetime import datetime, timedelta
  3. from core_domain import (
  4. InitialNodeContract,
  5. NodeExecutionContextContract,
  6. NodeExecutionResultContract,
  7. NodeRunStatus,
  8. WorkflowRunStatus,
  9. WorkflowVersionContract,
  10. )
  11. from core_dsl import parse_workflow_definition
  12. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  13. from core_shared import JSONValue, try_build_redis_client
  14. from core_shared.task_queue import TaskQueuePublisher
  15. from sqlalchemy.orm import Session
  16. from app.bootstrap.settings import RuntimeServiceSettings
  17. from app.db.models import ExecutionLog, NodeArtifact, NodeRun, TraceSpan, WorkflowRun
  18. from app.domain.repositories import (
  19. ExecutionLogRepository,
  20. NodeArtifactRepository,
  21. NodeRunRepository,
  22. TraceSpanRepository,
  23. WorkflowRunRepository,
  24. )
  25. from app.infrastructure.code_runner_client import CodeRunnerClient
  26. from app.infrastructure.executors import (
  27. NodeExecutionDispatcher,
  28. build_node_execution_dispatcher_with_clients,
  29. )
  30. from app.infrastructure.human_client import HumanServiceClient
  31. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  32. from app.infrastructure.model_gateway_client import ModelGatewayClient
  33. from app.infrastructure.planner import (
  34. derive_initial_node,
  35. derive_node_config,
  36. derive_successor_nodes,
  37. )
  38. from app.infrastructure.tool_client import ToolServiceClient
  39. from app.infrastructure.workflow_client import WorkflowServiceClient
  40. from app.schemas.run import (
  41. HumanNodeResumeRequest,
  42. NodeRunExecuteRequest,
  43. NodeRunStatusUpdateRequest,
  44. RunCreateRequest,
  45. RunExecuteRequest,
  46. RuntimeDebugContinueRequest,
  47. WorkflowRunStatusUpdateRequest,
  48. )
  49. @dataclass(frozen=True)
  50. class RuntimeDebugSnapshot:
  51. run: WorkflowRun
  52. node_runs: list[NodeRun]
  53. run_state_json: dict[str, JSONValue]
  54. node_output_json_by_node_id: dict[str, dict[str, JSONValue]]
  55. node_output_text_by_node_id: dict[str, str]
  56. queued_node_ids: list[str]
  57. running_node_ids: list[str]
  58. completed_node_ids: list[str]
  59. failed_node_ids: list[str]
  60. execution_logs: list[ExecutionLog]
  61. node_artifacts: list[NodeArtifact]
  62. trace_spans: list[TraceSpan]
  63. class RuntimeApplicationService:
  64. def __init__(
  65. self,
  66. workflow_run_repository: WorkflowRunRepository,
  67. node_run_repository: NodeRunRepository,
  68. execution_log_repository: ExecutionLogRepository,
  69. node_artifact_repository: NodeArtifactRepository,
  70. trace_span_repository: TraceSpanRepository,
  71. execution_dispatcher: NodeExecutionDispatcher,
  72. workflow_client: WorkflowServiceClient | None = None,
  73. event_client: EventServiceClient | None = None,
  74. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  75. self.workflow_run_repository = workflow_run_repository
  76. self.node_run_repository = node_run_repository
  77. self.execution_log_repository = execution_log_repository
  78. self.node_artifact_repository = node_artifact_repository
  79. self.trace_span_repository = trace_span_repository
  80. self.execution_dispatcher = execution_dispatcher
  81. self.workflow_client = workflow_client
  82. self.event_client = event_client
  83. self.task_queue_publisher = task_queue_publisher
  84. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  85. initial_node = payload.initial_node or self._plan_initial_node(payload)
  86. workflow_run = self.workflow_run_repository.create(
  87. app_id=payload.app_id,
  88. app_version_id=payload.app_version_id,
  89. workflow_id=payload.workflow_id,
  90. workflow_version_id=payload.workflow_version_id,
  91. session_id=payload.session_id,
  92. parent_run_id=payload.parent_run_id,
  93. root_run_id=payload.root_run_id,
  94. run_type=payload.run_type,
  95. trigger_type=payload.trigger_type,
  96. priority=payload.priority)
  97. node_run = None
  98. if initial_node is not None:
  99. self.workflow_run_repository.update_node_count(
  100. run_id=workflow_run.id,
  101. current_node_count=1)
  102. initial_config = self._resolve_node_config(
  103. workflow_version_id=payload.workflow_version_id,
  104. node_id=initial_node.node_id)
  105. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  106. node_run = self.node_run_repository.create(
  107. run_id=workflow_run.id,
  108. node_id=initial_node.node_id,
  109. node_type=initial_node.node_type,
  110. status=initial_node.status,
  111. scheduled_time=scheduled_time,
  112. timeout_time=timeout_time)
  113. self._log_event(
  114. run_id=workflow_run.id,
  115. node_run_id=node_run.id,
  116. event_type="node_queued",
  117. message=f"initial node queued: {initial_node.node_id}",
  118. detail_json={
  119. "node_id": initial_node.node_id,
  120. "node_type": initial_node.node_type,
  121. "status": initial_node.status,
  122. })
  123. self._publish_node_run_to_queue(node_run)
  124. self._log_event(
  125. run_id=workflow_run.id,
  126. node_run_id=node_run.id if node_run is not None else None,
  127. event_type="run_created",
  128. message="workflow run created",
  129. detail_json={
  130. "workflow_id": payload.workflow_id,
  131. "workflow_version_id": payload.workflow_version_id,
  132. "session_id": payload.session_id,
  133. })
  134. self._publish_event(
  135. event_type="workflow.run.created",
  136. workflow_run=workflow_run,
  137. payload_json={
  138. "run_id": workflow_run.id,
  139. "workflow_id": workflow_run.workflow_id,
  140. "workflow_version_id": workflow_run.workflow_version_id,
  141. "status": workflow_run.status,
  142. })
  143. return workflow_run, node_run
  144. def list_runs(self, session_id: str | None = None) -> list[WorkflowRun]:
  145. return self.workflow_run_repository.list_by_scope(
  146. session_id=session_id)
  147. def list_node_runs(self, run_id: str) -> list[NodeRun]:
  148. return self.node_run_repository.list_by_run(run_id=run_id)
  149. def list_execution_logs(
  150. self,
  151. run_id: str | None = None,
  152. node_run_id: str | None = None):
  153. return self.execution_log_repository.list_by_scope(
  154. run_id=run_id,
  155. node_run_id=node_run_id)
  156. def list_node_artifacts(
  157. self,
  158. run_id: str | None = None,
  159. node_run_id: str | None = None,
  160. artifact_type: str | None = None):
  161. return self.node_artifact_repository.list_by_scope(
  162. run_id=run_id,
  163. node_run_id=node_run_id,
  164. artifact_type=artifact_type)
  165. def list_trace_spans(
  166. self,
  167. run_id: str | None = None,
  168. node_run_id: str | None = None,
  169. span_type: str | None = None):
  170. return self.trace_span_repository.list_by_scope(
  171. run_id=run_id,
  172. node_run_id=node_run_id,
  173. span_type=span_type)
  174. def get_debug_snapshot(
  175. self,
  176. *,
  177. run_id: str) -> RuntimeDebugSnapshot | None:
  178. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  179. if workflow_run is None:
  180. return None
  181. return self._build_debug_snapshot(workflow_run)
  182. def update_run_status(
  183. self,
  184. run_id: str,
  185. payload: WorkflowRunStatusUpdateRequest) -> WorkflowRun | None:
  186. entity = self.workflow_run_repository.update_status(
  187. run_id=run_id,
  188. status=payload.status,
  189. error_code=payload.error_code,
  190. error_message=payload.error_message)
  191. if entity is not None:
  192. self._publish_event(
  193. event_type=f"workflow.run.{entity.status}",
  194. workflow_run=entity,
  195. payload_json={
  196. "run_id": entity.id,
  197. "status": entity.status,
  198. "error_code": entity.error_code,
  199. })
  200. return entity
  201. def update_node_run_status(
  202. self,
  203. node_run_id: str,
  204. payload: NodeRunStatusUpdateRequest) -> NodeRun | None:
  205. node_run = self.node_run_repository.update_status(
  206. node_run_id=node_run_id,
  207. status=payload.status,
  208. worker_key=payload.worker_key,
  209. error_code=payload.error_code,
  210. error_message=payload.error_message,
  211. output_text=payload.output_text,
  212. output_json=payload.output_json)
  213. if node_run is None:
  214. return None
  215. self._log_event(
  216. run_id=node_run.run_id,
  217. node_run_id=node_run.id,
  218. event_type="node_status_updated",
  219. message=f"node status updated to {payload.status}",
  220. detail_json={
  221. "node_id": node_run.node_id,
  222. "node_type": node_run.node_type,
  223. "status": payload.status,
  224. "error_code": payload.error_code,
  225. })
  226. self._publish_event(
  227. event_type=f"workflow.node.{node_run.status}",
  228. workflow_run=None,
  229. node_run=node_run,
  230. payload_json={
  231. "run_id": node_run.run_id,
  232. "node_run_id": node_run.id,
  233. "node_id": node_run.node_id,
  234. "node_type": node_run.node_type,
  235. "status": node_run.status,
  236. "error_code": node_run.error_code,
  237. })
  238. if payload.status == "completed":
  239. self._schedule_successor_nodes(node_run)
  240. if payload.status == "failed":
  241. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  242. if workflow_run is not None:
  243. node_config = self._resolve_node_config(
  244. workflow_version_id=workflow_run.workflow_version_id,
  245. node_id=node_run.node_id)
  246. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  247. self._sync_workflow_run_status_from_nodes(
  248. run_id=node_run.run_id)
  249. return node_run
  250. def execute_node_run(
  251. self,
  252. node_run_id: str,
  253. payload: NodeRunExecuteRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
  254. node_run = self.node_run_repository.get_by_id(node_run_id)
  255. if node_run is None:
  256. return None
  257. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  258. if workflow_run is None:
  259. return None
  260. if workflow_run.status == "paused":
  261. return workflow_run, node_run, "debug_paused"
  262. if node_run.status in {"completed", "failed", "skipped"}:
  263. executor_name = self.execution_dispatcher.resolve_executor(
  264. node_run.node_type
  265. ).executor_name
  266. return workflow_run, node_run, executor_name
  267. node_config = self._resolve_node_config(
  268. workflow_version_id=workflow_run.workflow_version_id,
  269. node_id=node_run.node_id)
  270. if self._node_has_timed_out(node_run):
  271. timed_out_node_run = self.update_node_run_status(
  272. node_run_id=node_run.id,
  273. payload=NodeRunStatusUpdateRequest(
  274. status="failed",
  275. worker_key=payload.worker_key,
  276. error_code="node_timeout",
  277. error_message=f"node timed out: {node_run.node_id}",
  278. output_json={
  279. "timeout_time": node_run.timeout_time.isoformat()
  280. if node_run.timeout_time is not None
  281. else None,
  282. }))
  283. if timed_out_node_run is None:
  284. return None
  285. executor_name = self.execution_dispatcher.resolve_executor(
  286. node_run.node_type
  287. ).executor_name
  288. return workflow_run, timed_out_node_run, executor_name
  289. running_node_run = self.node_run_repository.update_status(
  290. node_run_id=node_run_id,
  291. status="running",
  292. worker_key=payload.worker_key)
  293. if running_node_run is None:
  294. return None
  295. self._log_event(
  296. run_id=running_node_run.run_id,
  297. node_run_id=running_node_run.id,
  298. event_type="node_execution_started",
  299. message=f"executing node {running_node_run.node_id}",
  300. detail_json={
  301. "node_id": running_node_run.node_id,
  302. "node_type": running_node_run.node_type,
  303. "worker_key": payload.worker_key,
  304. })
  305. context = self._build_execution_context(
  306. workflow_run=workflow_run,
  307. node_run=running_node_run,
  308. worker_key=payload.worker_key,
  309. node_config_json=node_config)
  310. executor_name = self.execution_dispatcher.resolve_executor(
  311. running_node_run.node_type
  312. ).executor_name
  313. trace_span = self.trace_span_repository.start(
  314. run_id=running_node_run.run_id,
  315. node_run_id=running_node_run.id,
  316. parent_span_id=None,
  317. span_type="node_execution",
  318. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  319. attributes_json={
  320. "node_id": running_node_run.node_id,
  321. "node_type": running_node_run.node_type,
  322. "executor_name": executor_name,
  323. "worker_key": payload.worker_key,
  324. })
  325. try:
  326. result, executor_name = self.execution_dispatcher.execute(
  327. context=context,
  328. request=payload)
  329. except Exception as exc:
  330. result = NodeExecutionResultContract(
  331. status="failed",
  332. worker_key=payload.worker_key,
  333. error_code="executor_error",
  334. error_message=str(exc))
  335. if result.status == "failed" and self._should_retry_node(
  336. node_run=running_node_run,
  337. node_config_json=context.node_config_json):
  338. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  339. retried_node_run = self.node_run_repository.requeue_for_retry(
  340. node_run_id=running_node_run.id,
  341. scheduled_time=retry_time,
  342. timeout_time=retry_timeout_time,
  343. error_code=result.error_code,
  344. error_message=result.error_message,
  345. output_text=result.output_text,
  346. output_json={
  347. **(result.output_json or {}),
  348. "retry_scheduled_time": retry_time.isoformat(),
  349. "retry_reason": result.error_code or "node_failed",
  350. })
  351. if retried_node_run is None:
  352. return None
  353. self._publish_node_run_to_queue(retried_node_run)
  354. self.trace_span_repository.finish(
  355. span_id=trace_span.id,
  356. status="error",
  357. error_code=result.error_code,
  358. error_message=result.error_message,
  359. attributes_json={
  360. "node_status": "queued",
  361. "executor_name": executor_name,
  362. "retry_scheduled": True,
  363. "attempt_no": retried_node_run.attempt_no,
  364. })
  365. self._log_event(
  366. run_id=retried_node_run.run_id,
  367. node_run_id=retried_node_run.id,
  368. event_type="node_retry_scheduled",
  369. message=f"node retry scheduled: {retried_node_run.node_id}",
  370. detail_json={
  371. "node_id": retried_node_run.node_id,
  372. "attempt_no": retried_node_run.attempt_no,
  373. "scheduled_time": retry_time.isoformat(),
  374. "error_code": result.error_code,
  375. })
  376. self._sync_workflow_run_status_from_nodes(
  377. run_id=retried_node_run.run_id)
  378. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  379. if workflow_run is None:
  380. return None
  381. return workflow_run, retried_node_run, executor_name
  382. final_node_run = self.update_node_run_status(
  383. node_run_id=running_node_run.id,
  384. payload=NodeRunStatusUpdateRequest(
  385. status=result.status,
  386. worker_key=result.worker_key,
  387. error_code=result.error_code,
  388. error_message=result.error_message,
  389. output_text=result.output_text,
  390. output_json=result.output_json))
  391. if final_node_run is None:
  392. return None
  393. self.trace_span_repository.finish(
  394. span_id=trace_span.id,
  395. status="ok" if final_node_run.status == "completed" else "error",
  396. error_code=final_node_run.error_code,
  397. error_message=final_node_run.error_message,
  398. attributes_json={
  399. "node_status": final_node_run.status,
  400. "executor_name": executor_name,
  401. "has_output_text": final_node_run.output_text is not None,
  402. "has_output_json": final_node_run.output_json is not None,
  403. })
  404. self._persist_node_execution_artifact(final_node_run)
  405. self._log_event(
  406. run_id=final_node_run.run_id,
  407. node_run_id=final_node_run.id,
  408. event_type="node_execution_finished",
  409. message=f"node execution finished with status {final_node_run.status}",
  410. detail_json={
  411. "node_id": final_node_run.node_id,
  412. "node_type": final_node_run.node_type,
  413. "executor_name": executor_name,
  414. "status": final_node_run.status,
  415. })
  416. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  417. if workflow_run is None:
  418. return None
  419. return workflow_run, final_node_run, executor_name
  420. def execute_next_node_run(
  421. self,
  422. run_id: str,
  423. payload: NodeRunExecuteRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
  424. next_node_run = self.node_run_repository.get_next_queued_by_run(
  425. run_id=run_id)
  426. if next_node_run is None:
  427. return None
  428. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  429. def execute_run(
  430. self,
  431. run_id: str,
  432. payload: RunExecuteRequest) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  433. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  434. if workflow_run is None:
  435. return None
  436. executed_node_runs: list[NodeRun] = []
  437. executor_names: list[str] = []
  438. for _ in range(payload.max_steps):
  439. step_result = self.execute_next_node_run(
  440. run_id=run_id,
  441. payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
  442. if step_result is None:
  443. break
  444. workflow_run, node_run, executor_name = step_result
  445. executed_node_runs.append(node_run)
  446. executor_names.append(executor_name)
  447. if node_run.status != "completed":
  448. break
  449. final_run = self.workflow_run_repository.get_by_id(run_id)
  450. if final_run is None:
  451. return None
  452. return final_run, executed_node_runs, executor_names
  453. def pause_run(
  454. self,
  455. *,
  456. run_id: str,
  457. reason: str = "debug_pause") -> RuntimeDebugSnapshot | None:
  458. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  459. if workflow_run is None:
  460. return None
  461. self._sync_workflow_run_status_from_nodes(run_id=run_id)
  462. latest_run = self.workflow_run_repository.get_by_id(run_id)
  463. if latest_run is None:
  464. return None
  465. if latest_run.status in {"completed", "failed", "cancelled"}:
  466. paused_run = latest_run
  467. else:
  468. paused_run = self.workflow_run_repository.update_status(
  469. run_id=run_id,
  470. status="paused")
  471. if paused_run is None:
  472. return None
  473. self._log_event(
  474. run_id=run_id,
  475. node_run_id=None,
  476. event_type="debug_run_paused",
  477. message=f"workflow run paused: {reason}",
  478. detail_json={"reason": reason})
  479. return self._build_debug_snapshot(paused_run)
  480. def resume_run(
  481. self,
  482. *,
  483. run_id: str,
  484. reason: str = "debug_resume") -> RuntimeDebugSnapshot | None:
  485. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  486. if workflow_run is None:
  487. return None
  488. resumed_run = self.workflow_run_repository.update_status(
  489. run_id=run_id,
  490. status="running")
  491. if resumed_run is None:
  492. return None
  493. self._log_event(
  494. run_id=run_id,
  495. node_run_id=None,
  496. event_type="debug_run_resumed",
  497. message=f"workflow run resumed: {reason}",
  498. detail_json={"reason": reason})
  499. return self._build_debug_snapshot(resumed_run)
  500. def step_debug_run(
  501. self,
  502. *,
  503. run_id: str,
  504. worker_key: str | None = None) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str] | None:
  505. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  506. if workflow_run is None:
  507. return None
  508. if workflow_run.status == "paused":
  509. self.workflow_run_repository.update_status(run_id=run_id, status="running")
  510. result = self.execute_next_node_run(
  511. run_id=run_id,
  512. payload=NodeRunExecuteRequest(worker_key=worker_key))
  513. executed_node_runs: list[NodeRun] = []
  514. executor_names: list[str] = []
  515. reason = "no_queued_node"
  516. if result is not None:
  517. _, node_run, executor_name = result
  518. executed_node_runs.append(node_run)
  519. executor_names.append(executor_name)
  520. reason = "step_completed"
  521. self._sync_workflow_run_status_from_nodes(run_id=run_id)
  522. latest_run = self.workflow_run_repository.get_by_id(run_id)
  523. if latest_run is None:
  524. return None
  525. if latest_run.status in {"completed", "failed", "cancelled"}:
  526. paused_run = latest_run
  527. else:
  528. paused_run = self.workflow_run_repository.update_status(
  529. run_id=run_id,
  530. status="paused")
  531. if paused_run is None:
  532. return None
  533. self._log_event(
  534. run_id=run_id,
  535. node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
  536. event_type="debug_step_finished",
  537. message=f"debug step finished: {reason}",
  538. detail_json={
  539. "reason": reason,
  540. "executed_node_ids": [item.node_id for item in executed_node_runs],
  541. })
  542. return self._build_debug_snapshot(paused_run), executed_node_runs, executor_names, reason
  543. def continue_debug_run(
  544. self,
  545. *,
  546. run_id: str,
  547. payload: RuntimeDebugContinueRequest) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str | None, str] | None:
  548. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  549. if workflow_run is None:
  550. return None
  551. if workflow_run.status == "paused":
  552. self.workflow_run_repository.update_status(run_id=run_id, status="running")
  553. breakpoint_node_ids = set(payload.breakpoint_node_ids)
  554. executed_node_runs: list[NodeRun] = []
  555. executor_names: list[str] = []
  556. paused_before_node_id: str | None = None
  557. reason = "completed"
  558. for _ in range(payload.max_steps):
  559. next_node_run = self.node_run_repository.get_next_queued_by_run(
  560. run_id=run_id)
  561. if next_node_run is None:
  562. reason = "no_queued_node"
  563. break
  564. if next_node_run.node_id in breakpoint_node_ids:
  565. paused_before_node_id = next_node_run.node_id
  566. reason = "breakpoint_hit"
  567. break
  568. step_result = self.execute_node_run(
  569. node_run_id=next_node_run.id,
  570. payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
  571. if step_result is None:
  572. reason = "node_not_found"
  573. break
  574. _, node_run, executor_name = step_result
  575. executed_node_runs.append(node_run)
  576. executor_names.append(executor_name)
  577. if node_run.status != "completed":
  578. reason = f"node_{node_run.status}"
  579. break
  580. else:
  581. reason = "max_steps_reached"
  582. self._sync_workflow_run_status_from_nodes(run_id=run_id)
  583. latest_run = self.workflow_run_repository.get_by_id(run_id)
  584. if latest_run is None:
  585. return None
  586. if latest_run.status in {"completed", "failed", "cancelled"}:
  587. paused_run = latest_run
  588. else:
  589. paused_run = self.workflow_run_repository.update_status(
  590. run_id=run_id,
  591. status="paused")
  592. if paused_run is None:
  593. return None
  594. self._log_event(
  595. run_id=run_id,
  596. node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
  597. event_type="debug_continue_paused",
  598. message=f"debug continue paused: {reason}",
  599. detail_json={
  600. "reason": reason,
  601. "paused_before_node_id": paused_before_node_id,
  602. "executed_node_ids": [item.node_id for item in executed_node_runs],
  603. "breakpoint_node_ids": list(breakpoint_node_ids),
  604. })
  605. return (
  606. self._build_debug_snapshot(paused_run),
  607. executed_node_runs,
  608. executor_names,
  609. paused_before_node_id,
  610. reason)
  611. def resume_human_node_run(
  612. self,
  613. *,
  614. node_run_id: str,
  615. payload: HumanNodeResumeRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
  616. node_run = self.node_run_repository.get_by_id(node_run_id)
  617. if node_run is None:
  618. return None
  619. output_json = dict(node_run.output_json or {})
  620. existing_human_task_id = output_json.get("human_task_id")
  621. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  622. return None
  623. if existing_human_task_id is None:
  624. output_json["human_task_id"] = payload.human_task_id
  625. self.node_run_repository.update_status(
  626. node_run_id=node_run.id,
  627. status="pending",
  628. worker_key=payload.worker_key,
  629. output_json=output_json)
  630. return self.execute_node_run(
  631. node_run_id=node_run_id,
  632. payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
  633. def execute_next_claimed_node_run(
  634. self,
  635. *,
  636. worker_key: str,
  637. lease_seconds: int,
  638. redis_client: object | None = None) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  639. released_lease_count = self.node_run_repository.release_expired_leases(
  640. now_time=datetime.utcnow())
  641. claimed_node_run = self.node_run_repository.claim_next_queued(
  642. worker_key=worker_key,
  643. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  644. if claimed_node_run is None:
  645. return None
  646. if redis_client is not None:
  647. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  648. lock = DistributedLock(
  649. client=redis_client,
  650. name=f"node-run:{claimed_node_run.id}:lock",
  651. ttl_seconds=lease_seconds)
  652. if not lock.acquire():
  653. return None
  654. idempotency_store = IdempotencyStore(
  655. client=redis_client,
  656. prefix="node-run-idempotency")
  657. if not idempotency_store.begin(key=claimed_node_run.id):
  658. lock.release()
  659. return None
  660. else:
  661. lock = None
  662. idempotency_store = None
  663. try:
  664. result = self.execute_node_run(
  665. node_run_id=claimed_node_run.id,
  666. payload=NodeRunExecuteRequest(worker_key=worker_key))
  667. if idempotency_store is not None and result is not None:
  668. _, node_run, executor_name = result
  669. idempotency_store.complete(
  670. key=claimed_node_run.id,
  671. result={
  672. "status": node_run.status,
  673. "node_run_id": node_run.id,
  674. "executor_name": executor_name,
  675. })
  676. finally:
  677. if lock is not None:
  678. lock.release()
  679. if result is None:
  680. return None
  681. workflow_run, node_run, executor_name = result
  682. return workflow_run, node_run, executor_name, released_lease_count
  683. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  684. if node_run.output_text is None and node_run.output_json is None:
  685. return
  686. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  687. self.node_artifact_repository.create(
  688. run_id=node_run.run_id,
  689. node_run_id=node_run.id,
  690. node_id=node_run.node_id,
  691. artifact_type="execution_result",
  692. name=f"{node_run.node_id}-execution-result",
  693. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  694. content_text=node_run.output_text,
  695. content_json=node_run.output_json,
  696. size_bytes=size_bytes)
  697. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  698. if self.workflow_client is None:
  699. return None
  700. workflow_version = self.workflow_client.get_workflow_version(
  701. workflow_version_id=payload.workflow_version_id)
  702. return derive_initial_node(workflow_version)
  703. def _resolve_node_config(
  704. self,
  705. *,
  706. workflow_version_id: str,
  707. node_id: str) -> dict[str, JSONValue]:
  708. if self.workflow_client is None:
  709. return {}
  710. workflow_version = self.workflow_client.get_workflow_version(
  711. workflow_version_id=workflow_version_id)
  712. return derive_node_config(workflow_version, node_id)
  713. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  714. if self.workflow_client is None:
  715. return
  716. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  717. if workflow_run is None:
  718. return
  719. workflow_version = self.workflow_client.get_workflow_version(
  720. workflow_version_id=workflow_run.workflow_version_id)
  721. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  722. self._build_run_state_maps(
  723. run_id=node_run.run_id)
  724. )
  725. successor_nodes = derive_successor_nodes(
  726. workflow_version,
  727. node_run.node_id,
  728. current_output_json=node_run.output_json,
  729. run_state_json=run_state_json,
  730. node_output_json_by_node_id=node_output_json_by_node_id,
  731. node_output_text_by_node_id=node_output_text_by_node_id)
  732. if not successor_nodes:
  733. return
  734. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  735. run_id=node_run.run_id,
  736. node_ids=[item.node_id for item in successor_nodes])
  737. existing_node_counts: dict[str, int] = {}
  738. for item in existing_nodes:
  739. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  740. for successor in successor_nodes:
  741. successor_config = derive_node_config(workflow_version, successor.node_id)
  742. if not self._is_join_ready(
  743. workflow_version=workflow_version,
  744. run_node_runs=self.node_run_repository.list_by_run(
  745. run_id=node_run.run_id),
  746. successor_node_id=successor.node_id,
  747. successor_node_type=successor.node_type,
  748. successor_config=successor_config):
  749. self._log_event(
  750. run_id=node_run.run_id,
  751. node_run_id=None,
  752. event_type="join_waiting",
  753. message=f"join node waiting for predecessors: {successor.node_id}",
  754. detail_json={
  755. "node_id": successor.node_id,
  756. "source_node_id": node_run.node_id,
  757. })
  758. continue
  759. if not self._can_schedule_repeated_node(
  760. successor_config,
  761. existing_count=existing_node_counts.get(successor.node_id, 0)):
  762. continue
  763. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  764. created = self.node_run_repository.create(
  765. run_id=node_run.run_id,
  766. parent_node_run_id=node_run.id,
  767. node_id=successor.node_id,
  768. node_type=successor.node_type,
  769. status=successor.status,
  770. scheduled_time=scheduled_time,
  771. timeout_time=timeout_time)
  772. self._publish_node_run_to_queue(created)
  773. existing_node_counts[successor.node_id] = (
  774. existing_node_counts.get(successor.node_id, 0) + 1
  775. )
  776. self._log_event(
  777. run_id=node_run.run_id,
  778. node_run_id=None,
  779. event_type="node_queued",
  780. message=f"successor node queued: {successor.node_id}",
  781. detail_json={
  782. "node_id": successor.node_id,
  783. "node_type": successor.node_type,
  784. "status": successor.status,
  785. "source_node_id": node_run.node_id,
  786. })
  787. def _build_execution_context(
  788. self,
  789. *,
  790. workflow_run: WorkflowRun,
  791. node_run: NodeRun,
  792. worker_key: str | None,
  793. node_config_json: dict[str, JSONValue] | None = None) -> NodeExecutionContextContract:
  794. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  795. self._build_run_state_maps(
  796. run_id=node_run.run_id)
  797. )
  798. return NodeExecutionContextContract(
  799. run_id=node_run.run_id,
  800. node_run_id=node_run.id,
  801. node_id=node_run.node_id,
  802. node_type=node_run.node_type,
  803. node_config_json=node_config_json
  804. if node_config_json is not None
  805. else self._resolve_node_config(
  806. workflow_version_id=workflow_run.workflow_version_id,
  807. node_id=node_run.node_id),
  808. run_state_json=run_state_json,
  809. node_output_json_by_node_id=node_output_json_by_node_id,
  810. node_output_text_by_node_id=node_output_text_by_node_id,
  811. worker_key=worker_key)
  812. def _build_run_state_maps(
  813. self,
  814. *,
  815. run_id: str) -> tuple[
  816. dict[str, JSONValue],
  817. dict[str, dict[str, JSONValue]],
  818. dict[str, str],
  819. ]:
  820. node_runs = self.node_run_repository.list_by_run(run_id=run_id)
  821. run_state_json: dict[str, JSONValue] = {}
  822. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  823. node_output_text_by_node_id: dict[str, str] = {}
  824. for item in node_runs:
  825. if item.output_json is not None:
  826. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  827. state_updates = item.output_json.get("state_updates")
  828. if isinstance(state_updates, dict):
  829. for state_key, state_value in state_updates.items():
  830. run_state_json[str(state_key)] = state_value
  831. if item.output_text is not None:
  832. node_output_text_by_node_id[item.node_id] = item.output_text
  833. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  834. def _build_debug_snapshot(self, workflow_run: WorkflowRun) -> RuntimeDebugSnapshot:
  835. node_runs = self.node_run_repository.list_by_run(
  836. run_id=workflow_run.id)
  837. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  838. self._build_run_state_maps(
  839. run_id=workflow_run.id)
  840. )
  841. return RuntimeDebugSnapshot(
  842. run=workflow_run,
  843. node_runs=node_runs,
  844. run_state_json=run_state_json,
  845. node_output_json_by_node_id=node_output_json_by_node_id,
  846. node_output_text_by_node_id=node_output_text_by_node_id,
  847. queued_node_ids=[
  848. item.node_id for item in node_runs if item.status in {"pending", "queued"}
  849. ],
  850. running_node_ids=[item.node_id for item in node_runs if item.status == "running"],
  851. completed_node_ids=[
  852. item.node_id for item in node_runs if item.status in {"completed", "skipped"}
  853. ],
  854. failed_node_ids=[item.node_id for item in node_runs if item.status == "failed"],
  855. execution_logs=self.execution_log_repository.list_by_scope(
  856. run_id=workflow_run.id),
  857. node_artifacts=self.node_artifact_repository.list_by_scope(
  858. run_id=workflow_run.id),
  859. trace_spans=self.trace_span_repository.list_by_scope(
  860. run_id=workflow_run.id))
  861. def _build_node_timing(
  862. self,
  863. node_config_json: dict[str, JSONValue]) -> tuple[datetime, datetime | None]:
  864. now = datetime.utcnow()
  865. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  866. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  867. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  868. timeout_time = (
  869. scheduled_time + timedelta(seconds=timeout_seconds)
  870. if timeout_seconds > 0
  871. else None
  872. )
  873. return scheduled_time, timeout_time
  874. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  875. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  876. def _should_retry_node(
  877. self,
  878. *,
  879. node_run: NodeRun,
  880. node_config_json: dict[str, JSONValue]) -> bool:
  881. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  882. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  883. return max_attempts > node_run.attempt_no
  884. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  885. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  886. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  887. def _build_retry_timing(
  888. self,
  889. node_config_json: dict[str, JSONValue]) -> tuple[datetime, datetime | None]:
  890. retry_time = datetime.utcnow() + timedelta(
  891. seconds=self._read_retry_delay_seconds(node_config_json)
  892. )
  893. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  894. timeout_time = (
  895. retry_time + timedelta(seconds=timeout_seconds)
  896. if timeout_seconds > 0
  897. else None
  898. )
  899. return retry_time, timeout_time
  900. def _is_join_ready(
  901. self,
  902. *,
  903. workflow_version: WorkflowVersionContract,
  904. run_node_runs: list[NodeRun],
  905. successor_node_id: str,
  906. successor_node_type: str,
  907. successor_config: dict[str, JSONValue]) -> bool:
  908. join_policy = self._read_string_value(successor_config, "join_policy")
  909. if join_policy is None and successor_node_type != "join":
  910. return True
  911. workflow = self._parse_workflow(workflow_version)
  912. if workflow is None:
  913. return True
  914. predecessor_ids = [
  915. edge.source for edge in workflow.edges if edge.target == successor_node_id
  916. ]
  917. if not predecessor_ids:
  918. return True
  919. completed_node_ids = {
  920. item.node_id
  921. for item in run_node_runs
  922. if item.status in {"completed", "skipped"}
  923. }
  924. if join_policy in {None, "all_completed"}:
  925. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  926. if join_policy == "any_completed":
  927. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  928. return True
  929. def _can_schedule_repeated_node(
  930. self,
  931. node_config_json: dict[str, JSONValue],
  932. *,
  933. existing_count: int) -> bool:
  934. if existing_count == 0:
  935. return True
  936. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  937. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  938. return allow_loop and existing_count < max_iterations
  939. def _schedule_compensation_node(
  940. self,
  941. *,
  942. node_run: NodeRun,
  943. node_config: dict[str, JSONValue]) -> None:
  944. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  945. if compensation_node_id is None:
  946. compensation_config = self._read_dict_value(node_config, "compensation")
  947. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  948. if compensation_node_id is None:
  949. return
  950. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  951. if workflow_run is None:
  952. return
  953. compensation_config = self._resolve_node_config(
  954. workflow_version_id=workflow_run.workflow_version_id,
  955. node_id=compensation_node_id)
  956. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  957. run_id=node_run.run_id,
  958. node_ids=[compensation_node_id])
  959. if existing_nodes and not self._can_schedule_repeated_node(
  960. compensation_config,
  961. existing_count=len(existing_nodes)):
  962. return
  963. compensation_node_type = self._resolve_workflow_node_type(
  964. workflow_version_id=workflow_run.workflow_version_id,
  965. node_id=compensation_node_id) or "compensation"
  966. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  967. created = self.node_run_repository.create(
  968. run_id=node_run.run_id,
  969. parent_node_run_id=node_run.id,
  970. node_id=compensation_node_id,
  971. node_type=compensation_node_type,
  972. status="queued",
  973. scheduled_time=scheduled_time,
  974. timeout_time=timeout_time)
  975. self._publish_node_run_to_queue(created)
  976. self._log_event(
  977. run_id=node_run.run_id,
  978. node_run_id=created.id,
  979. event_type="compensation_queued",
  980. message=f"compensation node queued: {compensation_node_id}",
  981. detail_json={
  982. "failed_node_id": node_run.node_id,
  983. "compensation_node_id": compensation_node_id,
  984. })
  985. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  986. return parse_workflow_definition(workflow_version.dsl_json)
  987. def _resolve_workflow_node_type(
  988. self,
  989. *,
  990. workflow_version_id: str,
  991. node_id: str) -> str | None:
  992. if self.workflow_client is None:
  993. return None
  994. workflow_version = self.workflow_client.get_workflow_version(
  995. workflow_version_id=workflow_version_id)
  996. workflow = self._parse_workflow(workflow_version)
  997. if workflow is None:
  998. return None
  999. for node in workflow.nodes:
  1000. if node.id == node_id:
  1001. return node.type
  1002. return None
  1003. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  1004. value = payload.get(key)
  1005. if isinstance(value, str) and value:
  1006. return value
  1007. return None
  1008. def _read_bool_value(
  1009. self,
  1010. payload: dict[str, JSONValue],
  1011. key: str,
  1012. *,
  1013. default: bool) -> bool:
  1014. value = payload.get(key)
  1015. if isinstance(value, bool):
  1016. return value
  1017. return default
  1018. def _read_int_value(
  1019. self,
  1020. payload: dict[str, JSONValue],
  1021. key: str,
  1022. *,
  1023. default: int) -> int:
  1024. value = payload.get(key)
  1025. if isinstance(value, int) and not isinstance(value, bool):
  1026. return value
  1027. return default
  1028. def _read_dict_value(
  1029. self,
  1030. payload: dict[str, JSONValue],
  1031. key: str) -> dict[str, JSONValue]:
  1032. value = payload.get(key)
  1033. if isinstance(value, dict):
  1034. return {str(item_key): item_value for item_key, item_value in value.items()}
  1035. return {}
  1036. def _sync_workflow_run_status_from_nodes(self, *, run_id: str) -> None:
  1037. node_runs = self.node_run_repository.list_by_run(run_id=run_id)
  1038. if not node_runs:
  1039. return
  1040. self.workflow_run_repository.update_node_count(
  1041. run_id=run_id,
  1042. current_node_count=len(node_runs))
  1043. next_status, error_code, error_message = self._derive_run_status(node_runs)
  1044. self.workflow_run_repository.update_status(
  1045. run_id=run_id,
  1046. status=next_status,
  1047. error_code=error_code,
  1048. error_message=error_message)
  1049. self._log_event(
  1050. run_id=run_id,
  1051. node_run_id=None,
  1052. event_type="run_status_synced",
  1053. message=f"workflow run status synced to {next_status}",
  1054. detail_json={
  1055. "status": next_status,
  1056. "error_code": error_code,
  1057. })
  1058. def _derive_run_status(
  1059. self,
  1060. node_runs: list[NodeRun]) -> tuple[WorkflowRunStatus, str | None, str | None]:
  1061. statuses = {node_run.status for node_run in node_runs}
  1062. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  1063. if statuses.intersection(active_statuses):
  1064. return "running", None, None
  1065. if "failed" in statuses:
  1066. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  1067. error_code = failed_node.error_code if failed_node is not None else None
  1068. error_message = failed_node.error_message if failed_node is not None else None
  1069. return "failed", error_code, error_message
  1070. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  1071. if statuses and statuses.issubset(terminal_statuses):
  1072. return "completed", None, None
  1073. return "running", None, None
  1074. def _log_event(
  1075. self,
  1076. *,
  1077. run_id: str,
  1078. node_run_id: str | None,
  1079. event_type: str,
  1080. message: str,
  1081. detail_json: dict[str, JSONValue] | None,
  1082. level: str = "info") -> None:
  1083. self.execution_log_repository.create(
  1084. run_id=run_id,
  1085. node_run_id=node_run_id,
  1086. event_type=event_type,
  1087. level=level,
  1088. message=message,
  1089. detail_json=detail_json)
  1090. def _publish_event(
  1091. self,
  1092. *,
  1093. event_type: str,
  1094. payload_json: dict[str, JSONValue],
  1095. workflow_run: WorkflowRun | None = None,
  1096. node_run: NodeRun | None = None) -> None:
  1097. if self.event_client is None:
  1098. return
  1099. aggregate_id = workflow_run.id if workflow_run is not None else None
  1100. if aggregate_id is None and node_run is not None:
  1101. aggregate_id = node_run.id
  1102. aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
  1103. correlation_id = workflow_run.session_id if workflow_run is not None else None
  1104. if correlation_id is None and node_run is not None:
  1105. correlation_id = node_run.run_id
  1106. try:
  1107. self.event_client.publish_event(
  1108. EventPublishContract(
  1109. event_type=event_type,
  1110. source_service="runtime-service",
  1111. aggregate_type=aggregate_type,
  1112. aggregate_id=aggregate_id,
  1113. correlation_id=correlation_id,
  1114. payload_json=payload_json)
  1115. )
  1116. except EventServiceClientError:
  1117. return
  1118. def _publish_node_run_to_queue(self, node_run: NodeRun) -> None:
  1119. if node_run.status != "queued" or self.task_queue_publisher is None:
  1120. return
  1121. self.task_queue_publisher.publish_runtime_node_run(
  1122. node_run_id=node_run.id)
  1123. def build_runtime_application_service(
  1124. *,
  1125. db: Session,
  1126. settings: RuntimeServiceSettings) -> RuntimeApplicationService:
  1127. redis_client = try_build_redis_client(settings.redis_url)
  1128. return RuntimeApplicationService(
  1129. workflow_run_repository=WorkflowRunRepository(db),
  1130. node_run_repository=NodeRunRepository(db),
  1131. execution_log_repository=ExecutionLogRepository(db),
  1132. node_artifact_repository=NodeArtifactRepository(db),
  1133. trace_span_repository=TraceSpanRepository(db),
  1134. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  1135. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  1136. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  1137. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  1138. human_client=HumanServiceClient(
  1139. base_url=settings.human_service_url,
  1140. timeout_seconds=settings.human_service_timeout_seconds),
  1141. knowledge_client=KnowledgeServiceClient(
  1142. base_url=settings.knowledge_service_url,
  1143. timeout_seconds=settings.knowledge_service_timeout_seconds)),
  1144. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  1145. event_client=EventServiceClient(
  1146. base_url=settings.event_service_url,
  1147. timeout_seconds=settings.event_service_timeout_seconds),
  1148. task_queue_publisher=(
  1149. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1150. ))