from pydantic import ValidationError from core_dsl import parse_workflow_definition from core_shared import JSONValue from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion from app.domain.repositories import ( AppDefinitionRepository, AppVersionRepository, WorkflowDefinitionRepository, WorkflowVersionRepository, ) from app.schemas.app import AppCreateRequest, AppVersionCreateRequest from app.schemas.workflow import WorkflowCreateRequest, WorkflowVersionCreateRequest class WorkflowApplicationService: def __init__( self, app_repository: AppDefinitionRepository, workflow_repository: WorkflowDefinitionRepository, app_version_repository: AppVersionRepository, workflow_version_repository: WorkflowVersionRepository, ) -> None: self.app_repository = app_repository self.workflow_repository = workflow_repository self.app_version_repository = app_version_repository self.workflow_version_repository = workflow_version_repository def create_app(self, payload: AppCreateRequest) -> AppDefinition: return self.app_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, owner_user_id=payload.owner_user_id, settings_json=payload.settings_json, ) def list_apps(self, tenant_id: str) -> list[AppDefinition]: return self.app_repository.list_by_tenant(tenant_id) def create_workflow(self, payload: WorkflowCreateRequest) -> WorkflowDefinitionModel: return self.workflow_repository.create( tenant_id=payload.tenant_id, app_id=payload.app_id, code=payload.code, name=payload.name, workflow_type=payload.workflow_type, ) def list_workflows(self, tenant_id: str, app_id: str | None = None) -> list[WorkflowDefinitionModel]: return self.workflow_repository.list_by_scope(tenant_id=tenant_id, app_id=app_id) def create_workflow_version(self, payload: WorkflowVersionCreateRequest) -> WorkflowVersion: dsl_json = self._validate_workflow_dsl(payload.dsl_json) return self.workflow_version_repository.create( tenant_id=payload.tenant_id, workflow_id=payload.workflow_id, dsl_json=dsl_json, compiled_plan_json=payload.compiled_plan_json, schema_version=payload.schema_version, checksum=payload.checksum, status=payload.status, ) def list_workflow_versions(self, tenant_id: str, workflow_id: str) -> list[WorkflowVersion]: return self.workflow_version_repository.list_by_workflow( tenant_id=tenant_id, workflow_id=workflow_id, ) def get_workflow_version(self, tenant_id: str, workflow_version_id: str) -> WorkflowVersion | None: return self.workflow_version_repository.get_by_id( tenant_id=tenant_id, workflow_version_id=workflow_version_id, ) def create_app_version(self, payload: AppVersionCreateRequest) -> AppVersion: return self.app_version_repository.create( tenant_id=payload.tenant_id, app_id=payload.app_id, workflow_version_id=payload.workflow_version_id, status=payload.status, published_by=payload.published_by, changelog=payload.changelog, ) def list_app_versions(self, tenant_id: str, app_id: str) -> list[AppVersion]: return self.app_version_repository.list_by_app(tenant_id=tenant_id, app_id=app_id) def _validate_workflow_dsl( self, dsl_json: dict[str, JSONValue] | None, ) -> dict[str, JSONValue] | None: if dsl_json is None: return None try: workflow = parse_workflow_definition(dsl_json) except ValidationError as exc: raise ValueError(f"invalid workflow dsl: {exc}") from exc if workflow is None: return None return workflow.model_dump(mode="json")