services.py 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_dsl import parse_workflow_definition
  4. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  5. from core_domain import (
  6. InitialNodeContract,
  7. NodeExecutionContextContract,
  8. NodeExecutionResultContract,
  9. NodeRunStatus,
  10. WorkflowVersionContract,
  11. WorkflowRunStatus,
  12. )
  13. from app.db.models import NodeRun, WorkflowRun
  14. from app.domain.repositories import (
  15. ExecutionLogRepository,
  16. NodeArtifactRepository,
  17. NodeRunRepository,
  18. TraceSpanRepository,
  19. WorkflowRunRepository,
  20. )
  21. from app.infrastructure.executors import (
  22. NodeExecutionDispatcher,
  23. build_node_execution_dispatcher_with_clients,
  24. )
  25. from app.infrastructure.human_client import HumanServiceClient
  26. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  27. from app.infrastructure.code_runner_client import CodeRunnerClient
  28. from app.infrastructure.model_gateway_client import ModelGatewayClient
  29. from app.infrastructure.planner import (
  30. derive_initial_node,
  31. derive_node_config,
  32. derive_successor_nodes,
  33. )
  34. from app.infrastructure.tool_client import ToolServiceClient
  35. from app.infrastructure.workflow_client import WorkflowServiceClient
  36. from app.bootstrap.settings import RuntimeServiceSettings
  37. from app.schemas.run import (
  38. NodeRunExecuteRequest,
  39. HumanNodeResumeRequest,
  40. NodeRunStatusUpdateRequest,
  41. RunCreateRequest,
  42. RunExecuteRequest,
  43. WorkflowRunStatusUpdateRequest,
  44. )
  45. from core_shared import JSONValue
  46. class RuntimeApplicationService:
  47. def __init__(
  48. self,
  49. workflow_run_repository: WorkflowRunRepository,
  50. node_run_repository: NodeRunRepository,
  51. execution_log_repository: ExecutionLogRepository,
  52. node_artifact_repository: NodeArtifactRepository,
  53. trace_span_repository: TraceSpanRepository,
  54. execution_dispatcher: NodeExecutionDispatcher,
  55. workflow_client: WorkflowServiceClient | None = None,
  56. event_client: EventServiceClient | None = None,
  57. ) -> None:
  58. self.workflow_run_repository = workflow_run_repository
  59. self.node_run_repository = node_run_repository
  60. self.execution_log_repository = execution_log_repository
  61. self.node_artifact_repository = node_artifact_repository
  62. self.trace_span_repository = trace_span_repository
  63. self.execution_dispatcher = execution_dispatcher
  64. self.workflow_client = workflow_client
  65. self.event_client = event_client
  66. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  67. initial_node = payload.initial_node or self._plan_initial_node(payload)
  68. workflow_run = self.workflow_run_repository.create(
  69. tenant_id=payload.tenant_id,
  70. app_id=payload.app_id,
  71. app_version_id=payload.app_version_id,
  72. workflow_id=payload.workflow_id,
  73. workflow_version_id=payload.workflow_version_id,
  74. session_id=payload.session_id,
  75. parent_run_id=payload.parent_run_id,
  76. root_run_id=payload.root_run_id,
  77. run_type=payload.run_type,
  78. trigger_type=payload.trigger_type,
  79. priority=payload.priority,
  80. )
  81. node_run = None
  82. if initial_node is not None:
  83. self.workflow_run_repository.update_node_count(
  84. run_id=workflow_run.id,
  85. current_node_count=1,
  86. )
  87. initial_config = self._resolve_node_config(
  88. tenant_id=payload.tenant_id,
  89. workflow_version_id=payload.workflow_version_id,
  90. node_id=initial_node.node_id,
  91. )
  92. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  93. node_run = self.node_run_repository.create(
  94. tenant_id=payload.tenant_id,
  95. run_id=workflow_run.id,
  96. node_id=initial_node.node_id,
  97. node_type=initial_node.node_type,
  98. status=initial_node.status,
  99. scheduled_time=scheduled_time,
  100. timeout_time=timeout_time,
  101. )
  102. self._log_event(
  103. tenant_id=payload.tenant_id,
  104. run_id=workflow_run.id,
  105. node_run_id=node_run.id,
  106. event_type="node_queued",
  107. message=f"initial node queued: {initial_node.node_id}",
  108. detail_json={
  109. "node_id": initial_node.node_id,
  110. "node_type": initial_node.node_type,
  111. "status": initial_node.status,
  112. },
  113. )
  114. self._log_event(
  115. tenant_id=payload.tenant_id,
  116. run_id=workflow_run.id,
  117. node_run_id=node_run.id if node_run is not None else None,
  118. event_type="run_created",
  119. message="workflow run created",
  120. detail_json={
  121. "workflow_id": payload.workflow_id,
  122. "workflow_version_id": payload.workflow_version_id,
  123. "session_id": payload.session_id,
  124. },
  125. )
  126. self._publish_event(
  127. event_type="workflow.run.created",
  128. workflow_run=workflow_run,
  129. payload_json={
  130. "run_id": workflow_run.id,
  131. "workflow_id": workflow_run.workflow_id,
  132. "workflow_version_id": workflow_run.workflow_version_id,
  133. "status": workflow_run.status,
  134. },
  135. )
  136. return workflow_run, node_run
  137. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  138. return self.workflow_run_repository.list_by_scope(
  139. tenant_id=tenant_id,
  140. session_id=session_id,
  141. )
  142. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  143. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  144. def list_execution_logs(
  145. self,
  146. tenant_id: str,
  147. run_id: str | None = None,
  148. node_run_id: str | None = None,
  149. ):
  150. return self.execution_log_repository.list_by_scope(
  151. tenant_id=tenant_id,
  152. run_id=run_id,
  153. node_run_id=node_run_id,
  154. )
  155. def list_node_artifacts(
  156. self,
  157. tenant_id: str,
  158. run_id: str | None = None,
  159. node_run_id: str | None = None,
  160. artifact_type: str | None = None,
  161. ):
  162. return self.node_artifact_repository.list_by_scope(
  163. tenant_id=tenant_id,
  164. run_id=run_id,
  165. node_run_id=node_run_id,
  166. artifact_type=artifact_type,
  167. )
  168. def list_trace_spans(
  169. self,
  170. tenant_id: str,
  171. run_id: str | None = None,
  172. node_run_id: str | None = None,
  173. span_type: str | None = None,
  174. ):
  175. return self.trace_span_repository.list_by_scope(
  176. tenant_id=tenant_id,
  177. run_id=run_id,
  178. node_run_id=node_run_id,
  179. span_type=span_type,
  180. )
  181. def update_run_status(
  182. self,
  183. run_id: str,
  184. payload: WorkflowRunStatusUpdateRequest,
  185. ) -> WorkflowRun | None:
  186. entity = self.workflow_run_repository.update_status(
  187. run_id=run_id,
  188. status=payload.status,
  189. error_code=payload.error_code,
  190. error_message=payload.error_message,
  191. )
  192. if entity is not None:
  193. self._publish_event(
  194. event_type=f"workflow.run.{entity.status}",
  195. workflow_run=entity,
  196. payload_json={
  197. "run_id": entity.id,
  198. "status": entity.status,
  199. "error_code": entity.error_code,
  200. },
  201. )
  202. return entity
  203. def update_node_run_status(
  204. self,
  205. node_run_id: str,
  206. payload: NodeRunStatusUpdateRequest,
  207. ) -> NodeRun | None:
  208. node_run = self.node_run_repository.update_status(
  209. node_run_id=node_run_id,
  210. status=payload.status,
  211. worker_key=payload.worker_key,
  212. error_code=payload.error_code,
  213. error_message=payload.error_message,
  214. output_text=payload.output_text,
  215. output_json=payload.output_json,
  216. )
  217. if node_run is None:
  218. return None
  219. self._log_event(
  220. tenant_id=node_run.tenant_id,
  221. run_id=node_run.run_id,
  222. node_run_id=node_run.id,
  223. event_type="node_status_updated",
  224. message=f"node status updated to {payload.status}",
  225. detail_json={
  226. "node_id": node_run.node_id,
  227. "node_type": node_run.node_type,
  228. "status": payload.status,
  229. "error_code": payload.error_code,
  230. },
  231. )
  232. self._publish_event(
  233. event_type=f"workflow.node.{node_run.status}",
  234. workflow_run=None,
  235. node_run=node_run,
  236. payload_json={
  237. "run_id": node_run.run_id,
  238. "node_run_id": node_run.id,
  239. "node_id": node_run.node_id,
  240. "node_type": node_run.node_type,
  241. "status": node_run.status,
  242. "error_code": node_run.error_code,
  243. },
  244. )
  245. if payload.status == "completed":
  246. self._schedule_successor_nodes(node_run)
  247. if payload.status == "failed":
  248. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  249. if workflow_run is not None:
  250. node_config = self._resolve_node_config(
  251. tenant_id=node_run.tenant_id,
  252. workflow_version_id=workflow_run.workflow_version_id,
  253. node_id=node_run.node_id,
  254. )
  255. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  256. self._sync_workflow_run_status_from_nodes(
  257. tenant_id=node_run.tenant_id,
  258. run_id=node_run.run_id,
  259. )
  260. return node_run
  261. def execute_node_run(
  262. self,
  263. node_run_id: str,
  264. payload: NodeRunExecuteRequest,
  265. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  266. node_run = self.node_run_repository.get_by_id(node_run_id)
  267. if node_run is None:
  268. return None
  269. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  270. if workflow_run is None:
  271. return None
  272. if node_run.status in {"completed", "failed", "skipped"}:
  273. executor_name = self.execution_dispatcher.resolve_executor(
  274. node_run.node_type
  275. ).executor_name
  276. return workflow_run, node_run, executor_name
  277. node_config = self._resolve_node_config(
  278. tenant_id=node_run.tenant_id,
  279. workflow_version_id=workflow_run.workflow_version_id,
  280. node_id=node_run.node_id,
  281. )
  282. if self._node_has_timed_out(node_run):
  283. timed_out_node_run = self.update_node_run_status(
  284. node_run_id=node_run.id,
  285. payload=NodeRunStatusUpdateRequest(
  286. status="failed",
  287. worker_key=payload.worker_key,
  288. error_code="node_timeout",
  289. error_message=f"node timed out: {node_run.node_id}",
  290. output_json={
  291. "timeout_time": node_run.timeout_time.isoformat()
  292. if node_run.timeout_time is not None
  293. else None,
  294. },
  295. ),
  296. )
  297. if timed_out_node_run is None:
  298. return None
  299. executor_name = self.execution_dispatcher.resolve_executor(
  300. node_run.node_type
  301. ).executor_name
  302. return workflow_run, timed_out_node_run, executor_name
  303. running_node_run = self.node_run_repository.update_status(
  304. node_run_id=node_run_id,
  305. status="running",
  306. worker_key=payload.worker_key,
  307. )
  308. if running_node_run is None:
  309. return None
  310. self._log_event(
  311. tenant_id=running_node_run.tenant_id,
  312. run_id=running_node_run.run_id,
  313. node_run_id=running_node_run.id,
  314. event_type="node_execution_started",
  315. message=f"executing node {running_node_run.node_id}",
  316. detail_json={
  317. "node_id": running_node_run.node_id,
  318. "node_type": running_node_run.node_type,
  319. "worker_key": payload.worker_key,
  320. },
  321. )
  322. context = self._build_execution_context(
  323. workflow_run=workflow_run,
  324. node_run=running_node_run,
  325. worker_key=payload.worker_key,
  326. node_config_json=node_config,
  327. )
  328. executor_name = self.execution_dispatcher.resolve_executor(
  329. running_node_run.node_type
  330. ).executor_name
  331. trace_span = self.trace_span_repository.start(
  332. tenant_id=running_node_run.tenant_id,
  333. run_id=running_node_run.run_id,
  334. node_run_id=running_node_run.id,
  335. parent_span_id=None,
  336. span_type="node_execution",
  337. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  338. attributes_json={
  339. "node_id": running_node_run.node_id,
  340. "node_type": running_node_run.node_type,
  341. "executor_name": executor_name,
  342. "worker_key": payload.worker_key,
  343. },
  344. )
  345. try:
  346. result, executor_name = self.execution_dispatcher.execute(
  347. context=context,
  348. request=payload,
  349. )
  350. except Exception as exc:
  351. result = NodeExecutionResultContract(
  352. status="failed",
  353. worker_key=payload.worker_key,
  354. error_code="executor_error",
  355. error_message=str(exc),
  356. )
  357. if result.status == "failed" and self._should_retry_node(
  358. node_run=running_node_run,
  359. node_config_json=context.node_config_json,
  360. ):
  361. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  362. retried_node_run = self.node_run_repository.requeue_for_retry(
  363. node_run_id=running_node_run.id,
  364. scheduled_time=retry_time,
  365. timeout_time=retry_timeout_time,
  366. error_code=result.error_code,
  367. error_message=result.error_message,
  368. output_text=result.output_text,
  369. output_json={
  370. **(result.output_json or {}),
  371. "retry_scheduled_time": retry_time.isoformat(),
  372. "retry_reason": result.error_code or "node_failed",
  373. },
  374. )
  375. if retried_node_run is None:
  376. return None
  377. self.trace_span_repository.finish(
  378. span_id=trace_span.id,
  379. status="error",
  380. error_code=result.error_code,
  381. error_message=result.error_message,
  382. attributes_json={
  383. "node_status": "queued",
  384. "executor_name": executor_name,
  385. "retry_scheduled": True,
  386. "attempt_no": retried_node_run.attempt_no,
  387. },
  388. )
  389. self._log_event(
  390. tenant_id=retried_node_run.tenant_id,
  391. run_id=retried_node_run.run_id,
  392. node_run_id=retried_node_run.id,
  393. event_type="node_retry_scheduled",
  394. message=f"node retry scheduled: {retried_node_run.node_id}",
  395. detail_json={
  396. "node_id": retried_node_run.node_id,
  397. "attempt_no": retried_node_run.attempt_no,
  398. "scheduled_time": retry_time.isoformat(),
  399. "error_code": result.error_code,
  400. },
  401. )
  402. self._sync_workflow_run_status_from_nodes(
  403. tenant_id=retried_node_run.tenant_id,
  404. run_id=retried_node_run.run_id,
  405. )
  406. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  407. if workflow_run is None:
  408. return None
  409. return workflow_run, retried_node_run, executor_name
  410. final_node_run = self.update_node_run_status(
  411. node_run_id=running_node_run.id,
  412. payload=NodeRunStatusUpdateRequest(
  413. status=result.status,
  414. worker_key=result.worker_key,
  415. error_code=result.error_code,
  416. error_message=result.error_message,
  417. output_text=result.output_text,
  418. output_json=result.output_json,
  419. ),
  420. )
  421. if final_node_run is None:
  422. return None
  423. self.trace_span_repository.finish(
  424. span_id=trace_span.id,
  425. status="ok" if final_node_run.status == "completed" else "error",
  426. error_code=final_node_run.error_code,
  427. error_message=final_node_run.error_message,
  428. attributes_json={
  429. "node_status": final_node_run.status,
  430. "executor_name": executor_name,
  431. "has_output_text": final_node_run.output_text is not None,
  432. "has_output_json": final_node_run.output_json is not None,
  433. },
  434. )
  435. self._persist_node_execution_artifact(final_node_run)
  436. self._log_event(
  437. tenant_id=final_node_run.tenant_id,
  438. run_id=final_node_run.run_id,
  439. node_run_id=final_node_run.id,
  440. event_type="node_execution_finished",
  441. message=f"node execution finished with status {final_node_run.status}",
  442. detail_json={
  443. "node_id": final_node_run.node_id,
  444. "node_type": final_node_run.node_type,
  445. "executor_name": executor_name,
  446. "status": final_node_run.status,
  447. },
  448. )
  449. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  450. if workflow_run is None:
  451. return None
  452. return workflow_run, final_node_run, executor_name
  453. def execute_next_node_run(
  454. self,
  455. tenant_id: str,
  456. run_id: str,
  457. payload: NodeRunExecuteRequest,
  458. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  459. next_node_run = self.node_run_repository.get_next_queued_by_run(
  460. tenant_id=tenant_id,
  461. run_id=run_id,
  462. )
  463. if next_node_run is None:
  464. return None
  465. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  466. def execute_run(
  467. self,
  468. tenant_id: str,
  469. run_id: str,
  470. payload: RunExecuteRequest,
  471. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  472. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  473. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  474. return None
  475. executed_node_runs: list[NodeRun] = []
  476. executor_names: list[str] = []
  477. for _ in range(payload.max_steps):
  478. step_result = self.execute_next_node_run(
  479. tenant_id=tenant_id,
  480. run_id=run_id,
  481. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  482. )
  483. if step_result is None:
  484. break
  485. workflow_run, node_run, executor_name = step_result
  486. executed_node_runs.append(node_run)
  487. executor_names.append(executor_name)
  488. if node_run.status != "completed":
  489. break
  490. final_run = self.workflow_run_repository.get_by_id(run_id)
  491. if final_run is None:
  492. return None
  493. return final_run, executed_node_runs, executor_names
  494. def resume_human_node_run(
  495. self,
  496. *,
  497. node_run_id: str,
  498. payload: HumanNodeResumeRequest,
  499. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  500. node_run = self.node_run_repository.get_by_id(node_run_id)
  501. if node_run is None or node_run.tenant_id != payload.tenant_id:
  502. return None
  503. output_json = dict(node_run.output_json or {})
  504. existing_human_task_id = output_json.get("human_task_id")
  505. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  506. return None
  507. if existing_human_task_id is None:
  508. output_json["human_task_id"] = payload.human_task_id
  509. self.node_run_repository.update_status(
  510. node_run_id=node_run.id,
  511. status="pending",
  512. worker_key=payload.worker_key,
  513. output_json=output_json,
  514. )
  515. return self.execute_node_run(
  516. node_run_id=node_run_id,
  517. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  518. )
  519. def execute_next_claimed_node_run(
  520. self,
  521. *,
  522. worker_key: str,
  523. lease_seconds: int,
  524. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  525. released_lease_count = self.node_run_repository.release_expired_leases(
  526. now_time=datetime.utcnow(),
  527. )
  528. claimed_node_run = self.node_run_repository.claim_next_queued(
  529. worker_key=worker_key,
  530. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  531. )
  532. if claimed_node_run is None:
  533. return None
  534. result = self.execute_node_run(
  535. node_run_id=claimed_node_run.id,
  536. payload=NodeRunExecuteRequest(worker_key=worker_key),
  537. )
  538. if result is None:
  539. return None
  540. workflow_run, node_run, executor_name = result
  541. return workflow_run, node_run, executor_name, released_lease_count
  542. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  543. if node_run.output_text is None and node_run.output_json is None:
  544. return
  545. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  546. self.node_artifact_repository.create(
  547. tenant_id=node_run.tenant_id,
  548. run_id=node_run.run_id,
  549. node_run_id=node_run.id,
  550. node_id=node_run.node_id,
  551. artifact_type="execution_result",
  552. name=f"{node_run.node_id}-execution-result",
  553. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  554. content_text=node_run.output_text,
  555. content_json=node_run.output_json,
  556. size_bytes=size_bytes,
  557. )
  558. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  559. if self.workflow_client is None:
  560. return None
  561. workflow_version = self.workflow_client.get_workflow_version(
  562. tenant_id=payload.tenant_id,
  563. workflow_version_id=payload.workflow_version_id,
  564. )
  565. return derive_initial_node(workflow_version)
  566. def _resolve_node_config(
  567. self,
  568. *,
  569. tenant_id: str,
  570. workflow_version_id: str,
  571. node_id: str,
  572. ) -> dict[str, JSONValue]:
  573. if self.workflow_client is None:
  574. return {}
  575. workflow_version = self.workflow_client.get_workflow_version(
  576. tenant_id=tenant_id,
  577. workflow_version_id=workflow_version_id,
  578. )
  579. return derive_node_config(workflow_version, node_id)
  580. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  581. if self.workflow_client is None:
  582. return
  583. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  584. if workflow_run is None:
  585. return
  586. workflow_version = self.workflow_client.get_workflow_version(
  587. tenant_id=node_run.tenant_id,
  588. workflow_version_id=workflow_run.workflow_version_id,
  589. )
  590. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  591. self._build_run_state_maps(
  592. tenant_id=node_run.tenant_id,
  593. run_id=node_run.run_id,
  594. )
  595. )
  596. successor_nodes = derive_successor_nodes(
  597. workflow_version,
  598. node_run.node_id,
  599. current_output_json=node_run.output_json,
  600. run_state_json=run_state_json,
  601. node_output_json_by_node_id=node_output_json_by_node_id,
  602. node_output_text_by_node_id=node_output_text_by_node_id,
  603. )
  604. if not successor_nodes:
  605. return
  606. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  607. tenant_id=node_run.tenant_id,
  608. run_id=node_run.run_id,
  609. node_ids=[item.node_id for item in successor_nodes],
  610. )
  611. existing_node_counts: dict[str, int] = {}
  612. for item in existing_nodes:
  613. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  614. for successor in successor_nodes:
  615. successor_config = derive_node_config(workflow_version, successor.node_id)
  616. if not self._is_join_ready(
  617. workflow_version=workflow_version,
  618. run_node_runs=self.node_run_repository.list_by_run(
  619. tenant_id=node_run.tenant_id,
  620. run_id=node_run.run_id,
  621. ),
  622. successor_node_id=successor.node_id,
  623. successor_node_type=successor.node_type,
  624. successor_config=successor_config,
  625. ):
  626. self._log_event(
  627. tenant_id=node_run.tenant_id,
  628. run_id=node_run.run_id,
  629. node_run_id=None,
  630. event_type="join_waiting",
  631. message=f"join node waiting for predecessors: {successor.node_id}",
  632. detail_json={
  633. "node_id": successor.node_id,
  634. "source_node_id": node_run.node_id,
  635. },
  636. )
  637. continue
  638. if not self._can_schedule_repeated_node(
  639. successor_config,
  640. existing_count=existing_node_counts.get(successor.node_id, 0),
  641. ):
  642. continue
  643. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  644. self.node_run_repository.create(
  645. tenant_id=node_run.tenant_id,
  646. run_id=node_run.run_id,
  647. parent_node_run_id=node_run.id,
  648. node_id=successor.node_id,
  649. node_type=successor.node_type,
  650. status=successor.status,
  651. scheduled_time=scheduled_time,
  652. timeout_time=timeout_time,
  653. )
  654. existing_node_counts[successor.node_id] = (
  655. existing_node_counts.get(successor.node_id, 0) + 1
  656. )
  657. self._log_event(
  658. tenant_id=node_run.tenant_id,
  659. run_id=node_run.run_id,
  660. node_run_id=None,
  661. event_type="node_queued",
  662. message=f"successor node queued: {successor.node_id}",
  663. detail_json={
  664. "node_id": successor.node_id,
  665. "node_type": successor.node_type,
  666. "status": successor.status,
  667. "source_node_id": node_run.node_id,
  668. },
  669. )
  670. def _build_execution_context(
  671. self,
  672. *,
  673. workflow_run: WorkflowRun,
  674. node_run: NodeRun,
  675. worker_key: str | None,
  676. node_config_json: dict[str, JSONValue] | None = None,
  677. ) -> NodeExecutionContextContract:
  678. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  679. self._build_run_state_maps(
  680. tenant_id=node_run.tenant_id,
  681. run_id=node_run.run_id,
  682. )
  683. )
  684. return NodeExecutionContextContract(
  685. tenant_id=node_run.tenant_id,
  686. run_id=node_run.run_id,
  687. node_run_id=node_run.id,
  688. node_id=node_run.node_id,
  689. node_type=node_run.node_type,
  690. node_config_json=node_config_json
  691. if node_config_json is not None
  692. else self._resolve_node_config(
  693. tenant_id=node_run.tenant_id,
  694. workflow_version_id=workflow_run.workflow_version_id,
  695. node_id=node_run.node_id,
  696. ),
  697. run_state_json=run_state_json,
  698. node_output_json_by_node_id=node_output_json_by_node_id,
  699. node_output_text_by_node_id=node_output_text_by_node_id,
  700. worker_key=worker_key,
  701. )
  702. def _build_run_state_maps(
  703. self,
  704. *,
  705. tenant_id: str,
  706. run_id: str,
  707. ) -> tuple[
  708. dict[str, JSONValue],
  709. dict[str, dict[str, JSONValue]],
  710. dict[str, str],
  711. ]:
  712. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  713. run_state_json: dict[str, JSONValue] = {}
  714. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  715. node_output_text_by_node_id: dict[str, str] = {}
  716. for item in node_runs:
  717. if item.output_json is not None:
  718. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  719. state_updates = item.output_json.get("state_updates")
  720. if isinstance(state_updates, dict):
  721. for state_key, state_value in state_updates.items():
  722. run_state_json[str(state_key)] = state_value
  723. if item.output_text is not None:
  724. node_output_text_by_node_id[item.node_id] = item.output_text
  725. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  726. def _build_node_timing(
  727. self,
  728. node_config_json: dict[str, JSONValue],
  729. ) -> tuple[datetime, datetime | None]:
  730. now = datetime.utcnow()
  731. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  732. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  733. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  734. timeout_time = (
  735. scheduled_time + timedelta(seconds=timeout_seconds)
  736. if timeout_seconds > 0
  737. else None
  738. )
  739. return scheduled_time, timeout_time
  740. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  741. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  742. def _should_retry_node(
  743. self,
  744. *,
  745. node_run: NodeRun,
  746. node_config_json: dict[str, JSONValue],
  747. ) -> bool:
  748. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  749. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  750. return max_attempts > node_run.attempt_no
  751. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  752. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  753. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  754. def _build_retry_timing(
  755. self,
  756. node_config_json: dict[str, JSONValue],
  757. ) -> tuple[datetime, datetime | None]:
  758. retry_time = datetime.utcnow() + timedelta(
  759. seconds=self._read_retry_delay_seconds(node_config_json)
  760. )
  761. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  762. timeout_time = (
  763. retry_time + timedelta(seconds=timeout_seconds)
  764. if timeout_seconds > 0
  765. else None
  766. )
  767. return retry_time, timeout_time
  768. def _is_join_ready(
  769. self,
  770. *,
  771. workflow_version: WorkflowVersionContract,
  772. run_node_runs: list[NodeRun],
  773. successor_node_id: str,
  774. successor_node_type: str,
  775. successor_config: dict[str, JSONValue],
  776. ) -> bool:
  777. join_policy = self._read_string_value(successor_config, "join_policy")
  778. if join_policy is None and successor_node_type != "join":
  779. return True
  780. workflow = self._parse_workflow(workflow_version)
  781. if workflow is None:
  782. return True
  783. predecessor_ids = [
  784. edge.source for edge in workflow.edges if edge.target == successor_node_id
  785. ]
  786. if not predecessor_ids:
  787. return True
  788. completed_node_ids = {
  789. item.node_id
  790. for item in run_node_runs
  791. if item.status in {"completed", "skipped"}
  792. }
  793. if join_policy in {None, "all_completed"}:
  794. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  795. if join_policy == "any_completed":
  796. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  797. return True
  798. def _can_schedule_repeated_node(
  799. self,
  800. node_config_json: dict[str, JSONValue],
  801. *,
  802. existing_count: int,
  803. ) -> bool:
  804. if existing_count == 0:
  805. return True
  806. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  807. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  808. return allow_loop and existing_count < max_iterations
  809. def _schedule_compensation_node(
  810. self,
  811. *,
  812. node_run: NodeRun,
  813. node_config: dict[str, JSONValue],
  814. ) -> None:
  815. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  816. if compensation_node_id is None:
  817. compensation_config = self._read_dict_value(node_config, "compensation")
  818. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  819. if compensation_node_id is None:
  820. return
  821. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  822. if workflow_run is None:
  823. return
  824. compensation_config = self._resolve_node_config(
  825. tenant_id=node_run.tenant_id,
  826. workflow_version_id=workflow_run.workflow_version_id,
  827. node_id=compensation_node_id,
  828. )
  829. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  830. tenant_id=node_run.tenant_id,
  831. run_id=node_run.run_id,
  832. node_ids=[compensation_node_id],
  833. )
  834. if existing_nodes and not self._can_schedule_repeated_node(
  835. compensation_config,
  836. existing_count=len(existing_nodes),
  837. ):
  838. return
  839. compensation_node_type = self._resolve_workflow_node_type(
  840. tenant_id=node_run.tenant_id,
  841. workflow_version_id=workflow_run.workflow_version_id,
  842. node_id=compensation_node_id,
  843. ) or "compensation"
  844. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  845. created = self.node_run_repository.create(
  846. tenant_id=node_run.tenant_id,
  847. run_id=node_run.run_id,
  848. parent_node_run_id=node_run.id,
  849. node_id=compensation_node_id,
  850. node_type=compensation_node_type,
  851. status="queued",
  852. scheduled_time=scheduled_time,
  853. timeout_time=timeout_time,
  854. )
  855. self._log_event(
  856. tenant_id=node_run.tenant_id,
  857. run_id=node_run.run_id,
  858. node_run_id=created.id,
  859. event_type="compensation_queued",
  860. message=f"compensation node queued: {compensation_node_id}",
  861. detail_json={
  862. "failed_node_id": node_run.node_id,
  863. "compensation_node_id": compensation_node_id,
  864. },
  865. )
  866. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  867. return parse_workflow_definition(workflow_version.dsl_json)
  868. def _resolve_workflow_node_type(
  869. self,
  870. *,
  871. tenant_id: str,
  872. workflow_version_id: str,
  873. node_id: str,
  874. ) -> str | None:
  875. if self.workflow_client is None:
  876. return None
  877. workflow_version = self.workflow_client.get_workflow_version(
  878. tenant_id=tenant_id,
  879. workflow_version_id=workflow_version_id,
  880. )
  881. workflow = self._parse_workflow(workflow_version)
  882. if workflow is None:
  883. return None
  884. for node in workflow.nodes:
  885. if node.id == node_id:
  886. return node.type
  887. return None
  888. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  889. value = payload.get(key)
  890. if isinstance(value, str) and value:
  891. return value
  892. return None
  893. def _read_bool_value(
  894. self,
  895. payload: dict[str, JSONValue],
  896. key: str,
  897. *,
  898. default: bool,
  899. ) -> bool:
  900. value = payload.get(key)
  901. if isinstance(value, bool):
  902. return value
  903. return default
  904. def _read_int_value(
  905. self,
  906. payload: dict[str, JSONValue],
  907. key: str,
  908. *,
  909. default: int,
  910. ) -> int:
  911. value = payload.get(key)
  912. if isinstance(value, int) and not isinstance(value, bool):
  913. return value
  914. return default
  915. def _read_dict_value(
  916. self,
  917. payload: dict[str, JSONValue],
  918. key: str,
  919. ) -> dict[str, JSONValue]:
  920. value = payload.get(key)
  921. if isinstance(value, dict):
  922. return {str(item_key): item_value for item_key, item_value in value.items()}
  923. return {}
  924. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  925. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  926. if not node_runs:
  927. return
  928. self.workflow_run_repository.update_node_count(
  929. run_id=run_id,
  930. current_node_count=len(node_runs),
  931. )
  932. next_status, error_code, error_message = self._derive_run_status(node_runs)
  933. self.workflow_run_repository.update_status(
  934. run_id=run_id,
  935. status=next_status,
  936. error_code=error_code,
  937. error_message=error_message,
  938. )
  939. self._log_event(
  940. tenant_id=tenant_id,
  941. run_id=run_id,
  942. node_run_id=None,
  943. event_type="run_status_synced",
  944. message=f"workflow run status synced to {next_status}",
  945. detail_json={
  946. "status": next_status,
  947. "error_code": error_code,
  948. },
  949. )
  950. def _derive_run_status(
  951. self,
  952. node_runs: list[NodeRun],
  953. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  954. statuses = {node_run.status for node_run in node_runs}
  955. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  956. if statuses.intersection(active_statuses):
  957. return "running", None, None
  958. if "failed" in statuses:
  959. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  960. error_code = failed_node.error_code if failed_node is not None else None
  961. error_message = failed_node.error_message if failed_node is not None else None
  962. return "failed", error_code, error_message
  963. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  964. if statuses and statuses.issubset(terminal_statuses):
  965. return "completed", None, None
  966. return "running", None, None
  967. def _log_event(
  968. self,
  969. *,
  970. tenant_id: str,
  971. run_id: str,
  972. node_run_id: str | None,
  973. event_type: str,
  974. message: str,
  975. detail_json: dict[str, JSONValue] | None,
  976. level: str = "info",
  977. ) -> None:
  978. self.execution_log_repository.create(
  979. tenant_id=tenant_id,
  980. run_id=run_id,
  981. node_run_id=node_run_id,
  982. event_type=event_type,
  983. level=level,
  984. message=message,
  985. detail_json=detail_json,
  986. )
  987. def _publish_event(
  988. self,
  989. *,
  990. event_type: str,
  991. payload_json: dict[str, JSONValue],
  992. workflow_run: WorkflowRun | None = None,
  993. node_run: NodeRun | None = None,
  994. ) -> None:
  995. if self.event_client is None:
  996. return
  997. tenant_id = workflow_run.tenant_id if workflow_run is not None else None
  998. if tenant_id is None and node_run is not None:
  999. tenant_id = node_run.tenant_id
  1000. if tenant_id is None:
  1001. return
  1002. aggregate_id = workflow_run.id if workflow_run is not None else None
  1003. if aggregate_id is None and node_run is not None:
  1004. aggregate_id = node_run.id
  1005. aggregate_type = "workflow_run" if workflow_run is not None else "node_run"
  1006. correlation_id = workflow_run.session_id if workflow_run is not None else None
  1007. if correlation_id is None and node_run is not None:
  1008. correlation_id = node_run.run_id
  1009. try:
  1010. self.event_client.publish_event(
  1011. EventPublishContract(
  1012. tenant_id=tenant_id,
  1013. event_type=event_type,
  1014. source_service="runtime-service",
  1015. aggregate_type=aggregate_type,
  1016. aggregate_id=aggregate_id,
  1017. correlation_id=correlation_id,
  1018. payload_json=payload_json,
  1019. )
  1020. )
  1021. except EventServiceClientError:
  1022. return
  1023. def build_runtime_application_service(
  1024. *,
  1025. db: Session,
  1026. settings: RuntimeServiceSettings,
  1027. ) -> RuntimeApplicationService:
  1028. return RuntimeApplicationService(
  1029. workflow_run_repository=WorkflowRunRepository(db),
  1030. node_run_repository=NodeRunRepository(db),
  1031. execution_log_repository=ExecutionLogRepository(db),
  1032. node_artifact_repository=NodeArtifactRepository(db),
  1033. trace_span_repository=TraceSpanRepository(db),
  1034. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  1035. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  1036. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  1037. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  1038. human_client=HumanServiceClient(
  1039. base_url=settings.human_service_url,
  1040. timeout_seconds=settings.human_service_timeout_seconds,
  1041. ),
  1042. knowledge_client=KnowledgeServiceClient(
  1043. base_url=settings.knowledge_service_url,
  1044. timeout_seconds=settings.knowledge_service_timeout_seconds,
  1045. ),
  1046. ),
  1047. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  1048. event_client=EventServiceClient(
  1049. base_url=settings.event_service_url,
  1050. timeout_seconds=settings.event_service_timeout_seconds,
  1051. ),
  1052. )