services.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_dsl import parse_workflow_definition
  4. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  5. from core_domain import (
  6. InitialNodeContract,
  7. NodeExecutionContextContract,
  8. NodeExecutionResultContract,
  9. NodeRunStatus,
  10. WorkflowVersionContract,
  11. WorkflowRunStatus,
  12. )
  13. from app.db.models import NodeRun, WorkflowRun
  14. from app.domain.repositories import (
  15. ExecutionLogRepository,
  16. NodeArtifactRepository,
  17. NodeRunRepository,
  18. TraceSpanRepository,
  19. WorkflowRunRepository,
  20. )
  21. from app.infrastructure.executors import (
  22. NodeExecutionDispatcher,
  23. build_node_execution_dispatcher_with_clients,
  24. )
  25. from app.infrastructure.human_client import HumanServiceClient
  26. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  27. from app.infrastructure.code_runner_client import CodeRunnerClient
  28. from app.infrastructure.model_gateway_client import ModelGatewayClient
  29. from app.infrastructure.planner import (
  30. derive_initial_node,
  31. derive_node_config,
  32. derive_successor_nodes,
  33. )
  34. from app.infrastructure.tool_client import ToolServiceClient
  35. from app.infrastructure.workflow_client import WorkflowServiceClient
  36. from app.bootstrap.settings import RuntimeServiceSettings
  37. from app.schemas.run import (
  38. NodeRunExecuteRequest,
  39. HumanNodeResumeRequest,
  40. NodeRunStatusUpdateRequest,
  41. RunCreateRequest,
  42. RunExecuteRequest,
  43. WorkflowRunStatusUpdateRequest,
  44. )
  45. from core_shared import JSONValue
  46. class RuntimeApplicationService:
  47. def __init__(
  48. self,
  49. workflow_run_repository: WorkflowRunRepository,
  50. node_run_repository: NodeRunRepository,
  51. execution_log_repository: ExecutionLogRepository,
  52. node_artifact_repository: NodeArtifactRepository,
  53. trace_span_repository: TraceSpanRepository,
  54. execution_dispatcher: NodeExecutionDispatcher,
  55. workflow_client: WorkflowServiceClient | None = None,
  56. event_client: EventServiceClient | None = None,
  57. ) -> None:
  58. self.workflow_run_repository = workflow_run_repository
  59. self.node_run_repository = node_run_repository
  60. self.execution_log_repository = execution_log_repository
  61. self.node_artifact_repository = node_artifact_repository
  62. self.trace_span_repository = trace_span_repository
  63. self.execution_dispatcher = execution_dispatcher
  64. self.workflow_client = workflow_client
  65. self.event_client = event_client
  66. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  67. initial_node = payload.initial_node or self._plan_initial_node(payload)
  68. workflow_run = self.workflow_run_repository.create(
  69. tenant_id=payload.tenant_id,
  70. app_id=payload.app_id,
  71. app_version_id=payload.app_version_id,
  72. workflow_id=payload.workflow_id,
  73. workflow_version_id=payload.workflow_version_id,
  74. session_id=payload.session_id,
  75. parent_run_id=payload.parent_run_id,
  76. root_run_id=payload.root_run_id,
  77. run_type=payload.run_type,
  78. trigger_type=payload.trigger_type,
  79. priority=payload.priority,
  80. )
  81. node_run = None
  82. if initial_node is not None:
  83. self.workflow_run_repository.update_node_count(
  84. run_id=workflow_run.id,
  85. current_node_count=1,
  86. )
  87. initial_config = self._resolve_node_config(
  88. tenant_id=payload.tenant_id,
  89. workflow_version_id=payload.workflow_version_id,
  90. node_id=initial_node.node_id,
  91. )
  92. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  93. node_run = self.node_run_repository.create(
  94. tenant_id=payload.tenant_id,
  95. run_id=workflow_run.id,
  96. node_id=initial_node.node_id,
  97. node_type=initial_node.node_type,
  98. status=initial_node.status,
  99. scheduled_time=scheduled_time,
  100. timeout_time=timeout_time,
  101. )
  102. self._log_event(
  103. tenant_id=payload.tenant_id,
  104. run_id=workflow_run.id,
  105. node_run_id=node_run.id,
  106. event_type="node_queued",
  107. message=f"initial node queued: {initial_node.node_id}",
  108. detail_json={
  109. "node_id": initial_node.node_id,
  110. "node_type": initial_node.node_type,
  111. "status": initial_node.status,
  112. },
  113. )
  114. self._log_event(
  115. tenant_id=payload.tenant_id,
  116. run_id=workflow_run.id,
  117. node_run_id=node_run.id if node_run is not None else None,
  118. event_type="run_created",
  119. message="workflow run created",
  120. detail_json={
  121. "workflow_id": payload.workflow_id,
  122. "workflow_version_id": payload.workflow_version_id,
  123. "session_id": payload.session_id,
  124. },
  125. )
  126. self._publish_event(
  127. event_type="workflow.run.created",
  128. workflow_run=workflow_run,
  129. payload_json={
  130. "run_id": workflow_run.id,
  131. "workflow_id": workflow_run.workflow_id,
  132. "workflow_version_id": workflow_run.workflow_version_id,
  133. "status": workflow_run.status,
  134. },
  135. )
  136. return workflow_run, node_run
  137. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  138. return self.workflow_run_repository.list_by_scope(
  139. tenant_id=tenant_id,
  140. session_id=session_id,
  141. )
  142. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  143. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  144. def list_execution_logs(
  145. self,
  146. tenant_id: str,
  147. run_id: str | None = None,
  148. node_run_id: str | None = None,
  149. ):
  150. return self.execution_log_repository.list_by_scope(
  151. tenant_id=tenant_id,
  152. run_id=run_id,
  153. node_run_id=node_run_id,
  154. )
  155. def list_node_artifacts(
  156. self,
  157. tenant_id: str,
  158. run_id: str | None = None,
  159. node_run_id: str | None = None,
  160. artifact_type: str | None = None,
  161. ):
  162. return self.node_artifact_repository.list_by_scope(
  163. tenant_id=tenant_id,
  164. run_id=run_id,
  165. node_run_id=node_run_id,
  166. artifact_type=artifact_type,
  167. )
  168. def list_trace_spans(
  169. self,
  170. tenant_id: str,
  171. run_id: str | None = None,
  172. node_run_id: str | None = None,
  173. span_type: str | None = None,
  174. ):
  175. return self.trace_span_repository.list_by_scope(
  176. tenant_id=tenant_id,
  177. run_id=run_id,
  178. node_run_id=node_run_id,
  179. span_type=span_type,
  180. )
  181. def update_run_status(
  182. self,
  183. run_id: str,
  184. payload: WorkflowRunStatusUpdateRequest,
  185. ) -> WorkflowRun | None:
  186. entity = self.workflow_run_repository.update_status(
  187. run_id=run_id,
  188. status=payload.status,
  189. error_code=payload.error_code,
  190. error_message=payload.error_message,
  191. )
  192. if entity is not None:
  193. self._publish_event(
  194. event_type=f"workflow.run.{entity.status}",
  195. workflow_run=entity,
  196. payload_json={
  197. "run_id": entity.id,
  198. "status": entity.status,
  199. "error_code": entity.error_code,
  200. },
  201. )
  202. return entity
  203. def update_node_run_status(
  204. self,
  205. node_run_id: str,
  206. payload: NodeRunStatusUpdateRequest,
  207. ) -> NodeRun | None:
  208. node_run = self.node_run_repository.update_status(
  209. node_run_id=node_run_id,
  210. status=payload.status,
  211. worker_key=payload.worker_key,
  212. error_code=payload.error_code,
  213. error_message=payload.error_message,
  214. output_text=payload.output_text,
  215. output_json=payload.output_json,
  216. )
  217. if node_run is None:
  218. return None
  219. self._log_event(
  220. tenant_id=node_run.tenant_id,
  221. run_id=node_run.run_id,
  222. node_run_id=node_run.id,
  223. event_type="node_status_updated",
  224. message=f"node status updated to {payload.status}",
  225. detail_json={
  226. "node_id": node_run.node_id,
  227. "node_type": node_run.node_type,
  228. "status": payload.status,
  229. "error_code": payload.error_code,
  230. },
  231. )
  232. self._publish_event(
  233. event_type=f"workflow.node.{node_run.status}",
  234. workflow_run=None,
  235. node_run=node_run,
  236. payload_json={
  237. "run_id": node_run.run_id,
  238. "node_run_id": node_run.id,
  239. "node_id": node_run.node_id,
  240. "node_type": node_run.node_type,
  241. "status": node_run.status,
  242. "error_code": node_run.error_code,
  243. },
  244. )
  245. if payload.status == "completed":
  246. self._schedule_successor_nodes(node_run)
  247. if payload.status == "failed":
  248. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  249. if workflow_run is not None:
  250. node_config = self._resolve_node_config(
  251. tenant_id=node_run.tenant_id,
  252. workflow_version_id=workflow_run.workflow_version_id,
  253. node_id=node_run.node_id,
  254. )
  255. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  256. self._sync_workflow_run_status_from_nodes(
  257. tenant_id=node_run.tenant_id,
  258. run_id=node_run.run_id,
  259. )
  260. return node_run
  261. def execute_node_run(
  262. self,
  263. node_run_id: str,
  264. payload: NodeRunExecuteRequest,
  265. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  266. node_run = self.node_run_repository.get_by_id(node_run_id)
  267. if node_run is None:
  268. return None
  269. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  270. if workflow_run is None:
  271. return None
  272. if node_run.status in {"completed", "failed", "skipped"}:
  273. executor_name = self.execution_dispatcher.resolve_executor(
  274. node_run.node_type
  275. ).executor_name
  276. return workflow_run, node_run, executor_name
  277. node_config = self._resolve_node_config(
  278. tenant_id=node_run.tenant_id,
  279. workflow_version_id=workflow_run.workflow_version_id,
  280. node_id=node_run.node_id,
  281. )
  282. if self._node_has_timed_out(node_run):
  283. timed_out_node_run = self.update_node_run_status(
  284. node_run_id=node_run.id,
  285. payload=NodeRunStatusUpdateRequest(
  286. status="failed",
  287. worker_key=payload.worker_key,
  288. error_code="node_timeout",
  289. error_message=f"node timed out: {node_run.node_id}",
  290. output_json={
  291. "timeout_time": node_run.timeout_time.isoformat()
  292. if node_run.timeout_time is not None
  293. else None,
  294. },
  295. ),
  296. )
  297. if timed_out_node_run is None:
  298. return None
  299. executor_name = self.execution_dispatcher.resolve_executor(
  300. node_run.node_type
  301. ).executor_name
  302. return workflow_run, timed_out_node_run, executor_name
  303. running_node_run = self.node_run_repository.update_status(
  304. node_run_id=node_run_id,
  305. status="running",
  306. worker_key=payload.worker_key,
  307. )
  308. if running_node_run is None:
  309. return None
  310. self._log_event(
  311. tenant_id=running_node_run.tenant_id,
  312. run_id=running_node_run.run_id,
  313. node_run_id=running_node_run.id,
  314. event_type="node_execution_started",
  315. message=f"executing node {running_node_run.node_id}",
  316. detail_json={
  317. "node_id": running_node_run.node_id,
  318. "node_type": running_node_run.node_type,
  319. "worker_key": payload.worker_key,
  320. },
  321. )
  322. context = self._build_execution_context(
  323. workflow_run=workflow_run,
  324. node_run=running_node_run,
  325. worker_key=payload.worker_key,
  326. node_config_json=node_config,
  327. )
  328. executor_name = self.execution_dispatcher.resolve_executor(
  329. running_node_run.node_type
  330. ).executor_name
  331. trace_span = self.trace_span_repository.start(
  332. tenant_id=running_node_run.tenant_id,
  333. run_id=running_node_run.run_id,
  334. node_run_id=running_node_run.id,
  335. parent_span_id=None,
  336. span_type="node_execution",
  337. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  338. attributes_json={
  339. "node_id": running_node_run.node_id,
  340. "node_type": running_node_run.node_type,
  341. "executor_name": executor_name,
  342. "worker_key": payload.worker_key,
  343. },
  344. )
  345. try:
  346. result, executor_name = self.execution_dispatcher.execute(
  347. context=context,
  348. request=payload,
  349. )
  350. except Exception as exc:
  351. result = NodeExecutionResultContract(
  352. status="failed",
  353. worker_key=payload.worker_key,
  354. error_code="executor_error",
  355. error_message=str(exc),
  356. )
  357. if result.status == "failed" and self._should_retry_node(
  358. node_run=running_node_run,
  359. node_config_json=context.node_config_json,
  360. ):
  361. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  362. retried_node_run = self.node_run_repository.requeue_for_retry(
  363. node_run_id=running_node_run.id,
  364. scheduled_time=retry_time,
  365. timeout_time=retry_timeout_time,
  366. error_code=result.error_code,
  367. error_message=result.error_message,
  368. output_text=result.output_text,
  369. output_json={
  370. **(result.output_json or {}),
  371. "retry_scheduled_time": retry_time.isoformat(),
  372. "retry_reason": result.error_code or "node_failed",
  373. },
  374. )
  375. if retried_node_run is None:
  376. return None
  377. self.trace_span_repository.finish(
  378. span_id=trace_span.id,
  379. status="error",
  380. error_code=result.error_code,
  381. error_message=result.error_message,
  382. attributes_json={
  383. "node_status": "queued",
  384. "executor_name": executor_name,
  385. "retry_scheduled": True,
  386. "attempt_no": retried_node_run.attempt_no,
  387. },
  388. )
  389. self._log_event(
  390. tenant_id=retried_node_run.tenant_id,
  391. run_id=retried_node_run.run_id,
  392. node_run_id=retried_node_run.id,
  393. event_type="node_retry_scheduled",
  394. message=f"node retry scheduled: {retried_node_run.node_id}",
  395. detail_json={
  396. "node_id": retried_node_run.node_id,
  397. "attempt_no": retried_node_run.attempt_no,
  398. "scheduled_time": retry_time.isoformat(),
  399. "error_code": result.error_code,
  400. },
  401. )
  402. self._sync_workflow_run_status_from_nodes(
  403. tenant_id=retried_node_run.tenant_id,
  404. run_id=retried_node_run.run_id,
  405. )
  406. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  407. if workflow_run is None:
  408. return None
  409. return workflow_run, retried_node_run, executor_name
  410. final_node_run = self.update_node_run_status(
  411. node_run_id=running_node_run.id,
  412. payload=NodeRunStatusUpdateRequest(
  413. status=result.status,
  414. worker_key=result.worker_key,
  415. error_code=result.error_code,
  416. error_message=result.error_message,
  417. output_text=result.output_text,
  418. output_json=result.output_json,
  419. ),
  420. )
  421. if final_node_run is None:
  422. return None
  423. self.trace_span_repository.finish(
  424. span_id=trace_span.id,
  425. status="ok" if final_node_run.status == "completed" else "error",
  426. error_code=final_node_run.error_code,
  427. error_message=final_node_run.error_message,
  428. attributes_json={
  429. "node_status": final_node_run.status,
  430. "executor_name": executor_name,
  431. "has_output_text": final_node_run.output_text is not None,
  432. "has_output_json": final_node_run.output_json is not None,
  433. },
  434. )
  435. self._persist_node_execution_artifact(final_node_run)
  436. self._log_event(
  437. tenant_id=final_node_run.tenant_id,
  438. run_id=final_node_run.run_id,
  439. node_run_id=final_node_run.id,
  440. event_type="node_execution_finished",
  441. message=f"node execution finished with status {final_node_run.status}",
  442. detail_json={
  443. "node_id": final_node_run.node_id,
  444. "node_type": final_node_run.node_type,
  445. "executor_name": executor_name,
  446. "status": final_node_run.status,
  447. },
  448. )
  449. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  450. if workflow_run is None:
  451. return None
  452. return workflow_run, final_node_run, executor_name
  453. def execute_next_node_run(
  454. self,
  455. tenant_id: str,
  456. run_id: str,
  457. payload: NodeRunExecuteRequest,
  458. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  459. next_node_run = self.node_run_repository.get_next_queued_by_run(
  460. tenant_id=tenant_id,
  461. run_id=run_id,
  462. )
  463. if next_node_run is None:
  464. return None
  465. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  466. def execute_run(
  467. self,
  468. tenant_id: str,
  469. run_id: str,
  470. payload: RunExecuteRequest,
  471. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  472. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  473. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  474. return None
  475. executed_node_runs: list[NodeRun] = []
  476. executor_names: list[str] = []
  477. for _ in range(payload.max_steps):
  478. step_result = self.execute_next_node_run(
  479. tenant_id=tenant_id,
  480. run_id=run_id,
  481. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  482. )
  483. if step_result is None:
  484. break
  485. workflow_run, node_run, executor_name = step_result
  486. executed_node_runs.append(node_run)
  487. executor_names.append(executor_name)
  488. if node_run.status != "completed":
  489. break
  490. final_run = self.workflow_run_repository.get_by_id(run_id)
  491. if final_run is None:
  492. return None
  493. return final_run, executed_node_runs, executor_names
  494. def resume_human_node_run(
  495. self,
  496. *,
  497. node_run_id: str,
  498. payload: HumanNodeResumeRequest,
  499. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  500. node_run = self.node_run_repository.get_by_id(node_run_id)
  501. if node_run is None or node_run.tenant_id != payload.tenant_id:
  502. return None
  503. output_json = dict(node_run.output_json or {})
  504. existing_human_task_id = output_json.get("human_task_id")
  505. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  506. return None
  507. if existing_human_task_id is None:
  508. output_json["human_task_id"] = payload.human_task_id
  509. self.node_run_repository.update_status(
  510. node_run_id=node_run.id,
  511. status="pending",
  512. worker_key=payload.worker_key,
  513. output_json=output_json,
  514. )
  515. return self.execute_node_run(
  516. node_run_id=node_run_id,
  517. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  518. )
  519. def execute_next_claimed_node_run(
  520. self,
  521. *,
  522. worker_key: str,
  523. lease_seconds: int,
  524. redis_client: object | None = None,
  525. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  526. released_lease_count = self.node_run_repository.release_expired_leases(
  527. now_time=datetime.utcnow(),
  528. )
  529. claimed_node_run = self.node_run_repository.claim_next_queued(
  530. worker_key=worker_key,
  531. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  532. )
  533. if claimed_node_run is None:
  534. return None
  535. if redis_client is not None:
  536. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  537. lock = DistributedLock(
  538. client=redis_client,
  539. name=f"node-run:{claimed_node_run.id}:lock",
  540. ttl_seconds=lease_seconds,
  541. )
  542. if not lock.acquire():
  543. return None
  544. idempotency_store = IdempotencyStore(
  545. client=redis_client,
  546. prefix="node-run-idempotency",
  547. )
  548. if not idempotency_store.begin(key=claimed_node_run.id):
  549. lock.release()
  550. return None
  551. else:
  552. lock = None
  553. idempotency_store = None
  554. try:
  555. result = self.execute_node_run(
  556. node_run_id=claimed_node_run.id,
  557. payload=NodeRunExecuteRequest(worker_key=worker_key),
  558. )
  559. if idempotency_store is not None and result is not None:
  560. _, node_run, executor_name = result
  561. idempotency_store.complete(
  562. key=claimed_node_run.id,
  563. result={
  564. "status": node_run.status,
  565. "node_run_id": node_run.id,
  566. "executor_name": executor_name,
  567. },
  568. )
  569. finally:
  570. if lock is not None:
  571. lock.release()
  572. if result is None:
  573. return None
  574. workflow_run, node_run, executor_name = result
  575. return workflow_run, node_run, executor_name, released_lease_count
  576. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  577. if node_run.output_text is None and node_run.output_json is None:
  578. return
  579. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  580. self.node_artifact_repository.create(
  581. tenant_id=node_run.tenant_id,
  582. run_id=node_run.run_id,
  583. node_run_id=node_run.id,
  584. node_id=node_run.node_id,
  585. artifact_type="execution_result",
  586. name=f"{node_run.node_id}-execution-result",
  587. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  588. content_text=node_run.output_text,
  589. content_json=node_run.output_json,
  590. size_bytes=size_bytes,
  591. )
  592. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  593. if self.workflow_client is None:
  594. return None
  595. workflow_version = self.workflow_client.get_workflow_version(
  596. tenant_id=payload.tenant_id,
  597. workflow_version_id=payload.workflow_version_id,
  598. )
  599. return derive_initial_node(workflow_version)
  600. def _resolve_node_config(
  601. self,
  602. *,
  603. tenant_id: str,
  604. workflow_version_id: str,
  605. node_id: str,
  606. ) -> dict[str, JSONValue]:
  607. if self.workflow_client is None:
  608. return {}
  609. workflow_version = self.workflow_client.get_workflow_version(
  610. tenant_id=tenant_id,
  611. workflow_version_id=workflow_version_id,
  612. )
  613. return derive_node_config(workflow_version, node_id)
  614. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  615. if self.workflow_client is None:
  616. return
  617. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  618. if workflow_run is None:
  619. return
  620. workflow_version = self.workflow_client.get_workflow_version(
  621. tenant_id=node_run.tenant_id,
  622. workflow_version_id=workflow_run.workflow_version_id,
  623. )
  624. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  625. self._build_run_state_maps(
  626. tenant_id=node_run.tenant_id,
  627. run_id=node_run.run_id,
  628. )
  629. )
  630. successor_nodes = derive_successor_nodes(
  631. workflow_version,
  632. node_run.node_id,
  633. current_output_json=node_run.output_json,
  634. run_state_json=run_state_json,
  635. node_output_json_by_node_id=node_output_json_by_node_id,
  636. node_output_text_by_node_id=node_output_text_by_node_id,
  637. )
  638. if not successor_nodes:
  639. return
  640. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  641. tenant_id=node_run.tenant_id,
  642. run_id=node_run.run_id,
  643. node_ids=[item.node_id for item in successor_nodes],
  644. )
  645. existing_node_counts: dict[str, int] = {}
  646. for item in existing_nodes:
  647. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  648. for successor in successor_nodes:
  649. successor_config = derive_node_config(workflow_version, successor.node_id)
  650. if not self._is_join_ready(
  651. workflow_version=workflow_version,
  652. run_node_runs=self.node_run_repository.list_by_run(
  653. tenant_id=node_run.tenant_id,
  654. run_id=node_run.run_id,
  655. ),
  656. successor_node_id=successor.node_id,
  657. successor_node_type=successor.node_type,
  658. successor_config=successor_config,
  659. ):
  660. self._log_event(
  661. tenant_id=node_run.tenant_id,
  662. run_id=node_run.run_id,
  663. node_run_id=None,
  664. event_type="join_waiting",
  665. message=f"join node waiting for predecessors: {successor.node_id}",
  666. detail_json={
  667. "node_id": successor.node_id,
  668. "source_node_id": node_run.node_id,
  669. },
  670. )
  671. continue
  672. if not self._can_schedule_repeated_node(
  673. successor_config,
  674. existing_count=existing_node_counts.get(successor.node_id, 0),
  675. ):
  676. continue
  677. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  678. self.node_run_repository.create(
  679. tenant_id=node_run.tenant_id,
  680. run_id=node_run.run_id,
  681. parent_node_run_id=node_run.id,
  682. node_id=successor.node_id,
  683. node_type=successor.node_type,
  684. status=successor.status,
  685. scheduled_time=scheduled_time,
  686. timeout_time=timeout_time,
  687. )
  688. existing_node_counts[successor.node_id] = (
  689. existing_node_counts.get(successor.node_id, 0) + 1
  690. )
  691. self._log_event(
  692. tenant_id=node_run.tenant_id,
  693. run_id=node_run.run_id,
  694. node_run_id=None,
  695. event_type="node_queued",
  696. message=f"successor node queued: {successor.node_id}",
  697. detail_json={
  698. "node_id": successor.node_id,
  699. "node_type": successor.node_type,
  700. "status": successor.status,
  701. "source_node_id": node_run.node_id,
  702. },
  703. )
  704. def _build_execution_context(
  705. self,
  706. *,
  707. workflow_run: WorkflowRun,
  708. node_run: NodeRun,
  709. worker_key: str | None,
  710. node_config_json: dict[str, JSONValue] | None = None,
  711. ) -> NodeExecutionContextContract:
  712. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  713. self._build_run_state_maps(
  714. tenant_id=node_run.tenant_id,
  715. run_id=node_run.run_id,
  716. )
  717. )
  718. return NodeExecutionContextContract(
  719. tenant_id=node_run.tenant_id,
  720. run_id=node_run.run_id,
  721. node_run_id=node_run.id,
  722. node_id=node_run.node_id,
  723. node_type=node_run.node_type,
  724. node_config_json=node_config_json
  725. if node_config_json is not None
  726. else self._resolve_node_config(
  727. tenant_id=node_run.tenant_id,
  728. workflow_version_id=workflow_run.workflow_version_id,
  729. node_id=node_run.node_id,
  730. ),
  731. run_state_json=run_state_json,
  732. node_output_json_by_node_id=node_output_json_by_node_id,
  733. node_output_text_by_node_id=node_output_text_by_node_id,
  734. worker_key=worker_key,
  735. )
  736. def _build_run_state_maps(
  737. self,
  738. *,
  739. tenant_id: str,
  740. run_id: str,
  741. ) -> tuple[
  742. dict[str, JSONValue],
  743. dict[str, dict[str, JSONValue]],
  744. dict[str, str],
  745. ]:
  746. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  747. run_state_json: dict[str, JSONValue] = {}
  748. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  749. node_output_text_by_node_id: dict[str, str] = {}
  750. for item in node_runs:
  751. if item.output_json is not None:
  752. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  753. state_updates = item.output_json.get("state_updates")
  754. if isinstance(state_updates, dict):
  755. for state_key, state_value in state_updates.items():
  756. run_state_json[str(state_key)] = state_value
  757. if item.output_text is not None:
  758. node_output_text_by_node_id[item.node_id] = item.output_text
  759. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  760. def _build_node_timing(
  761. self,
  762. node_config_json: dict[str, JSONValue],
  763. ) -> tuple[datetime, datetime | None]:
  764. now = datetime.utcnow()
  765. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  766. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  767. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  768. timeout_time = (
  769. scheduled_time + timedelta(seconds=timeout_seconds)
  770. if timeout_seconds > 0
  771. else None
  772. )
  773. return scheduled_time, timeout_time
  774. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  775. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  776. def _should_retry_node(
  777. self,
  778. *,
  779. node_run: NodeRun,
  780. node_config_json: dict[str, JSONValue],
  781. ) -> bool:
  782. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  783. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  784. return max_attempts > node_run.attempt_no
  785. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  786. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  787. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  788. def _build_retry_timing(
  789. self,
  790. node_config_json: dict[str, JSONValue],
  791. ) -> tuple[datetime, datetime | None]:
  792. retry_time = datetime.utcnow() + timedelta(
  793. seconds=self._read_retry_delay_seconds(node_config_json)
  794. )
  795. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  796. timeout_time = (
  797. retry_time + timedelta(seconds=timeout_seconds)
  798. if timeout_seconds > 0
  799. else None
  800. )
  801. return retry_time, timeout_time
  802. def _is_join_ready(
  803. self,
  804. *,
  805. workflow_version: WorkflowVersionContract,
  806. run_node_runs: list[NodeRun],
  807. successor_node_id: str,
  808. successor_node_type: str,
  809. successor_config: dict[str, JSONValue],
  810. ) -> bool:
  811. join_policy = self._read_string_value(successor_config, "join_policy")
  812. if join_policy is None and successor_node_type != "join":
  813. return True
  814. workflow = self._parse_workflow(workflow_version)
  815. if workflow is None:
  816. return True
  817. predecessor_ids = [
  818. edge.source for edge in workflow.edges if edge.target == successor_node_id
  819. ]
  820. if not predecessor_ids:
  821. return True
  822. completed_node_ids = {
  823. item.node_id
  824. for item in run_node_runs
  825. if item.status in {"completed", "skipped"}
  826. }
  827. if join_policy in {None, "all_completed"}:
  828. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  829. if join_policy == "any_completed":
  830. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  831. return True
  832. def _can_schedule_repeated_node(
  833. self,
  834. node_config_json: dict[str, JSONValue],
  835. *,
  836. existing_count: int,
  837. ) -> bool:
  838. if existing_count == 0:
  839. return True
  840. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  841. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  842. return allow_loop and existing_count < max_iterations
  843. def _schedule_compensation_node(
  844. self,
  845. *,
  846. node_run: NodeRun,
  847. node_config: dict[str, JSONValue],
  848. ) -> None:
  849. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  850. if compensation_node_id is None:
  851. compensation_config = self._read_dict_value(node_config, "compensation")
  852. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  853. if compensation_node_id is None:
  854. return
  855. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  856. if workflow_run is None:
  857. return
  858. compensation_config = self._resolve_node_config(
  859. tenant_id=node_run.tenant_id,
  860. workflow_version_id=workflow_run.workflow_version_id,
  861. node_id=compensation_node_id,
  862. )
  863. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  864. tenant_id=node_run.tenant_id,
  865. run_id=node_run.run_id,
  866. node_ids=[compensation_node_id],
  867. )
  868. if existing_nodes and not self._can_schedule_repeated_node(
  869. compensation_config,
  870. existing_count=len(existing_nodes),
  871. ):
  872. return
  873. compensation_node_type = self._resolve_workflow_node_type(
  874. tenant_id=node_run.tenant_id,
  875. workflow_version_id=workflow_run.workflow_version_id,
  876. node_id=compensation_node_id,
  877. ) or "compensation"
  878. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  879. created = self.node_run_repository.create(
  880. tenant_id=node_run.tenant_id,
  881. run_id=node_run.run_id,
  882. parent_node_run_id=node_run.id,
  883. node_id=compensation_node_id,
  884. node_type=compensation_node_type,
  885. status="queued",
  886. scheduled_time=scheduled_time,
  887. timeout_time=timeout_time,
  888. )
  889. self._log_event(
  890. tenant_id=node_run.tenant_id,
  891. run_id=node_run.run_id,
  892. node_run_id=created.id,
  893. event_type="compensation_queued",
  894. message=f"compensation node queued: {compensation_node_id}",
  895. detail_json={
  896. "failed_node_id": node_run.node_id,
  897. "compensation_node_id": compensation_node_id,
  898. },
  899. )
  900. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  901. return parse_workflow_definition(workflow_version.dsl_json)
  902. def _resolve_workflow_node_type(
  903. self,
  904. *,
  905. tenant_id: str,
  906. workflow_version_id: str,
  907. node_id: str,
  908. ) -> str | None:
  909. if self.workflow_client is None:
  910. return None
  911. workflow_version = self.workflow_client.get_workflow_version(
  912. tenant_id=tenant_id,
  913. workflow_version_id=workflow_version_id,
  914. )
  915. workflow = self._parse_workflow(workflow_version)
  916. if workflow is None:
  917. return None
  918. for node in workflow.nodes:
  919. if node.id == node_id:
  920. return node.type
  921. return None
  922. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  923. value = payload.get(key)
  924. if isinstance(value, str) and value:
  925. return value
  926. return None
  927. def _read_bool_value(
  928. self,
  929. payload: dict[str, JSONValue],
  930. key: str,
  931. *,
  932. default: bool,
  933. ) -> bool:
  934. value = payload.get(key)
  935. if isinstance(value, bool):
  936. return value
  937. return default
  938. def _read_int_value(
  939. self,
  940. payload: dict[str, JSONValue],
  941. key: str,
  942. *,
  943. default: int,
  944. ) -> int:
  945. value = payload.get(key)
  946. if isinstance(value, int) and not isinstance(value, bool):
  947. return value
  948. return default
  949. def _read_dict_value(
  950. self,
  951. payload: dict[str, JSONValue],
  952. key: str,
  953. ) -> dict[str, JSONValue]:
  954. value = payload.get(key)
  955. if isinstance(value, dict):
  956. return {str(item_key): item_value for item_key, item_value in value.items()}
  957. return {}
  958. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  959. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  960. if not node_runs:
  961. return
  962. self.workflow_run_repository.update_node_count(
  963. run_id=run_id,
  964. current_node_count=len(node_runs),
  965. )
  966. next_status, error_code, error_message = self._derive_run_status(node_runs)
  967. self.workflow_run_repository.update_status(
  968. run_id=run_id,
  969. status=next_status,
  970. error_code=error_code,
  971. error_message=error_message,
  972. )
  973. self._log_event(
  974. tenant_id=tenant_id,
  975. run_id=run_id,
  976. node_run_id=None,
  977. event_type="run_status_synced",
  978. message=f"workflow run status synced to {next_status}",
  979. detail_json={
  980. "status": next_status,
  981. "error_code": error_code,
  982. },
  983. )
  984. def _derive_run_status(
  985. self,
  986. node_runs: list[NodeRun],
  987. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  988. statuses = {node_run.status for node_run in node_runs}
  989. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  990. if statuses.intersection(active_statuses):
  991. return "running", None, None
  992. if "failed" in statuses:
  993. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  994. error_code = failed_node.error_code if failed_node is not None else None
  995. error_message = failed_node.error_message if failed_node is not None else None
  996. return "failed", error_code, error_message
  997. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  998. if statuses and statuses.issubset(terminal_statuses):
  999. return "completed", None, None
  1000. return "running", None, None
  1001. def _log_event(
  1002. self,
  1003. *,
  1004. tenant_id: str,
  1005. run_id: str,
  1006. node_run_id: str | None,
  1007. event_type: str,
  1008. message: str,
  1009. detail_json: dict[str, JSONValue] | None,
  1010. level: str = "info",
  1011. ) -> None:
  1012. self.execution_log_repository.create(
  1013. tenant_id=tenant_id,
  1014. run_id=run_id,
  1015. node_run_id=node_run_id,
  1016. event_type=event_type,
  1017. level=level,
  1018. message=message,
  1019. detail_json=detail_json,
  1020. )
  1021. def _publish_event(
  1022. self,
  1023. *,
  1024. event_type: str,
  1025. payload_json: dict[str, JSONValue],
  1026. workflow_run: WorkflowRun | None = None,
  1027. node_run: NodeRun | None = None,
  1028. ) -> None:
  1029. if self.event_client is None:
  1030. return
  1031. tenant_id = workflow_run.tenant_id if workflow_run is not None else None
  1032. if tenant_id is None and node_run is not None:
  1033. tenant_id = node_run.tenant_id
  1034. if tenant_id is None:
  1035. return
  1036. aggregate_id = workflow_run.id if workflow_run is not None else None
  1037. if aggregate_id is None and node_run is not None:
  1038. aggregate_id = node_run.id
  1039. aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
  1040. correlation_id = workflow_run.session_id if workflow_run is not None else None
  1041. if correlation_id is None and node_run is not None:
  1042. correlation_id = node_run.run_id
  1043. try:
  1044. self.event_client.publish_event(
  1045. EventPublishContract(
  1046. tenant_id=tenant_id,
  1047. event_type=event_type,
  1048. source_service="runtime-service",
  1049. aggregate_type=aggregate_type,
  1050. aggregate_id=aggregate_id,
  1051. correlation_id=correlation_id,
  1052. payload_json=payload_json,
  1053. )
  1054. )
  1055. except EventServiceClientError:
  1056. return
  1057. def build_runtime_application_service(
  1058. *,
  1059. db: Session,
  1060. settings: RuntimeServiceSettings,
  1061. ) -> RuntimeApplicationService:
  1062. return RuntimeApplicationService(
  1063. workflow_run_repository=WorkflowRunRepository(db),
  1064. node_run_repository=NodeRunRepository(db),
  1065. execution_log_repository=ExecutionLogRepository(db),
  1066. node_artifact_repository=NodeArtifactRepository(db),
  1067. trace_span_repository=TraceSpanRepository(db),
  1068. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  1069. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  1070. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  1071. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  1072. human_client=HumanServiceClient(
  1073. base_url=settings.human_service_url,
  1074. timeout_seconds=settings.human_service_timeout_seconds,
  1075. ),
  1076. knowledge_client=KnowledgeServiceClient(
  1077. base_url=settings.knowledge_service_url,
  1078. timeout_seconds=settings.knowledge_service_timeout_seconds,
  1079. ),
  1080. ),
  1081. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  1082. event_client=EventServiceClient(
  1083. base_url=settings.event_service_url,
  1084. timeout_seconds=settings.event_service_timeout_seconds,
  1085. ),
  1086. )