services.py 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_dsl import parse_workflow_definition
  4. from core_domain import (
  5. InitialNodeContract,
  6. NodeExecutionContextContract,
  7. NodeExecutionResultContract,
  8. NodeRunStatus,
  9. WorkflowVersionContract,
  10. WorkflowRunStatus,
  11. )
  12. from app.db.models import NodeRun, WorkflowRun
  13. from app.domain.repositories import (
  14. ExecutionLogRepository,
  15. NodeArtifactRepository,
  16. NodeRunRepository,
  17. TraceSpanRepository,
  18. WorkflowRunRepository,
  19. )
  20. from app.infrastructure.executors import (
  21. NodeExecutionDispatcher,
  22. build_node_execution_dispatcher_with_clients,
  23. )
  24. from app.infrastructure.code_runner_client import CodeRunnerClient
  25. from app.infrastructure.model_gateway_client import ModelGatewayClient
  26. from app.infrastructure.planner import (
  27. derive_initial_node,
  28. derive_node_config,
  29. derive_successor_nodes,
  30. )
  31. from app.infrastructure.tool_client import ToolServiceClient
  32. from app.infrastructure.workflow_client import WorkflowServiceClient
  33. from app.bootstrap.settings import RuntimeServiceSettings
  34. from app.schemas.run import (
  35. NodeRunExecuteRequest,
  36. NodeRunStatusUpdateRequest,
  37. RunCreateRequest,
  38. RunExecuteRequest,
  39. WorkflowRunStatusUpdateRequest,
  40. )
  41. from core_shared import JSONValue
  42. class RuntimeApplicationService:
  43. def __init__(
  44. self,
  45. workflow_run_repository: WorkflowRunRepository,
  46. node_run_repository: NodeRunRepository,
  47. execution_log_repository: ExecutionLogRepository,
  48. node_artifact_repository: NodeArtifactRepository,
  49. trace_span_repository: TraceSpanRepository,
  50. execution_dispatcher: NodeExecutionDispatcher,
  51. workflow_client: WorkflowServiceClient | None = None,
  52. ) -> None:
  53. self.workflow_run_repository = workflow_run_repository
  54. self.node_run_repository = node_run_repository
  55. self.execution_log_repository = execution_log_repository
  56. self.node_artifact_repository = node_artifact_repository
  57. self.trace_span_repository = trace_span_repository
  58. self.execution_dispatcher = execution_dispatcher
  59. self.workflow_client = workflow_client
  60. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  61. initial_node = payload.initial_node or self._plan_initial_node(payload)
  62. workflow_run = self.workflow_run_repository.create(
  63. tenant_id=payload.tenant_id,
  64. app_id=payload.app_id,
  65. app_version_id=payload.app_version_id,
  66. workflow_id=payload.workflow_id,
  67. workflow_version_id=payload.workflow_version_id,
  68. session_id=payload.session_id,
  69. parent_run_id=payload.parent_run_id,
  70. root_run_id=payload.root_run_id,
  71. run_type=payload.run_type,
  72. trigger_type=payload.trigger_type,
  73. priority=payload.priority,
  74. )
  75. node_run = None
  76. if initial_node is not None:
  77. self.workflow_run_repository.update_node_count(
  78. run_id=workflow_run.id,
  79. current_node_count=1,
  80. )
  81. initial_config = self._resolve_node_config(
  82. tenant_id=payload.tenant_id,
  83. workflow_version_id=payload.workflow_version_id,
  84. node_id=initial_node.node_id,
  85. )
  86. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  87. node_run = self.node_run_repository.create(
  88. tenant_id=payload.tenant_id,
  89. run_id=workflow_run.id,
  90. node_id=initial_node.node_id,
  91. node_type=initial_node.node_type,
  92. status=initial_node.status,
  93. scheduled_time=scheduled_time,
  94. timeout_time=timeout_time,
  95. )
  96. self._log_event(
  97. tenant_id=payload.tenant_id,
  98. run_id=workflow_run.id,
  99. node_run_id=node_run.id,
  100. event_type="node_queued",
  101. message=f"initial node queued: {initial_node.node_id}",
  102. detail_json={
  103. "node_id": initial_node.node_id,
  104. "node_type": initial_node.node_type,
  105. "status": initial_node.status,
  106. },
  107. )
  108. self._log_event(
  109. tenant_id=payload.tenant_id,
  110. run_id=workflow_run.id,
  111. node_run_id=node_run.id if node_run is not None else None,
  112. event_type="run_created",
  113. message="workflow run created",
  114. detail_json={
  115. "workflow_id": payload.workflow_id,
  116. "workflow_version_id": payload.workflow_version_id,
  117. "session_id": payload.session_id,
  118. },
  119. )
  120. return workflow_run, node_run
  121. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  122. return self.workflow_run_repository.list_by_scope(
  123. tenant_id=tenant_id,
  124. session_id=session_id,
  125. )
  126. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  127. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  128. def list_execution_logs(
  129. self,
  130. tenant_id: str,
  131. run_id: str | None = None,
  132. node_run_id: str | None = None,
  133. ):
  134. return self.execution_log_repository.list_by_scope(
  135. tenant_id=tenant_id,
  136. run_id=run_id,
  137. node_run_id=node_run_id,
  138. )
  139. def list_node_artifacts(
  140. self,
  141. tenant_id: str,
  142. run_id: str | None = None,
  143. node_run_id: str | None = None,
  144. artifact_type: str | None = None,
  145. ):
  146. return self.node_artifact_repository.list_by_scope(
  147. tenant_id=tenant_id,
  148. run_id=run_id,
  149. node_run_id=node_run_id,
  150. artifact_type=artifact_type,
  151. )
  152. def list_trace_spans(
  153. self,
  154. tenant_id: str,
  155. run_id: str | None = None,
  156. node_run_id: str | None = None,
  157. span_type: str | None = None,
  158. ):
  159. return self.trace_span_repository.list_by_scope(
  160. tenant_id=tenant_id,
  161. run_id=run_id,
  162. node_run_id=node_run_id,
  163. span_type=span_type,
  164. )
  165. def update_run_status(
  166. self,
  167. run_id: str,
  168. payload: WorkflowRunStatusUpdateRequest,
  169. ) -> WorkflowRun | None:
  170. return self.workflow_run_repository.update_status(
  171. run_id=run_id,
  172. status=payload.status,
  173. error_code=payload.error_code,
  174. error_message=payload.error_message,
  175. )
  176. def update_node_run_status(
  177. self,
  178. node_run_id: str,
  179. payload: NodeRunStatusUpdateRequest,
  180. ) -> NodeRun | None:
  181. node_run = self.node_run_repository.update_status(
  182. node_run_id=node_run_id,
  183. status=payload.status,
  184. worker_key=payload.worker_key,
  185. error_code=payload.error_code,
  186. error_message=payload.error_message,
  187. output_text=payload.output_text,
  188. output_json=payload.output_json,
  189. )
  190. if node_run is None:
  191. return None
  192. self._log_event(
  193. tenant_id=node_run.tenant_id,
  194. run_id=node_run.run_id,
  195. node_run_id=node_run.id,
  196. event_type="node_status_updated",
  197. message=f"node status updated to {payload.status}",
  198. detail_json={
  199. "node_id": node_run.node_id,
  200. "node_type": node_run.node_type,
  201. "status": payload.status,
  202. "error_code": payload.error_code,
  203. },
  204. )
  205. if payload.status == "completed":
  206. self._schedule_successor_nodes(node_run)
  207. if payload.status == "failed":
  208. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  209. if workflow_run is not None:
  210. node_config = self._resolve_node_config(
  211. tenant_id=node_run.tenant_id,
  212. workflow_version_id=workflow_run.workflow_version_id,
  213. node_id=node_run.node_id,
  214. )
  215. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  216. self._sync_workflow_run_status_from_nodes(
  217. tenant_id=node_run.tenant_id,
  218. run_id=node_run.run_id,
  219. )
  220. return node_run
  221. def execute_node_run(
  222. self,
  223. node_run_id: str,
  224. payload: NodeRunExecuteRequest,
  225. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  226. node_run = self.node_run_repository.get_by_id(node_run_id)
  227. if node_run is None:
  228. return None
  229. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  230. if workflow_run is None:
  231. return None
  232. if node_run.status in {"completed", "failed", "skipped"}:
  233. executor_name = self.execution_dispatcher.resolve_executor(
  234. node_run.node_type
  235. ).executor_name
  236. return workflow_run, node_run, executor_name
  237. node_config = self._resolve_node_config(
  238. tenant_id=node_run.tenant_id,
  239. workflow_version_id=workflow_run.workflow_version_id,
  240. node_id=node_run.node_id,
  241. )
  242. if self._node_has_timed_out(node_run):
  243. timed_out_node_run = self.update_node_run_status(
  244. node_run_id=node_run.id,
  245. payload=NodeRunStatusUpdateRequest(
  246. status="failed",
  247. worker_key=payload.worker_key,
  248. error_code="node_timeout",
  249. error_message=f"node timed out: {node_run.node_id}",
  250. output_json={
  251. "timeout_time": node_run.timeout_time.isoformat()
  252. if node_run.timeout_time is not None
  253. else None,
  254. },
  255. ),
  256. )
  257. if timed_out_node_run is None:
  258. return None
  259. executor_name = self.execution_dispatcher.resolve_executor(
  260. node_run.node_type
  261. ).executor_name
  262. return workflow_run, timed_out_node_run, executor_name
  263. running_node_run = self.node_run_repository.update_status(
  264. node_run_id=node_run_id,
  265. status="running",
  266. worker_key=payload.worker_key,
  267. )
  268. if running_node_run is None:
  269. return None
  270. self._log_event(
  271. tenant_id=running_node_run.tenant_id,
  272. run_id=running_node_run.run_id,
  273. node_run_id=running_node_run.id,
  274. event_type="node_execution_started",
  275. message=f"executing node {running_node_run.node_id}",
  276. detail_json={
  277. "node_id": running_node_run.node_id,
  278. "node_type": running_node_run.node_type,
  279. "worker_key": payload.worker_key,
  280. },
  281. )
  282. context = self._build_execution_context(
  283. workflow_run=workflow_run,
  284. node_run=running_node_run,
  285. worker_key=payload.worker_key,
  286. node_config_json=node_config,
  287. )
  288. executor_name = self.execution_dispatcher.resolve_executor(
  289. running_node_run.node_type
  290. ).executor_name
  291. trace_span = self.trace_span_repository.start(
  292. tenant_id=running_node_run.tenant_id,
  293. run_id=running_node_run.run_id,
  294. node_run_id=running_node_run.id,
  295. parent_span_id=None,
  296. span_type="node_execution",
  297. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  298. attributes_json={
  299. "node_id": running_node_run.node_id,
  300. "node_type": running_node_run.node_type,
  301. "executor_name": executor_name,
  302. "worker_key": payload.worker_key,
  303. },
  304. )
  305. try:
  306. result, executor_name = self.execution_dispatcher.execute(
  307. context=context,
  308. request=payload,
  309. )
  310. except Exception as exc:
  311. result = NodeExecutionResultContract(
  312. status="failed",
  313. worker_key=payload.worker_key,
  314. error_code="executor_error",
  315. error_message=str(exc),
  316. )
  317. if result.status == "failed" and self._should_retry_node(
  318. node_run=running_node_run,
  319. node_config_json=context.node_config_json,
  320. ):
  321. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  322. retried_node_run = self.node_run_repository.requeue_for_retry(
  323. node_run_id=running_node_run.id,
  324. scheduled_time=retry_time,
  325. timeout_time=retry_timeout_time,
  326. error_code=result.error_code,
  327. error_message=result.error_message,
  328. output_text=result.output_text,
  329. output_json={
  330. **(result.output_json or {}),
  331. "retry_scheduled_time": retry_time.isoformat(),
  332. "retry_reason": result.error_code or "node_failed",
  333. },
  334. )
  335. if retried_node_run is None:
  336. return None
  337. self.trace_span_repository.finish(
  338. span_id=trace_span.id,
  339. status="error",
  340. error_code=result.error_code,
  341. error_message=result.error_message,
  342. attributes_json={
  343. "node_status": "queued",
  344. "executor_name": executor_name,
  345. "retry_scheduled": True,
  346. "attempt_no": retried_node_run.attempt_no,
  347. },
  348. )
  349. self._log_event(
  350. tenant_id=retried_node_run.tenant_id,
  351. run_id=retried_node_run.run_id,
  352. node_run_id=retried_node_run.id,
  353. event_type="node_retry_scheduled",
  354. message=f"node retry scheduled: {retried_node_run.node_id}",
  355. detail_json={
  356. "node_id": retried_node_run.node_id,
  357. "attempt_no": retried_node_run.attempt_no,
  358. "scheduled_time": retry_time.isoformat(),
  359. "error_code": result.error_code,
  360. },
  361. )
  362. self._sync_workflow_run_status_from_nodes(
  363. tenant_id=retried_node_run.tenant_id,
  364. run_id=retried_node_run.run_id,
  365. )
  366. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  367. if workflow_run is None:
  368. return None
  369. return workflow_run, retried_node_run, executor_name
  370. final_node_run = self.update_node_run_status(
  371. node_run_id=running_node_run.id,
  372. payload=NodeRunStatusUpdateRequest(
  373. status=result.status,
  374. worker_key=result.worker_key,
  375. error_code=result.error_code,
  376. error_message=result.error_message,
  377. output_text=result.output_text,
  378. output_json=result.output_json,
  379. ),
  380. )
  381. if final_node_run is None:
  382. return None
  383. self.trace_span_repository.finish(
  384. span_id=trace_span.id,
  385. status="ok" if final_node_run.status == "completed" else "error",
  386. error_code=final_node_run.error_code,
  387. error_message=final_node_run.error_message,
  388. attributes_json={
  389. "node_status": final_node_run.status,
  390. "executor_name": executor_name,
  391. "has_output_text": final_node_run.output_text is not None,
  392. "has_output_json": final_node_run.output_json is not None,
  393. },
  394. )
  395. self._persist_node_execution_artifact(final_node_run)
  396. self._log_event(
  397. tenant_id=final_node_run.tenant_id,
  398. run_id=final_node_run.run_id,
  399. node_run_id=final_node_run.id,
  400. event_type="node_execution_finished",
  401. message=f"node execution finished with status {final_node_run.status}",
  402. detail_json={
  403. "node_id": final_node_run.node_id,
  404. "node_type": final_node_run.node_type,
  405. "executor_name": executor_name,
  406. "status": final_node_run.status,
  407. },
  408. )
  409. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  410. if workflow_run is None:
  411. return None
  412. return workflow_run, final_node_run, executor_name
  413. def execute_next_node_run(
  414. self,
  415. tenant_id: str,
  416. run_id: str,
  417. payload: NodeRunExecuteRequest,
  418. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  419. next_node_run = self.node_run_repository.get_next_queued_by_run(
  420. tenant_id=tenant_id,
  421. run_id=run_id,
  422. )
  423. if next_node_run is None:
  424. return None
  425. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  426. def execute_run(
  427. self,
  428. tenant_id: str,
  429. run_id: str,
  430. payload: RunExecuteRequest,
  431. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  432. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  433. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  434. return None
  435. executed_node_runs: list[NodeRun] = []
  436. executor_names: list[str] = []
  437. for _ in range(payload.max_steps):
  438. step_result = self.execute_next_node_run(
  439. tenant_id=tenant_id,
  440. run_id=run_id,
  441. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  442. )
  443. if step_result is None:
  444. break
  445. workflow_run, node_run, executor_name = step_result
  446. executed_node_runs.append(node_run)
  447. executor_names.append(executor_name)
  448. if node_run.status != "completed":
  449. break
  450. final_run = self.workflow_run_repository.get_by_id(run_id)
  451. if final_run is None:
  452. return None
  453. return final_run, executed_node_runs, executor_names
  454. def execute_next_claimed_node_run(
  455. self,
  456. *,
  457. worker_key: str,
  458. lease_seconds: int,
  459. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  460. released_lease_count = self.node_run_repository.release_expired_leases(
  461. now_time=datetime.utcnow(),
  462. )
  463. claimed_node_run = self.node_run_repository.claim_next_queued(
  464. worker_key=worker_key,
  465. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  466. )
  467. if claimed_node_run is None:
  468. return None
  469. result = self.execute_node_run(
  470. node_run_id=claimed_node_run.id,
  471. payload=NodeRunExecuteRequest(worker_key=worker_key),
  472. )
  473. if result is None:
  474. return None
  475. workflow_run, node_run, executor_name = result
  476. return workflow_run, node_run, executor_name, released_lease_count
  477. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  478. if node_run.output_text is None and node_run.output_json is None:
  479. return
  480. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  481. self.node_artifact_repository.create(
  482. tenant_id=node_run.tenant_id,
  483. run_id=node_run.run_id,
  484. node_run_id=node_run.id,
  485. node_id=node_run.node_id,
  486. artifact_type="execution_result",
  487. name=f"{node_run.node_id}-execution-result",
  488. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  489. content_text=node_run.output_text,
  490. content_json=node_run.output_json,
  491. size_bytes=size_bytes,
  492. )
  493. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  494. if self.workflow_client is None:
  495. return None
  496. workflow_version = self.workflow_client.get_workflow_version(
  497. tenant_id=payload.tenant_id,
  498. workflow_version_id=payload.workflow_version_id,
  499. )
  500. return derive_initial_node(workflow_version)
  501. def _resolve_node_config(
  502. self,
  503. *,
  504. tenant_id: str,
  505. workflow_version_id: str,
  506. node_id: str,
  507. ) -> dict[str, JSONValue]:
  508. if self.workflow_client is None:
  509. return {}
  510. workflow_version = self.workflow_client.get_workflow_version(
  511. tenant_id=tenant_id,
  512. workflow_version_id=workflow_version_id,
  513. )
  514. return derive_node_config(workflow_version, node_id)
  515. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  516. if self.workflow_client is None:
  517. return
  518. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  519. if workflow_run is None:
  520. return
  521. workflow_version = self.workflow_client.get_workflow_version(
  522. tenant_id=node_run.tenant_id,
  523. workflow_version_id=workflow_run.workflow_version_id,
  524. )
  525. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  526. self._build_run_state_maps(
  527. tenant_id=node_run.tenant_id,
  528. run_id=node_run.run_id,
  529. )
  530. )
  531. successor_nodes = derive_successor_nodes(
  532. workflow_version,
  533. node_run.node_id,
  534. current_output_json=node_run.output_json,
  535. run_state_json=run_state_json,
  536. node_output_json_by_node_id=node_output_json_by_node_id,
  537. node_output_text_by_node_id=node_output_text_by_node_id,
  538. )
  539. if not successor_nodes:
  540. return
  541. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  542. tenant_id=node_run.tenant_id,
  543. run_id=node_run.run_id,
  544. node_ids=[item.node_id for item in successor_nodes],
  545. )
  546. existing_node_counts: dict[str, int] = {}
  547. for item in existing_nodes:
  548. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  549. for successor in successor_nodes:
  550. successor_config = derive_node_config(workflow_version, successor.node_id)
  551. if not self._is_join_ready(
  552. workflow_version=workflow_version,
  553. run_node_runs=self.node_run_repository.list_by_run(
  554. tenant_id=node_run.tenant_id,
  555. run_id=node_run.run_id,
  556. ),
  557. successor_node_id=successor.node_id,
  558. successor_node_type=successor.node_type,
  559. successor_config=successor_config,
  560. ):
  561. self._log_event(
  562. tenant_id=node_run.tenant_id,
  563. run_id=node_run.run_id,
  564. node_run_id=None,
  565. event_type="join_waiting",
  566. message=f"join node waiting for predecessors: {successor.node_id}",
  567. detail_json={
  568. "node_id": successor.node_id,
  569. "source_node_id": node_run.node_id,
  570. },
  571. )
  572. continue
  573. if not self._can_schedule_repeated_node(
  574. successor_config,
  575. existing_count=existing_node_counts.get(successor.node_id, 0),
  576. ):
  577. continue
  578. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  579. self.node_run_repository.create(
  580. tenant_id=node_run.tenant_id,
  581. run_id=node_run.run_id,
  582. parent_node_run_id=node_run.id,
  583. node_id=successor.node_id,
  584. node_type=successor.node_type,
  585. status=successor.status,
  586. scheduled_time=scheduled_time,
  587. timeout_time=timeout_time,
  588. )
  589. existing_node_counts[successor.node_id] = (
  590. existing_node_counts.get(successor.node_id, 0) + 1
  591. )
  592. self._log_event(
  593. tenant_id=node_run.tenant_id,
  594. run_id=node_run.run_id,
  595. node_run_id=None,
  596. event_type="node_queued",
  597. message=f"successor node queued: {successor.node_id}",
  598. detail_json={
  599. "node_id": successor.node_id,
  600. "node_type": successor.node_type,
  601. "status": successor.status,
  602. "source_node_id": node_run.node_id,
  603. },
  604. )
  605. def _build_execution_context(
  606. self,
  607. *,
  608. workflow_run: WorkflowRun,
  609. node_run: NodeRun,
  610. worker_key: str | None,
  611. node_config_json: dict[str, JSONValue] | None = None,
  612. ) -> NodeExecutionContextContract:
  613. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  614. self._build_run_state_maps(
  615. tenant_id=node_run.tenant_id,
  616. run_id=node_run.run_id,
  617. )
  618. )
  619. return NodeExecutionContextContract(
  620. tenant_id=node_run.tenant_id,
  621. run_id=node_run.run_id,
  622. node_run_id=node_run.id,
  623. node_id=node_run.node_id,
  624. node_type=node_run.node_type,
  625. node_config_json=node_config_json
  626. if node_config_json is not None
  627. else self._resolve_node_config(
  628. tenant_id=node_run.tenant_id,
  629. workflow_version_id=workflow_run.workflow_version_id,
  630. node_id=node_run.node_id,
  631. ),
  632. run_state_json=run_state_json,
  633. node_output_json_by_node_id=node_output_json_by_node_id,
  634. node_output_text_by_node_id=node_output_text_by_node_id,
  635. worker_key=worker_key,
  636. )
  637. def _build_run_state_maps(
  638. self,
  639. *,
  640. tenant_id: str,
  641. run_id: str,
  642. ) -> tuple[
  643. dict[str, JSONValue],
  644. dict[str, dict[str, JSONValue]],
  645. dict[str, str],
  646. ]:
  647. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  648. run_state_json: dict[str, JSONValue] = {}
  649. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  650. node_output_text_by_node_id: dict[str, str] = {}
  651. for item in node_runs:
  652. if item.output_json is not None:
  653. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  654. state_updates = item.output_json.get("state_updates")
  655. if isinstance(state_updates, dict):
  656. for state_key, state_value in state_updates.items():
  657. run_state_json[str(state_key)] = state_value
  658. if item.output_text is not None:
  659. node_output_text_by_node_id[item.node_id] = item.output_text
  660. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  661. def _build_node_timing(
  662. self,
  663. node_config_json: dict[str, JSONValue],
  664. ) -> tuple[datetime, datetime | None]:
  665. now = datetime.utcnow()
  666. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  667. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  668. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  669. timeout_time = (
  670. scheduled_time + timedelta(seconds=timeout_seconds)
  671. if timeout_seconds > 0
  672. else None
  673. )
  674. return scheduled_time, timeout_time
  675. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  676. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  677. def _should_retry_node(
  678. self,
  679. *,
  680. node_run: NodeRun,
  681. node_config_json: dict[str, JSONValue],
  682. ) -> bool:
  683. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  684. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  685. return max_attempts > node_run.attempt_no
  686. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  687. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  688. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  689. def _build_retry_timing(
  690. self,
  691. node_config_json: dict[str, JSONValue],
  692. ) -> tuple[datetime, datetime | None]:
  693. retry_time = datetime.utcnow() + timedelta(
  694. seconds=self._read_retry_delay_seconds(node_config_json)
  695. )
  696. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  697. timeout_time = (
  698. retry_time + timedelta(seconds=timeout_seconds)
  699. if timeout_seconds > 0
  700. else None
  701. )
  702. return retry_time, timeout_time
  703. def _is_join_ready(
  704. self,
  705. *,
  706. workflow_version: WorkflowVersionContract,
  707. run_node_runs: list[NodeRun],
  708. successor_node_id: str,
  709. successor_node_type: str,
  710. successor_config: dict[str, JSONValue],
  711. ) -> bool:
  712. join_policy = self._read_string_value(successor_config, "join_policy")
  713. if join_policy is None and successor_node_type != "join":
  714. return True
  715. workflow = self._parse_workflow(workflow_version)
  716. if workflow is None:
  717. return True
  718. predecessor_ids = [
  719. edge.source for edge in workflow.edges if edge.target == successor_node_id
  720. ]
  721. if not predecessor_ids:
  722. return True
  723. completed_node_ids = {
  724. item.node_id
  725. for item in run_node_runs
  726. if item.status in {"completed", "skipped"}
  727. }
  728. if join_policy in {None, "all_completed"}:
  729. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  730. if join_policy == "any_completed":
  731. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  732. return True
  733. def _can_schedule_repeated_node(
  734. self,
  735. node_config_json: dict[str, JSONValue],
  736. *,
  737. existing_count: int,
  738. ) -> bool:
  739. if existing_count == 0:
  740. return True
  741. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  742. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  743. return allow_loop and existing_count < max_iterations
  744. def _schedule_compensation_node(
  745. self,
  746. *,
  747. node_run: NodeRun,
  748. node_config: dict[str, JSONValue],
  749. ) -> None:
  750. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  751. if compensation_node_id is None:
  752. compensation_config = self._read_dict_value(node_config, "compensation")
  753. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  754. if compensation_node_id is None:
  755. return
  756. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  757. if workflow_run is None:
  758. return
  759. compensation_config = self._resolve_node_config(
  760. tenant_id=node_run.tenant_id,
  761. workflow_version_id=workflow_run.workflow_version_id,
  762. node_id=compensation_node_id,
  763. )
  764. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  765. tenant_id=node_run.tenant_id,
  766. run_id=node_run.run_id,
  767. node_ids=[compensation_node_id],
  768. )
  769. if existing_nodes and not self._can_schedule_repeated_node(
  770. compensation_config,
  771. existing_count=len(existing_nodes),
  772. ):
  773. return
  774. compensation_node_type = self._resolve_workflow_node_type(
  775. tenant_id=node_run.tenant_id,
  776. workflow_version_id=workflow_run.workflow_version_id,
  777. node_id=compensation_node_id,
  778. ) or "compensation"
  779. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  780. created = self.node_run_repository.create(
  781. tenant_id=node_run.tenant_id,
  782. run_id=node_run.run_id,
  783. parent_node_run_id=node_run.id,
  784. node_id=compensation_node_id,
  785. node_type=compensation_node_type,
  786. status="queued",
  787. scheduled_time=scheduled_time,
  788. timeout_time=timeout_time,
  789. )
  790. self._log_event(
  791. tenant_id=node_run.tenant_id,
  792. run_id=node_run.run_id,
  793. node_run_id=created.id,
  794. event_type="compensation_queued",
  795. message=f"compensation node queued: {compensation_node_id}",
  796. detail_json={
  797. "failed_node_id": node_run.node_id,
  798. "compensation_node_id": compensation_node_id,
  799. },
  800. )
  801. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  802. return parse_workflow_definition(workflow_version.dsl_json)
  803. def _resolve_workflow_node_type(
  804. self,
  805. *,
  806. tenant_id: str,
  807. workflow_version_id: str,
  808. node_id: str,
  809. ) -> str | None:
  810. if self.workflow_client is None:
  811. return None
  812. workflow_version = self.workflow_client.get_workflow_version(
  813. tenant_id=tenant_id,
  814. workflow_version_id=workflow_version_id,
  815. )
  816. workflow = self._parse_workflow(workflow_version)
  817. if workflow is None:
  818. return None
  819. for node in workflow.nodes:
  820. if node.id == node_id:
  821. return node.type
  822. return None
  823. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  824. value = payload.get(key)
  825. if isinstance(value, str) and value:
  826. return value
  827. return None
  828. def _read_bool_value(
  829. self,
  830. payload: dict[str, JSONValue],
  831. key: str,
  832. *,
  833. default: bool,
  834. ) -> bool:
  835. value = payload.get(key)
  836. if isinstance(value, bool):
  837. return value
  838. return default
  839. def _read_int_value(
  840. self,
  841. payload: dict[str, JSONValue],
  842. key: str,
  843. *,
  844. default: int,
  845. ) -> int:
  846. value = payload.get(key)
  847. if isinstance(value, int) and not isinstance(value, bool):
  848. return value
  849. return default
  850. def _read_dict_value(
  851. self,
  852. payload: dict[str, JSONValue],
  853. key: str,
  854. ) -> dict[str, JSONValue]:
  855. value = payload.get(key)
  856. if isinstance(value, dict):
  857. return {str(item_key): item_value for item_key, item_value in value.items()}
  858. return {}
  859. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  860. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  861. if not node_runs:
  862. return
  863. self.workflow_run_repository.update_node_count(
  864. run_id=run_id,
  865. current_node_count=len(node_runs),
  866. )
  867. next_status, error_code, error_message = self._derive_run_status(node_runs)
  868. self.workflow_run_repository.update_status(
  869. run_id=run_id,
  870. status=next_status,
  871. error_code=error_code,
  872. error_message=error_message,
  873. )
  874. self._log_event(
  875. tenant_id=tenant_id,
  876. run_id=run_id,
  877. node_run_id=None,
  878. event_type="run_status_synced",
  879. message=f"workflow run status synced to {next_status}",
  880. detail_json={
  881. "status": next_status,
  882. "error_code": error_code,
  883. },
  884. )
  885. def _derive_run_status(
  886. self,
  887. node_runs: list[NodeRun],
  888. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  889. statuses = {node_run.status for node_run in node_runs}
  890. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  891. if statuses.intersection(active_statuses):
  892. return "running", None, None
  893. if "failed" in statuses:
  894. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  895. error_code = failed_node.error_code if failed_node is not None else None
  896. error_message = failed_node.error_message if failed_node is not None else None
  897. return "failed", error_code, error_message
  898. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  899. if statuses and statuses.issubset(terminal_statuses):
  900. return "completed", None, None
  901. return "running", None, None
  902. def _log_event(
  903. self,
  904. *,
  905. tenant_id: str,
  906. run_id: str,
  907. node_run_id: str | None,
  908. event_type: str,
  909. message: str,
  910. detail_json: dict[str, JSONValue] | None,
  911. level: str = "info",
  912. ) -> None:
  913. self.execution_log_repository.create(
  914. tenant_id=tenant_id,
  915. run_id=run_id,
  916. node_run_id=node_run_id,
  917. event_type=event_type,
  918. level=level,
  919. message=message,
  920. detail_json=detail_json,
  921. )
  922. def build_runtime_application_service(
  923. *,
  924. db: Session,
  925. settings: RuntimeServiceSettings,
  926. ) -> RuntimeApplicationService:
  927. return RuntimeApplicationService(
  928. workflow_run_repository=WorkflowRunRepository(db),
  929. node_run_repository=NodeRunRepository(db),
  930. execution_log_repository=ExecutionLogRepository(db),
  931. node_artifact_repository=NodeArtifactRepository(db),
  932. trace_span_repository=TraceSpanRepository(db),
  933. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  934. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  935. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  936. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  937. ),
  938. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  939. )