services.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_dsl import parse_workflow_definition
  4. from core_domain import (
  5. InitialNodeContract,
  6. NodeExecutionContextContract,
  7. NodeExecutionResultContract,
  8. NodeRunStatus,
  9. WorkflowVersionContract,
  10. WorkflowRunStatus,
  11. )
  12. from app.db.models import NodeRun, WorkflowRun
  13. from app.domain.repositories import (
  14. ExecutionLogRepository,
  15. NodeArtifactRepository,
  16. NodeRunRepository,
  17. TraceSpanRepository,
  18. WorkflowRunRepository,
  19. )
  20. from app.infrastructure.executors import (
  21. NodeExecutionDispatcher,
  22. build_node_execution_dispatcher_with_clients,
  23. )
  24. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  25. from app.infrastructure.code_runner_client import CodeRunnerClient
  26. from app.infrastructure.model_gateway_client import ModelGatewayClient
  27. from app.infrastructure.planner import (
  28. derive_initial_node,
  29. derive_node_config,
  30. derive_successor_nodes,
  31. )
  32. from app.infrastructure.tool_client import ToolServiceClient
  33. from app.infrastructure.workflow_client import WorkflowServiceClient
  34. from app.bootstrap.settings import RuntimeServiceSettings
  35. from app.schemas.run import (
  36. NodeRunExecuteRequest,
  37. NodeRunStatusUpdateRequest,
  38. RunCreateRequest,
  39. RunExecuteRequest,
  40. WorkflowRunStatusUpdateRequest,
  41. )
  42. from core_shared import JSONValue
  43. class RuntimeApplicationService:
  44. def __init__(
  45. self,
  46. workflow_run_repository: WorkflowRunRepository,
  47. node_run_repository: NodeRunRepository,
  48. execution_log_repository: ExecutionLogRepository,
  49. node_artifact_repository: NodeArtifactRepository,
  50. trace_span_repository: TraceSpanRepository,
  51. execution_dispatcher: NodeExecutionDispatcher,
  52. workflow_client: WorkflowServiceClient | None = None,
  53. ) -> None:
  54. self.workflow_run_repository = workflow_run_repository
  55. self.node_run_repository = node_run_repository
  56. self.execution_log_repository = execution_log_repository
  57. self.node_artifact_repository = node_artifact_repository
  58. self.trace_span_repository = trace_span_repository
  59. self.execution_dispatcher = execution_dispatcher
  60. self.workflow_client = workflow_client
  61. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  62. initial_node = payload.initial_node or self._plan_initial_node(payload)
  63. workflow_run = self.workflow_run_repository.create(
  64. tenant_id=payload.tenant_id,
  65. app_id=payload.app_id,
  66. app_version_id=payload.app_version_id,
  67. workflow_id=payload.workflow_id,
  68. workflow_version_id=payload.workflow_version_id,
  69. session_id=payload.session_id,
  70. parent_run_id=payload.parent_run_id,
  71. root_run_id=payload.root_run_id,
  72. run_type=payload.run_type,
  73. trigger_type=payload.trigger_type,
  74. priority=payload.priority,
  75. )
  76. node_run = None
  77. if initial_node is not None:
  78. self.workflow_run_repository.update_node_count(
  79. run_id=workflow_run.id,
  80. current_node_count=1,
  81. )
  82. initial_config = self._resolve_node_config(
  83. tenant_id=payload.tenant_id,
  84. workflow_version_id=payload.workflow_version_id,
  85. node_id=initial_node.node_id,
  86. )
  87. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  88. node_run = self.node_run_repository.create(
  89. tenant_id=payload.tenant_id,
  90. run_id=workflow_run.id,
  91. node_id=initial_node.node_id,
  92. node_type=initial_node.node_type,
  93. status=initial_node.status,
  94. scheduled_time=scheduled_time,
  95. timeout_time=timeout_time,
  96. )
  97. self._log_event(
  98. tenant_id=payload.tenant_id,
  99. run_id=workflow_run.id,
  100. node_run_id=node_run.id,
  101. event_type="node_queued",
  102. message=f"initial node queued: {initial_node.node_id}",
  103. detail_json={
  104. "node_id": initial_node.node_id,
  105. "node_type": initial_node.node_type,
  106. "status": initial_node.status,
  107. },
  108. )
  109. self._log_event(
  110. tenant_id=payload.tenant_id,
  111. run_id=workflow_run.id,
  112. node_run_id=node_run.id if node_run is not None else None,
  113. event_type="run_created",
  114. message="workflow run created",
  115. detail_json={
  116. "workflow_id": payload.workflow_id,
  117. "workflow_version_id": payload.workflow_version_id,
  118. "session_id": payload.session_id,
  119. },
  120. )
  121. return workflow_run, node_run
  122. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  123. return self.workflow_run_repository.list_by_scope(
  124. tenant_id=tenant_id,
  125. session_id=session_id,
  126. )
  127. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  128. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  129. def list_execution_logs(
  130. self,
  131. tenant_id: str,
  132. run_id: str | None = None,
  133. node_run_id: str | None = None,
  134. ):
  135. return self.execution_log_repository.list_by_scope(
  136. tenant_id=tenant_id,
  137. run_id=run_id,
  138. node_run_id=node_run_id,
  139. )
  140. def list_node_artifacts(
  141. self,
  142. tenant_id: str,
  143. run_id: str | None = None,
  144. node_run_id: str | None = None,
  145. artifact_type: str | None = None,
  146. ):
  147. return self.node_artifact_repository.list_by_scope(
  148. tenant_id=tenant_id,
  149. run_id=run_id,
  150. node_run_id=node_run_id,
  151. artifact_type=artifact_type,
  152. )
  153. def list_trace_spans(
  154. self,
  155. tenant_id: str,
  156. run_id: str | None = None,
  157. node_run_id: str | None = None,
  158. span_type: str | None = None,
  159. ):
  160. return self.trace_span_repository.list_by_scope(
  161. tenant_id=tenant_id,
  162. run_id=run_id,
  163. node_run_id=node_run_id,
  164. span_type=span_type,
  165. )
  166. def update_run_status(
  167. self,
  168. run_id: str,
  169. payload: WorkflowRunStatusUpdateRequest,
  170. ) -> WorkflowRun | None:
  171. return self.workflow_run_repository.update_status(
  172. run_id=run_id,
  173. status=payload.status,
  174. error_code=payload.error_code,
  175. error_message=payload.error_message,
  176. )
  177. def update_node_run_status(
  178. self,
  179. node_run_id: str,
  180. payload: NodeRunStatusUpdateRequest,
  181. ) -> NodeRun | None:
  182. node_run = self.node_run_repository.update_status(
  183. node_run_id=node_run_id,
  184. status=payload.status,
  185. worker_key=payload.worker_key,
  186. error_code=payload.error_code,
  187. error_message=payload.error_message,
  188. output_text=payload.output_text,
  189. output_json=payload.output_json,
  190. )
  191. if node_run is None:
  192. return None
  193. self._log_event(
  194. tenant_id=node_run.tenant_id,
  195. run_id=node_run.run_id,
  196. node_run_id=node_run.id,
  197. event_type="node_status_updated",
  198. message=f"node status updated to {payload.status}",
  199. detail_json={
  200. "node_id": node_run.node_id,
  201. "node_type": node_run.node_type,
  202. "status": payload.status,
  203. "error_code": payload.error_code,
  204. },
  205. )
  206. if payload.status == "completed":
  207. self._schedule_successor_nodes(node_run)
  208. if payload.status == "failed":
  209. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  210. if workflow_run is not None:
  211. node_config = self._resolve_node_config(
  212. tenant_id=node_run.tenant_id,
  213. workflow_version_id=workflow_run.workflow_version_id,
  214. node_id=node_run.node_id,
  215. )
  216. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  217. self._sync_workflow_run_status_from_nodes(
  218. tenant_id=node_run.tenant_id,
  219. run_id=node_run.run_id,
  220. )
  221. return node_run
  222. def execute_node_run(
  223. self,
  224. node_run_id: str,
  225. payload: NodeRunExecuteRequest,
  226. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  227. node_run = self.node_run_repository.get_by_id(node_run_id)
  228. if node_run is None:
  229. return None
  230. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  231. if workflow_run is None:
  232. return None
  233. if node_run.status in {"completed", "failed", "skipped"}:
  234. executor_name = self.execution_dispatcher.resolve_executor(
  235. node_run.node_type
  236. ).executor_name
  237. return workflow_run, node_run, executor_name
  238. node_config = self._resolve_node_config(
  239. tenant_id=node_run.tenant_id,
  240. workflow_version_id=workflow_run.workflow_version_id,
  241. node_id=node_run.node_id,
  242. )
  243. if self._node_has_timed_out(node_run):
  244. timed_out_node_run = self.update_node_run_status(
  245. node_run_id=node_run.id,
  246. payload=NodeRunStatusUpdateRequest(
  247. status="failed",
  248. worker_key=payload.worker_key,
  249. error_code="node_timeout",
  250. error_message=f"node timed out: {node_run.node_id}",
  251. output_json={
  252. "timeout_time": node_run.timeout_time.isoformat()
  253. if node_run.timeout_time is not None
  254. else None,
  255. },
  256. ),
  257. )
  258. if timed_out_node_run is None:
  259. return None
  260. executor_name = self.execution_dispatcher.resolve_executor(
  261. node_run.node_type
  262. ).executor_name
  263. return workflow_run, timed_out_node_run, executor_name
  264. running_node_run = self.node_run_repository.update_status(
  265. node_run_id=node_run_id,
  266. status="running",
  267. worker_key=payload.worker_key,
  268. )
  269. if running_node_run is None:
  270. return None
  271. self._log_event(
  272. tenant_id=running_node_run.tenant_id,
  273. run_id=running_node_run.run_id,
  274. node_run_id=running_node_run.id,
  275. event_type="node_execution_started",
  276. message=f"executing node {running_node_run.node_id}",
  277. detail_json={
  278. "node_id": running_node_run.node_id,
  279. "node_type": running_node_run.node_type,
  280. "worker_key": payload.worker_key,
  281. },
  282. )
  283. context = self._build_execution_context(
  284. workflow_run=workflow_run,
  285. node_run=running_node_run,
  286. worker_key=payload.worker_key,
  287. node_config_json=node_config,
  288. )
  289. executor_name = self.execution_dispatcher.resolve_executor(
  290. running_node_run.node_type
  291. ).executor_name
  292. trace_span = self.trace_span_repository.start(
  293. tenant_id=running_node_run.tenant_id,
  294. run_id=running_node_run.run_id,
  295. node_run_id=running_node_run.id,
  296. parent_span_id=None,
  297. span_type="node_execution",
  298. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  299. attributes_json={
  300. "node_id": running_node_run.node_id,
  301. "node_type": running_node_run.node_type,
  302. "executor_name": executor_name,
  303. "worker_key": payload.worker_key,
  304. },
  305. )
  306. try:
  307. result, executor_name = self.execution_dispatcher.execute(
  308. context=context,
  309. request=payload,
  310. )
  311. except Exception as exc:
  312. result = NodeExecutionResultContract(
  313. status="failed",
  314. worker_key=payload.worker_key,
  315. error_code="executor_error",
  316. error_message=str(exc),
  317. )
  318. if result.status == "failed" and self._should_retry_node(
  319. node_run=running_node_run,
  320. node_config_json=context.node_config_json,
  321. ):
  322. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  323. retried_node_run = self.node_run_repository.requeue_for_retry(
  324. node_run_id=running_node_run.id,
  325. scheduled_time=retry_time,
  326. timeout_time=retry_timeout_time,
  327. error_code=result.error_code,
  328. error_message=result.error_message,
  329. output_text=result.output_text,
  330. output_json={
  331. **(result.output_json or {}),
  332. "retry_scheduled_time": retry_time.isoformat(),
  333. "retry_reason": result.error_code or "node_failed",
  334. },
  335. )
  336. if retried_node_run is None:
  337. return None
  338. self.trace_span_repository.finish(
  339. span_id=trace_span.id,
  340. status="error",
  341. error_code=result.error_code,
  342. error_message=result.error_message,
  343. attributes_json={
  344. "node_status": "queued",
  345. "executor_name": executor_name,
  346. "retry_scheduled": True,
  347. "attempt_no": retried_node_run.attempt_no,
  348. },
  349. )
  350. self._log_event(
  351. tenant_id=retried_node_run.tenant_id,
  352. run_id=retried_node_run.run_id,
  353. node_run_id=retried_node_run.id,
  354. event_type="node_retry_scheduled",
  355. message=f"node retry scheduled: {retried_node_run.node_id}",
  356. detail_json={
  357. "node_id": retried_node_run.node_id,
  358. "attempt_no": retried_node_run.attempt_no,
  359. "scheduled_time": retry_time.isoformat(),
  360. "error_code": result.error_code,
  361. },
  362. )
  363. self._sync_workflow_run_status_from_nodes(
  364. tenant_id=retried_node_run.tenant_id,
  365. run_id=retried_node_run.run_id,
  366. )
  367. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  368. if workflow_run is None:
  369. return None
  370. return workflow_run, retried_node_run, executor_name
  371. final_node_run = self.update_node_run_status(
  372. node_run_id=running_node_run.id,
  373. payload=NodeRunStatusUpdateRequest(
  374. status=result.status,
  375. worker_key=result.worker_key,
  376. error_code=result.error_code,
  377. error_message=result.error_message,
  378. output_text=result.output_text,
  379. output_json=result.output_json,
  380. ),
  381. )
  382. if final_node_run is None:
  383. return None
  384. self.trace_span_repository.finish(
  385. span_id=trace_span.id,
  386. status="ok" if final_node_run.status == "completed" else "error",
  387. error_code=final_node_run.error_code,
  388. error_message=final_node_run.error_message,
  389. attributes_json={
  390. "node_status": final_node_run.status,
  391. "executor_name": executor_name,
  392. "has_output_text": final_node_run.output_text is not None,
  393. "has_output_json": final_node_run.output_json is not None,
  394. },
  395. )
  396. self._persist_node_execution_artifact(final_node_run)
  397. self._log_event(
  398. tenant_id=final_node_run.tenant_id,
  399. run_id=final_node_run.run_id,
  400. node_run_id=final_node_run.id,
  401. event_type="node_execution_finished",
  402. message=f"node execution finished with status {final_node_run.status}",
  403. detail_json={
  404. "node_id": final_node_run.node_id,
  405. "node_type": final_node_run.node_type,
  406. "executor_name": executor_name,
  407. "status": final_node_run.status,
  408. },
  409. )
  410. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  411. if workflow_run is None:
  412. return None
  413. return workflow_run, final_node_run, executor_name
  414. def execute_next_node_run(
  415. self,
  416. tenant_id: str,
  417. run_id: str,
  418. payload: NodeRunExecuteRequest,
  419. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  420. next_node_run = self.node_run_repository.get_next_queued_by_run(
  421. tenant_id=tenant_id,
  422. run_id=run_id,
  423. )
  424. if next_node_run is None:
  425. return None
  426. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  427. def execute_run(
  428. self,
  429. tenant_id: str,
  430. run_id: str,
  431. payload: RunExecuteRequest,
  432. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  433. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  434. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  435. return None
  436. executed_node_runs: list[NodeRun] = []
  437. executor_names: list[str] = []
  438. for _ in range(payload.max_steps):
  439. step_result = self.execute_next_node_run(
  440. tenant_id=tenant_id,
  441. run_id=run_id,
  442. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  443. )
  444. if step_result is None:
  445. break
  446. workflow_run, node_run, executor_name = step_result
  447. executed_node_runs.append(node_run)
  448. executor_names.append(executor_name)
  449. if node_run.status != "completed":
  450. break
  451. final_run = self.workflow_run_repository.get_by_id(run_id)
  452. if final_run is None:
  453. return None
  454. return final_run, executed_node_runs, executor_names
  455. def execute_next_claimed_node_run(
  456. self,
  457. *,
  458. worker_key: str,
  459. lease_seconds: int,
  460. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  461. released_lease_count = self.node_run_repository.release_expired_leases(
  462. now_time=datetime.utcnow(),
  463. )
  464. claimed_node_run = self.node_run_repository.claim_next_queued(
  465. worker_key=worker_key,
  466. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  467. )
  468. if claimed_node_run is None:
  469. return None
  470. result = self.execute_node_run(
  471. node_run_id=claimed_node_run.id,
  472. payload=NodeRunExecuteRequest(worker_key=worker_key),
  473. )
  474. if result is None:
  475. return None
  476. workflow_run, node_run, executor_name = result
  477. return workflow_run, node_run, executor_name, released_lease_count
  478. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  479. if node_run.output_text is None and node_run.output_json is None:
  480. return
  481. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  482. self.node_artifact_repository.create(
  483. tenant_id=node_run.tenant_id,
  484. run_id=node_run.run_id,
  485. node_run_id=node_run.id,
  486. node_id=node_run.node_id,
  487. artifact_type="execution_result",
  488. name=f"{node_run.node_id}-execution-result",
  489. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  490. content_text=node_run.output_text,
  491. content_json=node_run.output_json,
  492. size_bytes=size_bytes,
  493. )
  494. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  495. if self.workflow_client is None:
  496. return None
  497. workflow_version = self.workflow_client.get_workflow_version(
  498. tenant_id=payload.tenant_id,
  499. workflow_version_id=payload.workflow_version_id,
  500. )
  501. return derive_initial_node(workflow_version)
  502. def _resolve_node_config(
  503. self,
  504. *,
  505. tenant_id: str,
  506. workflow_version_id: str,
  507. node_id: str,
  508. ) -> dict[str, JSONValue]:
  509. if self.workflow_client is None:
  510. return {}
  511. workflow_version = self.workflow_client.get_workflow_version(
  512. tenant_id=tenant_id,
  513. workflow_version_id=workflow_version_id,
  514. )
  515. return derive_node_config(workflow_version, node_id)
  516. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  517. if self.workflow_client is None:
  518. return
  519. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  520. if workflow_run is None:
  521. return
  522. workflow_version = self.workflow_client.get_workflow_version(
  523. tenant_id=node_run.tenant_id,
  524. workflow_version_id=workflow_run.workflow_version_id,
  525. )
  526. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  527. self._build_run_state_maps(
  528. tenant_id=node_run.tenant_id,
  529. run_id=node_run.run_id,
  530. )
  531. )
  532. successor_nodes = derive_successor_nodes(
  533. workflow_version,
  534. node_run.node_id,
  535. current_output_json=node_run.output_json,
  536. run_state_json=run_state_json,
  537. node_output_json_by_node_id=node_output_json_by_node_id,
  538. node_output_text_by_node_id=node_output_text_by_node_id,
  539. )
  540. if not successor_nodes:
  541. return
  542. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  543. tenant_id=node_run.tenant_id,
  544. run_id=node_run.run_id,
  545. node_ids=[item.node_id for item in successor_nodes],
  546. )
  547. existing_node_counts: dict[str, int] = {}
  548. for item in existing_nodes:
  549. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  550. for successor in successor_nodes:
  551. successor_config = derive_node_config(workflow_version, successor.node_id)
  552. if not self._is_join_ready(
  553. workflow_version=workflow_version,
  554. run_node_runs=self.node_run_repository.list_by_run(
  555. tenant_id=node_run.tenant_id,
  556. run_id=node_run.run_id,
  557. ),
  558. successor_node_id=successor.node_id,
  559. successor_node_type=successor.node_type,
  560. successor_config=successor_config,
  561. ):
  562. self._log_event(
  563. tenant_id=node_run.tenant_id,
  564. run_id=node_run.run_id,
  565. node_run_id=None,
  566. event_type="join_waiting",
  567. message=f"join node waiting for predecessors: {successor.node_id}",
  568. detail_json={
  569. "node_id": successor.node_id,
  570. "source_node_id": node_run.node_id,
  571. },
  572. )
  573. continue
  574. if not self._can_schedule_repeated_node(
  575. successor_config,
  576. existing_count=existing_node_counts.get(successor.node_id, 0),
  577. ):
  578. continue
  579. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  580. self.node_run_repository.create(
  581. tenant_id=node_run.tenant_id,
  582. run_id=node_run.run_id,
  583. parent_node_run_id=node_run.id,
  584. node_id=successor.node_id,
  585. node_type=successor.node_type,
  586. status=successor.status,
  587. scheduled_time=scheduled_time,
  588. timeout_time=timeout_time,
  589. )
  590. existing_node_counts[successor.node_id] = (
  591. existing_node_counts.get(successor.node_id, 0) + 1
  592. )
  593. self._log_event(
  594. tenant_id=node_run.tenant_id,
  595. run_id=node_run.run_id,
  596. node_run_id=None,
  597. event_type="node_queued",
  598. message=f"successor node queued: {successor.node_id}",
  599. detail_json={
  600. "node_id": successor.node_id,
  601. "node_type": successor.node_type,
  602. "status": successor.status,
  603. "source_node_id": node_run.node_id,
  604. },
  605. )
  606. def _build_execution_context(
  607. self,
  608. *,
  609. workflow_run: WorkflowRun,
  610. node_run: NodeRun,
  611. worker_key: str | None,
  612. node_config_json: dict[str, JSONValue] | None = None,
  613. ) -> NodeExecutionContextContract:
  614. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  615. self._build_run_state_maps(
  616. tenant_id=node_run.tenant_id,
  617. run_id=node_run.run_id,
  618. )
  619. )
  620. return NodeExecutionContextContract(
  621. tenant_id=node_run.tenant_id,
  622. run_id=node_run.run_id,
  623. node_run_id=node_run.id,
  624. node_id=node_run.node_id,
  625. node_type=node_run.node_type,
  626. node_config_json=node_config_json
  627. if node_config_json is not None
  628. else self._resolve_node_config(
  629. tenant_id=node_run.tenant_id,
  630. workflow_version_id=workflow_run.workflow_version_id,
  631. node_id=node_run.node_id,
  632. ),
  633. run_state_json=run_state_json,
  634. node_output_json_by_node_id=node_output_json_by_node_id,
  635. node_output_text_by_node_id=node_output_text_by_node_id,
  636. worker_key=worker_key,
  637. )
  638. def _build_run_state_maps(
  639. self,
  640. *,
  641. tenant_id: str,
  642. run_id: str,
  643. ) -> tuple[
  644. dict[str, JSONValue],
  645. dict[str, dict[str, JSONValue]],
  646. dict[str, str],
  647. ]:
  648. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  649. run_state_json: dict[str, JSONValue] = {}
  650. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  651. node_output_text_by_node_id: dict[str, str] = {}
  652. for item in node_runs:
  653. if item.output_json is not None:
  654. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  655. state_updates = item.output_json.get("state_updates")
  656. if isinstance(state_updates, dict):
  657. for state_key, state_value in state_updates.items():
  658. run_state_json[str(state_key)] = state_value
  659. if item.output_text is not None:
  660. node_output_text_by_node_id[item.node_id] = item.output_text
  661. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  662. def _build_node_timing(
  663. self,
  664. node_config_json: dict[str, JSONValue],
  665. ) -> tuple[datetime, datetime | None]:
  666. now = datetime.utcnow()
  667. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  668. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  669. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  670. timeout_time = (
  671. scheduled_time + timedelta(seconds=timeout_seconds)
  672. if timeout_seconds > 0
  673. else None
  674. )
  675. return scheduled_time, timeout_time
  676. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  677. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  678. def _should_retry_node(
  679. self,
  680. *,
  681. node_run: NodeRun,
  682. node_config_json: dict[str, JSONValue],
  683. ) -> bool:
  684. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  685. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  686. return max_attempts > node_run.attempt_no
  687. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  688. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  689. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  690. def _build_retry_timing(
  691. self,
  692. node_config_json: dict[str, JSONValue],
  693. ) -> tuple[datetime, datetime | None]:
  694. retry_time = datetime.utcnow() + timedelta(
  695. seconds=self._read_retry_delay_seconds(node_config_json)
  696. )
  697. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  698. timeout_time = (
  699. retry_time + timedelta(seconds=timeout_seconds)
  700. if timeout_seconds > 0
  701. else None
  702. )
  703. return retry_time, timeout_time
  704. def _is_join_ready(
  705. self,
  706. *,
  707. workflow_version: WorkflowVersionContract,
  708. run_node_runs: list[NodeRun],
  709. successor_node_id: str,
  710. successor_node_type: str,
  711. successor_config: dict[str, JSONValue],
  712. ) -> bool:
  713. join_policy = self._read_string_value(successor_config, "join_policy")
  714. if join_policy is None and successor_node_type != "join":
  715. return True
  716. workflow = self._parse_workflow(workflow_version)
  717. if workflow is None:
  718. return True
  719. predecessor_ids = [
  720. edge.source for edge in workflow.edges if edge.target == successor_node_id
  721. ]
  722. if not predecessor_ids:
  723. return True
  724. completed_node_ids = {
  725. item.node_id
  726. for item in run_node_runs
  727. if item.status in {"completed", "skipped"}
  728. }
  729. if join_policy in {None, "all_completed"}:
  730. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  731. if join_policy == "any_completed":
  732. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  733. return True
  734. def _can_schedule_repeated_node(
  735. self,
  736. node_config_json: dict[str, JSONValue],
  737. *,
  738. existing_count: int,
  739. ) -> bool:
  740. if existing_count == 0:
  741. return True
  742. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  743. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  744. return allow_loop and existing_count < max_iterations
  745. def _schedule_compensation_node(
  746. self,
  747. *,
  748. node_run: NodeRun,
  749. node_config: dict[str, JSONValue],
  750. ) -> None:
  751. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  752. if compensation_node_id is None:
  753. compensation_config = self._read_dict_value(node_config, "compensation")
  754. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  755. if compensation_node_id is None:
  756. return
  757. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  758. if workflow_run is None:
  759. return
  760. compensation_config = self._resolve_node_config(
  761. tenant_id=node_run.tenant_id,
  762. workflow_version_id=workflow_run.workflow_version_id,
  763. node_id=compensation_node_id,
  764. )
  765. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  766. tenant_id=node_run.tenant_id,
  767. run_id=node_run.run_id,
  768. node_ids=[compensation_node_id],
  769. )
  770. if existing_nodes and not self._can_schedule_repeated_node(
  771. compensation_config,
  772. existing_count=len(existing_nodes),
  773. ):
  774. return
  775. compensation_node_type = self._resolve_workflow_node_type(
  776. tenant_id=node_run.tenant_id,
  777. workflow_version_id=workflow_run.workflow_version_id,
  778. node_id=compensation_node_id,
  779. ) or "compensation"
  780. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  781. created = self.node_run_repository.create(
  782. tenant_id=node_run.tenant_id,
  783. run_id=node_run.run_id,
  784. parent_node_run_id=node_run.id,
  785. node_id=compensation_node_id,
  786. node_type=compensation_node_type,
  787. status="queued",
  788. scheduled_time=scheduled_time,
  789. timeout_time=timeout_time,
  790. )
  791. self._log_event(
  792. tenant_id=node_run.tenant_id,
  793. run_id=node_run.run_id,
  794. node_run_id=created.id,
  795. event_type="compensation_queued",
  796. message=f"compensation node queued: {compensation_node_id}",
  797. detail_json={
  798. "failed_node_id": node_run.node_id,
  799. "compensation_node_id": compensation_node_id,
  800. },
  801. )
  802. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  803. return parse_workflow_definition(workflow_version.dsl_json)
  804. def _resolve_workflow_node_type(
  805. self,
  806. *,
  807. tenant_id: str,
  808. workflow_version_id: str,
  809. node_id: str,
  810. ) -> str | None:
  811. if self.workflow_client is None:
  812. return None
  813. workflow_version = self.workflow_client.get_workflow_version(
  814. tenant_id=tenant_id,
  815. workflow_version_id=workflow_version_id,
  816. )
  817. workflow = self._parse_workflow(workflow_version)
  818. if workflow is None:
  819. return None
  820. for node in workflow.nodes:
  821. if node.id == node_id:
  822. return node.type
  823. return None
  824. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  825. value = payload.get(key)
  826. if isinstance(value, str) and value:
  827. return value
  828. return None
  829. def _read_bool_value(
  830. self,
  831. payload: dict[str, JSONValue],
  832. key: str,
  833. *,
  834. default: bool,
  835. ) -> bool:
  836. value = payload.get(key)
  837. if isinstance(value, bool):
  838. return value
  839. return default
  840. def _read_int_value(
  841. self,
  842. payload: dict[str, JSONValue],
  843. key: str,
  844. *,
  845. default: int,
  846. ) -> int:
  847. value = payload.get(key)
  848. if isinstance(value, int) and not isinstance(value, bool):
  849. return value
  850. return default
  851. def _read_dict_value(
  852. self,
  853. payload: dict[str, JSONValue],
  854. key: str,
  855. ) -> dict[str, JSONValue]:
  856. value = payload.get(key)
  857. if isinstance(value, dict):
  858. return {str(item_key): item_value for item_key, item_value in value.items()}
  859. return {}
  860. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  861. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  862. if not node_runs:
  863. return
  864. self.workflow_run_repository.update_node_count(
  865. run_id=run_id,
  866. current_node_count=len(node_runs),
  867. )
  868. next_status, error_code, error_message = self._derive_run_status(node_runs)
  869. self.workflow_run_repository.update_status(
  870. run_id=run_id,
  871. status=next_status,
  872. error_code=error_code,
  873. error_message=error_message,
  874. )
  875. self._log_event(
  876. tenant_id=tenant_id,
  877. run_id=run_id,
  878. node_run_id=None,
  879. event_type="run_status_synced",
  880. message=f"workflow run status synced to {next_status}",
  881. detail_json={
  882. "status": next_status,
  883. "error_code": error_code,
  884. },
  885. )
  886. def _derive_run_status(
  887. self,
  888. node_runs: list[NodeRun],
  889. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  890. statuses = {node_run.status for node_run in node_runs}
  891. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  892. if statuses.intersection(active_statuses):
  893. return "running", None, None
  894. if "failed" in statuses:
  895. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  896. error_code = failed_node.error_code if failed_node is not None else None
  897. error_message = failed_node.error_message if failed_node is not None else None
  898. return "failed", error_code, error_message
  899. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  900. if statuses and statuses.issubset(terminal_statuses):
  901. return "completed", None, None
  902. return "running", None, None
  903. def _log_event(
  904. self,
  905. *,
  906. tenant_id: str,
  907. run_id: str,
  908. node_run_id: str | None,
  909. event_type: str,
  910. message: str,
  911. detail_json: dict[str, JSONValue] | None,
  912. level: str = "info",
  913. ) -> None:
  914. self.execution_log_repository.create(
  915. tenant_id=tenant_id,
  916. run_id=run_id,
  917. node_run_id=node_run_id,
  918. event_type=event_type,
  919. level=level,
  920. message=message,
  921. detail_json=detail_json,
  922. )
  923. def build_runtime_application_service(
  924. *,
  925. db: Session,
  926. settings: RuntimeServiceSettings,
  927. ) -> RuntimeApplicationService:
  928. return RuntimeApplicationService(
  929. workflow_run_repository=WorkflowRunRepository(db),
  930. node_run_repository=NodeRunRepository(db),
  931. execution_log_repository=ExecutionLogRepository(db),
  932. node_artifact_repository=NodeArtifactRepository(db),
  933. trace_span_repository=TraceSpanRepository(db),
  934. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  935. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  936. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  937. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  938. knowledge_client=KnowledgeServiceClient(
  939. base_url=settings.knowledge_service_url,
  940. timeout_seconds=settings.knowledge_service_timeout_seconds,
  941. ),
  942. ),
  943. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  944. )