services.py 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. from dataclasses import dataclass
  2. from datetime import datetime, timedelta
  3. from core_domain import (
  4. InitialNodeContract,
  5. NodeExecutionContextContract,
  6. NodeExecutionResultContract,
  7. NodeRunStatus,
  8. WorkflowRunStatus,
  9. WorkflowConfigContract,
  10. )
  11. from core_dsl import parse_workflow_definition
  12. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  13. from core_shared import JSONValue, try_build_redis_client
  14. from core_shared.task_queue import TaskQueuePublisher
  15. from sqlalchemy.orm import Session
  16. from app.bootstrap.settings import RuntimeServiceSettings
  17. from app.db.models import ExecutionLog, NodeArtifact, NodeRun, TraceSpan, WorkflowRun
  18. from app.domain.repositories import (
  19. ExecutionLogRepository,
  20. NodeArtifactRepository,
  21. NodeRunRepository,
  22. TraceSpanRepository,
  23. WorkflowRunRepository,
  24. )
  25. from app.infrastructure.code_runner_client import CodeRunnerClient
  26. from app.infrastructure.executors import (
  27. NodeExecutionDispatcher,
  28. build_node_execution_dispatcher_with_clients,
  29. )
  30. from app.infrastructure.human_client import HumanServiceClient
  31. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  32. from app.infrastructure.model_gateway_client import ModelGatewayClient
  33. from app.infrastructure.planner import (
  34. derive_initial_node,
  35. derive_node_config,
  36. derive_successor_nodes,
  37. )
  38. from app.infrastructure.tool_client import ToolServiceClient
  39. from app.infrastructure.workflow_client import WorkflowServiceClient
  40. from app.schemas.run import (
  41. HumanNodeResumeRequest,
  42. NodeRunExecuteRequest,
  43. NodeRunStatusUpdateRequest,
  44. RunCreateRequest,
  45. RunExecuteRequest,
  46. RuntimeDebugContinueRequest,
  47. WorkflowRunStatusUpdateRequest,
  48. )
  49. @dataclass(frozen=True)
  50. class RuntimeDebugSnapshot:
  51. run: WorkflowRun
  52. node_runs: list[NodeRun]
  53. run_state_json: dict[str, JSONValue]
  54. node_output_json_by_node_id: dict[str, dict[str, JSONValue]]
  55. node_output_text_by_node_id: dict[str, str]
  56. queued_node_ids: list[str]
  57. running_node_ids: list[str]
  58. completed_node_ids: list[str]
  59. failed_node_ids: list[str]
  60. execution_logs: list[ExecutionLog]
  61. node_artifacts: list[NodeArtifact]
  62. trace_spans: list[TraceSpan]
  63. class RuntimeApplicationService:
  64. def __init__(
  65. self,
  66. workflow_run_repository: WorkflowRunRepository,
  67. node_run_repository: NodeRunRepository,
  68. execution_log_repository: ExecutionLogRepository,
  69. node_artifact_repository: NodeArtifactRepository,
  70. trace_span_repository: TraceSpanRepository,
  71. execution_dispatcher: NodeExecutionDispatcher,
  72. workflow_client: WorkflowServiceClient | None = None,
  73. event_client: EventServiceClient | None = None,
  74. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  75. self.workflow_run_repository = workflow_run_repository
  76. self.node_run_repository = node_run_repository
  77. self.execution_log_repository = execution_log_repository
  78. self.node_artifact_repository = node_artifact_repository
  79. self.trace_span_repository = trace_span_repository
  80. self.execution_dispatcher = execution_dispatcher
  81. self.workflow_client = workflow_client
  82. self.event_client = event_client
  83. self.task_queue_publisher = task_queue_publisher
  84. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  85. initial_node = payload.initial_node or self._plan_initial_node(payload)
  86. workflow_run = self.workflow_run_repository.create(
  87. app_id=payload.app_id,
  88. app_config_id=payload.app_config_id,
  89. workflow_id=payload.workflow_id,
  90. workflow_config_id=payload.workflow_config_id,
  91. session_id=payload.session_id,
  92. parent_run_id=payload.parent_run_id,
  93. root_run_id=payload.root_run_id,
  94. run_type=payload.run_type,
  95. trigger_type=payload.trigger_type,
  96. priority=payload.priority)
  97. node_run = None
  98. if initial_node is not None:
  99. self.workflow_run_repository.update_node_count(
  100. run_id=workflow_run.id,
  101. current_node_count=1)
  102. initial_config = self._resolve_node_config(
  103. workflow_config_id=payload.workflow_config_id,
  104. node_id=initial_node.node_id)
  105. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  106. node_run = self.node_run_repository.create(
  107. run_id=workflow_run.id,
  108. node_id=initial_node.node_id,
  109. node_type=initial_node.node_type,
  110. status=initial_node.status,
  111. scheduled_time=scheduled_time,
  112. timeout_time=timeout_time)
  113. self._log_event(
  114. run_id=workflow_run.id,
  115. node_run_id=node_run.id,
  116. event_type="node_queued",
  117. message=f"initial node queued: {initial_node.node_id}",
  118. detail_json={
  119. "node_id": initial_node.node_id,
  120. "node_type": initial_node.node_type,
  121. "status": initial_node.status,
  122. })
  123. self._publish_node_run_to_queue(node_run)
  124. self._log_event(
  125. run_id=workflow_run.id,
  126. node_run_id=node_run.id if node_run is not None else None,
  127. event_type="run_created",
  128. message="workflow run created",
  129. detail_json={
  130. "workflow_id": payload.workflow_id,
  131. "workflow_config_id": payload.workflow_config_id,
  132. "session_id": payload.session_id,
  133. })
  134. self._publish_event(
  135. event_type="workflow.run.created",
  136. workflow_run=workflow_run,
  137. payload_json={
  138. "run_id": workflow_run.id,
  139. "workflow_id": workflow_run.workflow_id,
  140. "workflow_config_id": workflow_run.workflow_config_id,
  141. "status": workflow_run.status,
  142. })
  143. return workflow_run, node_run
  144. def list_runs(
  145. self,
  146. session_id: str | None = None,
  147. *,
  148. limit: int = 50) -> list[WorkflowRun]:
  149. return self.workflow_run_repository.list_by_scope(
  150. session_id=session_id,
  151. limit=limit)
  152. def list_node_runs(self, run_id: str) -> list[NodeRun]:
  153. return self.node_run_repository.list_by_run(run_id=run_id)
  154. def list_execution_logs(
  155. self,
  156. run_id: str | None = None,
  157. node_run_id: str | None = None):
  158. return self.execution_log_repository.list_by_scope(
  159. run_id=run_id,
  160. node_run_id=node_run_id)
  161. def list_node_artifacts(
  162. self,
  163. run_id: str | None = None,
  164. node_run_id: str | None = None,
  165. artifact_type: str | None = None):
  166. return self.node_artifact_repository.list_by_scope(
  167. run_id=run_id,
  168. node_run_id=node_run_id,
  169. artifact_type=artifact_type)
  170. def list_trace_spans(
  171. self,
  172. run_id: str | None = None,
  173. node_run_id: str | None = None,
  174. span_type: str | None = None):
  175. return self.trace_span_repository.list_by_scope(
  176. run_id=run_id,
  177. node_run_id=node_run_id,
  178. span_type=span_type)
  179. def get_debug_snapshot(
  180. self,
  181. *,
  182. run_id: str) -> RuntimeDebugSnapshot | None:
  183. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  184. if workflow_run is None:
  185. return None
  186. return self._build_debug_snapshot(workflow_run)
  187. def update_run_status(
  188. self,
  189. run_id: str,
  190. payload: WorkflowRunStatusUpdateRequest) -> WorkflowRun | None:
  191. entity = self.workflow_run_repository.update_status(
  192. run_id=run_id,
  193. status=payload.status,
  194. error_code=payload.error_code,
  195. error_message=payload.error_message)
  196. if entity is not None:
  197. self._publish_event(
  198. event_type=f"workflow.run.{entity.status}",
  199. workflow_run=entity,
  200. payload_json={
  201. "run_id": entity.id,
  202. "status": entity.status,
  203. "error_code": entity.error_code,
  204. })
  205. return entity
  206. def update_node_run_status(
  207. self,
  208. node_run_id: str,
  209. payload: NodeRunStatusUpdateRequest) -> NodeRun | None:
  210. node_run = self.node_run_repository.update_status(
  211. node_run_id=node_run_id,
  212. status=payload.status,
  213. worker_key=payload.worker_key,
  214. error_code=payload.error_code,
  215. error_message=payload.error_message,
  216. output_text=payload.output_text,
  217. output_json=payload.output_json)
  218. if node_run is None:
  219. return None
  220. self._log_event(
  221. run_id=node_run.run_id,
  222. node_run_id=node_run.id,
  223. event_type="node_status_updated",
  224. message=f"node status updated to {payload.status}",
  225. detail_json={
  226. "node_id": node_run.node_id,
  227. "node_type": node_run.node_type,
  228. "status": payload.status,
  229. "error_code": payload.error_code,
  230. })
  231. self._publish_event(
  232. event_type=f"workflow.node.{node_run.status}",
  233. workflow_run=None,
  234. node_run=node_run,
  235. payload_json={
  236. "run_id": node_run.run_id,
  237. "node_run_id": node_run.id,
  238. "node_id": node_run.node_id,
  239. "node_type": node_run.node_type,
  240. "status": node_run.status,
  241. "error_code": node_run.error_code,
  242. })
  243. if payload.status == "completed":
  244. self._schedule_successor_nodes(node_run)
  245. if payload.status == "failed":
  246. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  247. if workflow_run is not None:
  248. node_config = self._resolve_node_config(
  249. workflow_config_id=workflow_run.workflow_config_id,
  250. node_id=node_run.node_id)
  251. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  252. self._sync_workflow_run_status_from_nodes(
  253. run_id=node_run.run_id)
  254. return node_run
  255. def execute_node_run(
  256. self,
  257. node_run_id: str,
  258. payload: NodeRunExecuteRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
  259. node_run = self.node_run_repository.get_by_id(node_run_id)
  260. if node_run is None:
  261. return None
  262. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  263. if workflow_run is None:
  264. return None
  265. if workflow_run.status == "paused":
  266. return workflow_run, node_run, "debug_paused"
  267. if node_run.status in {"completed", "failed", "skipped"}:
  268. executor_name = self.execution_dispatcher.resolve_executor(
  269. node_run.node_type
  270. ).executor_name
  271. return workflow_run, node_run, executor_name
  272. node_config = self._resolve_node_config(
  273. workflow_config_id=workflow_run.workflow_config_id,
  274. node_id=node_run.node_id)
  275. if self._node_has_timed_out(node_run):
  276. timed_out_node_run = self.update_node_run_status(
  277. node_run_id=node_run.id,
  278. payload=NodeRunStatusUpdateRequest(
  279. status="failed",
  280. worker_key=payload.worker_key,
  281. error_code="node_timeout",
  282. error_message=f"node timed out: {node_run.node_id}",
  283. output_json={
  284. "timeout_time": node_run.timeout_time.isoformat()
  285. if node_run.timeout_time is not None
  286. else None,
  287. }))
  288. if timed_out_node_run is None:
  289. return None
  290. executor_name = self.execution_dispatcher.resolve_executor(
  291. node_run.node_type
  292. ).executor_name
  293. return workflow_run, timed_out_node_run, executor_name
  294. running_node_run = self.node_run_repository.update_status(
  295. node_run_id=node_run_id,
  296. status="running",
  297. worker_key=payload.worker_key)
  298. if running_node_run is None:
  299. return None
  300. self._log_event(
  301. run_id=running_node_run.run_id,
  302. node_run_id=running_node_run.id,
  303. event_type="node_execution_started",
  304. message=f"executing node {running_node_run.node_id}",
  305. detail_json={
  306. "node_id": running_node_run.node_id,
  307. "node_type": running_node_run.node_type,
  308. "worker_key": payload.worker_key,
  309. })
  310. context = self._build_execution_context(
  311. workflow_run=workflow_run,
  312. node_run=running_node_run,
  313. worker_key=payload.worker_key,
  314. node_config_json=node_config)
  315. executor_name = self.execution_dispatcher.resolve_executor(
  316. running_node_run.node_type
  317. ).executor_name
  318. trace_span = self.trace_span_repository.start(
  319. run_id=running_node_run.run_id,
  320. node_run_id=running_node_run.id,
  321. parent_span_id=None,
  322. span_type="node_execution",
  323. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  324. attributes_json={
  325. "node_id": running_node_run.node_id,
  326. "node_type": running_node_run.node_type,
  327. "executor_name": executor_name,
  328. "worker_key": payload.worker_key,
  329. })
  330. try:
  331. result, executor_name = self.execution_dispatcher.execute(
  332. context=context,
  333. request=payload)
  334. except Exception as exc:
  335. result = NodeExecutionResultContract(
  336. status="failed",
  337. worker_key=payload.worker_key,
  338. error_code="executor_error",
  339. error_message=str(exc))
  340. if result.status == "failed" and self._should_retry_node(
  341. node_run=running_node_run,
  342. node_config_json=context.node_config_json):
  343. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  344. retried_node_run = self.node_run_repository.requeue_for_retry(
  345. node_run_id=running_node_run.id,
  346. scheduled_time=retry_time,
  347. timeout_time=retry_timeout_time,
  348. error_code=result.error_code,
  349. error_message=result.error_message,
  350. output_text=result.output_text,
  351. output_json={
  352. **(result.output_json or {}),
  353. "retry_scheduled_time": retry_time.isoformat(),
  354. "retry_reason": result.error_code or "node_failed",
  355. })
  356. if retried_node_run is None:
  357. return None
  358. self._publish_node_run_to_queue(retried_node_run)
  359. self.trace_span_repository.finish(
  360. span_id=trace_span.id,
  361. status="error",
  362. error_code=result.error_code,
  363. error_message=result.error_message,
  364. attributes_json={
  365. "node_status": "queued",
  366. "executor_name": executor_name,
  367. "retry_scheduled": True,
  368. "attempt_no": retried_node_run.attempt_no,
  369. })
  370. self._log_event(
  371. run_id=retried_node_run.run_id,
  372. node_run_id=retried_node_run.id,
  373. event_type="node_retry_scheduled",
  374. message=f"node retry scheduled: {retried_node_run.node_id}",
  375. detail_json={
  376. "node_id": retried_node_run.node_id,
  377. "attempt_no": retried_node_run.attempt_no,
  378. "scheduled_time": retry_time.isoformat(),
  379. "error_code": result.error_code,
  380. })
  381. self._sync_workflow_run_status_from_nodes(
  382. run_id=retried_node_run.run_id)
  383. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  384. if workflow_run is None:
  385. return None
  386. return workflow_run, retried_node_run, executor_name
  387. final_node_run = self.update_node_run_status(
  388. node_run_id=running_node_run.id,
  389. payload=NodeRunStatusUpdateRequest(
  390. status=result.status,
  391. worker_key=result.worker_key,
  392. error_code=result.error_code,
  393. error_message=result.error_message,
  394. output_text=result.output_text,
  395. output_json=result.output_json))
  396. if final_node_run is None:
  397. return None
  398. self.trace_span_repository.finish(
  399. span_id=trace_span.id,
  400. status="ok" if final_node_run.status == "completed" else "error",
  401. error_code=final_node_run.error_code,
  402. error_message=final_node_run.error_message,
  403. attributes_json={
  404. "node_status": final_node_run.status,
  405. "executor_name": executor_name,
  406. "has_output_text": final_node_run.output_text is not None,
  407. "has_output_json": final_node_run.output_json is not None,
  408. })
  409. self._persist_node_execution_artifact(final_node_run)
  410. self._log_event(
  411. run_id=final_node_run.run_id,
  412. node_run_id=final_node_run.id,
  413. event_type="node_execution_finished",
  414. message=f"node execution finished with status {final_node_run.status}",
  415. detail_json={
  416. "node_id": final_node_run.node_id,
  417. "node_type": final_node_run.node_type,
  418. "executor_name": executor_name,
  419. "status": final_node_run.status,
  420. })
  421. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  422. if workflow_run is None:
  423. return None
  424. return workflow_run, final_node_run, executor_name
  425. def execute_next_node_run(
  426. self,
  427. run_id: str,
  428. payload: NodeRunExecuteRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
  429. next_node_run = self.node_run_repository.get_next_queued_by_run(
  430. run_id=run_id)
  431. if next_node_run is None:
  432. return None
  433. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  434. def execute_run(
  435. self,
  436. run_id: str,
  437. payload: RunExecuteRequest) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  438. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  439. if workflow_run is None:
  440. return None
  441. executed_node_runs: list[NodeRun] = []
  442. executor_names: list[str] = []
  443. for _ in range(payload.max_steps):
  444. step_result = self.execute_next_node_run(
  445. run_id=run_id,
  446. payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
  447. if step_result is None:
  448. break
  449. workflow_run, node_run, executor_name = step_result
  450. executed_node_runs.append(node_run)
  451. executor_names.append(executor_name)
  452. if node_run.status != "completed":
  453. break
  454. final_run = self.workflow_run_repository.get_by_id(run_id)
  455. if final_run is None:
  456. return None
  457. return final_run, executed_node_runs, executor_names
  458. def pause_run(
  459. self,
  460. *,
  461. run_id: str,
  462. reason: str = "debug_pause") -> RuntimeDebugSnapshot | None:
  463. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  464. if workflow_run is None:
  465. return None
  466. self._sync_workflow_run_status_from_nodes(run_id=run_id)
  467. latest_run = self.workflow_run_repository.get_by_id(run_id)
  468. if latest_run is None:
  469. return None
  470. if latest_run.status in {"completed", "failed", "cancelled"}:
  471. paused_run = latest_run
  472. else:
  473. paused_run = self.workflow_run_repository.update_status(
  474. run_id=run_id,
  475. status="paused")
  476. if paused_run is None:
  477. return None
  478. self._log_event(
  479. run_id=run_id,
  480. node_run_id=None,
  481. event_type="debug_run_paused",
  482. message=f"workflow run paused: {reason}",
  483. detail_json={"reason": reason})
  484. return self._build_debug_snapshot(paused_run)
  485. def resume_run(
  486. self,
  487. *,
  488. run_id: str,
  489. reason: str = "debug_resume") -> RuntimeDebugSnapshot | None:
  490. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  491. if workflow_run is None:
  492. return None
  493. resumed_run = self.workflow_run_repository.update_status(
  494. run_id=run_id,
  495. status="running")
  496. if resumed_run is None:
  497. return None
  498. self._log_event(
  499. run_id=run_id,
  500. node_run_id=None,
  501. event_type="debug_run_resumed",
  502. message=f"workflow run resumed: {reason}",
  503. detail_json={"reason": reason})
  504. return self._build_debug_snapshot(resumed_run)
  505. def step_debug_run(
  506. self,
  507. *,
  508. run_id: str,
  509. worker_key: str | None = None) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str] | None:
  510. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  511. if workflow_run is None:
  512. return None
  513. if workflow_run.status == "paused":
  514. self.workflow_run_repository.update_status(run_id=run_id, status="running")
  515. result = self.execute_next_node_run(
  516. run_id=run_id,
  517. payload=NodeRunExecuteRequest(worker_key=worker_key))
  518. executed_node_runs: list[NodeRun] = []
  519. executor_names: list[str] = []
  520. reason = "no_queued_node"
  521. if result is not None:
  522. _, node_run, executor_name = result
  523. executed_node_runs.append(node_run)
  524. executor_names.append(executor_name)
  525. reason = "step_completed"
  526. self._sync_workflow_run_status_from_nodes(run_id=run_id)
  527. latest_run = self.workflow_run_repository.get_by_id(run_id)
  528. if latest_run is None:
  529. return None
  530. if latest_run.status in {"completed", "failed", "cancelled"}:
  531. paused_run = latest_run
  532. else:
  533. paused_run = self.workflow_run_repository.update_status(
  534. run_id=run_id,
  535. status="paused")
  536. if paused_run is None:
  537. return None
  538. self._log_event(
  539. run_id=run_id,
  540. node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
  541. event_type="debug_step_finished",
  542. message=f"debug step finished: {reason}",
  543. detail_json={
  544. "reason": reason,
  545. "executed_node_ids": [item.node_id for item in executed_node_runs],
  546. })
  547. return self._build_debug_snapshot(paused_run), executed_node_runs, executor_names, reason
  548. def continue_debug_run(
  549. self,
  550. *,
  551. run_id: str,
  552. payload: RuntimeDebugContinueRequest) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str | None, str] | None:
  553. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  554. if workflow_run is None:
  555. return None
  556. if workflow_run.status == "paused":
  557. self.workflow_run_repository.update_status(run_id=run_id, status="running")
  558. breakpoint_node_ids = set(payload.breakpoint_node_ids)
  559. executed_node_runs: list[NodeRun] = []
  560. executor_names: list[str] = []
  561. paused_before_node_id: str | None = None
  562. reason = "completed"
  563. for _ in range(payload.max_steps):
  564. next_node_run = self.node_run_repository.get_next_queued_by_run(
  565. run_id=run_id)
  566. if next_node_run is None:
  567. reason = "no_queued_node"
  568. break
  569. if next_node_run.node_id in breakpoint_node_ids:
  570. paused_before_node_id = next_node_run.node_id
  571. reason = "breakpoint_hit"
  572. break
  573. step_result = self.execute_node_run(
  574. node_run_id=next_node_run.id,
  575. payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
  576. if step_result is None:
  577. reason = "node_not_found"
  578. break
  579. _, node_run, executor_name = step_result
  580. executed_node_runs.append(node_run)
  581. executor_names.append(executor_name)
  582. if node_run.status != "completed":
  583. reason = f"node_{node_run.status}"
  584. break
  585. else:
  586. reason = "max_steps_reached"
  587. self._sync_workflow_run_status_from_nodes(run_id=run_id)
  588. latest_run = self.workflow_run_repository.get_by_id(run_id)
  589. if latest_run is None:
  590. return None
  591. if latest_run.status in {"completed", "failed", "cancelled"}:
  592. paused_run = latest_run
  593. else:
  594. paused_run = self.workflow_run_repository.update_status(
  595. run_id=run_id,
  596. status="paused")
  597. if paused_run is None:
  598. return None
  599. self._log_event(
  600. run_id=run_id,
  601. node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
  602. event_type="debug_continue_paused",
  603. message=f"debug continue paused: {reason}",
  604. detail_json={
  605. "reason": reason,
  606. "paused_before_node_id": paused_before_node_id,
  607. "executed_node_ids": [item.node_id for item in executed_node_runs],
  608. "breakpoint_node_ids": list(breakpoint_node_ids),
  609. })
  610. return (
  611. self._build_debug_snapshot(paused_run),
  612. executed_node_runs,
  613. executor_names,
  614. paused_before_node_id,
  615. reason)
  616. def resume_human_node_run(
  617. self,
  618. *,
  619. node_run_id: str,
  620. payload: HumanNodeResumeRequest) -> tuple[WorkflowRun, NodeRun, str] | None:
  621. node_run = self.node_run_repository.get_by_id(node_run_id)
  622. if node_run is None:
  623. return None
  624. output_json = dict(node_run.output_json or {})
  625. existing_human_task_id = output_json.get("human_task_id")
  626. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  627. return None
  628. if existing_human_task_id is None:
  629. output_json["human_task_id"] = payload.human_task_id
  630. self.node_run_repository.update_status(
  631. node_run_id=node_run.id,
  632. status="pending",
  633. worker_key=payload.worker_key,
  634. output_json=output_json)
  635. return self.execute_node_run(
  636. node_run_id=node_run_id,
  637. payload=NodeRunExecuteRequest(worker_key=payload.worker_key))
  638. def execute_next_claimed_node_run(
  639. self,
  640. *,
  641. worker_key: str,
  642. lease_seconds: int,
  643. redis_client: object | None = None) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  644. released_lease_count = self.node_run_repository.release_expired_leases(
  645. now_time=datetime.utcnow())
  646. claimed_node_run = self.node_run_repository.claim_next_queued(
  647. worker_key=worker_key,
  648. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  649. if claimed_node_run is None:
  650. return None
  651. if redis_client is not None:
  652. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  653. lock = DistributedLock(
  654. client=redis_client,
  655. name=f"node-run:{claimed_node_run.id}:lock",
  656. ttl_seconds=lease_seconds)
  657. if not lock.acquire():
  658. return None
  659. idempotency_store = IdempotencyStore(
  660. client=redis_client,
  661. prefix="node-run-idempotency")
  662. if not idempotency_store.begin(key=claimed_node_run.id):
  663. lock.release()
  664. return None
  665. else:
  666. lock = None
  667. idempotency_store = None
  668. try:
  669. result = self.execute_node_run(
  670. node_run_id=claimed_node_run.id,
  671. payload=NodeRunExecuteRequest(worker_key=worker_key))
  672. if idempotency_store is not None and result is not None:
  673. _, node_run, executor_name = result
  674. idempotency_store.complete(
  675. key=claimed_node_run.id,
  676. result={
  677. "status": node_run.status,
  678. "node_run_id": node_run.id,
  679. "executor_name": executor_name,
  680. })
  681. finally:
  682. if lock is not None:
  683. lock.release()
  684. if result is None:
  685. return None
  686. workflow_run, node_run, executor_name = result
  687. return workflow_run, node_run, executor_name, released_lease_count
  688. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  689. if node_run.output_text is None and node_run.output_json is None:
  690. return
  691. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  692. self.node_artifact_repository.create(
  693. run_id=node_run.run_id,
  694. node_run_id=node_run.id,
  695. node_id=node_run.node_id,
  696. artifact_type="execution_result",
  697. name=f"{node_run.node_id}-execution-result",
  698. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  699. content_text=node_run.output_text,
  700. content_json=node_run.output_json,
  701. size_bytes=size_bytes)
  702. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  703. if self.workflow_client is None:
  704. return None
  705. workflow_config = self.workflow_client.get_workflow_config(
  706. workflow_config_id=payload.workflow_config_id)
  707. return derive_initial_node(workflow_config)
  708. def _resolve_node_config(
  709. self,
  710. *,
  711. workflow_config_id: str,
  712. node_id: str) -> dict[str, JSONValue]:
  713. if self.workflow_client is None:
  714. return {}
  715. workflow_config = self.workflow_client.get_workflow_config(
  716. workflow_config_id=workflow_config_id)
  717. return derive_node_config(workflow_config, node_id)
  718. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  719. if self.workflow_client is None:
  720. return
  721. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  722. if workflow_run is None:
  723. return
  724. workflow_config = self.workflow_client.get_workflow_config(
  725. workflow_config_id=workflow_run.workflow_config_id)
  726. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  727. self._build_run_state_maps(
  728. run_id=node_run.run_id)
  729. )
  730. successor_nodes = derive_successor_nodes(
  731. workflow_config,
  732. node_run.node_id,
  733. current_output_json=node_run.output_json,
  734. run_state_json=run_state_json,
  735. node_output_json_by_node_id=node_output_json_by_node_id,
  736. node_output_text_by_node_id=node_output_text_by_node_id)
  737. if not successor_nodes:
  738. return
  739. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  740. run_id=node_run.run_id,
  741. node_ids=[item.node_id for item in successor_nodes])
  742. existing_node_counts: dict[str, int] = {}
  743. for item in existing_nodes:
  744. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  745. for successor in successor_nodes:
  746. successor_config = derive_node_config(workflow_config, successor.node_id)
  747. if not self._is_join_ready(
  748. workflow_config=workflow_config,
  749. run_node_runs=self.node_run_repository.list_by_run(
  750. run_id=node_run.run_id),
  751. successor_node_id=successor.node_id,
  752. successor_node_type=successor.node_type,
  753. successor_config=successor_config):
  754. self._log_event(
  755. run_id=node_run.run_id,
  756. node_run_id=None,
  757. event_type="join_waiting",
  758. message=f"join node waiting for predecessors: {successor.node_id}",
  759. detail_json={
  760. "node_id": successor.node_id,
  761. "source_node_id": node_run.node_id,
  762. })
  763. continue
  764. if not self._can_schedule_repeated_node(
  765. successor_config,
  766. existing_count=existing_node_counts.get(successor.node_id, 0)):
  767. continue
  768. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  769. created = self.node_run_repository.create(
  770. run_id=node_run.run_id,
  771. parent_node_run_id=node_run.id,
  772. node_id=successor.node_id,
  773. node_type=successor.node_type,
  774. status=successor.status,
  775. scheduled_time=scheduled_time,
  776. timeout_time=timeout_time)
  777. self._publish_node_run_to_queue(created)
  778. existing_node_counts[successor.node_id] = (
  779. existing_node_counts.get(successor.node_id, 0) + 1
  780. )
  781. self._log_event(
  782. run_id=node_run.run_id,
  783. node_run_id=None,
  784. event_type="node_queued",
  785. message=f"successor node queued: {successor.node_id}",
  786. detail_json={
  787. "node_id": successor.node_id,
  788. "node_type": successor.node_type,
  789. "status": successor.status,
  790. "source_node_id": node_run.node_id,
  791. })
  792. def _build_execution_context(
  793. self,
  794. *,
  795. workflow_run: WorkflowRun,
  796. node_run: NodeRun,
  797. worker_key: str | None,
  798. node_config_json: dict[str, JSONValue] | None = None) -> NodeExecutionContextContract:
  799. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  800. self._build_run_state_maps(
  801. run_id=node_run.run_id)
  802. )
  803. return NodeExecutionContextContract(
  804. run_id=node_run.run_id,
  805. node_run_id=node_run.id,
  806. node_id=node_run.node_id,
  807. node_type=node_run.node_type,
  808. node_config_json=node_config_json
  809. if node_config_json is not None
  810. else self._resolve_node_config(
  811. workflow_config_id=workflow_run.workflow_config_id,
  812. node_id=node_run.node_id),
  813. run_state_json=run_state_json,
  814. node_output_json_by_node_id=node_output_json_by_node_id,
  815. node_output_text_by_node_id=node_output_text_by_node_id,
  816. worker_key=worker_key)
  817. def _build_run_state_maps(
  818. self,
  819. *,
  820. run_id: str) -> tuple[
  821. dict[str, JSONValue],
  822. dict[str, dict[str, JSONValue]],
  823. dict[str, str],
  824. ]:
  825. node_runs = self.node_run_repository.list_by_run(run_id=run_id)
  826. run_state_json: dict[str, JSONValue] = {}
  827. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  828. node_output_text_by_node_id: dict[str, str] = {}
  829. for item in node_runs:
  830. if item.output_json is not None:
  831. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  832. state_updates = item.output_json.get("state_updates")
  833. if isinstance(state_updates, dict):
  834. for state_key, state_value in state_updates.items():
  835. run_state_json[str(state_key)] = state_value
  836. if item.output_text is not None:
  837. node_output_text_by_node_id[item.node_id] = item.output_text
  838. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  839. def _build_debug_snapshot(self, workflow_run: WorkflowRun) -> RuntimeDebugSnapshot:
  840. node_runs = self.node_run_repository.list_by_run(
  841. run_id=workflow_run.id)
  842. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  843. self._build_run_state_maps(
  844. run_id=workflow_run.id)
  845. )
  846. return RuntimeDebugSnapshot(
  847. run=workflow_run,
  848. node_runs=node_runs,
  849. run_state_json=run_state_json,
  850. node_output_json_by_node_id=node_output_json_by_node_id,
  851. node_output_text_by_node_id=node_output_text_by_node_id,
  852. queued_node_ids=[
  853. item.node_id for item in node_runs if item.status in {"pending", "queued"}
  854. ],
  855. running_node_ids=[item.node_id for item in node_runs if item.status == "running"],
  856. completed_node_ids=[
  857. item.node_id for item in node_runs if item.status in {"completed", "skipped"}
  858. ],
  859. failed_node_ids=[item.node_id for item in node_runs if item.status == "failed"],
  860. execution_logs=self.execution_log_repository.list_by_scope(
  861. run_id=workflow_run.id),
  862. node_artifacts=self.node_artifact_repository.list_by_scope(
  863. run_id=workflow_run.id),
  864. trace_spans=self.trace_span_repository.list_by_scope(
  865. run_id=workflow_run.id))
  866. def _build_node_timing(
  867. self,
  868. node_config_json: dict[str, JSONValue]) -> tuple[datetime, datetime | None]:
  869. now = datetime.utcnow()
  870. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  871. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  872. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  873. timeout_time = (
  874. scheduled_time + timedelta(seconds=timeout_seconds)
  875. if timeout_seconds > 0
  876. else None
  877. )
  878. return scheduled_time, timeout_time
  879. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  880. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  881. def _should_retry_node(
  882. self,
  883. *,
  884. node_run: NodeRun,
  885. node_config_json: dict[str, JSONValue]) -> bool:
  886. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  887. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  888. return max_attempts > node_run.attempt_no
  889. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  890. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  891. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  892. def _build_retry_timing(
  893. self,
  894. node_config_json: dict[str, JSONValue]) -> tuple[datetime, datetime | None]:
  895. retry_time = datetime.utcnow() + timedelta(
  896. seconds=self._read_retry_delay_seconds(node_config_json)
  897. )
  898. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  899. timeout_time = (
  900. retry_time + timedelta(seconds=timeout_seconds)
  901. if timeout_seconds > 0
  902. else None
  903. )
  904. return retry_time, timeout_time
  905. def _is_join_ready(
  906. self,
  907. *,
  908. workflow_config: WorkflowConfigContract,
  909. run_node_runs: list[NodeRun],
  910. successor_node_id: str,
  911. successor_node_type: str,
  912. successor_config: dict[str, JSONValue]) -> bool:
  913. join_policy = self._read_string_value(successor_config, "join_policy")
  914. if join_policy is None and successor_node_type != "join":
  915. return True
  916. workflow = self._parse_workflow(workflow_config)
  917. if workflow is None:
  918. return True
  919. predecessor_ids = [
  920. edge.source for edge in workflow.edges if edge.target == successor_node_id
  921. ]
  922. if not predecessor_ids:
  923. return True
  924. completed_node_ids = {
  925. item.node_id
  926. for item in run_node_runs
  927. if item.status in {"completed", "skipped"}
  928. }
  929. if join_policy in {None, "all_completed"}:
  930. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  931. if join_policy == "any_completed":
  932. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  933. return True
  934. def _can_schedule_repeated_node(
  935. self,
  936. node_config_json: dict[str, JSONValue],
  937. *,
  938. existing_count: int) -> bool:
  939. if existing_count == 0:
  940. return True
  941. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  942. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  943. return allow_loop and existing_count < max_iterations
  944. def _schedule_compensation_node(
  945. self,
  946. *,
  947. node_run: NodeRun,
  948. node_config: dict[str, JSONValue]) -> None:
  949. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  950. if compensation_node_id is None:
  951. compensation_config = self._read_dict_value(node_config, "compensation")
  952. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  953. if compensation_node_id is None:
  954. return
  955. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  956. if workflow_run is None:
  957. return
  958. compensation_config = self._resolve_node_config(
  959. workflow_config_id=workflow_run.workflow_config_id,
  960. node_id=compensation_node_id)
  961. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  962. run_id=node_run.run_id,
  963. node_ids=[compensation_node_id])
  964. if existing_nodes and not self._can_schedule_repeated_node(
  965. compensation_config,
  966. existing_count=len(existing_nodes)):
  967. return
  968. compensation_node_type = self._resolve_workflow_node_type(
  969. workflow_config_id=workflow_run.workflow_config_id,
  970. node_id=compensation_node_id) or "compensation"
  971. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  972. created = self.node_run_repository.create(
  973. run_id=node_run.run_id,
  974. parent_node_run_id=node_run.id,
  975. node_id=compensation_node_id,
  976. node_type=compensation_node_type,
  977. status="queued",
  978. scheduled_time=scheduled_time,
  979. timeout_time=timeout_time)
  980. self._publish_node_run_to_queue(created)
  981. self._log_event(
  982. run_id=node_run.run_id,
  983. node_run_id=created.id,
  984. event_type="compensation_queued",
  985. message=f"compensation node queued: {compensation_node_id}",
  986. detail_json={
  987. "failed_node_id": node_run.node_id,
  988. "compensation_node_id": compensation_node_id,
  989. })
  990. def _parse_workflow(self, workflow_config: WorkflowConfigContract):
  991. return parse_workflow_definition(workflow_config.dsl_json)
  992. def _resolve_workflow_node_type(
  993. self,
  994. *,
  995. workflow_config_id: str,
  996. node_id: str) -> str | None:
  997. if self.workflow_client is None:
  998. return None
  999. workflow_config = self.workflow_client.get_workflow_config(
  1000. workflow_config_id=workflow_config_id)
  1001. workflow = self._parse_workflow(workflow_config)
  1002. if workflow is None:
  1003. return None
  1004. for node in workflow.nodes:
  1005. if node.id == node_id:
  1006. return node.type
  1007. return None
  1008. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  1009. value = payload.get(key)
  1010. if isinstance(value, str) and value:
  1011. return value
  1012. return None
  1013. def _read_bool_value(
  1014. self,
  1015. payload: dict[str, JSONValue],
  1016. key: str,
  1017. *,
  1018. default: bool) -> bool:
  1019. value = payload.get(key)
  1020. if isinstance(value, bool):
  1021. return value
  1022. return default
  1023. def _read_int_value(
  1024. self,
  1025. payload: dict[str, JSONValue],
  1026. key: str,
  1027. *,
  1028. default: int) -> int:
  1029. value = payload.get(key)
  1030. if isinstance(value, int) and not isinstance(value, bool):
  1031. return value
  1032. return default
  1033. def _read_dict_value(
  1034. self,
  1035. payload: dict[str, JSONValue],
  1036. key: str) -> dict[str, JSONValue]:
  1037. value = payload.get(key)
  1038. if isinstance(value, dict):
  1039. return {str(item_key): item_value for item_key, item_value in value.items()}
  1040. return {}
  1041. def _sync_workflow_run_status_from_nodes(self, *, run_id: str) -> None:
  1042. node_runs = self.node_run_repository.list_by_run(run_id=run_id)
  1043. if not node_runs:
  1044. return
  1045. self.workflow_run_repository.update_node_count(
  1046. run_id=run_id,
  1047. current_node_count=len(node_runs))
  1048. next_status, error_code, error_message = self._derive_run_status(node_runs)
  1049. self.workflow_run_repository.update_status(
  1050. run_id=run_id,
  1051. status=next_status,
  1052. error_code=error_code,
  1053. error_message=error_message)
  1054. self._log_event(
  1055. run_id=run_id,
  1056. node_run_id=None,
  1057. event_type="run_status_synced",
  1058. message=f"workflow run status synced to {next_status}",
  1059. detail_json={
  1060. "status": next_status,
  1061. "error_code": error_code,
  1062. })
  1063. def _derive_run_status(
  1064. self,
  1065. node_runs: list[NodeRun]) -> tuple[WorkflowRunStatus, str | None, str | None]:
  1066. statuses = {node_run.status for node_run in node_runs}
  1067. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  1068. if statuses.intersection(active_statuses):
  1069. return "running", None, None
  1070. if "failed" in statuses:
  1071. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  1072. error_code = failed_node.error_code if failed_node is not None else None
  1073. error_message = failed_node.error_message if failed_node is not None else None
  1074. return "failed", error_code, error_message
  1075. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  1076. if statuses and statuses.issubset(terminal_statuses):
  1077. return "completed", None, None
  1078. return "running", None, None
  1079. def _log_event(
  1080. self,
  1081. *,
  1082. run_id: str,
  1083. node_run_id: str | None,
  1084. event_type: str,
  1085. message: str,
  1086. detail_json: dict[str, JSONValue] | None,
  1087. level: str = "info") -> None:
  1088. self.execution_log_repository.create(
  1089. run_id=run_id,
  1090. node_run_id=node_run_id,
  1091. event_type=event_type,
  1092. level=level,
  1093. message=message,
  1094. detail_json=detail_json)
  1095. def _publish_event(
  1096. self,
  1097. *,
  1098. event_type: str,
  1099. payload_json: dict[str, JSONValue],
  1100. workflow_run: WorkflowRun | None = None,
  1101. node_run: NodeRun | None = None) -> None:
  1102. if self.event_client is None:
  1103. return
  1104. aggregate_id = workflow_run.id if workflow_run is not None else None
  1105. if aggregate_id is None and node_run is not None:
  1106. aggregate_id = node_run.id
  1107. aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
  1108. correlation_id = workflow_run.session_id if workflow_run is not None else None
  1109. if correlation_id is None and node_run is not None:
  1110. correlation_id = node_run.run_id
  1111. try:
  1112. self.event_client.publish_event(
  1113. EventPublishContract(
  1114. event_type=event_type,
  1115. source_service="runtime-service",
  1116. aggregate_type=aggregate_type,
  1117. aggregate_id=aggregate_id,
  1118. correlation_id=correlation_id,
  1119. payload_json=payload_json)
  1120. )
  1121. except EventServiceClientError:
  1122. return
  1123. def _publish_node_run_to_queue(self, node_run: NodeRun) -> None:
  1124. if node_run.status != "queued" or self.task_queue_publisher is None:
  1125. return
  1126. self.task_queue_publisher.publish_runtime_node_run(
  1127. node_run_id=node_run.id)
  1128. def build_runtime_application_service(
  1129. *,
  1130. db: Session,
  1131. settings: RuntimeServiceSettings) -> RuntimeApplicationService:
  1132. redis_client = try_build_redis_client(settings.redis_url)
  1133. return RuntimeApplicationService(
  1134. workflow_run_repository=WorkflowRunRepository(db),
  1135. node_run_repository=NodeRunRepository(db),
  1136. execution_log_repository=ExecutionLogRepository(db),
  1137. node_artifact_repository=NodeArtifactRepository(db),
  1138. trace_span_repository=TraceSpanRepository(db),
  1139. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  1140. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  1141. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  1142. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  1143. human_client=HumanServiceClient(
  1144. base_url=settings.human_service_url,
  1145. timeout_seconds=settings.human_service_timeout_seconds),
  1146. knowledge_client=KnowledgeServiceClient(
  1147. base_url=settings.knowledge_service_url,
  1148. timeout_seconds=settings.knowledge_service_timeout_seconds)),
  1149. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  1150. event_client=EventServiceClient(
  1151. base_url=settings.event_service_url,
  1152. timeout_seconds=settings.event_service_timeout_seconds),
  1153. task_queue_publisher=(
  1154. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1155. ))