services.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from pydantic import ValidationError
  2. from core_dsl import parse_workflow_definition
  3. from core_shared import JSONValue
  4. from app.application.designer import (
  5. WorkflowDebugPlan,
  6. WorkflowInspection,
  7. build_debug_plan,
  8. inspect_workflow_dsl,
  9. )
  10. from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion
  11. from app.domain.repositories import (
  12. AppDefinitionRepository,
  13. AppVersionRepository,
  14. WorkflowDefinitionRepository,
  15. WorkflowVersionRepository,
  16. )
  17. from app.schemas.app import AppCreateRequest, AppVersionCreateRequest
  18. from app.schemas.workflow import (
  19. WorkflowCreateRequest,
  20. WorkflowDebuggerPlanRequest,
  21. WorkflowDesignerValidateRequest,
  22. WorkflowVersionCreateRequest,
  23. )
  24. class WorkflowApplicationService:
  25. def __init__(
  26. self,
  27. app_repository: AppDefinitionRepository,
  28. workflow_repository: WorkflowDefinitionRepository,
  29. app_version_repository: AppVersionRepository,
  30. workflow_version_repository: WorkflowVersionRepository,
  31. ) -> None:
  32. self.app_repository = app_repository
  33. self.workflow_repository = workflow_repository
  34. self.app_version_repository = app_version_repository
  35. self.workflow_version_repository = workflow_version_repository
  36. def create_app(self, payload: AppCreateRequest) -> AppDefinition:
  37. return self.app_repository.create(
  38. tenant_id=payload.tenant_id,
  39. code=payload.code,
  40. name=payload.name,
  41. description=payload.description,
  42. owner_user_id=payload.owner_user_id,
  43. settings_json=payload.settings_json,
  44. )
  45. def list_apps(self, tenant_id: str) -> list[AppDefinition]:
  46. return self.app_repository.list_by_tenant(tenant_id)
  47. def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel:
  48. return self.workflow_repository.create(
  49. tenant_id=payload.tenant_id,
  50. app_id=payload.app_id,
  51. code=payload.code,
  52. name=payload.name,
  53. workflow_type=payload.workflow_type,
  54. )
  55. def list_workflows(self, tenant_id: str, app_id: str | None = None) -> list[WorkflowDefinitionModel]:
  56. return self.workflow_repository.list_by_scope(tenant_id=tenant_id, app_id=app_id)
  57. def create_workflow_version(self, payload: WorkflowVersionCreateRequest) -> WorkflowVersion:
  58. dsl_json = self._validate_workflow_dsl(payload.dsl_json)
  59. compiled_plan_json = payload.compiled_plan_json
  60. if compiled_plan_json is None and dsl_json is not None:
  61. compiled_plan_json = self._build_compiled_plan_json(dsl_json)
  62. return self.workflow_version_repository.create(
  63. tenant_id=payload.tenant_id,
  64. workflow_id=payload.workflow_id,
  65. dsl_json=dsl_json,
  66. compiled_plan_json=compiled_plan_json,
  67. schema_version=payload.schema_version,
  68. checksum=payload.checksum,
  69. status=payload.status,
  70. )
  71. def list_workflow_versions(self, tenant_id: str, workflow_id: str) -> list[WorkflowVersion]:
  72. return self.workflow_version_repository.list_by_workflow(
  73. tenant_id=tenant_id,
  74. workflow_id=workflow_id,
  75. )
  76. def get_workflow_version(self, tenant_id: str, workflow_version_id: str) -> WorkflowVersion | None:
  77. return self.workflow_version_repository.get_by_id(
  78. tenant_id=tenant_id,
  79. workflow_version_id=workflow_version_id,
  80. )
  81. def validate_designer_workflow(
  82. self,
  83. payload: WorkflowDesignerValidateRequest,
  84. ) -> WorkflowInspection:
  85. return inspect_workflow_dsl(payload.dsl_json)
  86. def build_designer_debug_plan(
  87. self,
  88. payload: WorkflowDebuggerPlanRequest,
  89. ) -> WorkflowDebugPlan:
  90. return build_debug_plan(
  91. payload.dsl_json,
  92. max_preview_steps=payload.max_preview_steps,
  93. )
  94. def build_version_debug_plan(
  95. self,
  96. *,
  97. tenant_id: str,
  98. workflow_version_id: str,
  99. max_preview_steps: int,
  100. ) -> WorkflowDebugPlan | None:
  101. entity = self.get_workflow_version(
  102. tenant_id=tenant_id,
  103. workflow_version_id=workflow_version_id,
  104. )
  105. if entity is None:
  106. return None
  107. return build_debug_plan(
  108. entity.dsl_json,
  109. max_preview_steps=max_preview_steps,
  110. )
  111. def create_app_version(self, payload: AppVersionCreateRequest) -> AppVersion:
  112. return self.app_version_repository.create(
  113. tenant_id=payload.tenant_id,
  114. app_id=payload.app_id,
  115. workflow_version_id=payload.workflow_version_id,
  116. status=payload.status,
  117. published_by=payload.published_by,
  118. changelog=payload.changelog,
  119. )
  120. def list_app_versions(self, tenant_id: str, app_id: str) -> list[AppVersion]:
  121. return self.app_version_repository.list_by_app(tenant_id=tenant_id, app_id=app_id)
  122. def _validate_workflow_dsl(
  123. self,
  124. dsl_json: dict[str, JSONValue] | None,
  125. ) -> dict[str, JSONValue] | None:
  126. if dsl_json is None:
  127. return None
  128. try:
  129. workflow = parse_workflow_definition(dsl_json)
  130. except ValidationError as exc:
  131. raise ValueError(f"invalid workflow dsl: {exc}") from exc
  132. inspection = inspect_workflow_dsl(dsl_json)
  133. errors = [item for item in inspection.diagnostics if item.severity == "error"]
  134. if errors:
  135. message = "; ".join(f"{item.code}: {item.message}" for item in errors)
  136. raise ValueError(f"invalid workflow dsl: {message}")
  137. if workflow is None:
  138. return None
  139. return workflow.model_dump(mode="json")
  140. def _build_compiled_plan_json(
  141. self,
  142. dsl_json: dict[str, JSONValue],
  143. ) -> dict[str, JSONValue]:
  144. plan = build_debug_plan(dsl_json)
  145. return {
  146. "valid": plan.inspection.valid,
  147. "entry_node_ids": plan.inspection.entry_node_ids,
  148. "terminal_node_ids": plan.inspection.terminal_node_ids,
  149. "node_count": len(plan.inspection.nodes),
  150. "edge_count": len(plan.inspection.edges),
  151. "execution_preview": [
  152. {
  153. "step_index": item.step_index,
  154. "node_id": item.node_id,
  155. "node_type": item.node_type,
  156. "name": item.name,
  157. "next_node_ids": item.next_node_ids,
  158. }
  159. for item in plan.execution_preview
  160. ],
  161. }