| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- from sqlalchemy import select
- from sqlalchemy.orm import Session
- from app.db.models import AppDefinition, AppConfig, WorkflowDefinitionModel, WorkflowConfig
- class AppDefinitionRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- code: str,
- name: str,
- description: str | None,
- owner_user_id: str | None,
- settings_json: dict | None) -> AppDefinition:
- entity = AppDefinition(
- code=code,
- name=name,
- description=description,
- owner_user_id=owner_user_id,
- settings_json=settings_json)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_all(self) -> list[AppDefinition]:
- stmt = select(AppDefinition)
- return list(self.db.scalars(stmt))
- class WorkflowDefinitionRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- app_id: str,
- code: str,
- name: str,
- workflow_type: str) -> WorkflowDefinitionModel:
- entity = WorkflowDefinitionModel(
- app_id=app_id,
- code=code,
- name=name,
- workflow_type=workflow_type)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_scope(self, *, app_id: str | None = None) -> list[WorkflowDefinitionModel]:
- stmt = select(WorkflowDefinitionModel)
- if app_id:
- stmt = stmt.where(WorkflowDefinitionModel.app_id == app_id)
- return list(self.db.scalars(stmt))
- class AppConfigRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- app_id: str,
- workflow_config_id: str) -> AppConfig:
- entity = AppConfig(
- app_id=app_id,
- workflow_config_id=workflow_config_id)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_app(self, *, app_id: str) -> list[AppConfig]:
- stmt = (
- select(AppConfig)
- .where(AppConfig.app_id == app_id)
- .order_by(AppConfig.created_time.desc())
- )
- return list(self.db.scalars(stmt))
- class WorkflowConfigRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- workflow_id: str,
- dsl_json: dict | None,
- compiled_plan_json: dict | None,
- checksum: str | None) -> WorkflowConfig:
- entity = WorkflowConfig(
- workflow_id=workflow_id,
- dsl_json=dsl_json,
- compiled_plan_json=compiled_plan_json,
- checksum=checksum)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_workflow(self, *, workflow_id: str) -> list[WorkflowConfig]:
- stmt = (
- select(WorkflowConfig)
- .where(WorkflowConfig.workflow_id == workflow_id)
- .order_by(WorkflowConfig.created_time.desc())
- )
- return list(self.db.scalars(stmt))
- def get_by_id(self, *, workflow_config_id: str) -> WorkflowConfig | None:
- stmt = (
- select(WorkflowConfig)
- .where(WorkflowConfig.id == workflow_config_id)
- )
- return self.db.scalar(stmt)
|