repositories.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from datetime import datetime
  2. from sqlalchemy import func, select
  3. from sqlalchemy.orm import Session
  4. from app.db.models import AppDefinition, AppVersion, WorkflowDefinitionModel, WorkflowVersion
  5. class AppDefinitionRepository:
  6. def __init__(self, db: Session) -> None:
  7. self.db = db
  8. def create(
  9. self,
  10. *,
  11. code: str,
  12. name: str,
  13. description: str | None,
  14. owner_user_id: str | None,
  15. settings_json: dict | None) -> AppDefinition:
  16. entity = AppDefinition(
  17. code=code,
  18. name=name,
  19. description=description,
  20. owner_user_id=owner_user_id,
  21. settings_json=settings_json)
  22. self.db.add(entity)
  23. self.db.commit()
  24. self.db.refresh(entity)
  25. return entity
  26. def list_all(self) -> list[AppDefinition]:
  27. stmt = select(AppDefinition)
  28. return list(self.db.scalars(stmt))
  29. class WorkflowDefinitionRepository:
  30. def __init__(self, db: Session) -> None:
  31. self.db = db
  32. def create(
  33. self,
  34. *,
  35. app_id: str,
  36. code: str,
  37. name: str,
  38. workflow_type: str) -> WorkflowDefinitionModel:
  39. entity = WorkflowDefinitionModel(
  40. app_id=app_id,
  41. code=code,
  42. name=name,
  43. workflow_type=workflow_type)
  44. self.db.add(entity)
  45. self.db.commit()
  46. self.db.refresh(entity)
  47. return entity
  48. def list_by_scope(self, *, app_id: str | None = None) -> list[WorkflowDefinitionModel]:
  49. stmt = select(WorkflowDefinitionModel)
  50. if app_id:
  51. stmt = stmt.where(WorkflowDefinitionModel.app_id == app_id)
  52. return list(self.db.scalars(stmt))
  53. class AppVersionRepository:
  54. def __init__(self, db: Session) -> None:
  55. self.db = db
  56. def create(
  57. self,
  58. *,
  59. app_id: str,
  60. workflow_version_id: str,
  61. status: str,
  62. published_by: str | None,
  63. changelog: str | None) -> AppVersion:
  64. version_no = self._next_version_no(app_id)
  65. published_time = datetime.utcnow() if status == "published" else None
  66. entity = AppVersion(
  67. app_id=app_id,
  68. version_no=version_no,
  69. workflow_version_id=workflow_version_id,
  70. status=status,
  71. published_by=published_by,
  72. published_time=published_time,
  73. changelog=changelog)
  74. self.db.add(entity)
  75. self.db.commit()
  76. self.db.refresh(entity)
  77. return entity
  78. def list_by_app(self, *, app_id: str) -> list[AppVersion]:
  79. stmt = (
  80. select(AppVersion)
  81. .where(AppVersion.app_id == app_id)
  82. .order_by(AppVersion.version_no.desc())
  83. )
  84. return list(self.db.scalars(stmt))
  85. def _next_version_no(self, app_id: str) -> int:
  86. stmt = select(func.max(AppVersion.version_no)).where(AppVersion.app_id == app_id)
  87. current_max = self.db.scalar(stmt)
  88. return (current_max or 0) + 1
  89. class WorkflowVersionRepository:
  90. def __init__(self, db: Session) -> None:
  91. self.db = db
  92. def create(
  93. self,
  94. *,
  95. workflow_id: str,
  96. dsl_json: dict | None,
  97. compiled_plan_json: dict | None,
  98. schema_version: str | None,
  99. checksum: str | None,
  100. status: str) -> WorkflowVersion:
  101. version_no = self._next_version_no(workflow_id)
  102. entity = WorkflowVersion(
  103. workflow_id=workflow_id,
  104. version_no=version_no,
  105. dsl_json=dsl_json,
  106. compiled_plan_json=compiled_plan_json,
  107. schema_version=schema_version,
  108. checksum=checksum,
  109. status=status)
  110. self.db.add(entity)
  111. self.db.commit()
  112. self.db.refresh(entity)
  113. return entity
  114. def list_by_workflow(self, *, workflow_id: str) -> list[WorkflowVersion]:
  115. stmt = (
  116. select(WorkflowVersion)
  117. .where(WorkflowVersion.workflow_id == workflow_id)
  118. .order_by(WorkflowVersion.version_no.desc())
  119. )
  120. return list(self.db.scalars(stmt))
  121. def get_by_id(self, *, workflow_version_id: str) -> WorkflowVersion | None:
  122. stmt = (
  123. select(WorkflowVersion)
  124. .where(WorkflowVersion.id == workflow_version_id)
  125. )
  126. return self.db.scalar(stmt)
  127. def _next_version_no(self, workflow_id: str) -> int:
  128. stmt = select(func.max(WorkflowVersion.version_no)).where(
  129. WorkflowVersion.workflow_id == workflow_id
  130. )
  131. current_max = self.db.scalar(stmt)
  132. return (current_max or 0) + 1