from core_dsl import parse_workflow_definition from core_shared import JSONValue from pydantic import ValidationError from app.application.designer import ( WorkflowDebugPlan, WorkflowInspection, build_debug_plan, inspect_workflow_dsl, ) from app.db.models import AppDefinition, AppConfig, WorkflowDefinitionModel, WorkflowConfig from app.domain.repositories import ( AppDefinitionRepository, AppConfigRepository, WorkflowDefinitionRepository, WorkflowConfigRepository, ) from app.schemas.app import AppCreateRequest, AppConfigCreateRequest from app.schemas.workflow import ( WorkflowCreateRequest, WorkflowDebuggerPlanRequest, WorkflowDesignerValidateRequest, WorkflowConfigCreateRequest, ) class WorkflowApplicationService: def __init__( self, app_repository: AppDefinitionRepository, workflow_repository: WorkflowDefinitionRepository, app_config_repository: AppConfigRepository, workflow_config_repository: WorkflowConfigRepository) -> None: self.app_repository = app_repository self.workflow_repository = workflow_repository self.app_config_repository = app_config_repository self.workflow_config_repository = workflow_config_repository def create_app(self, payload: AppCreateRequest) -> AppDefinition: return self.app_repository.create( code=payload.code, name=payload.name, description=payload.description, owner_user_id=payload.owner_user_id, settings_json=payload.settings_json) def list_apps(self) -> list[AppDefinition]: return self.app_repository.list_all() def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel: return self.workflow_repository.create( app_id=payload.app_id, code=payload.code, name=payload.name, workflow_type=payload.workflow_type) def list_workflows(self, app_id: str | None = None) -> list[WorkflowDefinitionModel]: return self.workflow_repository.list_by_scope(app_id=app_id) def create_workflow_config(self, payload: WorkflowConfigCreateRequest) -> WorkflowConfig: dsl_json = self._validate_workflow_dsl(payload.dsl_json) compiled_plan_json = payload.compiled_plan_json if compiled_plan_json is None and dsl_json is not None: compiled_plan_json = self._build_compiled_plan_json(dsl_json) return self.workflow_config_repository.create( workflow_id=payload.workflow_id, dsl_json=dsl_json, compiled_plan_json=compiled_plan_json, checksum=payload.checksum) def list_workflow_configs(self, workflow_id: str) -> list[WorkflowConfig]: return self.workflow_config_repository.list_by_workflow( workflow_id=workflow_id) def get_workflow_config(self, workflow_config_id: str) -> WorkflowConfig | None: return self.workflow_config_repository.get_by_id( workflow_config_id=workflow_config_id) def validate_designer_workflow( self, payload: WorkflowDesignerValidateRequest) -> WorkflowInspection: return inspect_workflow_dsl(payload.dsl_json) def build_designer_debug_plan( self, payload: WorkflowDebuggerPlanRequest) -> WorkflowDebugPlan: return build_debug_plan( payload.dsl_json, max_preview_steps=payload.max_preview_steps) def build_config_debug_plan( self, *, workflow_config_id: str, max_preview_steps: int) -> WorkflowDebugPlan | None: entity = self.get_workflow_config( workflow_config_id=workflow_config_id) if entity is None: return None return build_debug_plan( entity.dsl_json, max_preview_steps=max_preview_steps) def create_app_config(self, payload: AppConfigCreateRequest) -> AppConfig: return self.app_config_repository.create( app_id=payload.app_id, workflow_config_id=payload.workflow_config_id) def list_app_configs(self, app_id: str) -> list[AppConfig]: return self.app_config_repository.list_by_app(app_id=app_id) def _validate_workflow_dsl( self, dsl_json: dict[str, JSONValue] | None) -> dict[str, JSONValue] | None: if dsl_json is None: return None try: workflow = parse_workflow_definition(dsl_json) except ValidationError as exc: raise ValueError(f"invalid workflow dsl: {exc}") from exc inspection = inspect_workflow_dsl(dsl_json) errors = [item for item in inspection.diagnostics if item.severity == "error"] if errors: message = "; ".join(f"{item.code}: {item.message}" for item in errors) raise ValueError(f"invalid workflow dsl: {message}") if workflow is None: return None return workflow.model_dump(mode="json") def _build_compiled_plan_json( self, dsl_json: dict[str, JSONValue]) -> dict[str, JSONValue]: plan = build_debug_plan(dsl_json) return { "valid": plan.inspection.valid, "entry_node_ids": plan.inspection.entry_node_ids, "terminal_node_ids": plan.inspection.terminal_node_ids, "node_count": len(plan.inspection.nodes), "edge_count": len(plan.inspection.edges), "execution_preview": [ { "step_index": item.step_index, "node_id": item.node_id, "node_type": item.node_type, "name": item.name, "next_node_ids": item.next_node_ids, } for item in plan.execution_preview ], }