| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- from pydantic import ValidationError
- from core_dsl import parse_workflow_definition
- from core_shared import JSONValue
- from app.application.designer import (
- WorkflowDebugPlan,
- WorkflowInspection,
- build_debug_plan,
- inspect_workflow_dsl,
- )
- from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion
- from app.domain.repositories import (
- AppDefinitionRepository,
- AppVersionRepository,
- WorkflowDefinitionRepository,
- WorkflowVersionRepository,
- )
- from app.schemas.app import AppCreateRequest, AppVersionCreateRequest
- from app.schemas.workflow import (
- WorkflowCreateRequest,
- WorkflowDebuggerPlanRequest,
- WorkflowDesignerValidateRequest,
- WorkflowVersionCreateRequest,
- )
- class WorkflowApplicationService:
- def __init__(
- self,
- app_repository: AppDefinitionRepository,
- workflow_repository: WorkflowDefinitionRepository,
- app_version_repository: AppVersionRepository,
- workflow_version_repository: WorkflowVersionRepository,
- ) -> None:
- self.app_repository = app_repository
- self.workflow_repository = workflow_repository
- self.app_version_repository = app_version_repository
- self.workflow_version_repository = workflow_version_repository
- def create_app(self, payload: AppCreateRequest) -> AppDefinition:
- return self.app_repository.create(
- tenant_id=payload.tenant_id,
- code=payload.code,
- name=payload.name,
- description=payload.description,
- owner_user_id=payload.owner_user_id,
- settings_json=payload.settings_json,
- )
- def list_apps(self, tenant_id: str) -> list[AppDefinition]:
- return self.app_repository.list_by_tenant(tenant_id)
- def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel:
- return self.workflow_repository.create(
- tenant_id=payload.tenant_id,
- app_id=payload.app_id,
- code=payload.code,
- name=payload.name,
- workflow_type=payload.workflow_type,
- )
- def list_workflows(self, tenant_id: str, app_id: str | None = None) -> list[WorkflowDefinitionModel]:
- return self.workflow_repository.list_by_scope(tenant_id=tenant_id, app_id=app_id)
- def create_workflow_version(self, payload: WorkflowVersionCreateRequest) -> WorkflowVersion:
- dsl_json = self._validate_workflow_dsl(payload.dsl_json)
- compiled_plan_json = payload.compiled_plan_json
- if compiled_plan_json is None and dsl_json is not None:
- compiled_plan_json = self._build_compiled_plan_json(dsl_json)
- return self.workflow_version_repository.create(
- tenant_id=payload.tenant_id,
- workflow_id=payload.workflow_id,
- dsl_json=dsl_json,
- compiled_plan_json=compiled_plan_json,
- schema_version=payload.schema_version,
- checksum=payload.checksum,
- status=payload.status,
- )
- def list_workflow_versions(self, tenant_id: str, workflow_id: str) -> list[WorkflowVersion]:
- return self.workflow_version_repository.list_by_workflow(
- tenant_id=tenant_id,
- workflow_id=workflow_id,
- )
- def get_workflow_version(self, tenant_id: str, workflow_version_id: str) -> WorkflowVersion | None:
- return self.workflow_version_repository.get_by_id(
- tenant_id=tenant_id,
- workflow_version_id=workflow_version_id,
- )
- def validate_designer_workflow(
- self,
- payload: WorkflowDesignerValidateRequest,
- ) -> WorkflowInspection:
- return inspect_workflow_dsl(payload.dsl_json)
- def build_designer_debug_plan(
- self,
- payload: WorkflowDebuggerPlanRequest,
- ) -> WorkflowDebugPlan:
- return build_debug_plan(
- payload.dsl_json,
- max_preview_steps=payload.max_preview_steps,
- )
- def build_version_debug_plan(
- self,
- *,
- tenant_id: str,
- workflow_version_id: str,
- max_preview_steps: int,
- ) -> WorkflowDebugPlan | None:
- entity = self.get_workflow_version(
- tenant_id=tenant_id,
- workflow_version_id=workflow_version_id,
- )
- if entity is None:
- return None
- return build_debug_plan(
- entity.dsl_json,
- max_preview_steps=max_preview_steps,
- )
- def create_app_version(self, payload: AppVersionCreateRequest) -> AppVersion:
- return self.app_version_repository.create(
- tenant_id=payload.tenant_id,
- app_id=payload.app_id,
- workflow_version_id=payload.workflow_version_id,
- status=payload.status,
- published_by=payload.published_by,
- changelog=payload.changelog,
- )
- def list_app_versions(self, tenant_id: str, app_id: str) -> list[AppVersion]:
- return self.app_version_repository.list_by_app(tenant_id=tenant_id, app_id=app_id)
- def _validate_workflow_dsl(
- self,
- dsl_json: dict[str, JSONValue] | None,
- ) -> dict[str, JSONValue] | None:
- if dsl_json is None:
- return None
- try:
- workflow = parse_workflow_definition(dsl_json)
- except ValidationError as exc:
- raise ValueError(f"invalid workflow dsl: {exc}") from exc
- inspection = inspect_workflow_dsl(dsl_json)
- errors = [item for item in inspection.diagnostics if item.severity == "error"]
- if errors:
- message = "; ".join(f"{item.code}: {item.message}" for item in errors)
- raise ValueError(f"invalid workflow dsl: {message}")
- if workflow is None:
- return None
- return workflow.model_dump(mode="json")
- def _build_compiled_plan_json(
- self,
- dsl_json: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- plan = build_debug_plan(dsl_json)
- return {
- "valid": plan.inspection.valid,
- "entry_node_ids": plan.inspection.entry_node_ids,
- "terminal_node_ids": plan.inspection.terminal_node_ids,
- "node_count": len(plan.inspection.nodes),
- "edge_count": len(plan.inspection.edges),
- "execution_preview": [
- {
- "step_index": item.step_index,
- "node_id": item.node_id,
- "node_type": item.node_type,
- "name": item.name,
- "next_node_ids": item.next_node_ids,
- }
- for item in plan.execution_preview
- ],
- }
|