services.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_domain import (
  4. InitialNodeContract,
  5. NodeExecutionContextContract,
  6. NodeExecutionResultContract,
  7. NodeRunStatus,
  8. WorkflowRunStatus,
  9. )
  10. from app.db.models import NodeRun, WorkflowRun
  11. from app.domain.repositories import (
  12. ExecutionLogRepository,
  13. NodeArtifactRepository,
  14. NodeRunRepository,
  15. TraceSpanRepository,
  16. WorkflowRunRepository,
  17. )
  18. from app.infrastructure.executors import NodeExecutionDispatcher, build_node_execution_dispatcher_with_clients
  19. from app.infrastructure.code_runner_client import CodeRunnerClient
  20. from app.infrastructure.model_gateway_client import ModelGatewayClient
  21. from app.infrastructure.planner import derive_initial_node, derive_node_config, derive_successor_nodes
  22. from app.infrastructure.tool_client import ToolServiceClient
  23. from app.infrastructure.workflow_client import WorkflowServiceClient
  24. from app.bootstrap.settings import RuntimeServiceSettings
  25. from app.schemas.run import (
  26. NodeRunExecuteRequest,
  27. NodeRunStatusUpdateRequest,
  28. RunCreateRequest,
  29. RunExecuteRequest,
  30. WorkflowRunStatusUpdateRequest,
  31. )
  32. from core_shared import JSONValue
  33. class RuntimeApplicationService:
  34. def __init__(
  35. self,
  36. workflow_run_repository: WorkflowRunRepository,
  37. node_run_repository: NodeRunRepository,
  38. execution_log_repository: ExecutionLogRepository,
  39. node_artifact_repository: NodeArtifactRepository,
  40. trace_span_repository: TraceSpanRepository,
  41. execution_dispatcher: NodeExecutionDispatcher,
  42. workflow_client: WorkflowServiceClient | None = None,
  43. ) -> None:
  44. self.workflow_run_repository = workflow_run_repository
  45. self.node_run_repository = node_run_repository
  46. self.execution_log_repository = execution_log_repository
  47. self.node_artifact_repository = node_artifact_repository
  48. self.trace_span_repository = trace_span_repository
  49. self.execution_dispatcher = execution_dispatcher
  50. self.workflow_client = workflow_client
  51. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  52. initial_node = payload.initial_node or self._plan_initial_node(payload)
  53. workflow_run = self.workflow_run_repository.create(
  54. tenant_id=payload.tenant_id,
  55. app_id=payload.app_id,
  56. app_version_id=payload.app_version_id,
  57. workflow_id=payload.workflow_id,
  58. workflow_version_id=payload.workflow_version_id,
  59. session_id=payload.session_id,
  60. parent_run_id=payload.parent_run_id,
  61. root_run_id=payload.root_run_id,
  62. run_type=payload.run_type,
  63. trigger_type=payload.trigger_type,
  64. priority=payload.priority,
  65. )
  66. node_run = None
  67. if initial_node is not None:
  68. self.workflow_run_repository.update_node_count(
  69. run_id=workflow_run.id,
  70. current_node_count=1,
  71. )
  72. node_run = self.node_run_repository.create(
  73. tenant_id=payload.tenant_id,
  74. run_id=workflow_run.id,
  75. node_id=initial_node.node_id,
  76. node_type=initial_node.node_type,
  77. status=initial_node.status,
  78. )
  79. self._log_event(
  80. tenant_id=payload.tenant_id,
  81. run_id=workflow_run.id,
  82. node_run_id=node_run.id,
  83. event_type="node_queued",
  84. message=f"initial node queued: {initial_node.node_id}",
  85. detail_json={
  86. "node_id": initial_node.node_id,
  87. "node_type": initial_node.node_type,
  88. "status": initial_node.status,
  89. },
  90. )
  91. self._log_event(
  92. tenant_id=payload.tenant_id,
  93. run_id=workflow_run.id,
  94. node_run_id=node_run.id if node_run is not None else None,
  95. event_type="run_created",
  96. message="workflow run created",
  97. detail_json={
  98. "workflow_id": payload.workflow_id,
  99. "workflow_version_id": payload.workflow_version_id,
  100. "session_id": payload.session_id,
  101. },
  102. )
  103. return workflow_run, node_run
  104. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  105. return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
  106. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  107. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  108. def list_execution_logs(
  109. self,
  110. tenant_id: str,
  111. run_id: str | None = None,
  112. node_run_id: str | None = None,
  113. ):
  114. return self.execution_log_repository.list_by_scope(
  115. tenant_id=tenant_id,
  116. run_id=run_id,
  117. node_run_id=node_run_id,
  118. )
  119. def list_node_artifacts(
  120. self,
  121. tenant_id: str,
  122. run_id: str | None = None,
  123. node_run_id: str | None = None,
  124. artifact_type: str | None = None,
  125. ):
  126. return self.node_artifact_repository.list_by_scope(
  127. tenant_id=tenant_id,
  128. run_id=run_id,
  129. node_run_id=node_run_id,
  130. artifact_type=artifact_type,
  131. )
  132. def list_trace_spans(
  133. self,
  134. tenant_id: str,
  135. run_id: str | None = None,
  136. node_run_id: str | None = None,
  137. span_type: str | None = None,
  138. ):
  139. return self.trace_span_repository.list_by_scope(
  140. tenant_id=tenant_id,
  141. run_id=run_id,
  142. node_run_id=node_run_id,
  143. span_type=span_type,
  144. )
  145. def update_run_status(
  146. self,
  147. run_id: str,
  148. payload: WorkflowRunStatusUpdateRequest,
  149. ) -> WorkflowRun | None:
  150. return self.workflow_run_repository.update_status(
  151. run_id=run_id,
  152. status=payload.status,
  153. error_code=payload.error_code,
  154. error_message=payload.error_message,
  155. )
  156. def update_node_run_status(
  157. self,
  158. node_run_id: str,
  159. payload: NodeRunStatusUpdateRequest,
  160. ) -> NodeRun | None:
  161. node_run = self.node_run_repository.update_status(
  162. node_run_id=node_run_id,
  163. status=payload.status,
  164. worker_key=payload.worker_key,
  165. error_code=payload.error_code,
  166. error_message=payload.error_message,
  167. output_text=payload.output_text,
  168. output_json=payload.output_json,
  169. )
  170. if node_run is None:
  171. return None
  172. self._log_event(
  173. tenant_id=node_run.tenant_id,
  174. run_id=node_run.run_id,
  175. node_run_id=node_run.id,
  176. event_type="node_status_updated",
  177. message=f"node status updated to {payload.status}",
  178. detail_json={
  179. "node_id": node_run.node_id,
  180. "node_type": node_run.node_type,
  181. "status": payload.status,
  182. "error_code": payload.error_code,
  183. },
  184. )
  185. if payload.status == "completed":
  186. self._schedule_successor_nodes(node_run)
  187. self._sync_workflow_run_status_from_nodes(
  188. tenant_id=node_run.tenant_id,
  189. run_id=node_run.run_id,
  190. )
  191. return node_run
  192. def execute_node_run(
  193. self,
  194. node_run_id: str,
  195. payload: NodeRunExecuteRequest,
  196. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  197. node_run = self.node_run_repository.get_by_id(node_run_id)
  198. if node_run is None:
  199. return None
  200. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  201. if workflow_run is None:
  202. return None
  203. if node_run.status in {"completed", "failed", "skipped"}:
  204. executor_name = self.execution_dispatcher.resolve_executor(node_run.node_type).executor_name
  205. return workflow_run, node_run, executor_name
  206. running_node_run = self.node_run_repository.update_status(
  207. node_run_id=node_run_id,
  208. status="running",
  209. worker_key=payload.worker_key,
  210. )
  211. if running_node_run is None:
  212. return None
  213. self._log_event(
  214. tenant_id=running_node_run.tenant_id,
  215. run_id=running_node_run.run_id,
  216. node_run_id=running_node_run.id,
  217. event_type="node_execution_started",
  218. message=f"executing node {running_node_run.node_id}",
  219. detail_json={
  220. "node_id": running_node_run.node_id,
  221. "node_type": running_node_run.node_type,
  222. "worker_key": payload.worker_key,
  223. },
  224. )
  225. context = self._build_execution_context(
  226. workflow_run=workflow_run,
  227. node_run=running_node_run,
  228. worker_key=payload.worker_key,
  229. )
  230. executor_name = self.execution_dispatcher.resolve_executor(
  231. running_node_run.node_type
  232. ).executor_name
  233. trace_span = self.trace_span_repository.start(
  234. tenant_id=running_node_run.tenant_id,
  235. run_id=running_node_run.run_id,
  236. node_run_id=running_node_run.id,
  237. parent_span_id=None,
  238. span_type="node_execution",
  239. name=f"{running_node_run.node_type}:{running_node_run.node_id}",
  240. attributes_json={
  241. "node_id": running_node_run.node_id,
  242. "node_type": running_node_run.node_type,
  243. "executor_name": executor_name,
  244. "worker_key": payload.worker_key,
  245. },
  246. )
  247. try:
  248. result, executor_name = self.execution_dispatcher.execute(context=context, request=payload)
  249. except Exception as exc:
  250. result = NodeExecutionResultContract(
  251. status="failed",
  252. worker_key=payload.worker_key,
  253. error_code="executor_error",
  254. error_message=str(exc),
  255. )
  256. final_node_run = self.update_node_run_status(
  257. node_run_id=running_node_run.id,
  258. payload=NodeRunStatusUpdateRequest(
  259. status=result.status,
  260. worker_key=result.worker_key,
  261. error_code=result.error_code,
  262. error_message=result.error_message,
  263. output_text=result.output_text,
  264. output_json=result.output_json,
  265. ),
  266. )
  267. if final_node_run is None:
  268. return None
  269. self.trace_span_repository.finish(
  270. span_id=trace_span.id,
  271. status="ok" if final_node_run.status == "completed" else "error",
  272. error_code=final_node_run.error_code,
  273. error_message=final_node_run.error_message,
  274. attributes_json={
  275. "node_status": final_node_run.status,
  276. "executor_name": executor_name,
  277. "has_output_text": final_node_run.output_text is not None,
  278. "has_output_json": final_node_run.output_json is not None,
  279. },
  280. )
  281. self._persist_node_execution_artifact(final_node_run)
  282. self._log_event(
  283. tenant_id=final_node_run.tenant_id,
  284. run_id=final_node_run.run_id,
  285. node_run_id=final_node_run.id,
  286. event_type="node_execution_finished",
  287. message=f"node execution finished with status {final_node_run.status}",
  288. detail_json={
  289. "node_id": final_node_run.node_id,
  290. "node_type": final_node_run.node_type,
  291. "executor_name": executor_name,
  292. "status": final_node_run.status,
  293. },
  294. )
  295. workflow_run = self.workflow_run_repository.get_by_id(final_node_run.run_id)
  296. if workflow_run is None:
  297. return None
  298. return workflow_run, final_node_run, executor_name
  299. def execute_next_node_run(
  300. self,
  301. tenant_id: str,
  302. run_id: str,
  303. payload: NodeRunExecuteRequest,
  304. ) -> tuple[WorkflowRun, NodeRun, str] | None:
  305. next_node_run = self.node_run_repository.get_next_queued_by_run(
  306. tenant_id=tenant_id,
  307. run_id=run_id,
  308. )
  309. if next_node_run is None:
  310. return None
  311. return self.execute_node_run(node_run_id=next_node_run.id, payload=payload)
  312. def execute_run(
  313. self,
  314. tenant_id: str,
  315. run_id: str,
  316. payload: RunExecuteRequest,
  317. ) -> tuple[WorkflowRun, list[NodeRun], list[str]] | None:
  318. workflow_run = self.workflow_run_repository.get_by_id(run_id)
  319. if workflow_run is None or workflow_run.tenant_id != tenant_id:
  320. return None
  321. executed_node_runs: list[NodeRun] = []
  322. executor_names: list[str] = []
  323. for _ in range(payload.max_steps):
  324. step_result = self.execute_next_node_run(
  325. tenant_id=tenant_id,
  326. run_id=run_id,
  327. payload=NodeRunExecuteRequest(worker_key=payload.worker_key),
  328. )
  329. if step_result is None:
  330. break
  331. workflow_run, node_run, executor_name = step_result
  332. executed_node_runs.append(node_run)
  333. executor_names.append(executor_name)
  334. if node_run.status != "completed":
  335. break
  336. final_run = self.workflow_run_repository.get_by_id(run_id)
  337. if final_run is None:
  338. return None
  339. return final_run, executed_node_runs, executor_names
  340. def execute_next_claimed_node_run(
  341. self,
  342. *,
  343. worker_key: str,
  344. lease_seconds: int,
  345. ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
  346. released_lease_count = self.node_run_repository.release_expired_leases(
  347. now_time=datetime.utcnow(),
  348. )
  349. claimed_node_run = self.node_run_repository.claim_next_queued(
  350. worker_key=worker_key,
  351. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  352. )
  353. if claimed_node_run is None:
  354. return None
  355. result = self.execute_node_run(
  356. node_run_id=claimed_node_run.id,
  357. payload=NodeRunExecuteRequest(worker_key=worker_key),
  358. )
  359. if result is None:
  360. return None
  361. workflow_run, node_run, executor_name = result
  362. return workflow_run, node_run, executor_name, released_lease_count
  363. def _persist_node_execution_artifact(self, node_run: NodeRun) -> None:
  364. if node_run.output_text is None and node_run.output_json is None:
  365. return
  366. size_bytes = len(node_run.output_text.encode("utf-8")) if node_run.output_text else None
  367. self.node_artifact_repository.create(
  368. tenant_id=node_run.tenant_id,
  369. run_id=node_run.run_id,
  370. node_run_id=node_run.id,
  371. node_id=node_run.node_id,
  372. artifact_type="execution_result",
  373. name=f"{node_run.node_id}-execution-result",
  374. mime_type="application/json" if node_run.output_json is not None else "text/plain",
  375. content_text=node_run.output_text,
  376. content_json=node_run.output_json,
  377. size_bytes=size_bytes,
  378. )
  379. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  380. if self.workflow_client is None:
  381. return None
  382. workflow_version = self.workflow_client.get_workflow_version(
  383. tenant_id=payload.tenant_id,
  384. workflow_version_id=payload.workflow_version_id,
  385. )
  386. return derive_initial_node(workflow_version)
  387. def _resolve_node_config(
  388. self,
  389. *,
  390. tenant_id: str,
  391. workflow_version_id: str,
  392. node_id: str,
  393. ) -> dict[str, JSONValue]:
  394. if self.workflow_client is None:
  395. return {}
  396. workflow_version = self.workflow_client.get_workflow_version(
  397. tenant_id=tenant_id,
  398. workflow_version_id=workflow_version_id,
  399. )
  400. return derive_node_config(workflow_version, node_id)
  401. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  402. if self.workflow_client is None:
  403. return
  404. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  405. if workflow_run is None:
  406. return
  407. workflow_version = self.workflow_client.get_workflow_version(
  408. tenant_id=node_run.tenant_id,
  409. workflow_version_id=workflow_run.workflow_version_id,
  410. )
  411. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  412. self._build_run_state_maps(
  413. tenant_id=node_run.tenant_id,
  414. run_id=node_run.run_id,
  415. )
  416. )
  417. successor_nodes = derive_successor_nodes(
  418. workflow_version,
  419. node_run.node_id,
  420. current_output_json=node_run.output_json,
  421. run_state_json=run_state_json,
  422. node_output_json_by_node_id=node_output_json_by_node_id,
  423. node_output_text_by_node_id=node_output_text_by_node_id,
  424. )
  425. if not successor_nodes:
  426. return
  427. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  428. tenant_id=node_run.tenant_id,
  429. run_id=node_run.run_id,
  430. node_ids=[item.node_id for item in successor_nodes],
  431. )
  432. existing_node_ids = {item.node_id for item in existing_nodes}
  433. for successor in successor_nodes:
  434. if successor.node_id in existing_node_ids:
  435. continue
  436. self.node_run_repository.create(
  437. tenant_id=node_run.tenant_id,
  438. run_id=node_run.run_id,
  439. node_id=successor.node_id,
  440. node_type=successor.node_type,
  441. status=successor.status,
  442. )
  443. self._log_event(
  444. tenant_id=node_run.tenant_id,
  445. run_id=node_run.run_id,
  446. node_run_id=None,
  447. event_type="node_queued",
  448. message=f"successor node queued: {successor.node_id}",
  449. detail_json={
  450. "node_id": successor.node_id,
  451. "node_type": successor.node_type,
  452. "status": successor.status,
  453. "source_node_id": node_run.node_id,
  454. },
  455. )
  456. def _build_execution_context(
  457. self,
  458. *,
  459. workflow_run: WorkflowRun,
  460. node_run: NodeRun,
  461. worker_key: str | None,
  462. ) -> NodeExecutionContextContract:
  463. run_state_json, node_output_json_by_node_id, node_output_text_by_node_id = (
  464. self._build_run_state_maps(
  465. tenant_id=node_run.tenant_id,
  466. run_id=node_run.run_id,
  467. )
  468. )
  469. return NodeExecutionContextContract(
  470. tenant_id=node_run.tenant_id,
  471. run_id=node_run.run_id,
  472. node_run_id=node_run.id,
  473. node_id=node_run.node_id,
  474. node_type=node_run.node_type,
  475. node_config_json=self._resolve_node_config(
  476. tenant_id=node_run.tenant_id,
  477. workflow_version_id=workflow_run.workflow_version_id,
  478. node_id=node_run.node_id,
  479. ),
  480. run_state_json=run_state_json,
  481. node_output_json_by_node_id=node_output_json_by_node_id,
  482. node_output_text_by_node_id=node_output_text_by_node_id,
  483. worker_key=worker_key,
  484. )
  485. def _build_run_state_maps(
  486. self,
  487. *,
  488. tenant_id: str,
  489. run_id: str,
  490. ) -> tuple[
  491. dict[str, JSONValue],
  492. dict[str, dict[str, JSONValue]],
  493. dict[str, str],
  494. ]:
  495. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  496. run_state_json: dict[str, JSONValue] = {}
  497. node_output_json_by_node_id: dict[str, dict[str, JSONValue]] = {}
  498. node_output_text_by_node_id: dict[str, str] = {}
  499. for item in node_runs:
  500. if item.output_json is not None:
  501. node_output_json_by_node_id[item.node_id] = dict(item.output_json)
  502. state_updates = item.output_json.get("state_updates")
  503. if isinstance(state_updates, dict):
  504. for state_key, state_value in state_updates.items():
  505. run_state_json[str(state_key)] = state_value
  506. if item.output_text is not None:
  507. node_output_text_by_node_id[item.node_id] = item.output_text
  508. return run_state_json, node_output_json_by_node_id, node_output_text_by_node_id
  509. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  510. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  511. if not node_runs:
  512. return
  513. self.workflow_run_repository.update_node_count(
  514. run_id=run_id,
  515. current_node_count=len(node_runs),
  516. )
  517. next_status, error_code, error_message = self._derive_run_status(node_runs)
  518. self.workflow_run_repository.update_status(
  519. run_id=run_id,
  520. status=next_status,
  521. error_code=error_code,
  522. error_message=error_message,
  523. )
  524. self._log_event(
  525. tenant_id=tenant_id,
  526. run_id=run_id,
  527. node_run_id=None,
  528. event_type="run_status_synced",
  529. message=f"workflow run status synced to {next_status}",
  530. detail_json={
  531. "status": next_status,
  532. "error_code": error_code,
  533. },
  534. )
  535. def _derive_run_status(
  536. self,
  537. node_runs: list[NodeRun],
  538. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  539. statuses = {node_run.status for node_run in node_runs}
  540. if "failed" in statuses:
  541. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  542. error_code = failed_node.error_code if failed_node is not None else None
  543. error_message = failed_node.error_message if failed_node is not None else None
  544. return "failed", error_code, error_message
  545. if "running" in statuses:
  546. return "running", None, None
  547. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  548. if statuses and statuses.issubset(terminal_statuses):
  549. return "completed", None, None
  550. return "running", None, None
  551. def _log_event(
  552. self,
  553. *,
  554. tenant_id: str,
  555. run_id: str,
  556. node_run_id: str | None,
  557. event_type: str,
  558. message: str,
  559. detail_json: dict[str, JSONValue] | None,
  560. level: str = "info",
  561. ) -> None:
  562. self.execution_log_repository.create(
  563. tenant_id=tenant_id,
  564. run_id=run_id,
  565. node_run_id=node_run_id,
  566. event_type=event_type,
  567. level=level,
  568. message=message,
  569. detail_json=detail_json,
  570. )
  571. def build_runtime_application_service(
  572. *,
  573. db: Session,
  574. settings: RuntimeServiceSettings,
  575. ) -> RuntimeApplicationService:
  576. return RuntimeApplicationService(
  577. workflow_run_repository=WorkflowRunRepository(db),
  578. node_run_repository=NodeRunRepository(db),
  579. execution_log_repository=ExecutionLogRepository(db),
  580. node_artifact_repository=NodeArtifactRepository(db),
  581. trace_span_repository=TraceSpanRepository(db),
  582. execution_dispatcher=build_node_execution_dispatcher_with_clients(
  583. code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
  584. model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
  585. tool_client=ToolServiceClient(base_url=settings.tool_service_url),
  586. ),
  587. workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
  588. )