services.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from core_dsl import parse_workflow_definition
  2. from core_shared import JSONValue
  3. from pydantic import ValidationError
  4. from app.application.designer import (
  5. WorkflowDebugPlan,
  6. WorkflowInspection,
  7. build_debug_plan,
  8. inspect_workflow_dsl,
  9. )
  10. from app.db.models import AppDefinition, AppConfig, WorkflowDefinitionModel, WorkflowConfig
  11. from app.domain.repositories import (
  12. AppDefinitionRepository,
  13. AppConfigRepository,
  14. WorkflowDefinitionRepository,
  15. WorkflowConfigRepository,
  16. )
  17. from app.schemas.app import AppCreateRequest, AppConfigCreateRequest
  18. from app.schemas.workflow import (
  19. WorkflowCreateRequest,
  20. WorkflowDebuggerPlanRequest,
  21. WorkflowDesignerValidateRequest,
  22. WorkflowConfigCreateRequest,
  23. )
  24. class WorkflowApplicationService:
  25. def __init__(
  26. self,
  27. app_repository: AppDefinitionRepository,
  28. workflow_repository: WorkflowDefinitionRepository,
  29. app_config_repository: AppConfigRepository,
  30. workflow_config_repository: WorkflowConfigRepository) -> None:
  31. self.app_repository = app_repository
  32. self.workflow_repository = workflow_repository
  33. self.app_config_repository = app_config_repository
  34. self.workflow_config_repository = workflow_config_repository
  35. def create_app(self, payload: AppCreateRequest) -> AppDefinition:
  36. return self.app_repository.create(
  37. code=payload.code,
  38. name=payload.name,
  39. description=payload.description,
  40. owner_user_id=payload.owner_user_id,
  41. settings_json=payload.settings_json)
  42. def list_apps(self) -> list[AppDefinition]:
  43. return self.app_repository.list_all()
  44. def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel:
  45. return self.workflow_repository.create(
  46. app_id=payload.app_id,
  47. code=payload.code,
  48. name=payload.name,
  49. workflow_type=payload.workflow_type)
  50. def list_workflows(self, app_id: str | None = None) -> list[WorkflowDefinitionModel]:
  51. return self.workflow_repository.list_by_scope(app_id=app_id)
  52. def create_workflow_config(self, payload: WorkflowConfigCreateRequest) -> WorkflowConfig:
  53. dsl_json = self._validate_workflow_dsl(payload.dsl_json)
  54. compiled_plan_json = payload.compiled_plan_json
  55. if compiled_plan_json is None and dsl_json is not None:
  56. compiled_plan_json = self._build_compiled_plan_json(dsl_json)
  57. return self.workflow_config_repository.create(
  58. workflow_id=payload.workflow_id,
  59. dsl_json=dsl_json,
  60. compiled_plan_json=compiled_plan_json,
  61. checksum=payload.checksum)
  62. def list_workflow_configs(self, workflow_id: str) -> list[WorkflowConfig]:
  63. return self.workflow_config_repository.list_by_workflow(
  64. workflow_id=workflow_id)
  65. def get_workflow_config(self, workflow_config_id: str) -> WorkflowConfig | None:
  66. return self.workflow_config_repository.get_by_id(
  67. workflow_config_id=workflow_config_id)
  68. def validate_designer_workflow(
  69. self,
  70. payload: WorkflowDesignerValidateRequest) -> WorkflowInspection:
  71. return inspect_workflow_dsl(payload.dsl_json)
  72. def build_designer_debug_plan(
  73. self,
  74. payload: WorkflowDebuggerPlanRequest) -> WorkflowDebugPlan:
  75. return build_debug_plan(
  76. payload.dsl_json,
  77. max_preview_steps=payload.max_preview_steps)
  78. def build_config_debug_plan(
  79. self,
  80. *,
  81. workflow_config_id: str,
  82. max_preview_steps: int) -> WorkflowDebugPlan | None:
  83. entity = self.get_workflow_config(
  84. workflow_config_id=workflow_config_id)
  85. if entity is None:
  86. return None
  87. return build_debug_plan(
  88. entity.dsl_json,
  89. max_preview_steps=max_preview_steps)
  90. def create_app_config(self, payload: AppConfigCreateRequest) -> AppConfig:
  91. return self.app_config_repository.create(
  92. app_id=payload.app_id,
  93. workflow_config_id=payload.workflow_config_id)
  94. def list_app_configs(self, app_id: str) -> list[AppConfig]:
  95. return self.app_config_repository.list_by_app(app_id=app_id)
  96. def _validate_workflow_dsl(
  97. self,
  98. dsl_json: dict[str, JSONValue] | None) -> dict[str, JSONValue] | None:
  99. if dsl_json is None:
  100. return None
  101. try:
  102. workflow = parse_workflow_definition(dsl_json)
  103. except ValidationError as exc:
  104. raise ValueError(f"invalid workflow dsl: {exc}") from exc
  105. inspection = inspect_workflow_dsl(dsl_json)
  106. errors = [item for item in inspection.diagnostics if item.severity == "error"]
  107. if errors:
  108. message = "; ".join(f"{item.code}: {item.message}" for item in errors)
  109. raise ValueError(f"invalid workflow dsl: {message}")
  110. if workflow is None:
  111. return None
  112. return workflow.model_dump(mode="json")
  113. def _build_compiled_plan_json(
  114. self,
  115. dsl_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
  116. plan = build_debug_plan(dsl_json)
  117. return {
  118. "valid": plan.inspection.valid,
  119. "entry_node_ids": plan.inspection.entry_node_ids,
  120. "terminal_node_ids": plan.inspection.terminal_node_ids,
  121. "node_count": len(plan.inspection.nodes),
  122. "edge_count": len(plan.inspection.edges),
  123. "execution_preview": [
  124. {
  125. "step_index": item.step_index,
  126. "node_id": item.node_id,
  127. "node_type": item.node_type,
  128. "name": item.name,
  129. "next_node_ids": item.next_node_ids,
  130. }
  131. for item in plan.execution_preview
  132. ],
  133. }