from core_dsl import parse_workflow_definition from core_shared import JSONValue from pydantic import ValidationError from app.application.designer import ( WorkflowDebugPlan, WorkflowInspection, build_debug_plan, inspect_workflow_dsl, ) from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion from app.domain.repositories import ( AppDefinitionRepository, AppVersionRepository, WorkflowDefinitionRepository, WorkflowVersionRepository, ) from app.schemas.app import AppCreateRequest, AppVersionCreateRequest from app.schemas.workflow import ( WorkflowCreateRequest, WorkflowDebuggerPlanRequest, WorkflowDesignerValidateRequest, WorkflowVersionCreateRequest, ) class WorkflowApplicationService: def __init__( self, app_repository: AppDefinitionRepository, workflow_repository: WorkflowDefinitionRepository, app_version_repository: AppVersionRepository, workflow_version_repository: WorkflowVersionRepository) -> None: self.app_repository = app_repository self.workflow_repository = workflow_repository self.app_version_repository = app_version_repository self.workflow_version_repository = workflow_version_repository def create_app(self, payload: AppCreateRequest) -> AppDefinition: return self.app_repository.create( code=payload.code, name=payload.name, description=payload.description, owner_user_id=payload.owner_user_id, settings_json=payload.settings_json) def list_apps(self) -> list[AppDefinition]: return self.app_repository.list_all() def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel: return self.workflow_repository.create( app_id=payload.app_id, code=payload.code, name=payload.name, workflow_type=payload.workflow_type) def list_workflows(self, app_id: str | None = None) -> list[WorkflowDefinitionModel]: return self.workflow_repository.list_by_scope(app_id=app_id) def create_workflow_version(self, payload: WorkflowVersionCreateRequest) -> WorkflowVersion: dsl_json = self._validate_workflow_dsl(payload.dsl_json) compiled_plan_json = payload.compiled_plan_json if compiled_plan_json is None and dsl_json is not None: compiled_plan_json = self._build_compiled_plan_json(dsl_json) return self.workflow_version_repository.create( workflow_id=payload.workflow_id, dsl_json=dsl_json, compiled_plan_json=compiled_plan_json, schema_version=payload.schema_version, checksum=payload.checksum, status=payload.status) def list_workflow_versions(self, workflow_id: str) -> list[WorkflowVersion]: return self.workflow_version_repository.list_by_workflow( workflow_id=workflow_id) def get_workflow_version(self, workflow_version_id: str) -> WorkflowVersion | None: return self.workflow_version_repository.get_by_id( workflow_version_id=workflow_version_id) def validate_designer_workflow( self, payload: WorkflowDesignerValidateRequest) -> WorkflowInspection: return inspect_workflow_dsl(payload.dsl_json) def build_designer_debug_plan( self, payload: WorkflowDebuggerPlanRequest) -> WorkflowDebugPlan: return build_debug_plan( payload.dsl_json, max_preview_steps=payload.max_preview_steps) def build_version_debug_plan( self, *, workflow_version_id: str, max_preview_steps: int) -> WorkflowDebugPlan | None: entity = self.get_workflow_version( workflow_version_id=workflow_version_id) if entity is None: return None return build_debug_plan( entity.dsl_json, max_preview_steps=max_preview_steps) def create_app_version(self, payload: AppVersionCreateRequest) -> AppVersion: return self.app_version_repository.create( app_id=payload.app_id, workflow_version_id=payload.workflow_version_id, status=payload.status, published_by=payload.published_by, changelog=payload.changelog) def list_app_versions(self, app_id: str) -> list[AppVersion]: return self.app_version_repository.list_by_app(app_id=app_id) def _validate_workflow_dsl( self, dsl_json: dict[str, JSONValue] | None) -> dict[str, JSONValue] | None: if dsl_json is None: return None try: workflow = parse_workflow_definition(dsl_json) except ValidationError as exc: raise ValueError(f"invalid workflow dsl: {exc}") from exc inspection = inspect_workflow_dsl(dsl_json) errors = [item for item in inspection.diagnostics if item.severity == "error"] if errors: message = "; ".join(f"{item.code}: {item.message}" for item in errors) raise ValueError(f"invalid workflow dsl: {message}") if workflow is None: return None return workflow.model_dump(mode="json") def _build_compiled_plan_json( self, dsl_json: dict[str, JSONValue]) -> dict[str, JSONValue]: plan = build_debug_plan(dsl_json) return { "valid": plan.inspection.valid, "entry_node_ids": plan.inspection.entry_node_ids, "terminal_node_ids": plan.inspection.terminal_node_ids, "node_count": len(plan.inspection.nodes), "edge_count": len(plan.inspection.edges), "execution_preview": [ { "step_index": item.step_index, "node_id": item.node_id, "node_type": item.node_type, "name": item.name, "next_node_ids": item.next_node_ids, } for item in plan.execution_preview ], }