services.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from core_dsl import parse_workflow_definition
  2. from core_shared import JSONValue
  3. from pydantic import ValidationError
  4. from app.application.designer import (
  5. WorkflowDebugPlan,
  6. WorkflowInspection,
  7. build_debug_plan,
  8. inspect_workflow_dsl,
  9. )
  10. from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion
  11. from app.domain.repositories import (
  12. AppDefinitionRepository,
  13. AppVersionRepository,
  14. WorkflowDefinitionRepository,
  15. WorkflowVersionRepository,
  16. )
  17. from app.schemas.app import AppCreateRequest, AppVersionCreateRequest
  18. from app.schemas.workflow import (
  19. WorkflowCreateRequest,
  20. WorkflowDebuggerPlanRequest,
  21. WorkflowDesignerValidateRequest,
  22. WorkflowVersionCreateRequest,
  23. )
  24. class WorkflowApplicationService:
  25. def __init__(
  26. self,
  27. app_repository: AppDefinitionRepository,
  28. workflow_repository: WorkflowDefinitionRepository,
  29. app_version_repository: AppVersionRepository,
  30. workflow_version_repository: WorkflowVersionRepository) -> None:
  31. self.app_repository = app_repository
  32. self.workflow_repository = workflow_repository
  33. self.app_version_repository = app_version_repository
  34. self.workflow_version_repository = workflow_version_repository
  35. def create_app(self, payload: AppCreateRequest) -> AppDefinition:
  36. return self.app_repository.create(
  37. code=payload.code,
  38. name=payload.name,
  39. description=payload.description,
  40. owner_user_id=payload.owner_user_id,
  41. settings_json=payload.settings_json)
  42. def list_apps(self) -> list[AppDefinition]:
  43. return self.app_repository.list_all()
  44. def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel:
  45. return self.workflow_repository.create(
  46. app_id=payload.app_id,
  47. code=payload.code,
  48. name=payload.name,
  49. workflow_type=payload.workflow_type)
  50. def list_workflows(self, app_id: str | None = None) -> list[WorkflowDefinitionModel]:
  51. return self.workflow_repository.list_by_scope(app_id=app_id)
  52. def create_workflow_version(self, payload: WorkflowVersionCreateRequest) -> WorkflowVersion:
  53. dsl_json = self._validate_workflow_dsl(payload.dsl_json)
  54. compiled_plan_json = payload.compiled_plan_json
  55. if compiled_plan_json is None and dsl_json is not None:
  56. compiled_plan_json = self._build_compiled_plan_json(dsl_json)
  57. return self.workflow_version_repository.create(
  58. workflow_id=payload.workflow_id,
  59. dsl_json=dsl_json,
  60. compiled_plan_json=compiled_plan_json,
  61. schema_version=payload.schema_version,
  62. checksum=payload.checksum,
  63. status=payload.status)
  64. def list_workflow_versions(self, workflow_id: str) -> list[WorkflowVersion]:
  65. return self.workflow_version_repository.list_by_workflow(
  66. workflow_id=workflow_id)
  67. def get_workflow_version(self, workflow_version_id: str) -> WorkflowVersion | None:
  68. return self.workflow_version_repository.get_by_id(
  69. workflow_version_id=workflow_version_id)
  70. def validate_designer_workflow(
  71. self,
  72. payload: WorkflowDesignerValidateRequest) -> WorkflowInspection:
  73. return inspect_workflow_dsl(payload.dsl_json)
  74. def build_designer_debug_plan(
  75. self,
  76. payload: WorkflowDebuggerPlanRequest) -> WorkflowDebugPlan:
  77. return build_debug_plan(
  78. payload.dsl_json,
  79. max_preview_steps=payload.max_preview_steps)
  80. def build_version_debug_plan(
  81. self,
  82. *,
  83. workflow_version_id: str,
  84. max_preview_steps: int) -> WorkflowDebugPlan | None:
  85. entity = self.get_workflow_version(
  86. workflow_version_id=workflow_version_id)
  87. if entity is None:
  88. return None
  89. return build_debug_plan(
  90. entity.dsl_json,
  91. max_preview_steps=max_preview_steps)
  92. def create_app_version(self, payload: AppVersionCreateRequest) -> AppVersion:
  93. return self.app_version_repository.create(
  94. app_id=payload.app_id,
  95. workflow_version_id=payload.workflow_version_id,
  96. status=payload.status,
  97. published_by=payload.published_by,
  98. changelog=payload.changelog)
  99. def list_app_versions(self, app_id: str) -> list[AppVersion]:
  100. return self.app_version_repository.list_by_app(app_id=app_id)
  101. def _validate_workflow_dsl(
  102. self,
  103. dsl_json: dict[str, JSONValue] | None) -> dict[str, JSONValue] | None:
  104. if dsl_json is None:
  105. return None
  106. try:
  107. workflow = parse_workflow_definition(dsl_json)
  108. except ValidationError as exc:
  109. raise ValueError(f"invalid workflow dsl: {exc}") from exc
  110. inspection = inspect_workflow_dsl(dsl_json)
  111. errors = [item for item in inspection.diagnostics if item.severity == "error"]
  112. if errors:
  113. message = "; ".join(f"{item.code}: {item.message}" for item in errors)
  114. raise ValueError(f"invalid workflow dsl: {message}")
  115. if workflow is None:
  116. return None
  117. return workflow.model_dump(mode="json")
  118. def _build_compiled_plan_json(
  119. self,
  120. dsl_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
  121. plan = build_debug_plan(dsl_json)
  122. return {
  123. "valid": plan.inspection.valid,
  124. "entry_node_ids": plan.inspection.entry_node_ids,
  125. "terminal_node_ids": plan.inspection.terminal_node_ids,
  126. "node_count": len(plan.inspection.nodes),
  127. "edge_count": len(plan.inspection.edges),
  128. "execution_preview": [
  129. {
  130. "step_index": item.step_index,
  131. "node_id": item.node_id,
  132. "node_type": item.node_type,
  133. "name": item.name,
  134. "next_node_ids": item.next_node_ids,
  135. }
  136. for item in plan.execution_preview
  137. ],
  138. }