services.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_dsl import parse_workflow_definition
  4. from core_domain import (
  5. InitialNodeContract,
  6. NodeExecutionContextContract,
  7. NodeExecutionResultContract,
  8. NodeRunStatus,
  9. WorkflowVersionContract,
  10. WorkflowRunStatus,
  11. )
  12. from app.db.models import NodeRun, WorkflowRun
  13. from app.domain.repositories import (
  14. ExecutionLogRepository,
  15. NodeArtifactRepository,
  16. NodeRunRepository,
  17. TraceSpanRepository,
  18. WorkflowRunRepository,
  19. )
  20. from app.infrastructure.executors import (
  21. NodeExecutionDispatcher,
  22. build_node_execution_dispatcher_with_clients,
  23. )
  24. from app.infrastructure.human_client import HumanServiceClient
  25. from app.infrastructure.knowledge_client import KnowledgeServiceClient
  26. from app.infrastructure.code_runner_client import CodeRunnerClient
  27. from app.infrastructure.model_gateway_client import ModelGatewayClient
  28. from app.infrastructure.planner import (
  29. derive_initial_node,
  30. derive_node_config,
  31. derive_successor_nodes,
  32. )
  33. from app.infrastructure.tool_client import ToolServiceClient
  34. from app.infrastructure.workflow_client import WorkflowServiceClient
  35. from app.bootstrap.settings import RuntimeServiceSettings
  36. from app.schemas.run import (
  37. NodeRunExecuteRequest,
  38. HumanNodeResumeRequest,
  39. NodeRunStatusUpdateRequest,
  40. RunCreateRequest,
  41. RunExecuteRequest,
  42. WorkflowRunStatusUpdateRequest,
  43. )
  44. from core_shared import JSONValue
  45. class RuntimeApplicationService:
  46. def __init__(
  47. self,
  48. workflow_run_repository: WorkflowRunRepository,
  49. node_run_repository: NodeRunRepository,
  50. execution_log_repository: ExecutionLogRepository,
  51. node_artifact_repository: NodeArtifactRepository,
  52. trace_span_repository: TraceSpanRepository,
  53. execution_dispatcher: NodeExecutionDispatcher,
  54. workflow_client: WorkflowServiceClient | None = None,
  55. ) -> None:
  56. self.workflow_run_repository = workflow_run_repository
  57. self.node_run_repository = node_run_repository
  58. self.execution_log_repository = execution_log_repository
  59. self.node_artifact_repository = node_artifact_repository
  60. self.trace_span_repository = trace_span_repository
  61. self.execution_dispatcher = execution_dispatcher
  62. self.workflow_client = workflow_client
  63. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  64. initial_node = payload.initial_node or self._plan_initial_node(payload)
  65. workflow_run = self.workflow_run_repository.create(
  66. tenant_id=payload.tenant_id,
  67. app_id=payload.app_id,
  68. app_version_id=payload.app_version_id,
  69. workflow_id=payload.workflow_id,
  70. workflow_version_id=payload.workflow_version_id,
  71. session_id=payload.session_id,
  72. parent_run_id=payload.parent_run_id,
  73. root_run_id=payload.root_run_id,
  74. run_type=payload.run_type,
  75. trigger_type=payload.trigger_type,
  76. priority=payload.priority,
  77. )
  78. node_run = None
  79. if initial_node is not None:
  80. self.workflow_run_repository.update_node_count(
  81. run_id=workflow_run.id,
  82. current_node_count=1,
  83. )
  84. initial_config = self._resolve_node_config(
  85. tenant_id=payload.tenant_id,
  86. workflow_version_id=payload.workflow_version_id,
  87. node_id=initial_node.node_id,
  88. )
  89. scheduled_time, timeout_time = self._build_node_timing(initial_config)
  90. node_run = self.node_run_repository.create(
  91. tenant_id=payload.tenant_id,
  92. run_id=workflow_run.id,
  93. node_id=initial_node.node_id,
  94. node_type=initial_node.node_type,
  95. status=initial_node.status,
  96. scheduled_time=scheduled_time,
  97. timeout_time=timeout_time,
  98. )
  99. self._log_event(
  100. tenant_id=payload.tenant_id,
  101. run_id=workflow_run.id,
  102. node_run_id=node_run.id,
  103. event_type="node_queued",
  104. message=f"initial node queued: {initial_node.node_id}",
  105. detail_json={
  106. "node_id": initial_node.node_id,
  107. "node_type": initial_node.node_type,
  108. "status": initial_node.status,
  109. },
  110. )
  111. self._log_event(
  112. tenant_id=payload.tenant_id,
  113. run_id=workflow_run.id,
  114. node_run_id=node_run.id if node_run is not None else None,
  115. event_type="run_created",
  116. message="workflow run created",
  117. detail_json={
  118. "workflow_id": payload.workflow_id,
  119. "workflow_version_id": payload.workflow_version_id,
  120. "session_id": payload.session_id,
  121. },
  122. )
  123. return workflow_run, node_run
  124. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  125. return self.workflow_run_repository.list_by_scope(
  126. tenant_id=tenant_id,
  127. session_id=session_id,
  128. )
  129. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  130. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  131. def list_execution_logs(
  132. self,
  133. tenant_id: str,
  134. run_id: str | None = None,
  135. node_run_id: str | None = None,
  136. ):
  137. return self.execution_log_repository.list_by_scope(
  138. tenant_id=tenant_id,
  139. run_id=run_id,
  140. node_run_id=node_run_id,
  141. )
  142. def list_node_artifacts(
  143. self,
  144. tenant_id: str,
  145. run_id: str | None = None,
  146. node_run_id: str | None = None,
  147. artifact_type: str | None = None,
  148. ):
  149. return self.node_artifact_repository.list_by_scope(
  150. tenant_id=tenant_id,
  151. run_id=run_id,
  152. node_run_id=node_run_id,
  153. artifact_type=artifact_type,
  154. )
  155. def list_trace_spans(
  156. self,
  157. tenant_id: str,
  158. run_id: str | None = None,
  159. node_run_id: str | None = None,
  160. span_type: str | None = None,
  161. ):
  162. return self.trace_span_repository.list_by_scope(
  163. tenant_id=tenant_id,
  164. run_id=run_id,
  165. node_run_id=node_run_id,
  166. span_type=span_type,
  167. )
  168. def update_run_status(
  169. self,
  170. run_id: str,
  171. payload: WorkflowRunStatusUpdateRequest,
  172. ) -> WorkflowRun | None:
  173. return self.workflow_run_repository.update_status(
  174. run_id=run_id,
  175. status=payload.status,
  176. error_code=payload.error_code,
  177. error_message=payload.error_message,
  178. )
  179. def update_node_run_status(
  180. self,
  181. node_run_id: str,
  182. payload: NodeRunStatusUpdateRequest,
  183. ) -> NodeRun | None:
  184. node_run = self.node_run_repository.update_status(
  185. node_run_id=node_run_id,
  186. status=payload.status,
  187. worker_key=payload.worker_key,
  188. error_code=payload.error_code,
  189. error_message=payload.error_message,
  190. output_text=payload.output_text,
  191. output_json=payload.output_json,
  192. )
  193. if node_run is None:
  194. return None
  195. self._log_event(
  196. tenant_id=node_run.tenant_id,
  197. run_id=node_run.run_id,
  198. node_run_id=node_run.id,
  199. event_type="node_status_updated",
  200. message=f"node status updated to {payload.status}",
  201. detail_json={
  202. "node_id": node_run.node_id,
  203. "node_type": node_run.node_type,
  204. "status": payload.status,
  205. "error_code": payload.error_code,
  206. },
  207. )
  208. if payload.status == "completed":
  209. self._schedule_successor_nodes(node_run)
  210. if payload.status == "failed":
  211. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  212. if workflow_run is not None:
  213. node_config = self._resolve_node_config(
  214. tenant_id=node_run.tenant_id,
  215. workflow_version_id=workflow_run.workflow_version_id,
  216. node_id=node_run.node_id,
  217. )
  218. self._schedule_compensation_node(node_run=node_run, node_config=node_config)
  219. self._sync_workflow_run_status_from_nodes(
  220. tenant_id=node_run.tenant_id,
  221. run_id=node_run.run_id,
  222. )
  223. return node_run
  224. def execute_node_run(
  225. self,
  226. node_run_id: str,
  227. payload: NodeRunExecuteRequest,
  228. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  229. node_run = self.node_run_repository.get_by_id(node_run_id)
  230. if node_run is None:
  231. return None
  232. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  233. if workflow_run is None:
  234. return None
  235. if node_run.status in {"completed", "failed", "skipped"}:
  236. executor_name = self.execution_dispatcher.resolve_executor(
  237. node_run.node_type
  238. ).executor_name
  239. return workflow_run, node_run, executor_name
  240. node_config = self._resolve_node_config(
  241. tenant_id=node_run.tenant_id,
  242. workflow_version_id=workflow_run.workflow_version_id,
  243. node_id=node_run.node_id,
  244. )
  245. if self._node_has_timed_out(node_run):
  246. timed_out_node_run = self.update_node_run_status(
  247. node_run_id=node_run.id,
  248. payload=NodeRunStatusUpdateRequest(
  249. status="failed",
  250. worker_key=payload.worker_key,
  251. error_code="node_timeout",
  252. error_message=f"node timed out: {node_run.node_id}",
  253. output_json={
  254. "timeout_time": node_run.timeout_time.isoformat()
  255. if node_run.timeout_time is not None
  256. else None,
  257. },
  258. ),
  259. )
  260. if timed_out_node_run is None:
  261. return None
  262. executor_name = self.execution_dispatcher.resolve_executor(
  263. node_run.node_type
  264. ).executor_name
  265. return workflow_run, timed_out_node_run, executor_name
  266. running_node_run = self.node_run_repository.update_status(
  267. node_run_id=node_run_id,
  268. status="running",
  269. worker_key=payload.worker_key,
  270. )
  271. if running_node_run is None:
  272. return None
  273. self._log_event(
  274. tenant_id=running_node_run.tenant_id,
  275. run_id=running_node_run.run_id,
  276. node_run_id=running_node_run.id,
  277. event_type="node_execution_started",
  278. message=f"executing node {running_node_run.node_id}",
  279. detail_json={
  280. "node_id": running_node_run.node_id,
  281. "node_type": running_node_run.node_type,
  282. "worker_key": payload.worker_key,
  283. },
  284. )
  285. context = self._build_execution_context(
  286. workflow_run=workflow_run,
  287. node_run=running_node_run,
  288. worker_key=payload.worker_key,
  289. node_config_json=node_config,
  290. )
  291. executor_name = self.execution_dispatcher.resolve_executor(
  292. running_node_run.node_type
  293. ).executor_name
  294. trace_span = self.trace_span_repository.start(
  295. tenant_id=running_node_run.tenant_id,
  296. run_id=running_node_run.run_id,
  297. node_run_id=running_node_run.id,
  298. parent_span_id=None,
  299. span_type="node_execution",
  300. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  301. attributes_json={
  302. "node_id": running_node_run.node_id,
  303. "node_type": running_node_run.node_type,
  304. "executor_name": executor_name,
  305. "worker_key": payload.worker_key,
  306. },
  307. )
  308. try:
  309. result, executor_name = self.execution_dispatcher.execute(
  310. context=context,
  311. request=payload,
  312. )
  313. except Exception as exc:
  314. result = NodeExecutionResultContract(
  315. status="failed",
  316. worker_key=payload.worker_key,
  317. error_code="executor_error",
  318. error_message=str(exc),
  319. )
  320. if result.status == "failed" and self._should_retry_node(
  321. node_run=running_node_run,
  322. node_config_json=context.node_config_json,
  323. ):
  324. retry_time, retry_timeout_time = self._build_retry_timing(context.node_config_json)
  325. retried_node_run = self.node_run_repository.requeue_for_retry(
  326. node_run_id=running_node_run.id,
  327. scheduled_time=retry_time,
  328. timeout_time=retry_timeout_time,
  329. error_code=result.error_code,
  330. error_message=result.error_message,
  331. output_text=result.output_text,
  332. output_json={
  333. **(result.output_json or {}),
  334. "retry_scheduled_time": retry_time.isoformat(),
  335. "retry_reason": result.error_code or "node_failed",
  336. },
  337. )
  338. if retried_node_run is None:
  339. return None
  340. self.trace_span_repository.finish(
  341. span_id=trace_span.id,
  342. status="error",
  343. error_code=result.error_code,
  344. error_message=result.error_message,
  345. attributes_json={
  346. "node_status": "queued",
  347. "executor_name": executor_name,
  348. "retry_scheduled": True,
  349. "attempt_no": retried_node_run.attempt_no,
  350. },
  351. )
  352. self._log_event(
  353. tenant_id=retried_node_run.tenant_id,
  354. run_id=retried_node_run.run_id,
  355. node_run_id=retried_node_run.id,
  356. event_type="node_retry_scheduled",
  357. message=f"node retry scheduled: {retried_node_run.node_id}",
  358. detail_json={
  359. "node_id": retried_node_run.node_id,
  360. "attempt_no": retried_node_run.attempt_no,
  361. "scheduled_time": retry_time.isoformat(),
  362. "error_code": result.error_code,
  363. },
  364. )
  365. self._sync_workflow_run_status_from_nodes(
  366. tenant_id=retried_node_run.tenant_id,
  367. run_id=retried_node_run.run_id,
  368. )
  369. workflow_run = self.workflow_run_repository.get_by_id(retried_node_run.run_id)
  370. if workflow_run is None:
  371. return None
  372. return workflow_run, retried_node_run, executor_name
  373. final_node_run = self.update_node_run_status(
  374. node_run_id=running_node_run.id,
  375. payload=NodeRunStatusUpdateRequest(
  376. status=result.status,
  377. worker_key=result.worker_key,
  378. error_code=result.error_code,
  379. error_message=result.error_message,
  380. output_text=result.output_text,
  381. output_json=result.output_json,
  382. ),
  383. )
  384. if final_node_run is None:
  385. return None
  386. self.trace_span_repository.finish(
  387. span_id=trace_span.id,
  388. status="ok" if final_node_run.status == "completed" else "error",
  389. error_code=final_node_run.error_code,
  390. error_message=final_node_run.error_message,
  391. attributes_json={
  392. "node_status": final_node_run.status,
  393. "executor_name": executor_name,
  394. "has_output_text": final_node_run.output_text is not None,
  395. "has_output_json": final_node_run.output_json is not None,
  396. },
  397. )
  398. self._persist_node_execution_artifact(final_node_run)
  399. self._log_event(
  400. tenant_id=final_node_run.tenant_id,
  401. run_id=final_node_run.run_id,
  402. node_run_id=final_node_run.id,
  403. event_type="node_execution_finished",
  404. message=f"node execution finished with status {final_node_run.status}",
  405. detail_json={
  406. "node_id": final_node_run.node_id,
  407. "node_type": final_node_run.node_type,
  408. "executor_name": executor_name,
  409. "status": final_node_run.status,
  410. },
  411. )
  412. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  413. if workflow_run is None:
  414. return None
  415. return workflow_run, final_node_run, executor_name
  416. def execute_next_node_run(
  417. self,
  418. tenant_id: str,
  419. run_id: str,
  420. payload: NodeRunExecuteRequest,
  421. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  422. next_node_run = self.node_run_repository.get_next_queued_by_run(
  423. tenant_id=tenant_id,
  424. run_id=run_id,
  425. )
  426. if next_node_run is None:
  427. return None
  428. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  429. def execute_run(
  430. self,
  431. tenant_id: str,
  432. run_id: str,
  433. payload: RunExecuteRequest,
  434. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  435. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  436. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  437. return None
  438. executed_node_runs: list[NodeRun] = []
  439. executor_names: list[str] = []
  440. for _ in range(payload.max_steps):
  441. step_result = self.execute_next_node_run(
  442. tenant_id=tenant_id,
  443. run_id=run_id,
  444. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  445. )
  446. if step_result is None:
  447. break
  448. workflow_run, node_run, executor_name = step_result
  449. executed_node_runs.append(node_run)
  450. executor_names.append(executor_name)
  451. if node_run.status != "completed":
  452. break
  453. final_run = self.workflow_run_repository.get_by_id(run_id)
  454. if final_run is None:
  455. return None
  456. return final_run, executed_node_runs, executor_names
  457. def resume_human_node_run(
  458. self,
  459. *,
  460. node_run_id: str,
  461. payload: HumanNodeResumeRequest,
  462. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  463. node_run = self.node_run_repository.get_by_id(node_run_id)
  464. if node_run is None or node_run.tenant_id != payload.tenant_id:
  465. return None
  466. output_json = dict(node_run.output_json or {})
  467. existing_human_task_id = output_json.get("human_task_id")
  468. if existing_human_task_id is not None and existing_human_task_id != payload.human_task_id:
  469. return None
  470. if existing_human_task_id is None:
  471. output_json["human_task_id"] = payload.human_task_id
  472. self.node_run_repository.update_status(
  473. node_run_id=node_run.id,
  474. status="pending",
  475. worker_key=payload.worker_key,
  476. output_json=output_json,
  477. )
  478. return self.execute_node_run(
  479. node_run_id=node_run_id,
  480. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  481. )
  482. def execute_next_claimed_node_run(
  483. self,
  484. *,
  485. worker_key: str,
  486. lease_seconds: int,
  487. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  488. released_lease_count = self.node_run_repository.release_expired_leases(
  489. now_time=datetime.utcnow(),
  490. )
  491. claimed_node_run = self.node_run_repository.claim_next_queued(
  492. worker_key=worker_key,
  493. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  494. )
  495. if claimed_node_run is None:
  496. return None
  497. result = self.execute_node_run(
  498. node_run_id=claimed_node_run.id,
  499. payload=NodeRunExecuteRequest(worker_key=worker_key),
  500. )
  501. if result is None:
  502. return None
  503. workflow_run, node_run, executor_name = result
  504. return workflow_run, node_run, executor_name, released_lease_count
  505. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  506. if node_run.output_text is None and node_run.output_json is None:
  507. return
  508. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  509. self.node_artifact_repository.create(
  510. tenant_id=node_run.tenant_id,
  511. run_id=node_run.run_id,
  512. node_run_id=node_run.id,
  513. node_id=node_run.node_id,
  514. artifact_type="execution_result",
  515. name=f"{node_run.node_id}-execution-result",
  516. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  517. content_text=node_run.output_text,
  518. content_json=node_run.output_json,
  519. size_bytes=size_bytes,
  520. )
  521. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  522. if self.workflow_client is None:
  523. return None
  524. workflow_version = self.workflow_client.get_workflow_version(
  525. tenant_id=payload.tenant_id,
  526. workflow_version_id=payload.workflow_version_id,
  527. )
  528. return derive_initial_node(workflow_version)
  529. def _resolve_node_config(
  530. self,
  531. *,
  532. tenant_id: str,
  533. workflow_version_id: str,
  534. node_id: str,
  535. ) -> dict[str, JSONValue]:
  536. if self.workflow_client is None:
  537. return {}
  538. workflow_version = self.workflow_client.get_workflow_version(
  539. tenant_id=tenant_id,
  540. workflow_version_id=workflow_version_id,
  541. )
  542. return derive_node_config(workflow_version, node_id)
  543. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  544. if self.workflow_client is None:
  545. return
  546. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  547. if workflow_run is None:
  548. return
  549. workflow_version = self.workflow_client.get_workflow_version(
  550. tenant_id=node_run.tenant_id,
  551. workflow_version_id=workflow_run.workflow_version_id,
  552. )
  553. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  554. self._build_run_state_maps(
  555. tenant_id=node_run.tenant_id,
  556. run_id=node_run.run_id,
  557. )
  558. )
  559. successor_nodes = derive_successor_nodes(
  560. workflow_version,
  561. node_run.node_id,
  562. current_output_json=node_run.output_json,
  563. run_state_json=run_state_json,
  564. node_output_json_by_node_id=node_output_json_by_node_id,
  565. node_output_text_by_node_id=node_output_text_by_node_id,
  566. )
  567. if not successor_nodes:
  568. return
  569. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  570. tenant_id=node_run.tenant_id,
  571. run_id=node_run.run_id,
  572. node_ids=[item.node_id for item in successor_nodes],
  573. )
  574. existing_node_counts: dict[str, int] = {}
  575. for item in existing_nodes:
  576. existing_node_counts[item.node_id] = existing_node_counts.get(item.node_id, 0) + 1
  577. for successor in successor_nodes:
  578. successor_config = derive_node_config(workflow_version, successor.node_id)
  579. if not self._is_join_ready(
  580. workflow_version=workflow_version,
  581. run_node_runs=self.node_run_repository.list_by_run(
  582. tenant_id=node_run.tenant_id,
  583. run_id=node_run.run_id,
  584. ),
  585. successor_node_id=successor.node_id,
  586. successor_node_type=successor.node_type,
  587. successor_config=successor_config,
  588. ):
  589. self._log_event(
  590. tenant_id=node_run.tenant_id,
  591. run_id=node_run.run_id,
  592. node_run_id=None,
  593. event_type="join_waiting",
  594. message=f"join node waiting for predecessors: {successor.node_id}",
  595. detail_json={
  596. "node_id": successor.node_id,
  597. "source_node_id": node_run.node_id,
  598. },
  599. )
  600. continue
  601. if not self._can_schedule_repeated_node(
  602. successor_config,
  603. existing_count=existing_node_counts.get(successor.node_id, 0),
  604. ):
  605. continue
  606. scheduled_time, timeout_time = self._build_node_timing(successor_config)
  607. self.node_run_repository.create(
  608. tenant_id=node_run.tenant_id,
  609. run_id=node_run.run_id,
  610. parent_node_run_id=node_run.id,
  611. node_id=successor.node_id,
  612. node_type=successor.node_type,
  613. status=successor.status,
  614. scheduled_time=scheduled_time,
  615. timeout_time=timeout_time,
  616. )
  617. existing_node_counts[successor.node_id] = (
  618. existing_node_counts.get(successor.node_id, 0) + 1
  619. )
  620. self._log_event(
  621. tenant_id=node_run.tenant_id,
  622. run_id=node_run.run_id,
  623. node_run_id=None,
  624. event_type="node_queued",
  625. message=f"successor node queued: {successor.node_id}",
  626. detail_json={
  627. "node_id": successor.node_id,
  628. "node_type": successor.node_type,
  629. "status": successor.status,
  630. "source_node_id": node_run.node_id,
  631. },
  632. )
  633. def _build_execution_context(
  634. self,
  635. *,
  636. workflow_run: WorkflowRun,
  637. node_run: NodeRun,
  638. worker_key: str | None,
  639. node_config_json: dict[str, JSONValue] | None = None,
  640. ) -> NodeExecutionContextContract:
  641. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  642. self._build_run_state_maps(
  643. tenant_id=node_run.tenant_id,
  644. run_id=node_run.run_id,
  645. )
  646. )
  647. return NodeExecutionContextContract(
  648. tenant_id=node_run.tenant_id,
  649. run_id=node_run.run_id,
  650. node_run_id=node_run.id,
  651. node_id=node_run.node_id,
  652. node_type=node_run.node_type,
  653. node_config_json=node_config_json
  654. if node_config_json is not None
  655. else self._resolve_node_config(
  656. tenant_id=node_run.tenant_id,
  657. workflow_version_id=workflow_run.workflow_version_id,
  658. node_id=node_run.node_id,
  659. ),
  660. run_state_json=run_state_json,
  661. node_output_json_by_node_id=node_output_json_by_node_id,
  662. node_output_text_by_node_id=node_output_text_by_node_id,
  663. worker_key=worker_key,
  664. )
  665. def _build_run_state_maps(
  666. self,
  667. *,
  668. tenant_id: str,
  669. run_id: str,
  670. ) -> tuple[
  671. dict[str, JSONValue],
  672. dict[str, dict[str, JSONValue]],
  673. dict[str, str],
  674. ]:
  675. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  676. run_state_json: dict[str, JSONValue] = {}
  677. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  678. node_output_text_by_node_id: dict[str, str] = {}
  679. for item in node_runs:
  680. if item.output_json is not None:
  681. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  682. state_updates = item.output_json.get("state_updates")
  683. if isinstance(state_updates, dict):
  684. for state_key, state_value in state_updates.items():
  685. run_state_json[str(state_key)] = state_value
  686. if item.output_text is not None:
  687. node_output_text_by_node_id[item.node_id] = item.output_text
  688. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  689. def _build_node_timing(
  690. self,
  691. node_config_json: dict[str, JSONValue],
  692. ) -> tuple[datetime, datetime | None]:
  693. now = datetime.utcnow()
  694. delay_seconds = self._read_int_value(node_config_json, "delay_seconds", default=0)
  695. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  696. scheduled_time = now + timedelta(seconds=max(delay_seconds, 0))
  697. timeout_time = (
  698. scheduled_time + timedelta(seconds=timeout_seconds)
  699. if timeout_seconds > 0
  700. else None
  701. )
  702. return scheduled_time, timeout_time
  703. def _node_has_timed_out(self, node_run: NodeRun) -> bool:
  704. return node_run.timeout_time is not None and node_run.timeout_time <= datetime.utcnow()
  705. def _should_retry_node(
  706. self,
  707. *,
  708. node_run: NodeRun,
  709. node_config_json: dict[str, JSONValue],
  710. ) -> bool:
  711. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  712. max_attempts = self._read_int_value(retry_policy, "max_attempts", default=1)
  713. return max_attempts > node_run.attempt_no
  714. def _read_retry_delay_seconds(self, node_config_json: dict[str, JSONValue]) -> int:
  715. retry_policy = self._read_dict_value(node_config_json, "retry_policy")
  716. return self._read_int_value(retry_policy, "retry_delay_seconds", default=0)
  717. def _build_retry_timing(
  718. self,
  719. node_config_json: dict[str, JSONValue],
  720. ) -> tuple[datetime, datetime | None]:
  721. retry_time = datetime.utcnow() + timedelta(
  722. seconds=self._read_retry_delay_seconds(node_config_json)
  723. )
  724. timeout_seconds = self._read_int_value(node_config_json, "timeout_seconds", default=0)
  725. timeout_time = (
  726. retry_time + timedelta(seconds=timeout_seconds)
  727. if timeout_seconds > 0
  728. else None
  729. )
  730. return retry_time, timeout_time
  731. def _is_join_ready(
  732. self,
  733. *,
  734. workflow_version: WorkflowVersionContract,
  735. run_node_runs: list[NodeRun],
  736. successor_node_id: str,
  737. successor_node_type: str,
  738. successor_config: dict[str, JSONValue],
  739. ) -> bool:
  740. join_policy = self._read_string_value(successor_config, "join_policy")
  741. if join_policy is None and successor_node_type != "join":
  742. return True
  743. workflow = self._parse_workflow(workflow_version)
  744. if workflow is None:
  745. return True
  746. predecessor_ids = [
  747. edge.source for edge in workflow.edges if edge.target == successor_node_id
  748. ]
  749. if not predecessor_ids:
  750. return True
  751. completed_node_ids = {
  752. item.node_id
  753. for item in run_node_runs
  754. if item.status in {"completed", "skipped"}
  755. }
  756. if join_policy in {None, "all_completed"}:
  757. return all(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  758. if join_policy == "any_completed":
  759. return any(predecessor_id in completed_node_ids for predecessor_id in predecessor_ids)
  760. return True
  761. def _can_schedule_repeated_node(
  762. self,
  763. node_config_json: dict[str, JSONValue],
  764. *,
  765. existing_count: int,
  766. ) -> bool:
  767. if existing_count == 0:
  768. return True
  769. allow_loop = self._read_bool_value(node_config_json, "allow_loop", default=False)
  770. max_iterations = self._read_int_value(node_config_json, "max_iterations", default=1)
  771. return allow_loop and existing_count < max_iterations
  772. def _schedule_compensation_node(
  773. self,
  774. *,
  775. node_run: NodeRun,
  776. node_config: dict[str, JSONValue],
  777. ) -> None:
  778. compensation_node_id = self._read_string_value(node_config, "compensation_node_id")
  779. if compensation_node_id is None:
  780. compensation_config = self._read_dict_value(node_config, "compensation")
  781. compensation_node_id = self._read_string_value(compensation_config, "node_id")
  782. if compensation_node_id is None:
  783. return
  784. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  785. if workflow_run is None:
  786. return
  787. compensation_config = self._resolve_node_config(
  788. tenant_id=node_run.tenant_id,
  789. workflow_version_id=workflow_run.workflow_version_id,
  790. node_id=compensation_node_id,
  791. )
  792. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  793. tenant_id=node_run.tenant_id,
  794. run_id=node_run.run_id,
  795. node_ids=[compensation_node_id],
  796. )
  797. if existing_nodes and not self._can_schedule_repeated_node(
  798. compensation_config,
  799. existing_count=len(existing_nodes),
  800. ):
  801. return
  802. compensation_node_type = self._resolve_workflow_node_type(
  803. tenant_id=node_run.tenant_id,
  804. workflow_version_id=workflow_run.workflow_version_id,
  805. node_id=compensation_node_id,
  806. ) or "compensation"
  807. scheduled_time, timeout_time = self._build_node_timing(compensation_config)
  808. created = self.node_run_repository.create(
  809. tenant_id=node_run.tenant_id,
  810. run_id=node_run.run_id,
  811. parent_node_run_id=node_run.id,
  812. node_id=compensation_node_id,
  813. node_type=compensation_node_type,
  814. status="queued",
  815. scheduled_time=scheduled_time,
  816. timeout_time=timeout_time,
  817. )
  818. self._log_event(
  819. tenant_id=node_run.tenant_id,
  820. run_id=node_run.run_id,
  821. node_run_id=created.id,
  822. event_type="compensation_queued",
  823. message=f"compensation node queued: {compensation_node_id}",
  824. detail_json={
  825. "failed_node_id": node_run.node_id,
  826. "compensation_node_id": compensation_node_id,
  827. },
  828. )
  829. def _parse_workflow(self, workflow_version: WorkflowVersionContract):
  830. return parse_workflow_definition(workflow_version.dsl_json)
  831. def _resolve_workflow_node_type(
  832. self,
  833. *,
  834. tenant_id: str,
  835. workflow_version_id: str,
  836. node_id: str,
  837. ) -> str | None:
  838. if self.workflow_client is None:
  839. return None
  840. workflow_version = self.workflow_client.get_workflow_version(
  841. tenant_id=tenant_id,
  842. workflow_version_id=workflow_version_id,
  843. )
  844. workflow = self._parse_workflow(workflow_version)
  845. if workflow is None:
  846. return None
  847. for node in workflow.nodes:
  848. if node.id == node_id:
  849. return node.type
  850. return None
  851. def _read_string_value(self, payload: dict[str, JSONValue], key: str) -> str | None:
  852. value = payload.get(key)
  853. if isinstance(value, str) and value:
  854. return value
  855. return None
  856. def _read_bool_value(
  857. self,
  858. payload: dict[str, JSONValue],
  859. key: str,
  860. *,
  861. default: bool,
  862. ) -> bool:
  863. value = payload.get(key)
  864. if isinstance(value, bool):
  865. return value
  866. return default
  867. def _read_int_value(
  868. self,
  869. payload: dict[str, JSONValue],
  870. key: str,
  871. *,
  872. default: int,
  873. ) -> int:
  874. value = payload.get(key)
  875. if isinstance(value, int) and not isinstance(value, bool):
  876. return value
  877. return default
  878. def _read_dict_value(
  879. self,
  880. payload: dict[str, JSONValue],
  881. key: str,
  882. ) -> dict[str, JSONValue]:
  883. value = payload.get(key)
  884. if isinstance(value, dict):
  885. return {str(item_key): item_value for item_key, item_value in value.items()}
  886. return {}
  887. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  888. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  889. if not node_runs:
  890. return
  891. self.workflow_run_repository.update_node_count(
  892. run_id=run_id,
  893. current_node_count=len(node_runs),
  894. )
  895. next_status, error_code, error_message = self._derive_run_status(node_runs)
  896. self.workflow_run_repository.update_status(
  897. run_id=run_id,
  898. status=next_status,
  899. error_code=error_code,
  900. error_message=error_message,
  901. )
  902. self._log_event(
  903. tenant_id=tenant_id,
  904. run_id=run_id,
  905. node_run_id=None,
  906. event_type="run_status_synced",
  907. message=f"workflow run status synced to {next_status}",
  908. detail_json={
  909. "status": next_status,
  910. "error_code": error_code,
  911. },
  912. )
  913. def _derive_run_status(
  914. self,
  915. node_runs: list[NodeRun],
  916. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  917. statuses = {node_run.status for node_run in node_runs}
  918. active_statuses: set[NodeRunStatus] = {"pending", "queued", "running"}
  919. if statuses.intersection(active_statuses):
  920. return "running", None, None
  921. if "failed" in statuses:
  922. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  923. error_code = failed_node.error_code if failed_node is not None else None
  924. error_message = failed_node.error_message if failed_node is not None else None
  925. return "failed", error_code, error_message
  926. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  927. if statuses and statuses.issubset(terminal_statuses):
  928. return "completed", None, None
  929. return "running", None, None
  930. def _log_event(
  931. self,
  932. *,
  933. tenant_id: str,
  934. run_id: str,
  935. node_run_id: str | None,
  936. event_type: str,
  937. message: str,
  938. detail_json: dict[str, JSONValue] | None,
  939. level: str = "info",
  940. ) -> None:
  941. self.execution_log_repository.create(
  942. tenant_id=tenant_id,
  943. run_id=run_id,
  944. node_run_id=node_run_id,
  945. event_type=event_type,
  946. level=level,
  947. message=message,
  948. detail_json=detail_json,
  949. )
  950. def build_runtime_application_service(
  951. *,
  952. db: Session,
  953. settings: RuntimeServiceSettings,
  954. ) -> RuntimeApplicationService:
  955. return RuntimeApplicationService(
  956. workflow_run_repository=WorkflowRunRepository(db),
  957. node_run_repository=NodeRunRepository(db),
  958. execution_log_repository=ExecutionLogRepository(db),
  959. node_artifact_repository=NodeArtifactRepository(db),
  960. trace_span_repository=TraceSpanRepository(db),
  961. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  962. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  963. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  964. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  965. human_client=HumanServiceClient(
  966. base_url=settings.human_service_url,
  967. timeout_seconds=settings.human_service_timeout_seconds,
  968. ),
  969. knowledge_client=KnowledgeServiceClient(
  970. base_url=settings.knowledge_service_url,
  971. timeout_seconds=settings.knowledge_service_timeout_seconds,
  972. ),
  973. ),
  974. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  975. )