services.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. from core_domain import (
  2. InitialNodeContract,
  3. NodeExecutionContextContract,
  4. NodeExecutionResultContract,
  5. NodeRunStatus,
  6. WorkflowRunStatus,
  7. )
  8. from app.db.models import NodeRun, WorkflowRun
  9. from app.domain.repositories import (
  10. ExecutionLogRepository,
  11. NodeArtifactRepository,
  12. NodeRunRepository,
  13. TraceSpanRepository,
  14. WorkflowRunRepository,
  15. )
  16. from app.infrastructure.executors import NodeExecutionDispatcher
  17. from app.infrastructure.planner import derive_initial_node, derive_node_config, derive_successor_nodes
  18. from app.infrastructure.workflow_client import WorkflowServiceClient
  19. from app.schemas.run import (
  20. NodeRunExecuteRequest,
  21. NodeRunStatusUpdateRequest,
  22. RunCreateRequest,
  23. RunExecuteRequest,
  24. WorkflowRunStatusUpdateRequest,
  25. )
  26. from core_shared import JSONValue
  27. class RuntimeApplicationService:
  28. def __init__(
  29. self,
  30. workflow_run_repository: WorkflowRunRepository,
  31. node_run_repository: NodeRunRepository,
  32. execution_log_repository: ExecutionLogRepository,
  33. node_artifact_repository: NodeArtifactRepository,
  34. trace_span_repository: TraceSpanRepository,
  35. execution_dispatcher: NodeExecutionDispatcher,
  36. workflow_client: WorkflowServiceClient | None = None,
  37. ) -> None:
  38. self.workflow_run_repository = workflow_run_repository
  39. self.node_run_repository = node_run_repository
  40. self.execution_log_repository = execution_log_repository
  41. self.node_artifact_repository = node_artifact_repository
  42. self.trace_span_repository = trace_span_repository
  43. self.execution_dispatcher = execution_dispatcher
  44. self.workflow_client = workflow_client
  45. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  46. initial_node = payload.initial_node or self._plan_initial_node(payload)
  47. workflow_run = self.workflow_run_repository.create(
  48. tenant_id=payload.tenant_id,
  49. app_id=payload.app_id,
  50. app_version_id=payload.app_version_id,
  51. workflow_id=payload.workflow_id,
  52. workflow_version_id=payload.workflow_version_id,
  53. session_id=payload.session_id,
  54. parent_run_id=payload.parent_run_id,
  55. root_run_id=payload.root_run_id,
  56. run_type=payload.run_type,
  57. trigger_type=payload.trigger_type,
  58. priority=payload.priority,
  59. )
  60. node_run = None
  61. if initial_node is not None:
  62. self.workflow_run_repository.update_node_count(
  63. run_id=workflow_run.id,
  64. current_node_count=1,
  65. )
  66. node_run = self.node_run_repository.create(
  67. tenant_id=payload.tenant_id,
  68. run_id=workflow_run.id,
  69. node_id=initial_node.node_id,
  70. node_type=initial_node.node_type,
  71. status=initial_node.status,
  72. )
  73. self._log_event(
  74. tenant_id=payload.tenant_id,
  75. run_id=workflow_run.id,
  76. node_run_id=node_run.id,
  77. event_type="node_queued",
  78. message=f"initial node queued: {initial_node.node_id}",
  79. detail_json={
  80. "node_id": initial_node.node_id,
  81. "node_type": initial_node.node_type,
  82. "status": initial_node.status,
  83. },
  84. )
  85. self._log_event(
  86. tenant_id=payload.tenant_id,
  87. run_id=workflow_run.id,
  88. node_run_id=node_run.id if node_run is not None else None,
  89. event_type="run_created",
  90. message="workflow run created",
  91. detail_json={
  92. "workflow_id": payload.workflow_id,
  93. "workflow_version_id": payload.workflow_version_id,
  94. "session_id": payload.session_id,
  95. },
  96. )
  97. return workflow_run, node_run
  98. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  99. return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
  100. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  101. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  102. def list_execution_logs(
  103. self,
  104. tenant_id: str,
  105. run_id: str | None = None,
  106. node_run_id: str | None = None,
  107. ):
  108. return self.execution_log_repository.list_by_scope(
  109. tenant_id=tenant_id,
  110. run_id=run_id,
  111. node_run_id=node_run_id,
  112. )
  113. def list_node_artifacts(
  114. self,
  115. tenant_id: str,
  116. run_id: str | None = None,
  117. node_run_id: str | None = None,
  118. artifact_type: str | None = None,
  119. ):
  120. return self.node_artifact_repository.list_by_scope(
  121. tenant_id=tenant_id,
  122. run_id=run_id,
  123. node_run_id=node_run_id,
  124. artifact_type=artifact_type,
  125. )
  126. def list_trace_spans(
  127. self,
  128. tenant_id: str,
  129. run_id: str | None = None,
  130. node_run_id: str | None = None,
  131. span_type: str | None = None,
  132. ):
  133. return self.trace_span_repository.list_by_scope(
  134. tenant_id=tenant_id,
  135. run_id=run_id,
  136. node_run_id=node_run_id,
  137. span_type=span_type,
  138. )
  139. def update_run_status(
  140. self,
  141. run_id: str,
  142. payload: WorkflowRunStatusUpdateRequest,
  143. ) -> WorkflowRun | None:
  144. return self.workflow_run_repository.update_status(
  145. run_id=run_id,
  146. status=payload.status,
  147. error_code=payload.error_code,
  148. error_message=payload.error_message,
  149. )
  150. def update_node_run_status(
  151. self,
  152. node_run_id: str,
  153. payload: NodeRunStatusUpdateRequest,
  154. ) -> NodeRun | None:
  155. node_run = self.node_run_repository.update_status(
  156. node_run_id=node_run_id,
  157. status=payload.status,
  158. worker_key=payload.worker_key,
  159. error_code=payload.error_code,
  160. error_message=payload.error_message,
  161. output_text=payload.output_text,
  162. output_json=payload.output_json,
  163. )
  164. if node_run is None:
  165. return None
  166. self._log_event(
  167. tenant_id=node_run.tenant_id,
  168. run_id=node_run.run_id,
  169. node_run_id=node_run.id,
  170. event_type="node_status_updated",
  171. message=f"node status updated to {payload.status}",
  172. detail_json={
  173. "node_id": node_run.node_id,
  174. "node_type": node_run.node_type,
  175. "status": payload.status,
  176. "error_code": payload.error_code,
  177. },
  178. )
  179. if payload.status == "completed":
  180. self._schedule_successor_nodes(node_run)
  181. self._sync_workflow_run_status_from_nodes(
  182. tenant_id=node_run.tenant_id,
  183. run_id=node_run.run_id,
  184. )
  185. return node_run
  186. def execute_node_run(
  187. self,
  188. node_run_id: str,
  189. payload: NodeRunExecuteRequest,
  190. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  191. node_run = self.node_run_repository.get_by_id(node_run_id)
  192. if node_run is None:
  193. return None
  194. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  195. if workflow_run is None:
  196. return None
  197. if node_run.status in {"completed", "failed", "skipped"}:
  198. executor_name = self.execution_dispatcher.resolve_executor(node_run.node_type).executor_name
  199. return workflow_run, node_run, executor_name
  200. running_node_run = self.node_run_repository.update_status(
  201. node_run_id=node_run_id,
  202. status="running",
  203. worker_key=payload.worker_key,
  204. )
  205. if running_node_run is None:
  206. return None
  207. self._log_event(
  208. tenant_id=running_node_run.tenant_id,
  209. run_id=running_node_run.run_id,
  210. node_run_id=running_node_run.id,
  211. event_type="node_execution_started",
  212. message=f"executing node {running_node_run.node_id}",
  213. detail_json={
  214. "node_id": running_node_run.node_id,
  215. "node_type": running_node_run.node_type,
  216. "worker_key": payload.worker_key,
  217. },
  218. )
  219. context = self._build_execution_context(
  220. workflow_run=workflow_run,
  221. node_run=running_node_run,
  222. worker_key=payload.worker_key,
  223. )
  224. executor_name = self.execution_dispatcher.resolve_executor(
  225. running_node_run.node_type
  226. ).executor_name
  227. trace_span = self.trace_span_repository.start(
  228. tenant_id=running_node_run.tenant_id,
  229. run_id=running_node_run.run_id,
  230. node_run_id=running_node_run.id,
  231. parent_span_id=None,
  232. span_type="node_execution",
  233. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  234. attributes_json={
  235. "node_id": running_node_run.node_id,
  236. "node_type": running_node_run.node_type,
  237. "executor_name": executor_name,
  238. "worker_key": payload.worker_key,
  239. },
  240. )
  241. try:
  242. result, executor_name = self.execution_dispatcher.execute(context=context, request=payload)
  243. except Exception as exc:
  244. result = NodeExecutionResultContract(
  245. status="failed",
  246. worker_key=payload.worker_key,
  247. error_code="executor_error",
  248. error_message=str(exc),
  249. )
  250. final_node_run = self.update_node_run_status(
  251. node_run_id=running_node_run.id,
  252. payload=NodeRunStatusUpdateRequest(
  253. status=result.status,
  254. worker_key=result.worker_key,
  255. error_code=result.error_code,
  256. error_message=result.error_message,
  257. output_text=result.output_text,
  258. output_json=result.output_json,
  259. ),
  260. )
  261. if final_node_run is None:
  262. return None
  263. self.trace_span_repository.finish(
  264. span_id=trace_span.id,
  265. status="ok" if final_node_run.status == "completed" else "error",
  266. error_code=final_node_run.error_code,
  267. error_message=final_node_run.error_message,
  268. attributes_json={
  269. "node_status": final_node_run.status,
  270. "executor_name": executor_name,
  271. "has_output_text": final_node_run.output_text is not None,
  272. "has_output_json": final_node_run.output_json is not None,
  273. },
  274. )
  275. self._persist_node_execution_artifact(final_node_run)
  276. self._log_event(
  277. tenant_id=final_node_run.tenant_id,
  278. run_id=final_node_run.run_id,
  279. node_run_id=final_node_run.id,
  280. event_type="node_execution_finished",
  281. message=f"node execution finished with status {final_node_run.status}",
  282. detail_json={
  283. "node_id": final_node_run.node_id,
  284. "node_type": final_node_run.node_type,
  285. "executor_name": executor_name,
  286. "status": final_node_run.status,
  287. },
  288. )
  289. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  290. if workflow_run is None:
  291. return None
  292. return workflow_run, final_node_run, executor_name
  293. def execute_next_node_run(
  294. self,
  295. tenant_id: str,
  296. run_id: str,
  297. payload: NodeRunExecuteRequest,
  298. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  299. next_node_run = self.node_run_repository.get_next_queued_by_run(
  300. tenant_id=tenant_id,
  301. run_id=run_id,
  302. )
  303. if next_node_run is None:
  304. return None
  305. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  306. def execute_run(
  307. self,
  308. tenant_id: str,
  309. run_id: str,
  310. payload: RunExecuteRequest,
  311. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  312. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  313. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  314. return None
  315. executed_node_runs: list[NodeRun] = []
  316. executor_names: list[str] = []
  317. for _ in range(payload.max_steps):
  318. step_result = self.execute_next_node_run(
  319. tenant_id=tenant_id,
  320. run_id=run_id,
  321. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  322. )
  323. if step_result is None:
  324. break
  325. workflow_run, node_run, executor_name = step_result
  326. executed_node_runs.append(node_run)
  327. executor_names.append(executor_name)
  328. if node_run.status != "completed":
  329. break
  330. final_run = self.workflow_run_repository.get_by_id(run_id)
  331. if final_run is None:
  332. return None
  333. return final_run, executed_node_runs, executor_names
  334. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  335. if node_run.output_text is None and node_run.output_json is None:
  336. return
  337. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  338. self.node_artifact_repository.create(
  339. tenant_id=node_run.tenant_id,
  340. run_id=node_run.run_id,
  341. node_run_id=node_run.id,
  342. node_id=node_run.node_id,
  343. artifact_type="execution_result",
  344. name=f"{node_run.node_id}-execution-result",
  345. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  346. content_text=node_run.output_text,
  347. content_json=node_run.output_json,
  348. size_bytes=size_bytes,
  349. )
  350. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  351. if self.workflow_client is None:
  352. return None
  353. workflow_version = self.workflow_client.get_workflow_version(
  354. tenant_id=payload.tenant_id,
  355. workflow_version_id=payload.workflow_version_id,
  356. )
  357. return derive_initial_node(workflow_version)
  358. def _resolve_node_config(
  359. self,
  360. *,
  361. tenant_id: str,
  362. workflow_version_id: str,
  363. node_id: str,
  364. ) -> dict[str, JSONValue]:
  365. if self.workflow_client is None:
  366. return {}
  367. workflow_version = self.workflow_client.get_workflow_version(
  368. tenant_id=tenant_id,
  369. workflow_version_id=workflow_version_id,
  370. )
  371. return derive_node_config(workflow_version, node_id)
  372. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  373. if self.workflow_client is None:
  374. return
  375. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  376. if workflow_run is None:
  377. return
  378. workflow_version = self.workflow_client.get_workflow_version(
  379. tenant_id=node_run.tenant_id,
  380. workflow_version_id=workflow_run.workflow_version_id,
  381. )
  382. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  383. self._build_run_state_maps(
  384. tenant_id=node_run.tenant_id,
  385. run_id=node_run.run_id,
  386. )
  387. )
  388. successor_nodes = derive_successor_nodes(
  389. workflow_version,
  390. node_run.node_id,
  391. current_output_json=node_run.output_json,
  392. run_state_json=run_state_json,
  393. node_output_json_by_node_id=node_output_json_by_node_id,
  394. node_output_text_by_node_id=node_output_text_by_node_id,
  395. )
  396. if not successor_nodes:
  397. return
  398. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  399. tenant_id=node_run.tenant_id,
  400. run_id=node_run.run_id,
  401. node_ids=[item.node_id for item in successor_nodes],
  402. )
  403. existing_node_ids = {item.node_id for item in existing_nodes}
  404. for successor in successor_nodes:
  405. if successor.node_id in existing_node_ids:
  406. continue
  407. self.node_run_repository.create(
  408. tenant_id=node_run.tenant_id,
  409. run_id=node_run.run_id,
  410. node_id=successor.node_id,
  411. node_type=successor.node_type,
  412. status=successor.status,
  413. )
  414. self._log_event(
  415. tenant_id=node_run.tenant_id,
  416. run_id=node_run.run_id,
  417. node_run_id=None,
  418. event_type="node_queued",
  419. message=f"successor node queued: {successor.node_id}",
  420. detail_json={
  421. "node_id": successor.node_id,
  422. "node_type": successor.node_type,
  423. "status": successor.status,
  424. "source_node_id": node_run.node_id,
  425. },
  426. )
  427. def _build_execution_context(
  428. self,
  429. *,
  430. workflow_run: WorkflowRun,
  431. node_run: NodeRun,
  432. worker_key: str | None,
  433. ) -> NodeExecutionContextContract:
  434. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  435. self._build_run_state_maps(
  436. tenant_id=node_run.tenant_id,
  437. run_id=node_run.run_id,
  438. )
  439. )
  440. return NodeExecutionContextContract(
  441. tenant_id=node_run.tenant_id,
  442. run_id=node_run.run_id,
  443. node_run_id=node_run.id,
  444. node_id=node_run.node_id,
  445. node_type=node_run.node_type,
  446. node_config_json=self._resolve_node_config(
  447. tenant_id=node_run.tenant_id,
  448. workflow_version_id=workflow_run.workflow_version_id,
  449. node_id=node_run.node_id,
  450. ),
  451. run_state_json=run_state_json,
  452. node_output_json_by_node_id=node_output_json_by_node_id,
  453. node_output_text_by_node_id=node_output_text_by_node_id,
  454. worker_key=worker_key,
  455. )
  456. def _build_run_state_maps(
  457. self,
  458. *,
  459. tenant_id: str,
  460. run_id: str,
  461. ) -> tuple[
  462. dict[str, JSONValue],
  463. dict[str, dict[str, JSONValue]],
  464. dict[str, str],
  465. ]:
  466. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  467. run_state_json: dict[str, JSONValue] = {}
  468. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  469. node_output_text_by_node_id: dict[str, str] = {}
  470. for item in node_runs:
  471. if item.output_json is not None:
  472. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  473. state_updates = item.output_json.get("state_updates")
  474. if isinstance(state_updates, dict):
  475. for state_key, state_value in state_updates.items():
  476. run_state_json[str(state_key)] = state_value
  477. if item.output_text is not None:
  478. node_output_text_by_node_id[item.node_id] = item.output_text
  479. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  480. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  481. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  482. if not node_runs:
  483. return
  484. self.workflow_run_repository.update_node_count(
  485. run_id=run_id,
  486. current_node_count=len(node_runs),
  487. )
  488. next_status, error_code, error_message = self._derive_run_status(node_runs)
  489. self.workflow_run_repository.update_status(
  490. run_id=run_id,
  491. status=next_status,
  492. error_code=error_code,
  493. error_message=error_message,
  494. )
  495. self._log_event(
  496. tenant_id=tenant_id,
  497. run_id=run_id,
  498. node_run_id=None,
  499. event_type="run_status_synced",
  500. message=f"workflow run status synced to {next_status}",
  501. detail_json={
  502. "status": next_status,
  503. "error_code": error_code,
  504. },
  505. )
  506. def _derive_run_status(
  507. self,
  508. node_runs: list[NodeRun],
  509. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  510. statuses = {node_run.status for node_run in node_runs}
  511. if "failed" in statuses:
  512. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  513. error_code = failed_node.error_code if failed_node is not None else None
  514. error_message = failed_node.error_message if failed_node is not None else None
  515. return "failed", error_code, error_message
  516. if "running" in statuses:
  517. return "running", None, None
  518. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  519. if statuses and statuses.issubset(terminal_statuses):
  520. return "completed", None, None
  521. return "running", None, None
  522. def _log_event(
  523. self,
  524. *,
  525. tenant_id: str,
  526. run_id: str,
  527. node_run_id: str | None,
  528. event_type: str,
  529. message: str,
  530. detail_json: dict[str, JSONValue] | None,
  531. level: str = "info",
  532. ) -> None:
  533. self.execution_log_repository.create(
  534. tenant_id=tenant_id,
  535. run_id=run_id,
  536. node_run_id=node_run_id,
  537. event_type=event_type,
  538. level=level,
  539. message=message,
  540. detail_json=detail_json,
  541. )