services.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_dsl import parse_workflow_definition
  4. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  5. from core_domain import (
  6. InitialNodeContract,
  7. NodeExecutionContextContract,
  8. NodeExecutionResultContract,
  9. NodeRunStatus,
  10. WorkflowVersionContract,
  11. WorkflowRunStatus,
  12. )
  13. from app.db.models import NodeRun, WorkflowRun
  14. from app.domain.repositories import (
  15. ExecutionLogRepository,
  16. NodeArtifactRepository,
  17. NodeRunRepository,
  18. TraceSpanRepository,
  19. WorkflowRunRepository,
  20. )
  21. from app.infrastructure.executors import (
  22. NodeExecutionDispatcher,
  23. build_node_execution_dispatcher_with_clients,
  24. )
  25. from app.infrastructure.human_client import HumanServiceClient
  26. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  27. from app.infrastructure.code_runner_client import CodeRunnerClient
  28. from app.infrastructure.model_gateway_client import ModelGatewayClient
  29. from app.infrastructure.planner import (
  30. derive_initial_node,
  31. derive_node_config,
  32. derive_successor_nodes,
  33. )
  34. from app.infrastructure.tool_client import ToolServiceClient
  35. from app.infrastructure.workflow_client import WorkflowServiceClient
  36. from app.bootstrap.settings import RuntimeServiceSettings
  37. from app.schemas.run import (
  38. NodeRunExecuteRequest,
  39. HumanNodeResumeRequest,
  40. NodeRunStatusUpdateRequest,
  41. RunCreateRequest,
  42. RunExecuteRequest,
  43. WorkflowRunStatusUpdateRequest,
  44. )
  45. from core_shared import JSONValue, try_build_redis_client
  46. from core_shared.task_queue import TaskQueuePublisher
  47. class RuntimeApplicationService:
  48. def __init__(
  49. self,
  50. workflow_run_repository: WorkflowRunRepository,
  51. node_run_repository: NodeRunRepository,
  52. execution_log_repository: ExecutionLogRepository,
  53. node_artifact_repository: NodeArtifactRepository,
  54. trace_span_repository: TraceSpanRepository,
  55. execution_dispatcher: NodeExecutionDispatcher,
  56. workflow_client: WorkflowServiceClient | None = None,
  57. event_client: EventServiceClient | None = None,
  58. task_queue_publisher: TaskQueuePublisher | None = None,
  59. ) -> None:
  60. self.workflow_run_repository = workflow_run_repository
  61. self.node_run_repository = node_run_repository
  62. self.execution_log_repository = execution_log_repository
  63. self.node_artifact_repository = node_artifact_repository
  64. self.trace_span_repository = trace_span_repository
  65. self.execution_dispatcher = execution_dispatcher
  66. self.workflow_client = workflow_client
  67. self.event_client = event_client
  68. self.task_queue_publisher = task_queue_publisher
  69. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  70. initial_node = payload.initial_node or self._plan_initial_node(payload)
  71. workflow_run = self.workflow_run_repository.create(
  72. tenant_id=payload.tenant_id,
  73. app_id=payload.app_id,
  74. app_version_id=payload.app_version_id,
  75. workflow_id=payload.workflow_id,
  76. workflow_version_id=payload.workflow_version_id,
  77. session_id=payload.session_id,
  78. parent_run_id=payload.parent_run_id,
  79. root_run_id=payload.root_run_id,
  80. run_type=payload.run_type,
  81. trigger_type=payload.trigger_type,
  82. priority=payload.priority,
  83. )
  84. node_run = None
  85. if initial_node is not None:
  86. self.workflow_run_repository.update_node_count(
  87. run_id=workflow_run.id,
  88. current_node_count=1,
  89. )
  90. initial_config = self._resolve_node_config(
  91. tenant_id=payload.tenant_id,
  92. workflow_version_id=payload.workflow_version_id,
  93. node_id=initial_node.node_id,
  94. )
  95. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  96. node_run = self.node_run_repository.create(
  97. tenant_id=payload.tenant_id,
  98. run_id=workflow_run.id,
  99. node_id=initial_node.node_id,
  100. node_type=initial_node.node_type,
  101. status=initial_node.status,
  102. scheduled_time=scheduled_time,
  103. timeout_time=timeout_time,
  104. )
  105. self._log_event(
  106. tenant_id=payload.tenant_id,
  107. run_id=workflow_run.id,
  108. node_run_id=node_run.id,
  109. event_type="node_queued",
  110. message=f"initial node queued: {initial_node.node_id}",
  111. detail_json={
  112. "node_id": initial_node.node_id,
  113. "node_type": initial_node.node_type,
  114. "status": initial_node.status,
  115. },
  116. )
  117. self._publish_node_run_to_queue(node_run)
  118. self._log_event(
  119. tenant_id=payload.tenant_id,
  120. run_id=workflow_run.id,
  121. node_run_id=node_run.id if node_run is not None else None,
  122. event_type="run_created",
  123. message="workflow run created",
  124. detail_json={
  125. "workflow_id": payload.workflow_id,
  126. "workflow_version_id": payload.workflow_version_id,
  127. "session_id": payload.session_id,
  128. },
  129. )
  130. self._publish_event(
  131. event_type="workflow.run.created",
  132. workflow_run=workflow_run,
  133. payload_json={
  134. "run_id": workflow_run.id,
  135. "workflow_id": workflow_run.workflow_id,
  136. "workflow_version_id": workflow_run.workflow_version_id,
  137. "status": workflow_run.status,
  138. },
  139. )
  140. return workflow_run, node_run
  141. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  142. return self.workflow_run_repository.list_by_scope(
  143. tenant_id=tenant_id,
  144. session_id=session_id,
  145. )
  146. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  147. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  148. def list_execution_logs(
  149. self,
  150. tenant_id: str,
  151. run_id: str | None = None,
  152. node_run_id: str | None = None,
  153. ):
  154. return self.execution_log_repository.list_by_scope(
  155. tenant_id=tenant_id,
  156. run_id=run_id,
  157. node_run_id=node_run_id,
  158. )
  159. def list_node_artifacts(
  160. self,
  161. tenant_id: str,
  162. run_id: str | None = None,
  163. node_run_id: str | None = None,
  164. artifact_type: str | None = None,
  165. ):
  166. return self.node_artifact_repository.list_by_scope(
  167. tenant_id=tenant_id,
  168. run_id=run_id,
  169. node_run_id=node_run_id,
  170. artifact_type=artifact_type,
  171. )
  172. def list_trace_spans(
  173. self,
  174. tenant_id: str,
  175. run_id: str | None = None,
  176. node_run_id: str | None = None,
  177. span_type: str | None = None,
  178. ):
  179. return self.trace_span_repository.list_by_scope(
  180. tenant_id=tenant_id,
  181. run_id=run_id,
  182. node_run_id=node_run_id,
  183. span_type=span_type,
  184. )
  185. def update_run_status(
  186. self,
  187. run_id: str,
  188. payload: WorkflowRunStatusUpdateRequest,
  189. ) -> WorkflowRun | None:
  190. entity = self.workflow_run_repository.update_status(
  191. run_id=run_id,
  192. status=payload.status,
  193. error_code=payload.error_code,
  194. error_message=payload.error_message,
  195. )
  196. if entity is not None:
  197. self._publish_event(
  198. event_type=f"workflow.run.{entity.status}",
  199. workflow_run=entity,
  200. payload_json={
  201. "run_id": entity.id,
  202. "status": entity.status,
  203. "error_code": entity.error_code,
  204. },
  205. )
  206. return entity
  207. def update_node_run_status(
  208. self,
  209. node_run_id: str,
  210. payload: NodeRunStatusUpdateRequest,
  211. ) -> NodeRun | None:
  212. node_run = self.node_run_repository.update_status(
  213. node_run_id=node_run_id,
  214. status=payload.status,
  215. worker_key=payload.worker_key,
  216. error_code=payload.error_code,
  217. error_message=payload.error_message,
  218. output_text=payload.output_text,
  219. output_json=payload.output_json,
  220. )
  221. if node_run is None:
  222. return None
  223. self._log_event(
  224. tenant_id=node_run.tenant_id,
  225. run_id=node_run.run_id,
  226. node_run_id=node_run.id,
  227. event_type="node_status_updated",
  228. message=f"node status updated to {payload.status}",
  229. detail_json={
  230. "node_id": node_run.node_id,
  231. "node_type": node_run.node_type,
  232. "status": payload.status,
  233. "error_code": payload.error_code,
  234. },
  235. )
  236. self._publish_event(
  237. event_type=f"workflow.node.{node_run.status}",
  238. workflow_run=None,
  239. node_run=node_run,
  240. payload_json={
  241. "run_id": node_run.run_id,
  242. "node_run_id": node_run.id,
  243. "node_id": node_run.node_id,
  244. "node_type": node_run.node_type,
  245. "status": node_run.status,
  246. "error_code": node_run.error_code,
  247. },
  248. )
  249. if payload.status == "completed":
  250. self._schedule_successor_nodes(node_run)
  251. if payload.status == "failed":
  252. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  253. if workflow_run is not None:
  254. node_config = self._resolve_node_config(
  255. tenant_id=node_run.tenant_id,
  256. workflow_version_id=workflow_run.workflow_version_id,
  257. node_id=node_run.node_id,
  258. )
  259. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  260. self._sync_workflow_run_status_from_nodes(
  261. tenant_id=node_run.tenant_id,
  262. run_id=node_run.run_id,
  263. )
  264. return node_run
  265. def execute_node_run(
  266. self,
  267. node_run_id: str,
  268. payload: NodeRunExecuteRequest,
  269. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  270. node_run = self.node_run_repository.get_by_id(node_run_id)
  271. if node_run is None:
  272. return None
  273. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  274. if workflow_run is None:
  275. return None
  276. if node_run.status in {"completed", "failed", "skipped"}:
  277. executor_name = self.execution_dispatcher.resolve_executor(
  278. node_run.node_type
  279. ).executor_name
  280. return workflow_run, node_run, executor_name
  281. node_config = self._resolve_node_config(
  282. tenant_id=node_run.tenant_id,
  283. workflow_version_id=workflow_run.workflow_version_id,
  284. node_id=node_run.node_id,
  285. )
  286. if self._node_has_timed_out(node_run):
  287. timed_out_node_run = self.update_node_run_status(
  288. node_run_id=node_run.id,
  289. payload=NodeRunStatusUpdateRequest(
  290. status="failed",
  291. worker_key=payload.worker_key,
  292. error_code="node_timeout",
  293. error_message=f"node timed out: {node_run.node_id}",
  294. output_json={
  295. "timeout_time": node_run.timeout_time.isoformat()
  296. if node_run.timeout_time is not None
  297. else None,
  298. },
  299. ),
  300. )
  301. if timed_out_node_run is None:
  302. return None
  303. executor_name = self.execution_dispatcher.resolve_executor(
  304. node_run.node_type
  305. ).executor_name
  306. return workflow_run, timed_out_node_run, executor_name
  307. running_node_run = self.node_run_repository.update_status(
  308. node_run_id=node_run_id,
  309. status="running",
  310. worker_key=payload.worker_key,
  311. )
  312. if running_node_run is None:
  313. return None
  314. self._log_event(
  315. tenant_id=running_node_run.tenant_id,
  316. run_id=running_node_run.run_id,
  317. node_run_id=running_node_run.id,
  318. event_type="node_execution_started",
  319. message=f"executing node {running_node_run.node_id}",
  320. detail_json={
  321. "node_id": running_node_run.node_id,
  322. "node_type": running_node_run.node_type,
  323. "worker_key": payload.worker_key,
  324. },
  325. )
  326. context = self._build_execution_context(
  327. workflow_run=workflow_run,
  328. node_run=running_node_run,
  329. worker_key=payload.worker_key,
  330. node_config_json=node_config,
  331. )
  332. executor_name = self.execution_dispatcher.resolve_executor(
  333. running_node_run.node_type
  334. ).executor_name
  335. trace_span = self.trace_span_repository.start(
  336. tenant_id=running_node_run.tenant_id,
  337. run_id=running_node_run.run_id,
  338. node_run_id=running_node_run.id,
  339. parent_span_id=None,
  340. span_type="node_execution",
  341. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  342. attributes_json={
  343. "node_id": running_node_run.node_id,
  344. "node_type": running_node_run.node_type,
  345. "executor_name": executor_name,
  346. "worker_key": payload.worker_key,
  347. },
  348. )
  349. try:
  350. result, executor_name = self.execution_dispatcher.execute(
  351. context=context,
  352. request=payload,
  353. )
  354. except Exception as exc:
  355. result = NodeExecutionResultContract(
  356. status="failed",
  357. worker_key=payload.worker_key,
  358. error_code="executor_error",
  359. error_message=str(exc),
  360. )
  361. if result.status == "failed" and self._should_retry_node(
  362. node_run=running_node_run,
  363. node_config_json=context.node_config_json,
  364. ):
  365. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  366. retried_node_run = self.node_run_repository.requeue_for_retry(
  367. node_run_id=running_node_run.id,
  368. scheduled_time=retry_time,
  369. timeout_time=retry_timeout_time,
  370. error_code=result.error_code,
  371. error_message=result.error_message,
  372. output_text=result.output_text,
  373. output_json={
  374. **(result.output_json or {}),
  375. "retry_scheduled_time": retry_time.isoformat(),
  376. "retry_reason": result.error_code or "node_failed",
  377. },
  378. )
  379. if retried_node_run is None:
  380. return None
  381. self._publish_node_run_to_queue(retried_node_run)
  382. self.trace_span_repository.finish(
  383. span_id=trace_span.id,
  384. status="error",
  385. error_code=result.error_code,
  386. error_message=result.error_message,
  387. attributes_json={
  388. "node_status": "queued",
  389. "executor_name": executor_name,
  390. "retry_scheduled": True,
  391. "attempt_no": retried_node_run.attempt_no,
  392. },
  393. )
  394. self._log_event(
  395. tenant_id=retried_node_run.tenant_id,
  396. run_id=retried_node_run.run_id,
  397. node_run_id=retried_node_run.id,
  398. event_type="node_retry_scheduled",
  399. message=f"node retry scheduled: {retried_node_run.node_id}",
  400. detail_json={
  401. "node_id": retried_node_run.node_id,
  402. "attempt_no": retried_node_run.attempt_no,
  403. "scheduled_time": retry_time.isoformat(),
  404. "error_code": result.error_code,
  405. },
  406. )
  407. self._sync_workflow_run_status_from_nodes(
  408. tenant_id=retried_node_run.tenant_id,
  409. run_id=retried_node_run.run_id,
  410. )
  411. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  412. if workflow_run is None:
  413. return None
  414. return workflow_run, retried_node_run, executor_name
  415. final_node_run = self.update_node_run_status(
  416. node_run_id=running_node_run.id,
  417. payload=NodeRunStatusUpdateRequest(
  418. status=result.status,
  419. worker_key=result.worker_key,
  420. error_code=result.error_code,
  421. error_message=result.error_message,
  422. output_text=result.output_text,
  423. output_json=result.output_json,
  424. ),
  425. )
  426. if final_node_run is None:
  427. return None
  428. self.trace_span_repository.finish(
  429. span_id=trace_span.id,
  430. status="ok" if final_node_run.status == "completed" else "error",
  431. error_code=final_node_run.error_code,
  432. error_message=final_node_run.error_message,
  433. attributes_json={
  434. "node_status": final_node_run.status,
  435. "executor_name": executor_name,
  436. "has_output_text": final_node_run.output_text is not None,
  437. "has_output_json": final_node_run.output_json is not None,
  438. },
  439. )
  440. self._persist_node_execution_artifact(final_node_run)
  441. self._log_event(
  442. tenant_id=final_node_run.tenant_id,
  443. run_id=final_node_run.run_id,
  444. node_run_id=final_node_run.id,
  445. event_type="node_execution_finished",
  446. message=f"node execution finished with status {final_node_run.status}",
  447. detail_json={
  448. "node_id": final_node_run.node_id,
  449. "node_type": final_node_run.node_type,
  450. "executor_name": executor_name,
  451. "status": final_node_run.status,
  452. },
  453. )
  454. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  455. if workflow_run is None:
  456. return None
  457. return workflow_run, final_node_run, executor_name
  458. def execute_next_node_run(
  459. self,
  460. tenant_id: str,
  461. run_id: str,
  462. payload: NodeRunExecuteRequest,
  463. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  464. next_node_run = self.node_run_repository.get_next_queued_by_run(
  465. tenant_id=tenant_id,
  466. run_id=run_id,
  467. )
  468. if next_node_run is None:
  469. return None
  470. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  471. def execute_run(
  472. self,
  473. tenant_id: str,
  474. run_id: str,
  475. payload: RunExecuteRequest,
  476. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  477. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  478. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  479. return None
  480. executed_node_runs: list[NodeRun] = []
  481. executor_names: list[str] = []
  482. for _ in range(payload.max_steps):
  483. step_result = self.execute_next_node_run(
  484. tenant_id=tenant_id,
  485. run_id=run_id,
  486. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  487. )
  488. if step_result is None:
  489. break
  490. workflow_run, node_run, executor_name = step_result
  491. executed_node_runs.append(node_run)
  492. executor_names.append(executor_name)
  493. if node_run.status != "completed":
  494. break
  495. final_run = self.workflow_run_repository.get_by_id(run_id)
  496. if final_run is None:
  497. return None
  498. return final_run, executed_node_runs, executor_names
  499. def resume_human_node_run(
  500. self,
  501. *,
  502. node_run_id: str,
  503. payload: HumanNodeResumeRequest,
  504. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  505. node_run = self.node_run_repository.get_by_id(node_run_id)
  506. if node_run is None or node_run.tenant_id != payload.tenant_id:
  507. return None
  508. output_json = dict(node_run.output_json or {})
  509. existing_human_task_id = output_json.get("human_task_id")
  510. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  511. return None
  512. if existing_human_task_id is None:
  513. output_json["human_task_id"] = payload.human_task_id
  514. self.node_run_repository.update_status(
  515. node_run_id=node_run.id,
  516. status="pending",
  517. worker_key=payload.worker_key,
  518. output_json=output_json,
  519. )
  520. return self.execute_node_run(
  521. node_run_id=node_run_id,
  522. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  523. )
  524. def execute_next_claimed_node_run(
  525. self,
  526. *,
  527. worker_key: str,
  528. lease_seconds: int,
  529. redis_client: object | None = None,
  530. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  531. released_lease_count = self.node_run_repository.release_expired_leases(
  532. now_time=datetime.utcnow(),
  533. )
  534. claimed_node_run = self.node_run_repository.claim_next_queued(
  535. worker_key=worker_key,
  536. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  537. )
  538. if claimed_node_run is None:
  539. return None
  540. if redis_client is not None:
  541. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  542. lock = DistributedLock(
  543. client=redis_client,
  544. name=f"node-run:{claimed_node_run.id}:lock",
  545. ttl_seconds=lease_seconds,
  546. )
  547. if not lock.acquire():
  548. return None
  549. idempotency_store = IdempotencyStore(
  550. client=redis_client,
  551. prefix="node-run-idempotency",
  552. )
  553. if not idempotency_store.begin(key=claimed_node_run.id):
  554. lock.release()
  555. return None
  556. else:
  557. lock = None
  558. idempotency_store = None
  559. try:
  560. result = self.execute_node_run(
  561. node_run_id=claimed_node_run.id,
  562. payload=NodeRunExecuteRequest(worker_key=worker_key),
  563. )
  564. if idempotency_store is not None and result is not None:
  565. _, node_run, executor_name = result
  566. idempotency_store.complete(
  567. key=claimed_node_run.id,
  568. result={
  569. "status": node_run.status,
  570. "node_run_id": node_run.id,
  571. "executor_name": executor_name,
  572. },
  573. )
  574. finally:
  575. if lock is not None:
  576. lock.release()
  577. if result is None:
  578. return None
  579. workflow_run, node_run, executor_name = result
  580. return workflow_run, node_run, executor_name, released_lease_count
  581. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  582. if node_run.output_text is None and node_run.output_json is None:
  583. return
  584. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  585. self.node_artifact_repository.create(
  586. tenant_id=node_run.tenant_id,
  587. run_id=node_run.run_id,
  588. node_run_id=node_run.id,
  589. node_id=node_run.node_id,
  590. artifact_type="execution_result",
  591. name=f"{node_run.node_id}-execution-result",
  592. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  593. content_text=node_run.output_text,
  594. content_json=node_run.output_json,
  595. size_bytes=size_bytes,
  596. )
  597. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  598. if self.workflow_client is None:
  599. return None
  600. workflow_version = self.workflow_client.get_workflow_version(
  601. tenant_id=payload.tenant_id,
  602. workflow_version_id=payload.workflow_version_id,
  603. )
  604. return derive_initial_node(workflow_version)
  605. def _resolve_node_config(
  606. self,
  607. *,
  608. tenant_id: str,
  609. workflow_version_id: str,
  610. node_id: str,
  611. ) -> dict[str, JSONValue]:
  612. if self.workflow_client is None:
  613. return {}
  614. workflow_version = self.workflow_client.get_workflow_version(
  615. tenant_id=tenant_id,
  616. workflow_version_id=workflow_version_id,
  617. )
  618. return derive_node_config(workflow_version, node_id)
  619. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  620. if self.workflow_client is None:
  621. return
  622. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  623. if workflow_run is None:
  624. return
  625. workflow_version = self.workflow_client.get_workflow_version(
  626. tenant_id=node_run.tenant_id,
  627. workflow_version_id=workflow_run.workflow_version_id,
  628. )
  629. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  630. self._build_run_state_maps(
  631. tenant_id=node_run.tenant_id,
  632. run_id=node_run.run_id,
  633. )
  634. )
  635. successor_nodes = derive_successor_nodes(
  636. workflow_version,
  637. node_run.node_id,
  638. current_output_json=node_run.output_json,
  639. run_state_json=run_state_json,
  640. node_output_json_by_node_id=node_output_json_by_node_id,
  641. node_output_text_by_node_id=node_output_text_by_node_id,
  642. )
  643. if not successor_nodes:
  644. return
  645. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  646. tenant_id=node_run.tenant_id,
  647. run_id=node_run.run_id,
  648. node_ids=[item.node_id for item in successor_nodes],
  649. )
  650. existing_node_counts: dict[str, int] = {}
  651. for item in existing_nodes:
  652. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  653. for successor in successor_nodes:
  654. successor_config = derive_node_config(workflow_version, successor.node_id)
  655. if not self._is_join_ready(
  656. workflow_version=workflow_version,
  657. run_node_runs=self.node_run_repository.list_by_run(
  658. tenant_id=node_run.tenant_id,
  659. run_id=node_run.run_id,
  660. ),
  661. successor_node_id=successor.node_id,
  662. successor_node_type=successor.node_type,
  663. successor_config=successor_config,
  664. ):
  665. self._log_event(
  666. tenant_id=node_run.tenant_id,
  667. run_id=node_run.run_id,
  668. node_run_id=None,
  669. event_type="join_waiting",
  670. message=f"join node waiting for predecessors: {successor.node_id}",
  671. detail_json={
  672. "node_id": successor.node_id,
  673. "source_node_id": node_run.node_id,
  674. },
  675. )
  676. continue
  677. if not self._can_schedule_repeated_node(
  678. successor_config,
  679. existing_count=existing_node_counts.get(successor.node_id, 0),
  680. ):
  681. continue
  682. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  683. created = self.node_run_repository.create(
  684. tenant_id=node_run.tenant_id,
  685. run_id=node_run.run_id,
  686. parent_node_run_id=node_run.id,
  687. node_id=successor.node_id,
  688. node_type=successor.node_type,
  689. status=successor.status,
  690. scheduled_time=scheduled_time,
  691. timeout_time=timeout_time,
  692. )
  693. self._publish_node_run_to_queue(created)
  694. existing_node_counts[successor.node_id] = (
  695. existing_node_counts.get(successor.node_id, 0) + 1
  696. )
  697. self._log_event(
  698. tenant_id=node_run.tenant_id,
  699. run_id=node_run.run_id,
  700. node_run_id=None,
  701. event_type="node_queued",
  702. message=f"successor node queued: {successor.node_id}",
  703. detail_json={
  704. "node_id": successor.node_id,
  705. "node_type": successor.node_type,
  706. "status": successor.status,
  707. "source_node_id": node_run.node_id,
  708. },
  709. )
  710. def _build_execution_context(
  711. self,
  712. *,
  713. workflow_run: WorkflowRun,
  714. node_run: NodeRun,
  715. worker_key: str | None,
  716. node_config_json: dict[str, JSONValue] | None = None,
  717. ) -> NodeExecutionContextContract:
  718. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  719. self._build_run_state_maps(
  720. tenant_id=node_run.tenant_id,
  721. run_id=node_run.run_id,
  722. )
  723. )
  724. return NodeExecutionContextContract(
  725. tenant_id=node_run.tenant_id,
  726. run_id=node_run.run_id,
  727. node_run_id=node_run.id,
  728. node_id=node_run.node_id,
  729. node_type=node_run.node_type,
  730. node_config_json=node_config_json
  731. if node_config_json is not None
  732. else self._resolve_node_config(
  733. tenant_id=node_run.tenant_id,
  734. workflow_version_id=workflow_run.workflow_version_id,
  735. node_id=node_run.node_id,
  736. ),
  737. run_state_json=run_state_json,
  738. node_output_json_by_node_id=node_output_json_by_node_id,
  739. node_output_text_by_node_id=node_output_text_by_node_id,
  740. worker_key=worker_key,
  741. )
  742. def _build_run_state_maps(
  743. self,
  744. *,
  745. tenant_id: str,
  746. run_id: str,
  747. ) -> tuple[
  748. dict[str, JSONValue],
  749. dict[str, dict[str, JSONValue]],
  750. dict[str, str],
  751. ]:
  752. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  753. run_state_json: dict[str, JSONValue] = {}
  754. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  755. node_output_text_by_node_id: dict[str, str] = {}
  756. for item in node_runs:
  757. if item.output_json is not None:
  758. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  759. state_updates = item.output_json.get("state_updates")
  760. if isinstance(state_updates, dict):
  761. for state_key, state_value in state_updates.items():
  762. run_state_json[str(state_key)] = state_value
  763. if item.output_text is not None:
  764. node_output_text_by_node_id[item.node_id] = item.output_text
  765. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  766. def _build_node_timing(
  767. self,
  768. node_config_json: dict[str, JSONValue],
  769. ) -> tuple[datetime, datetime | None]:
  770. now = datetime.utcnow()
  771. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  772. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  773. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  774. timeout_time = (
  775. scheduled_time + timedelta(seconds=timeout_seconds)
  776. if timeout_seconds > 0
  777. else None
  778. )
  779. return scheduled_time, timeout_time
  780. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  781. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  782. def _should_retry_node(
  783. self,
  784. *,
  785. node_run: NodeRun,
  786. node_config_json: dict[str, JSONValue],
  787. ) -> bool:
  788. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  789. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  790. return max_attempts > node_run.attempt_no
  791. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  792. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  793. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  794. def _build_retry_timing(
  795. self,
  796. node_config_json: dict[str, JSONValue],
  797. ) -> tuple[datetime, datetime | None]:
  798. retry_time = datetime.utcnow() + timedelta(
  799. seconds=self._read_retry_delay_seconds(node_config_json)
  800. )
  801. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  802. timeout_time = (
  803. retry_time + timedelta(seconds=timeout_seconds)
  804. if timeout_seconds > 0
  805. else None
  806. )
  807. return retry_time, timeout_time
  808. def _is_join_ready(
  809. self,
  810. *,
  811. workflow_version: WorkflowVersionContract,
  812. run_node_runs: list[NodeRun],
  813. successor_node_id: str,
  814. successor_node_type: str,
  815. successor_config: dict[str, JSONValue],
  816. ) -> bool:
  817. join_policy = self._read_string_value(successor_config, "join_policy")
  818. if join_policy is None and successor_node_type != "join":
  819. return True
  820. workflow = self._parse_workflow(workflow_version)
  821. if workflow is None:
  822. return True
  823. predecessor_ids = [
  824. edge.source for edge in workflow.edges if edge.target == successor_node_id
  825. ]
  826. if not predecessor_ids:
  827. return True
  828. completed_node_ids = {
  829. item.node_id
  830. for item in run_node_runs
  831. if item.status in {"completed", "skipped"}
  832. }
  833. if join_policy in {None, "all_completed"}:
  834. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  835. if join_policy == "any_completed":
  836. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  837. return True
  838. def _can_schedule_repeated_node(
  839. self,
  840. node_config_json: dict[str, JSONValue],
  841. *,
  842. existing_count: int,
  843. ) -> bool:
  844. if existing_count == 0:
  845. return True
  846. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  847. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  848. return allow_loop and existing_count < max_iterations
  849. def _schedule_compensation_node(
  850. self,
  851. *,
  852. node_run: NodeRun,
  853. node_config: dict[str, JSONValue],
  854. ) -> None:
  855. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  856. if compensation_node_id is None:
  857. compensation_config = self._read_dict_value(node_config, "compensation")
  858. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  859. if compensation_node_id is None:
  860. return
  861. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  862. if workflow_run is None:
  863. return
  864. compensation_config = self._resolve_node_config(
  865. tenant_id=node_run.tenant_id,
  866. workflow_version_id=workflow_run.workflow_version_id,
  867. node_id=compensation_node_id,
  868. )
  869. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  870. tenant_id=node_run.tenant_id,
  871. run_id=node_run.run_id,
  872. node_ids=[compensation_node_id],
  873. )
  874. if existing_nodes and not self._can_schedule_repeated_node(
  875. compensation_config,
  876. existing_count=len(existing_nodes),
  877. ):
  878. return
  879. compensation_node_type = self._resolve_workflow_node_type(
  880. tenant_id=node_run.tenant_id,
  881. workflow_version_id=workflow_run.workflow_version_id,
  882. node_id=compensation_node_id,
  883. ) or "compensation"
  884. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  885. created = self.node_run_repository.create(
  886. tenant_id=node_run.tenant_id,
  887. run_id=node_run.run_id,
  888. parent_node_run_id=node_run.id,
  889. node_id=compensation_node_id,
  890. node_type=compensation_node_type,
  891. status="queued",
  892. scheduled_time=scheduled_time,
  893. timeout_time=timeout_time,
  894. )
  895. self._publish_node_run_to_queue(created)
  896. self._log_event(
  897. tenant_id=node_run.tenant_id,
  898. run_id=node_run.run_id,
  899. node_run_id=created.id,
  900. event_type="compensation_queued",
  901. message=f"compensation node queued: {compensation_node_id}",
  902. detail_json={
  903. "failed_node_id": node_run.node_id,
  904. "compensation_node_id": compensation_node_id,
  905. },
  906. )
  907. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  908. return parse_workflow_definition(workflow_version.dsl_json)
  909. def _resolve_workflow_node_type(
  910. self,
  911. *,
  912. tenant_id: str,
  913. workflow_version_id: str,
  914. node_id: str,
  915. ) -> str | None:
  916. if self.workflow_client is None:
  917. return None
  918. workflow_version = self.workflow_client.get_workflow_version(
  919. tenant_id=tenant_id,
  920. workflow_version_id=workflow_version_id,
  921. )
  922. workflow = self._parse_workflow(workflow_version)
  923. if workflow is None:
  924. return None
  925. for node in workflow.nodes:
  926. if node.id == node_id:
  927. return node.type
  928. return None
  929. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  930. value = payload.get(key)
  931. if isinstance(value, str) and value:
  932. return value
  933. return None
  934. def _read_bool_value(
  935. self,
  936. payload: dict[str, JSONValue],
  937. key: str,
  938. *,
  939. default: bool,
  940. ) -> bool:
  941. value = payload.get(key)
  942. if isinstance(value, bool):
  943. return value
  944. return default
  945. def _read_int_value(
  946. self,
  947. payload: dict[str, JSONValue],
  948. key: str,
  949. *,
  950. default: int,
  951. ) -> int:
  952. value = payload.get(key)
  953. if isinstance(value, int) and not isinstance(value, bool):
  954. return value
  955. return default
  956. def _read_dict_value(
  957. self,
  958. payload: dict[str, JSONValue],
  959. key: str,
  960. ) -> dict[str, JSONValue]:
  961. value = payload.get(key)
  962. if isinstance(value, dict):
  963. return {str(item_key): item_value for item_key, item_value in value.items()}
  964. return {}
  965. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  966. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  967. if not node_runs:
  968. return
  969. self.workflow_run_repository.update_node_count(
  970. run_id=run_id,
  971. current_node_count=len(node_runs),
  972. )
  973. next_status, error_code, error_message = self._derive_run_status(node_runs)
  974. self.workflow_run_repository.update_status(
  975. run_id=run_id,
  976. status=next_status,
  977. error_code=error_code,
  978. error_message=error_message,
  979. )
  980. self._log_event(
  981. tenant_id=tenant_id,
  982. run_id=run_id,
  983. node_run_id=None,
  984. event_type="run_status_synced",
  985. message=f"workflow run status synced to {next_status}",
  986. detail_json={
  987. "status": next_status,
  988. "error_code": error_code,
  989. },
  990. )
  991. def _derive_run_status(
  992. self,
  993. node_runs: list[NodeRun],
  994. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  995. statuses = {node_run.status for node_run in node_runs}
  996. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  997. if statuses.intersection(active_statuses):
  998. return "running", None, None
  999. if "failed" in statuses:
  1000. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  1001. error_code = failed_node.error_code if failed_node is not None else None
  1002. error_message = failed_node.error_message if failed_node is not None else None
  1003. return "failed", error_code, error_message
  1004. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  1005. if statuses and statuses.issubset(terminal_statuses):
  1006. return "completed", None, None
  1007. return "running", None, None
  1008. def _log_event(
  1009. self,
  1010. *,
  1011. tenant_id: str,
  1012. run_id: str,
  1013. node_run_id: str | None,
  1014. event_type: str,
  1015. message: str,
  1016. detail_json: dict[str, JSONValue] | None,
  1017. level: str = "info",
  1018. ) -> None:
  1019. self.execution_log_repository.create(
  1020. tenant_id=tenant_id,
  1021. run_id=run_id,
  1022. node_run_id=node_run_id,
  1023. event_type=event_type,
  1024. level=level,
  1025. message=message,
  1026. detail_json=detail_json,
  1027. )
  1028. def _publish_event(
  1029. self,
  1030. *,
  1031. event_type: str,
  1032. payload_json: dict[str, JSONValue],
  1033. workflow_run: WorkflowRun | None = None,
  1034. node_run: NodeRun | None = None,
  1035. ) -> None:
  1036. if self.event_client is None:
  1037. return
  1038. tenant_id = workflow_run.tenant_id if workflow_run is not None else None
  1039. if tenant_id is None and node_run is not None:
  1040. tenant_id = node_run.tenant_id
  1041. if tenant_id is None:
  1042. return
  1043. aggregate_id = workflow_run.id if workflow_run is not None else None
  1044. if aggregate_id is None and node_run is not None:
  1045. aggregate_id = node_run.id
  1046. aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
  1047. correlation_id = workflow_run.session_id if workflow_run is not None else None
  1048. if correlation_id is None and node_run is not None:
  1049. correlation_id = node_run.run_id
  1050. try:
  1051. self.event_client.publish_event(
  1052. EventPublishContract(
  1053. tenant_id=tenant_id,
  1054. event_type=event_type,
  1055. source_service="runtime-service",
  1056. aggregate_type=aggregate_type,
  1057. aggregate_id=aggregate_id,
  1058. correlation_id=correlation_id,
  1059. payload_json=payload_json,
  1060. )
  1061. )
  1062. except EventServiceClientError:
  1063. return
  1064. def _publish_node_run_to_queue(self, node_run: NodeRun) -> None:
  1065. if node_run.status != "queued" or self.task_queue_publisher is None:
  1066. return
  1067. self.task_queue_publisher.publish_runtime_node_run(
  1068. tenant_id=node_run.tenant_id,
  1069. node_run_id=node_run.id,
  1070. )
  1071. def build_runtime_application_service(
  1072. *,
  1073. db: Session,
  1074. settings: RuntimeServiceSettings,
  1075. ) -> RuntimeApplicationService:
  1076. redis_client = try_build_redis_client(settings.redis_url)
  1077. return RuntimeApplicationService(
  1078. workflow_run_repository=WorkflowRunRepository(db),
  1079. node_run_repository=NodeRunRepository(db),
  1080. execution_log_repository=ExecutionLogRepository(db),
  1081. node_artifact_repository=NodeArtifactRepository(db),
  1082. trace_span_repository=TraceSpanRepository(db),
  1083. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  1084. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  1085. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  1086. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  1087. human_client=HumanServiceClient(
  1088. base_url=settings.human_service_url,
  1089. timeout_seconds=settings.human_service_timeout_seconds,
  1090. ),
  1091. knowledge_client=KnowledgeServiceClient(
  1092. base_url=settings.knowledge_service_url,
  1093. timeout_seconds=settings.knowledge_service_timeout_seconds,
  1094. ),
  1095. ),
  1096. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  1097. event_client=EventServiceClient(
  1098. base_url=settings.event_service_url,
  1099. timeout_seconds=settings.event_service_timeout_seconds,
  1100. ),
  1101. task_queue_publisher=(
  1102. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1103. ),
  1104. )