from sqlalchemy import select from sqlalchemy.orm import Session from app.db.models import AppDefinition, AppConfig, WorkflowDefinitionModel, WorkflowConfig class AppDefinitionRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, code: str, name: str, description: str | None, owner_user_id: str | None, settings_json: dict | None) -> AppDefinition: entity = AppDefinition( code=code, name=name, description=description, owner_user_id=owner_user_id, settings_json=settings_json) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_all(self) -> list[AppDefinition]: stmt = select(AppDefinition) return list(self.db.scalars(stmt)) class WorkflowDefinitionRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, app_id: str, code: str, name: str, workflow_type: str) -> WorkflowDefinitionModel: entity = WorkflowDefinitionModel( app_id=app_id, code=code, name=name, workflow_type=workflow_type) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_scope(self, *, app_id: str | None = None) -> list[WorkflowDefinitionModel]: stmt = select(WorkflowDefinitionModel) if app_id: stmt = stmt.where(WorkflowDefinitionModel.app_id == app_id) return list(self.db.scalars(stmt)) class AppConfigRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, app_id: str, workflow_config_id: str) -> AppConfig: entity = AppConfig( app_id=app_id, workflow_config_id=workflow_config_id) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_app(self, *, app_id: str) -> list[AppConfig]: stmt = ( select(AppConfig) .where(AppConfig.app_id == app_id) .order_by(AppConfig.created_time.desc()) ) return list(self.db.scalars(stmt)) class WorkflowConfigRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, workflow_id: str, dsl_json: dict | None, compiled_plan_json: dict | None, checksum: str | None) -> WorkflowConfig: entity = WorkflowConfig( workflow_id=workflow_id, dsl_json=dsl_json, compiled_plan_json=compiled_plan_json, checksum=checksum) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_workflow(self, *, workflow_id: str) -> list[WorkflowConfig]: stmt = ( select(WorkflowConfig) .where(WorkflowConfig.workflow_id == workflow_id) .order_by(WorkflowConfig.created_time.desc()) ) return list(self.db.scalars(stmt)) def get_by_id(self, *, workflow_config_id: str) -> WorkflowConfig | None: stmt = ( select(WorkflowConfig) .where(WorkflowConfig.id == workflow_config_id) ) return self.db.scalar(stmt)