services.py 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460
  1. from dataclasses import dataclass
  2. from datetime import datetime, timedelta
  3. from sqlalchemy.orm import Session
  4. from core_dsl import parse_workflow_definition
  5. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  6. from core_domain import (
  7. InitialNodeContract,
  8. NodeExecutionContextContract,
  9. NodeExecutionResultContract,
  10. NodeRunStatus,
  11. WorkflowVersionContract,
  12. WorkflowRunStatus,
  13. )
  14. from app.db.models import ExecutionLog, NodeArtifact, NodeRun, TraceSpan, WorkflowRun
  15. from app.domain.repositories import (
  16. ExecutionLogRepository,
  17. NodeArtifactRepository,
  18. NodeRunRepository,
  19. TraceSpanRepository,
  20. WorkflowRunRepository,
  21. )
  22. from app.infrastructure.executors import (
  23. NodeExecutionDispatcher,
  24. build_node_execution_dispatcher_with_clients,
  25. )
  26. from app.infrastructure.human_client import HumanServiceClient
  27. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  28. from app.infrastructure.code_runner_client import CodeRunnerClient
  29. from app.infrastructure.model_gateway_client import ModelGatewayClient
  30. from app.infrastructure.planner import (
  31. derive_initial_node,
  32. derive_node_config,
  33. derive_successor_nodes,
  34. )
  35. from app.infrastructure.tool_client import ToolServiceClient
  36. from app.infrastructure.workflow_client import WorkflowServiceClient
  37. from app.bootstrap.settings import RuntimeServiceSettings
  38. from app.schemas.run import (
  39. NodeRunExecuteRequest,
  40. HumanNodeResumeRequest,
  41. NodeRunStatusUpdateRequest,
  42. RunCreateRequest,
  43. RunExecuteRequest,
  44. RuntimeDebugContinueRequest,
  45. WorkflowRunStatusUpdateRequest,
  46. )
  47. from core_shared import JSONValue, try_build_redis_client
  48. from core_shared.task_queue import TaskQueuePublisher
  49. @dataclass(frozen=True)
  50. class RuntimeDebugSnapshot:
  51. run: WorkflowRun
  52. node_runs: list[NodeRun]
  53. run_state_json: dict[str, JSONValue]
  54. node_output_json_by_node_id: dict[str, dict[str, JSONValue]]
  55. node_output_text_by_node_id: dict[str, str]
  56. queued_node_ids: list[str]
  57. running_node_ids: list[str]
  58. completed_node_ids: list[str]
  59. failed_node_ids: list[str]
  60. execution_logs: list[ExecutionLog]
  61. node_artifacts: list[NodeArtifact]
  62. trace_spans: list[TraceSpan]
  63. class RuntimeApplicationService:
  64. def __init__(
  65. self,
  66. workflow_run_repository: WorkflowRunRepository,
  67. node_run_repository: NodeRunRepository,
  68. execution_log_repository: ExecutionLogRepository,
  69. node_artifact_repository: NodeArtifactRepository,
  70. trace_span_repository: TraceSpanRepository,
  71. execution_dispatcher: NodeExecutionDispatcher,
  72. workflow_client: WorkflowServiceClient | None = None,
  73. event_client: EventServiceClient | None = None,
  74. task_queue_publisher: TaskQueuePublisher | None = None,
  75. ) -> None:
  76. self.workflow_run_repository = workflow_run_repository
  77. self.node_run_repository = node_run_repository
  78. self.execution_log_repository = execution_log_repository
  79. self.node_artifact_repository = node_artifact_repository
  80. self.trace_span_repository = trace_span_repository
  81. self.execution_dispatcher = execution_dispatcher
  82. self.workflow_client = workflow_client
  83. self.event_client = event_client
  84. self.task_queue_publisher = task_queue_publisher
  85. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  86. initial_node = payload.initial_node or self._plan_initial_node(payload)
  87. workflow_run = self.workflow_run_repository.create(
  88. tenant_id=payload.tenant_id,
  89. app_id=payload.app_id,
  90. app_version_id=payload.app_version_id,
  91. workflow_id=payload.workflow_id,
  92. workflow_version_id=payload.workflow_version_id,
  93. session_id=payload.session_id,
  94. parent_run_id=payload.parent_run_id,
  95. root_run_id=payload.root_run_id,
  96. run_type=payload.run_type,
  97. trigger_type=payload.trigger_type,
  98. priority=payload.priority,
  99. )
  100. node_run = None
  101. if initial_node is not None:
  102. self.workflow_run_repository.update_node_count(
  103. run_id=workflow_run.id,
  104. current_node_count=1,
  105. )
  106. initial_config = self._resolve_node_config(
  107. tenant_id=payload.tenant_id,
  108. workflow_version_id=payload.workflow_version_id,
  109. node_id=initial_node.node_id,
  110. )
  111. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  112. node_run = self.node_run_repository.create(
  113. tenant_id=payload.tenant_id,
  114. run_id=workflow_run.id,
  115. node_id=initial_node.node_id,
  116. node_type=initial_node.node_type,
  117. status=initial_node.status,
  118. scheduled_time=scheduled_time,
  119. timeout_time=timeout_time,
  120. )
  121. self._log_event(
  122. tenant_id=payload.tenant_id,
  123. run_id=workflow_run.id,
  124. node_run_id=node_run.id,
  125. event_type="node_queued",
  126. message=f"initial node queued: {initial_node.node_id}",
  127. detail_json={
  128. "node_id": initial_node.node_id,
  129. "node_type": initial_node.node_type,
  130. "status": initial_node.status,
  131. },
  132. )
  133. self._publish_node_run_to_queue(node_run)
  134. self._log_event(
  135. tenant_id=payload.tenant_id,
  136. run_id=workflow_run.id,
  137. node_run_id=node_run.id if node_run is not None else None,
  138. event_type="run_created",
  139. message="workflow run created",
  140. detail_json={
  141. "workflow_id": payload.workflow_id,
  142. "workflow_version_id": payload.workflow_version_id,
  143. "session_id": payload.session_id,
  144. },
  145. )
  146. self._publish_event(
  147. event_type="workflow.run.created",
  148. workflow_run=workflow_run,
  149. payload_json={
  150. "run_id": workflow_run.id,
  151. "workflow_id": workflow_run.workflow_id,
  152. "workflow_version_id": workflow_run.workflow_version_id,
  153. "status": workflow_run.status,
  154. },
  155. )
  156. return workflow_run, node_run
  157. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  158. return self.workflow_run_repository.list_by_scope(
  159. tenant_id=tenant_id,
  160. session_id=session_id,
  161. )
  162. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  163. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  164. def list_execution_logs(
  165. self,
  166. tenant_id: str,
  167. run_id: str | None = None,
  168. node_run_id: str | None = None,
  169. ):
  170. return self.execution_log_repository.list_by_scope(
  171. tenant_id=tenant_id,
  172. run_id=run_id,
  173. node_run_id=node_run_id,
  174. )
  175. def list_node_artifacts(
  176. self,
  177. tenant_id: str,
  178. run_id: str | None = None,
  179. node_run_id: str | None = None,
  180. artifact_type: str | None = None,
  181. ):
  182. return self.node_artifact_repository.list_by_scope(
  183. tenant_id=tenant_id,
  184. run_id=run_id,
  185. node_run_id=node_run_id,
  186. artifact_type=artifact_type,
  187. )
  188. def list_trace_spans(
  189. self,
  190. tenant_id: str,
  191. run_id: str | None = None,
  192. node_run_id: str | None = None,
  193. span_type: str | None = None,
  194. ):
  195. return self.trace_span_repository.list_by_scope(
  196. tenant_id=tenant_id,
  197. run_id=run_id,
  198. node_run_id=node_run_id,
  199. span_type=span_type,
  200. )
  201. def get_debug_snapshot(
  202. self,
  203. *,
  204. tenant_id: str,
  205. run_id: str,
  206. ) -> RuntimeDebugSnapshot | None:
  207. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  208. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  209. return None
  210. return self._build_debug_snapshot(workflow_run)
  211. def update_run_status(
  212. self,
  213. run_id: str,
  214. payload: WorkflowRunStatusUpdateRequest,
  215. ) -> WorkflowRun | None:
  216. entity = self.workflow_run_repository.update_status(
  217. run_id=run_id,
  218. status=payload.status,
  219. error_code=payload.error_code,
  220. error_message=payload.error_message,
  221. )
  222. if entity is not None:
  223. self._publish_event(
  224. event_type=f"workflow.run.{entity.status}",
  225. workflow_run=entity,
  226. payload_json={
  227. "run_id": entity.id,
  228. "status": entity.status,
  229. "error_code": entity.error_code,
  230. },
  231. )
  232. return entity
  233. def update_node_run_status(
  234. self,
  235. node_run_id: str,
  236. payload: NodeRunStatusUpdateRequest,
  237. ) -> NodeRun | None:
  238. node_run = self.node_run_repository.update_status(
  239. node_run_id=node_run_id,
  240. status=payload.status,
  241. worker_key=payload.worker_key,
  242. error_code=payload.error_code,
  243. error_message=payload.error_message,
  244. output_text=payload.output_text,
  245. output_json=payload.output_json,
  246. )
  247. if node_run is None:
  248. return None
  249. self._log_event(
  250. tenant_id=node_run.tenant_id,
  251. run_id=node_run.run_id,
  252. node_run_id=node_run.id,
  253. event_type="node_status_updated",
  254. message=f"node status updated to {payload.status}",
  255. detail_json={
  256. "node_id": node_run.node_id,
  257. "node_type": node_run.node_type,
  258. "status": payload.status,
  259. "error_code": payload.error_code,
  260. },
  261. )
  262. self._publish_event(
  263. event_type=f"workflow.node.{node_run.status}",
  264. workflow_run=None,
  265. node_run=node_run,
  266. payload_json={
  267. "run_id": node_run.run_id,
  268. "node_run_id": node_run.id,
  269. "node_id": node_run.node_id,
  270. "node_type": node_run.node_type,
  271. "status": node_run.status,
  272. "error_code": node_run.error_code,
  273. },
  274. )
  275. if payload.status == "completed":
  276. self._schedule_successor_nodes(node_run)
  277. if payload.status == "failed":
  278. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  279. if workflow_run is not None:
  280. node_config = self._resolve_node_config(
  281. tenant_id=node_run.tenant_id,
  282. workflow_version_id=workflow_run.workflow_version_id,
  283. node_id=node_run.node_id,
  284. )
  285. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  286. self._sync_workflow_run_status_from_nodes(
  287. tenant_id=node_run.tenant_id,
  288. run_id=node_run.run_id,
  289. )
  290. return node_run
  291. def execute_node_run(
  292. self,
  293. node_run_id: str,
  294. payload: NodeRunExecuteRequest,
  295. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  296. node_run = self.node_run_repository.get_by_id(node_run_id)
  297. if node_run is None:
  298. return None
  299. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  300. if workflow_run is None:
  301. return None
  302. if workflow_run.status == "paused":
  303. return workflow_run, node_run, "debug_paused"
  304. if node_run.status in {"completed", "failed", "skipped"}:
  305. executor_name = self.execution_dispatcher.resolve_executor(
  306. node_run.node_type
  307. ).executor_name
  308. return workflow_run, node_run, executor_name
  309. node_config = self._resolve_node_config(
  310. tenant_id=node_run.tenant_id,
  311. workflow_version_id=workflow_run.workflow_version_id,
  312. node_id=node_run.node_id,
  313. )
  314. if self._node_has_timed_out(node_run):
  315. timed_out_node_run = self.update_node_run_status(
  316. node_run_id=node_run.id,
  317. payload=NodeRunStatusUpdateRequest(
  318. status="failed",
  319. worker_key=payload.worker_key,
  320. error_code="node_timeout",
  321. error_message=f"node timed out: {node_run.node_id}",
  322. output_json={
  323. "timeout_time": node_run.timeout_time.isoformat()
  324. if node_run.timeout_time is not None
  325. else None,
  326. },
  327. ),
  328. )
  329. if timed_out_node_run is None:
  330. return None
  331. executor_name = self.execution_dispatcher.resolve_executor(
  332. node_run.node_type
  333. ).executor_name
  334. return workflow_run, timed_out_node_run, executor_name
  335. running_node_run = self.node_run_repository.update_status(
  336. node_run_id=node_run_id,
  337. status="running",
  338. worker_key=payload.worker_key,
  339. )
  340. if running_node_run is None:
  341. return None
  342. self._log_event(
  343. tenant_id=running_node_run.tenant_id,
  344. run_id=running_node_run.run_id,
  345. node_run_id=running_node_run.id,
  346. event_type="node_execution_started",
  347. message=f"executing node {running_node_run.node_id}",
  348. detail_json={
  349. "node_id": running_node_run.node_id,
  350. "node_type": running_node_run.node_type,
  351. "worker_key": payload.worker_key,
  352. },
  353. )
  354. context = self._build_execution_context(
  355. workflow_run=workflow_run,
  356. node_run=running_node_run,
  357. worker_key=payload.worker_key,
  358. node_config_json=node_config,
  359. )
  360. executor_name = self.execution_dispatcher.resolve_executor(
  361. running_node_run.node_type
  362. ).executor_name
  363. trace_span = self.trace_span_repository.start(
  364. tenant_id=running_node_run.tenant_id,
  365. run_id=running_node_run.run_id,
  366. node_run_id=running_node_run.id,
  367. parent_span_id=None,
  368. span_type="node_execution",
  369. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  370. attributes_json={
  371. "node_id": running_node_run.node_id,
  372. "node_type": running_node_run.node_type,
  373. "executor_name": executor_name,
  374. "worker_key": payload.worker_key,
  375. },
  376. )
  377. try:
  378. result, executor_name = self.execution_dispatcher.execute(
  379. context=context,
  380. request=payload,
  381. )
  382. except Exception as exc:
  383. result = NodeExecutionResultContract(
  384. status="failed",
  385. worker_key=payload.worker_key,
  386. error_code="executor_error",
  387. error_message=str(exc),
  388. )
  389. if result.status == "failed" and self._should_retry_node(
  390. node_run=running_node_run,
  391. node_config_json=context.node_config_json,
  392. ):
  393. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  394. retried_node_run = self.node_run_repository.requeue_for_retry(
  395. node_run_id=running_node_run.id,
  396. scheduled_time=retry_time,
  397. timeout_time=retry_timeout_time,
  398. error_code=result.error_code,
  399. error_message=result.error_message,
  400. output_text=result.output_text,
  401. output_json={
  402. **(result.output_json or {}),
  403. "retry_scheduled_time": retry_time.isoformat(),
  404. "retry_reason": result.error_code or "node_failed",
  405. },
  406. )
  407. if retried_node_run is None:
  408. return None
  409. self._publish_node_run_to_queue(retried_node_run)
  410. self.trace_span_repository.finish(
  411. span_id=trace_span.id,
  412. status="error",
  413. error_code=result.error_code,
  414. error_message=result.error_message,
  415. attributes_json={
  416. "node_status": "queued",
  417. "executor_name": executor_name,
  418. "retry_scheduled": True,
  419. "attempt_no": retried_node_run.attempt_no,
  420. },
  421. )
  422. self._log_event(
  423. tenant_id=retried_node_run.tenant_id,
  424. run_id=retried_node_run.run_id,
  425. node_run_id=retried_node_run.id,
  426. event_type="node_retry_scheduled",
  427. message=f"node retry scheduled: {retried_node_run.node_id}",
  428. detail_json={
  429. "node_id": retried_node_run.node_id,
  430. "attempt_no": retried_node_run.attempt_no,
  431. "scheduled_time": retry_time.isoformat(),
  432. "error_code": result.error_code,
  433. },
  434. )
  435. self._sync_workflow_run_status_from_nodes(
  436. tenant_id=retried_node_run.tenant_id,
  437. run_id=retried_node_run.run_id,
  438. )
  439. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  440. if workflow_run is None:
  441. return None
  442. return workflow_run, retried_node_run, executor_name
  443. final_node_run = self.update_node_run_status(
  444. node_run_id=running_node_run.id,
  445. payload=NodeRunStatusUpdateRequest(
  446. status=result.status,
  447. worker_key=result.worker_key,
  448. error_code=result.error_code,
  449. error_message=result.error_message,
  450. output_text=result.output_text,
  451. output_json=result.output_json,
  452. ),
  453. )
  454. if final_node_run is None:
  455. return None
  456. self.trace_span_repository.finish(
  457. span_id=trace_span.id,
  458. status="ok" if final_node_run.status == "completed" else "error",
  459. error_code=final_node_run.error_code,
  460. error_message=final_node_run.error_message,
  461. attributes_json={
  462. "node_status": final_node_run.status,
  463. "executor_name": executor_name,
  464. "has_output_text": final_node_run.output_text is not None,
  465. "has_output_json": final_node_run.output_json is not None,
  466. },
  467. )
  468. self._persist_node_execution_artifact(final_node_run)
  469. self._log_event(
  470. tenant_id=final_node_run.tenant_id,
  471. run_id=final_node_run.run_id,
  472. node_run_id=final_node_run.id,
  473. event_type="node_execution_finished",
  474. message=f"node execution finished with status {final_node_run.status}",
  475. detail_json={
  476. "node_id": final_node_run.node_id,
  477. "node_type": final_node_run.node_type,
  478. "executor_name": executor_name,
  479. "status": final_node_run.status,
  480. },
  481. )
  482. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  483. if workflow_run is None:
  484. return None
  485. return workflow_run, final_node_run, executor_name
  486. def execute_next_node_run(
  487. self,
  488. tenant_id: str,
  489. run_id: str,
  490. payload: NodeRunExecuteRequest,
  491. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  492. next_node_run = self.node_run_repository.get_next_queued_by_run(
  493. tenant_id=tenant_id,
  494. run_id=run_id,
  495. )
  496. if next_node_run is None:
  497. return None
  498. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  499. def execute_run(
  500. self,
  501. tenant_id: str,
  502. run_id: str,
  503. payload: RunExecuteRequest,
  504. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  505. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  506. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  507. return None
  508. executed_node_runs: list[NodeRun] = []
  509. executor_names: list[str] = []
  510. for _ in range(payload.max_steps):
  511. step_result = self.execute_next_node_run(
  512. tenant_id=tenant_id,
  513. run_id=run_id,
  514. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  515. )
  516. if step_result is None:
  517. break
  518. workflow_run, node_run, executor_name = step_result
  519. executed_node_runs.append(node_run)
  520. executor_names.append(executor_name)
  521. if node_run.status != "completed":
  522. break
  523. final_run = self.workflow_run_repository.get_by_id(run_id)
  524. if final_run is None:
  525. return None
  526. return final_run, executed_node_runs, executor_names
  527. def pause_run(
  528. self,
  529. *,
  530. tenant_id: str,
  531. run_id: str,
  532. reason: str = "debug_pause",
  533. ) -> RuntimeDebugSnapshot | None:
  534. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  535. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  536. return None
  537. self._sync_workflow_run_status_from_nodes(tenant_id=tenant_id, run_id=run_id)
  538. latest_run = self.workflow_run_repository.get_by_id(run_id)
  539. if latest_run is None:
  540. return None
  541. if latest_run.status in {"completed", "failed", "cancelled"}:
  542. paused_run = latest_run
  543. else:
  544. paused_run = self.workflow_run_repository.update_status(
  545. run_id=run_id,
  546. status="paused",
  547. )
  548. if paused_run is None:
  549. return None
  550. self._log_event(
  551. tenant_id=tenant_id,
  552. run_id=run_id,
  553. node_run_id=None,
  554. event_type="debug_run_paused",
  555. message=f"workflow run paused: {reason}",
  556. detail_json={"reason": reason},
  557. )
  558. return self._build_debug_snapshot(paused_run)
  559. def resume_run(
  560. self,
  561. *,
  562. tenant_id: str,
  563. run_id: str,
  564. reason: str = "debug_resume",
  565. ) -> RuntimeDebugSnapshot | None:
  566. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  567. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  568. return None
  569. resumed_run = self.workflow_run_repository.update_status(
  570. run_id=run_id,
  571. status="running",
  572. )
  573. if resumed_run is None:
  574. return None
  575. self._log_event(
  576. tenant_id=tenant_id,
  577. run_id=run_id,
  578. node_run_id=None,
  579. event_type="debug_run_resumed",
  580. message=f"workflow run resumed: {reason}",
  581. detail_json={"reason": reason},
  582. )
  583. return self._build_debug_snapshot(resumed_run)
  584. def step_debug_run(
  585. self,
  586. *,
  587. tenant_id: str,
  588. run_id: str,
  589. worker_key: str | None = None,
  590. ) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str] | None:
  591. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  592. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  593. return None
  594. if workflow_run.status == "paused":
  595. self.workflow_run_repository.update_status(run_id=run_id, status="running")
  596. result = self.execute_next_node_run(
  597. tenant_id=tenant_id,
  598. run_id=run_id,
  599. payload=NodeRunExecuteRequest(worker_key=worker_key),
  600. )
  601. executed_node_runs: list[NodeRun] = []
  602. executor_names: list[str] = []
  603. reason = "no_queued_node"
  604. if result is not None:
  605. _, node_run, executor_name = result
  606. executed_node_runs.append(node_run)
  607. executor_names.append(executor_name)
  608. reason = "step_completed"
  609. self._sync_workflow_run_status_from_nodes(tenant_id=tenant_id, run_id=run_id)
  610. latest_run = self.workflow_run_repository.get_by_id(run_id)
  611. if latest_run is None:
  612. return None
  613. if latest_run.status in {"completed", "failed", "cancelled"}:
  614. paused_run = latest_run
  615. else:
  616. paused_run = self.workflow_run_repository.update_status(
  617. run_id=run_id,
  618. status="paused",
  619. )
  620. if paused_run is None:
  621. return None
  622. self._log_event(
  623. tenant_id=tenant_id,
  624. run_id=run_id,
  625. node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
  626. event_type="debug_step_finished",
  627. message=f"debug step finished: {reason}",
  628. detail_json={
  629. "reason": reason,
  630. "executed_node_ids": [item.node_id for item in executed_node_runs],
  631. },
  632. )
  633. return self._build_debug_snapshot(paused_run), executed_node_runs, executor_names, reason
  634. def continue_debug_run(
  635. self,
  636. *,
  637. tenant_id: str,
  638. run_id: str,
  639. payload: RuntimeDebugContinueRequest,
  640. ) -> tuple[RuntimeDebugSnapshot, list[NodeRun], list[str], str | None, str] | None:
  641. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  642. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  643. return None
  644. if workflow_run.status == "paused":
  645. self.workflow_run_repository.update_status(run_id=run_id, status="running")
  646. breakpoint_node_ids = set(payload.breakpoint_node_ids)
  647. executed_node_runs: list[NodeRun] = []
  648. executor_names: list[str] = []
  649. paused_before_node_id: str | None = None
  650. reason = "completed"
  651. for _ in range(payload.max_steps):
  652. next_node_run = self.node_run_repository.get_next_queued_by_run(
  653. tenant_id=tenant_id,
  654. run_id=run_id,
  655. )
  656. if next_node_run is None:
  657. reason = "no_queued_node"
  658. break
  659. if next_node_run.node_id in breakpoint_node_ids:
  660. paused_before_node_id = next_node_run.node_id
  661. reason = "breakpoint_hit"
  662. break
  663. step_result = self.execute_node_run(
  664. node_run_id=next_node_run.id,
  665. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  666. )
  667. if step_result is None:
  668. reason = "node_not_found"
  669. break
  670. _, node_run, executor_name = step_result
  671. executed_node_runs.append(node_run)
  672. executor_names.append(executor_name)
  673. if node_run.status != "completed":
  674. reason = f"node_{node_run.status}"
  675. break
  676. else:
  677. reason = "max_steps_reached"
  678. self._sync_workflow_run_status_from_nodes(tenant_id=tenant_id, run_id=run_id)
  679. latest_run = self.workflow_run_repository.get_by_id(run_id)
  680. if latest_run is None:
  681. return None
  682. if latest_run.status in {"completed", "failed", "cancelled"}:
  683. paused_run = latest_run
  684. else:
  685. paused_run = self.workflow_run_repository.update_status(
  686. run_id=run_id,
  687. status="paused",
  688. )
  689. if paused_run is None:
  690. return None
  691. self._log_event(
  692. tenant_id=tenant_id,
  693. run_id=run_id,
  694. node_run_id=executed_node_runs[-1].id if executed_node_runs else None,
  695. event_type="debug_continue_paused",
  696. message=f"debug continue paused: {reason}",
  697. detail_json={
  698. "reason": reason,
  699. "paused_before_node_id": paused_before_node_id,
  700. "executed_node_ids": [item.node_id for item in executed_node_runs],
  701. "breakpoint_node_ids": list(breakpoint_node_ids),
  702. },
  703. )
  704. return (
  705. self._build_debug_snapshot(paused_run),
  706. executed_node_runs,
  707. executor_names,
  708. paused_before_node_id,
  709. reason,
  710. )
  711. def resume_human_node_run(
  712. self,
  713. *,
  714. node_run_id: str,
  715. payload: HumanNodeResumeRequest,
  716. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  717. node_run = self.node_run_repository.get_by_id(node_run_id)
  718. if node_run is None or node_run.tenant_id != payload.tenant_id:
  719. return None
  720. output_json = dict(node_run.output_json or {})
  721. existing_human_task_id = output_json.get("human_task_id")
  722. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  723. return None
  724. if existing_human_task_id is None:
  725. output_json["human_task_id"] = payload.human_task_id
  726. self.node_run_repository.update_status(
  727. node_run_id=node_run.id,
  728. status="pending",
  729. worker_key=payload.worker_key,
  730. output_json=output_json,
  731. )
  732. return self.execute_node_run(
  733. node_run_id=node_run_id,
  734. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  735. )
  736. def execute_next_claimed_node_run(
  737. self,
  738. *,
  739. worker_key: str,
  740. lease_seconds: int,
  741. redis_client: object | None = None,
  742. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  743. released_lease_count = self.node_run_repository.release_expired_leases(
  744. now_time=datetime.utcnow(),
  745. )
  746. claimed_node_run = self.node_run_repository.claim_next_queued(
  747. worker_key=worker_key,
  748. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  749. )
  750. if claimed_node_run is None:
  751. return None
  752. if redis_client is not None:
  753. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  754. lock = DistributedLock(
  755. client=redis_client,
  756. name=f"node-run:{claimed_node_run.id}:lock",
  757. ttl_seconds=lease_seconds,
  758. )
  759. if not lock.acquire():
  760. return None
  761. idempotency_store = IdempotencyStore(
  762. client=redis_client,
  763. prefix="node-run-idempotency",
  764. )
  765. if not idempotency_store.begin(key=claimed_node_run.id):
  766. lock.release()
  767. return None
  768. else:
  769. lock = None
  770. idempotency_store = None
  771. try:
  772. result = self.execute_node_run(
  773. node_run_id=claimed_node_run.id,
  774. payload=NodeRunExecuteRequest(worker_key=worker_key),
  775. )
  776. if idempotency_store is not None and result is not None:
  777. _, node_run, executor_name = result
  778. idempotency_store.complete(
  779. key=claimed_node_run.id,
  780. result={
  781. "status": node_run.status,
  782. "node_run_id": node_run.id,
  783. "executor_name": executor_name,
  784. },
  785. )
  786. finally:
  787. if lock is not None:
  788. lock.release()
  789. if result is None:
  790. return None
  791. workflow_run, node_run, executor_name = result
  792. return workflow_run, node_run, executor_name, released_lease_count
  793. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  794. if node_run.output_text is None and node_run.output_json is None:
  795. return
  796. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  797. self.node_artifact_repository.create(
  798. tenant_id=node_run.tenant_id,
  799. run_id=node_run.run_id,
  800. node_run_id=node_run.id,
  801. node_id=node_run.node_id,
  802. artifact_type="execution_result",
  803. name=f"{node_run.node_id}-execution-result",
  804. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  805. content_text=node_run.output_text,
  806. content_json=node_run.output_json,
  807. size_bytes=size_bytes,
  808. )
  809. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  810. if self.workflow_client is None:
  811. return None
  812. workflow_version = self.workflow_client.get_workflow_version(
  813. tenant_id=payload.tenant_id,
  814. workflow_version_id=payload.workflow_version_id,
  815. )
  816. return derive_initial_node(workflow_version)
  817. def _resolve_node_config(
  818. self,
  819. *,
  820. tenant_id: str,
  821. workflow_version_id: str,
  822. node_id: str,
  823. ) -> dict[str, JSONValue]:
  824. if self.workflow_client is None:
  825. return {}
  826. workflow_version = self.workflow_client.get_workflow_version(
  827. tenant_id=tenant_id,
  828. workflow_version_id=workflow_version_id,
  829. )
  830. return derive_node_config(workflow_version, node_id)
  831. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  832. if self.workflow_client is None:
  833. return
  834. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  835. if workflow_run is None:
  836. return
  837. workflow_version = self.workflow_client.get_workflow_version(
  838. tenant_id=node_run.tenant_id,
  839. workflow_version_id=workflow_run.workflow_version_id,
  840. )
  841. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  842. self._build_run_state_maps(
  843. tenant_id=node_run.tenant_id,
  844. run_id=node_run.run_id,
  845. )
  846. )
  847. successor_nodes = derive_successor_nodes(
  848. workflow_version,
  849. node_run.node_id,
  850. current_output_json=node_run.output_json,
  851. run_state_json=run_state_json,
  852. node_output_json_by_node_id=node_output_json_by_node_id,
  853. node_output_text_by_node_id=node_output_text_by_node_id,
  854. )
  855. if not successor_nodes:
  856. return
  857. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  858. tenant_id=node_run.tenant_id,
  859. run_id=node_run.run_id,
  860. node_ids=[item.node_id for item in successor_nodes],
  861. )
  862. existing_node_counts: dict[str, int] = {}
  863. for item in existing_nodes:
  864. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  865. for successor in successor_nodes:
  866. successor_config = derive_node_config(workflow_version, successor.node_id)
  867. if not self._is_join_ready(
  868. workflow_version=workflow_version,
  869. run_node_runs=self.node_run_repository.list_by_run(
  870. tenant_id=node_run.tenant_id,
  871. run_id=node_run.run_id,
  872. ),
  873. successor_node_id=successor.node_id,
  874. successor_node_type=successor.node_type,
  875. successor_config=successor_config,
  876. ):
  877. self._log_event(
  878. tenant_id=node_run.tenant_id,
  879. run_id=node_run.run_id,
  880. node_run_id=None,
  881. event_type="join_waiting",
  882. message=f"join node waiting for predecessors: {successor.node_id}",
  883. detail_json={
  884. "node_id": successor.node_id,
  885. "source_node_id": node_run.node_id,
  886. },
  887. )
  888. continue
  889. if not self._can_schedule_repeated_node(
  890. successor_config,
  891. existing_count=existing_node_counts.get(successor.node_id, 0),
  892. ):
  893. continue
  894. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  895. created = self.node_run_repository.create(
  896. tenant_id=node_run.tenant_id,
  897. run_id=node_run.run_id,
  898. parent_node_run_id=node_run.id,
  899. node_id=successor.node_id,
  900. node_type=successor.node_type,
  901. status=successor.status,
  902. scheduled_time=scheduled_time,
  903. timeout_time=timeout_time,
  904. )
  905. self._publish_node_run_to_queue(created)
  906. existing_node_counts[successor.node_id] = (
  907. existing_node_counts.get(successor.node_id, 0) + 1
  908. )
  909. self._log_event(
  910. tenant_id=node_run.tenant_id,
  911. run_id=node_run.run_id,
  912. node_run_id=None,
  913. event_type="node_queued",
  914. message=f"successor node queued: {successor.node_id}",
  915. detail_json={
  916. "node_id": successor.node_id,
  917. "node_type": successor.node_type,
  918. "status": successor.status,
  919. "source_node_id": node_run.node_id,
  920. },
  921. )
  922. def _build_execution_context(
  923. self,
  924. *,
  925. workflow_run: WorkflowRun,
  926. node_run: NodeRun,
  927. worker_key: str | None,
  928. node_config_json: dict[str, JSONValue] | None = None,
  929. ) -> NodeExecutionContextContract:
  930. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  931. self._build_run_state_maps(
  932. tenant_id=node_run.tenant_id,
  933. run_id=node_run.run_id,
  934. )
  935. )
  936. return NodeExecutionContextContract(
  937. tenant_id=node_run.tenant_id,
  938. run_id=node_run.run_id,
  939. node_run_id=node_run.id,
  940. node_id=node_run.node_id,
  941. node_type=node_run.node_type,
  942. node_config_json=node_config_json
  943. if node_config_json is not None
  944. else self._resolve_node_config(
  945. tenant_id=node_run.tenant_id,
  946. workflow_version_id=workflow_run.workflow_version_id,
  947. node_id=node_run.node_id,
  948. ),
  949. run_state_json=run_state_json,
  950. node_output_json_by_node_id=node_output_json_by_node_id,
  951. node_output_text_by_node_id=node_output_text_by_node_id,
  952. worker_key=worker_key,
  953. )
  954. def _build_run_state_maps(
  955. self,
  956. *,
  957. tenant_id: str,
  958. run_id: str,
  959. ) -> tuple[
  960. dict[str, JSONValue],
  961. dict[str, dict[str, JSONValue]],
  962. dict[str, str],
  963. ]:
  964. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  965. run_state_json: dict[str, JSONValue] = {}
  966. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  967. node_output_text_by_node_id: dict[str, str] = {}
  968. for item in node_runs:
  969. if item.output_json is not None:
  970. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  971. state_updates = item.output_json.get("state_updates")
  972. if isinstance(state_updates, dict):
  973. for state_key, state_value in state_updates.items():
  974. run_state_json[str(state_key)] = state_value
  975. if item.output_text is not None:
  976. node_output_text_by_node_id[item.node_id] = item.output_text
  977. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  978. def _build_debug_snapshot(self, workflow_run: WorkflowRun) -> RuntimeDebugSnapshot:
  979. node_runs = self.node_run_repository.list_by_run(
  980. tenant_id=workflow_run.tenant_id,
  981. run_id=workflow_run.id,
  982. )
  983. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  984. self._build_run_state_maps(
  985. tenant_id=workflow_run.tenant_id,
  986. run_id=workflow_run.id,
  987. )
  988. )
  989. return RuntimeDebugSnapshot(
  990. run=workflow_run,
  991. node_runs=node_runs,
  992. run_state_json=run_state_json,
  993. node_output_json_by_node_id=node_output_json_by_node_id,
  994. node_output_text_by_node_id=node_output_text_by_node_id,
  995. queued_node_ids=[
  996. item.node_id for item in node_runs if item.status in {"pending", "queued"}
  997. ],
  998. running_node_ids=[item.node_id for item in node_runs if item.status == "running"],
  999. completed_node_ids=[
  1000. item.node_id for item in node_runs if item.status in {"completed", "skipped"}
  1001. ],
  1002. failed_node_ids=[item.node_id for item in node_runs if item.status == "failed"],
  1003. execution_logs=self.execution_log_repository.list_by_scope(
  1004. tenant_id=workflow_run.tenant_id,
  1005. run_id=workflow_run.id,
  1006. ),
  1007. node_artifacts=self.node_artifact_repository.list_by_scope(
  1008. tenant_id=workflow_run.tenant_id,
  1009. run_id=workflow_run.id,
  1010. ),
  1011. trace_spans=self.trace_span_repository.list_by_scope(
  1012. tenant_id=workflow_run.tenant_id,
  1013. run_id=workflow_run.id,
  1014. ),
  1015. )
  1016. def _build_node_timing(
  1017. self,
  1018. node_config_json: dict[str, JSONValue],
  1019. ) -> tuple[datetime, datetime | None]:
  1020. now = datetime.utcnow()
  1021. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  1022. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  1023. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  1024. timeout_time = (
  1025. scheduled_time + timedelta(seconds=timeout_seconds)
  1026. if timeout_seconds > 0
  1027. else None
  1028. )
  1029. return scheduled_time, timeout_time
  1030. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  1031. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  1032. def _should_retry_node(
  1033. self,
  1034. *,
  1035. node_run: NodeRun,
  1036. node_config_json: dict[str, JSONValue],
  1037. ) -> bool:
  1038. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  1039. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  1040. return max_attempts > node_run.attempt_no
  1041. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  1042. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  1043. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  1044. def _build_retry_timing(
  1045. self,
  1046. node_config_json: dict[str, JSONValue],
  1047. ) -> tuple[datetime, datetime | None]:
  1048. retry_time = datetime.utcnow() + timedelta(
  1049. seconds=self._read_retry_delay_seconds(node_config_json)
  1050. )
  1051. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  1052. timeout_time = (
  1053. retry_time + timedelta(seconds=timeout_seconds)
  1054. if timeout_seconds > 0
  1055. else None
  1056. )
  1057. return retry_time, timeout_time
  1058. def _is_join_ready(
  1059. self,
  1060. *,
  1061. workflow_version: WorkflowVersionContract,
  1062. run_node_runs: list[NodeRun],
  1063. successor_node_id: str,
  1064. successor_node_type: str,
  1065. successor_config: dict[str, JSONValue],
  1066. ) -> bool:
  1067. join_policy = self._read_string_value(successor_config, "join_policy")
  1068. if join_policy is None and successor_node_type != "join":
  1069. return True
  1070. workflow = self._parse_workflow(workflow_version)
  1071. if workflow is None:
  1072. return True
  1073. predecessor_ids = [
  1074. edge.source for edge in workflow.edges if edge.target == successor_node_id
  1075. ]
  1076. if not predecessor_ids:
  1077. return True
  1078. completed_node_ids = {
  1079. item.node_id
  1080. for item in run_node_runs
  1081. if item.status in {"completed", "skipped"}
  1082. }
  1083. if join_policy in {None, "all_completed"}:
  1084. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  1085. if join_policy == "any_completed":
  1086. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  1087. return True
  1088. def _can_schedule_repeated_node(
  1089. self,
  1090. node_config_json: dict[str, JSONValue],
  1091. *,
  1092. existing_count: int,
  1093. ) -> bool:
  1094. if existing_count == 0:
  1095. return True
  1096. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  1097. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  1098. return allow_loop and existing_count < max_iterations
  1099. def _schedule_compensation_node(
  1100. self,
  1101. *,
  1102. node_run: NodeRun,
  1103. node_config: dict[str, JSONValue],
  1104. ) -> None:
  1105. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  1106. if compensation_node_id is None:
  1107. compensation_config = self._read_dict_value(node_config, "compensation")
  1108. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  1109. if compensation_node_id is None:
  1110. return
  1111. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  1112. if workflow_run is None:
  1113. return
  1114. compensation_config = self._resolve_node_config(
  1115. tenant_id=node_run.tenant_id,
  1116. workflow_version_id=workflow_run.workflow_version_id,
  1117. node_id=compensation_node_id,
  1118. )
  1119. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  1120. tenant_id=node_run.tenant_id,
  1121. run_id=node_run.run_id,
  1122. node_ids=[compensation_node_id],
  1123. )
  1124. if existing_nodes and not self._can_schedule_repeated_node(
  1125. compensation_config,
  1126. existing_count=len(existing_nodes),
  1127. ):
  1128. return
  1129. compensation_node_type = self._resolve_workflow_node_type(
  1130. tenant_id=node_run.tenant_id,
  1131. workflow_version_id=workflow_run.workflow_version_id,
  1132. node_id=compensation_node_id,
  1133. ) or "compensation"
  1134. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  1135. created = self.node_run_repository.create(
  1136. tenant_id=node_run.tenant_id,
  1137. run_id=node_run.run_id,
  1138. parent_node_run_id=node_run.id,
  1139. node_id=compensation_node_id,
  1140. node_type=compensation_node_type,
  1141. status="queued",
  1142. scheduled_time=scheduled_time,
  1143. timeout_time=timeout_time,
  1144. )
  1145. self._publish_node_run_to_queue(created)
  1146. self._log_event(
  1147. tenant_id=node_run.tenant_id,
  1148. run_id=node_run.run_id,
  1149. node_run_id=created.id,
  1150. event_type="compensation_queued",
  1151. message=f"compensation node queued: {compensation_node_id}",
  1152. detail_json={
  1153. "failed_node_id": node_run.node_id,
  1154. "compensation_node_id": compensation_node_id,
  1155. },
  1156. )
  1157. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  1158. return parse_workflow_definition(workflow_version.dsl_json)
  1159. def _resolve_workflow_node_type(
  1160. self,
  1161. *,
  1162. tenant_id: str,
  1163. workflow_version_id: str,
  1164. node_id: str,
  1165. ) -> str | None:
  1166. if self.workflow_client is None:
  1167. return None
  1168. workflow_version = self.workflow_client.get_workflow_version(
  1169. tenant_id=tenant_id,
  1170. workflow_version_id=workflow_version_id,
  1171. )
  1172. workflow = self._parse_workflow(workflow_version)
  1173. if workflow is None:
  1174. return None
  1175. for node in workflow.nodes:
  1176. if node.id == node_id:
  1177. return node.type
  1178. return None
  1179. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  1180. value = payload.get(key)
  1181. if isinstance(value, str) and value:
  1182. return value
  1183. return None
  1184. def _read_bool_value(
  1185. self,
  1186. payload: dict[str, JSONValue],
  1187. key: str,
  1188. *,
  1189. default: bool,
  1190. ) -> bool:
  1191. value = payload.get(key)
  1192. if isinstance(value, bool):
  1193. return value
  1194. return default
  1195. def _read_int_value(
  1196. self,
  1197. payload: dict[str, JSONValue],
  1198. key: str,
  1199. *,
  1200. default: int,
  1201. ) -> int:
  1202. value = payload.get(key)
  1203. if isinstance(value, int) and not isinstance(value, bool):
  1204. return value
  1205. return default
  1206. def _read_dict_value(
  1207. self,
  1208. payload: dict[str, JSONValue],
  1209. key: str,
  1210. ) -> dict[str, JSONValue]:
  1211. value = payload.get(key)
  1212. if isinstance(value, dict):
  1213. return {str(item_key): item_value for item_key, item_value in value.items()}
  1214. return {}
  1215. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  1216. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  1217. if not node_runs:
  1218. return
  1219. self.workflow_run_repository.update_node_count(
  1220. run_id=run_id,
  1221. current_node_count=len(node_runs),
  1222. )
  1223. next_status, error_code, error_message = self._derive_run_status(node_runs)
  1224. self.workflow_run_repository.update_status(
  1225. run_id=run_id,
  1226. status=next_status,
  1227. error_code=error_code,
  1228. error_message=error_message,
  1229. )
  1230. self._log_event(
  1231. tenant_id=tenant_id,
  1232. run_id=run_id,
  1233. node_run_id=None,
  1234. event_type="run_status_synced",
  1235. message=f"workflow run status synced to {next_status}",
  1236. detail_json={
  1237. "status": next_status,
  1238. "error_code": error_code,
  1239. },
  1240. )
  1241. def _derive_run_status(
  1242. self,
  1243. node_runs: list[NodeRun],
  1244. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  1245. statuses = {node_run.status for node_run in node_runs}
  1246. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  1247. if statuses.intersection(active_statuses):
  1248. return "running", None, None
  1249. if "failed" in statuses:
  1250. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  1251. error_code = failed_node.error_code if failed_node is not None else None
  1252. error_message = failed_node.error_message if failed_node is not None else None
  1253. return "failed", error_code, error_message
  1254. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  1255. if statuses and statuses.issubset(terminal_statuses):
  1256. return "completed", None, None
  1257. return "running", None, None
  1258. def _log_event(
  1259. self,
  1260. *,
  1261. tenant_id: str,
  1262. run_id: str,
  1263. node_run_id: str | None,
  1264. event_type: str,
  1265. message: str,
  1266. detail_json: dict[str, JSONValue] | None,
  1267. level: str = "info",
  1268. ) -> None:
  1269. self.execution_log_repository.create(
  1270. tenant_id=tenant_id,
  1271. run_id=run_id,
  1272. node_run_id=node_run_id,
  1273. event_type=event_type,
  1274. level=level,
  1275. message=message,
  1276. detail_json=detail_json,
  1277. )
  1278. def _publish_event(
  1279. self,
  1280. *,
  1281. event_type: str,
  1282. payload_json: dict[str, JSONValue],
  1283. workflow_run: WorkflowRun | None = None,
  1284. node_run: NodeRun | None = None,
  1285. ) -> None:
  1286. if self.event_client is None:
  1287. return
  1288. tenant_id = workflow_run.tenant_id if workflow_run is not None else None
  1289. if tenant_id is None and node_run is not None:
  1290. tenant_id = node_run.tenant_id
  1291. if tenant_id is None:
  1292. return
  1293. aggregate_id = workflow_run.id if workflow_run is not None else None
  1294. if aggregate_id is None and node_run is not None:
  1295. aggregate_id = node_run.id
  1296. aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
  1297. correlation_id = workflow_run.session_id if workflow_run is not None else None
  1298. if correlation_id is None and node_run is not None:
  1299. correlation_id = node_run.run_id
  1300. try:
  1301. self.event_client.publish_event(
  1302. EventPublishContract(
  1303. tenant_id=tenant_id,
  1304. event_type=event_type,
  1305. source_service="runtime-service",
  1306. aggregate_type=aggregate_type,
  1307. aggregate_id=aggregate_id,
  1308. correlation_id=correlation_id,
  1309. payload_json=payload_json,
  1310. )
  1311. )
  1312. except EventServiceClientError:
  1313. return
  1314. def _publish_node_run_to_queue(self, node_run: NodeRun) -> None:
  1315. if node_run.status != "queued" or self.task_queue_publisher is None:
  1316. return
  1317. self.task_queue_publisher.publish_runtime_node_run(
  1318. tenant_id=node_run.tenant_id,
  1319. node_run_id=node_run.id,
  1320. )
  1321. def build_runtime_application_service(
  1322. *,
  1323. db: Session,
  1324. settings: RuntimeServiceSettings,
  1325. ) -> RuntimeApplicationService:
  1326. redis_client = try_build_redis_client(settings.redis_url)
  1327. return RuntimeApplicationService(
  1328. workflow_run_repository=WorkflowRunRepository(db),
  1329. node_run_repository=NodeRunRepository(db),
  1330. execution_log_repository=ExecutionLogRepository(db),
  1331. node_artifact_repository=NodeArtifactRepository(db),
  1332. trace_span_repository=TraceSpanRepository(db),
  1333. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  1334. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  1335. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  1336. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  1337. human_client=HumanServiceClient(
  1338. base_url=settings.human_service_url,
  1339. timeout_seconds=settings.human_service_timeout_seconds,
  1340. ),
  1341. knowledge_client=KnowledgeServiceClient(
  1342. base_url=settings.knowledge_service_url,
  1343. timeout_seconds=settings.knowledge_service_timeout_seconds,
  1344. ),
  1345. ),
  1346. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  1347. event_client=EventServiceClient(
  1348. base_url=settings.event_service_url,
  1349. timeout_seconds=settings.event_service_timeout_seconds,
  1350. ),
  1351. task_queue_publisher=(
  1352. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1353. ),
  1354. )