services.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. from core_domain import InitialNodeContract, NodeRunStatus, WorkflowRunStatus
  2. from app.db.models import NodeRun, WorkflowRun
  3. from app.domain.repositories import NodeRunRepository, WorkflowRunRepository
  4. from app.infrastructure.planner import derive_initial_node, derive_successor_nodes
  5. from app.infrastructure.workflow_client import WorkflowServiceClient
  6. from app.schemas.run import NodeRunStatusUpdateRequest, RunCreateRequest, WorkflowRunStatusUpdateRequest
  7. class RuntimeApplicationService:
  8. def __init__(
  9. self,
  10. workflow_run_repository: WorkflowRunRepository,
  11. node_run_repository: NodeRunRepository,
  12. workflow_client: WorkflowServiceClient | None = None,
  13. ) -> None:
  14. self.workflow_run_repository = workflow_run_repository
  15. self.node_run_repository = node_run_repository
  16. self.workflow_client = workflow_client
  17. def create_run(self, payload: RunCreateRequest) -> tuple[WorkflowRun, NodeRun | None]:
  18. initial_node = payload.initial_node or self._plan_initial_node(payload)
  19. workflow_run = self.workflow_run_repository.create(
  20. tenant_id=payload.tenant_id,
  21. app_id=payload.app_id,
  22. app_version_id=payload.app_version_id,
  23. workflow_id=payload.workflow_id,
  24. workflow_version_id=payload.workflow_version_id,
  25. session_id=payload.session_id,
  26. parent_run_id=payload.parent_run_id,
  27. root_run_id=payload.root_run_id,
  28. run_type=payload.run_type,
  29. trigger_type=payload.trigger_type,
  30. priority=payload.priority,
  31. )
  32. node_run = None
  33. if initial_node is not None:
  34. self.workflow_run_repository.update_node_count(
  35. run_id=workflow_run.id,
  36. current_node_count=1,
  37. )
  38. node_run = self.node_run_repository.create(
  39. tenant_id=payload.tenant_id,
  40. run_id=workflow_run.id,
  41. node_id=initial_node.node_id,
  42. node_type=initial_node.node_type,
  43. status=initial_node.status,
  44. )
  45. return workflow_run, node_run
  46. def list_runs(self, tenant_id: str, session_id: str | None = None) -> list[WorkflowRun]:
  47. return self.workflow_run_repository.list_by_scope(tenant_id=tenant_id, session_id=session_id)
  48. def list_node_runs(self, tenant_id: str, run_id: str) -> list[NodeRun]:
  49. return self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  50. def update_run_status(
  51. self,
  52. run_id: str,
  53. payload: WorkflowRunStatusUpdateRequest,
  54. ) -> WorkflowRun | None:
  55. return self.workflow_run_repository.update_status(
  56. run_id=run_id,
  57. status=payload.status,
  58. error_code=payload.error_code,
  59. error_message=payload.error_message,
  60. )
  61. def update_node_run_status(
  62. self,
  63. node_run_id: str,
  64. payload: NodeRunStatusUpdateRequest,
  65. ) -> NodeRun | None:
  66. node_run = self.node_run_repository.update_status(
  67. node_run_id=node_run_id,
  68. status=payload.status,
  69. worker_key=payload.worker_key,
  70. error_code=payload.error_code,
  71. error_message=payload.error_message,
  72. )
  73. if node_run is None:
  74. return None
  75. if payload.status == "completed":
  76. self._schedule_successor_nodes(node_run)
  77. self._sync_workflow_run_status_from_nodes(
  78. tenant_id=node_run.tenant_id,
  79. run_id=node_run.run_id,
  80. )
  81. return node_run
  82. def _plan_initial_node(self, payload: RunCreateRequest) -> InitialNodeContract | None:
  83. if self.workflow_client is None:
  84. return None
  85. workflow_version = self.workflow_client.get_workflow_version(
  86. tenant_id=payload.tenant_id,
  87. workflow_version_id=payload.workflow_version_id,
  88. )
  89. return derive_initial_node(workflow_version)
  90. def _schedule_successor_nodes(self, node_run: NodeRun) -> None:
  91. if self.workflow_client is None:
  92. return
  93. workflow_run = self.workflow_run_repository.get_by_id(node_run.run_id)
  94. if workflow_run is None:
  95. return
  96. workflow_version = self.workflow_client.get_workflow_version(
  97. tenant_id=node_run.tenant_id,
  98. workflow_version_id=workflow_run.workflow_version_id,
  99. )
  100. successor_nodes = derive_successor_nodes(workflow_version, node_run.node_id)
  101. if not successor_nodes:
  102. return
  103. existing_nodes = self.node_run_repository.list_by_run_and_node_ids(
  104. tenant_id=node_run.tenant_id,
  105. run_id=node_run.run_id,
  106. node_ids=[item.node_id for item in successor_nodes],
  107. )
  108. existing_node_ids = {item.node_id for item in existing_nodes}
  109. for successor in successor_nodes:
  110. if successor.node_id in existing_node_ids:
  111. continue
  112. self.node_run_repository.create(
  113. tenant_id=node_run.tenant_id,
  114. run_id=node_run.run_id,
  115. node_id=successor.node_id,
  116. node_type=successor.node_type,
  117. status=successor.status,
  118. )
  119. def _sync_workflow_run_status_from_nodes(self, *, tenant_id: str, run_id: str) -> None:
  120. node_runs = self.node_run_repository.list_by_run(tenant_id=tenant_id, run_id=run_id)
  121. if not node_runs:
  122. return
  123. self.workflow_run_repository.update_node_count(
  124. run_id=run_id,
  125. current_node_count=len(node_runs),
  126. )
  127. next_status, error_code, error_message = self._derive_run_status(node_runs)
  128. self.workflow_run_repository.update_status(
  129. run_id=run_id,
  130. status=next_status,
  131. error_code=error_code,
  132. error_message=error_message,
  133. )
  134. def _derive_run_status(
  135. self,
  136. node_runs: list[NodeRun],
  137. ) -> tuple[WorkflowRunStatus, str | None, str | None]:
  138. statuses = {node_run.status for node_run in node_runs}
  139. if "failed" in statuses:
  140. failed_node = next((item for item in node_runs if item.status == "failed"), None)
  141. error_code = failed_node.error_code if failed_node is not None else None
  142. error_message = failed_node.error_message if failed_node is not None else None
  143. return "failed", error_code, error_message
  144. if "running" in statuses:
  145. return "running", None, None
  146. terminal_statuses: set[NodeRunStatus] = {"completed", "skipped"}
  147. if statuses and statuses.issubset(terminal_statuses):
  148. return "completed", None, None
  149. return "running", None, None