repositories.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from datetime import datetime, timedelta
  2. from sqlalchemy import or_, select
  3. from sqlalchemy.orm import Session
  4. from core_domain import ScheduledJobStatus, ScheduledJobType
  5. from core_shared import JSONValue
  6. from app.db.models import ScheduledJob
  7. class ScheduledJobRepository:
  8. def __init__(self, db: Session) -> None:
  9. self.db = db
  10. def create(
  11. self,
  12. *,
  13. tenant_id: str,
  14. job_type: ScheduledJobType,
  15. name: str,
  16. description: str | None,
  17. target_service: str | None,
  18. target_url: str | None,
  19. method: str | None,
  20. payload_json: dict[str, JSONValue],
  21. schedule_time: datetime,
  22. max_attempts: int,
  23. metadata_json: dict[str, JSONValue],
  24. ) -> ScheduledJob:
  25. entity = ScheduledJob(
  26. tenant_id=tenant_id,
  27. job_type=job_type,
  28. name=name,
  29. description=description,
  30. target_service=target_service,
  31. target_url=target_url,
  32. method=method,
  33. payload_json=payload_json,
  34. schedule_time=schedule_time,
  35. max_attempts=max_attempts,
  36. metadata_json=metadata_json,
  37. )
  38. self.db.add(entity)
  39. self.db.commit()
  40. self.db.refresh(entity)
  41. return entity
  42. def list_by_scope(
  43. self,
  44. *,
  45. tenant_id: str,
  46. status: ScheduledJobStatus | None = None,
  47. job_type: ScheduledJobType | None = None,
  48. limit: int = 100,
  49. ) -> list[ScheduledJob]:
  50. stmt = select(ScheduledJob).where(ScheduledJob.tenant_id == tenant_id)
  51. if status is not None:
  52. stmt = stmt.where(ScheduledJob.status == status)
  53. if job_type is not None:
  54. stmt = stmt.where(ScheduledJob.job_type == job_type)
  55. stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
  56. return list(self.db.scalars(stmt))
  57. def get_by_id(self, *, tenant_id: str, job_id: str) -> ScheduledJob | None:
  58. stmt = (
  59. select(ScheduledJob)
  60. .where(ScheduledJob.tenant_id == tenant_id)
  61. .where(ScheduledJob.id == job_id)
  62. )
  63. return self.db.scalar(stmt)
  64. def claim_due_jobs(
  65. self,
  66. *,
  67. tenant_id: str | None,
  68. worker_key: str,
  69. lease_seconds: int,
  70. limit: int,
  71. now_time: datetime,
  72. ) -> list[ScheduledJob]:
  73. self.release_expired_leases(now_time=now_time)
  74. stmt = (
  75. select(ScheduledJob)
  76. .where(ScheduledJob.status == "scheduled")
  77. .where(ScheduledJob.schedule_time <= now_time)
  78. .where(ScheduledJob.attempt_count < ScheduledJob.max_attempts)
  79. )
  80. if tenant_id is not None:
  81. stmt = stmt.where(ScheduledJob.tenant_id == tenant_id)
  82. stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
  83. jobs = list(self.db.scalars(stmt))
  84. lease_expire_time = now_time + timedelta(seconds=lease_seconds)
  85. for job in jobs:
  86. job.status = "claimed"
  87. job.claimed_by = worker_key
  88. job.claimed_time = now_time
  89. job.lease_expire_time = lease_expire_time
  90. job.attempt_count += 1
  91. if jobs:
  92. self.db.commit()
  93. for job in jobs:
  94. self.db.refresh(job)
  95. return jobs
  96. def release_expired_leases(self, *, now_time: datetime, max_items: int = 100) -> int:
  97. stmt = (
  98. select(ScheduledJob)
  99. .where(ScheduledJob.status == "claimed")
  100. .where(
  101. or_(
  102. ScheduledJob.lease_expire_time.is_(None),
  103. ScheduledJob.lease_expire_time <= now_time,
  104. )
  105. )
  106. .limit(max_items)
  107. )
  108. jobs = list(self.db.scalars(stmt))
  109. for job in jobs:
  110. job.status = "scheduled" if job.attempt_count < job.max_attempts else "failed"
  111. job.claimed_by = None
  112. job.claimed_time = None
  113. job.lease_expire_time = None
  114. if job.status == "failed":
  115. job.last_error_message = "lease expired and max attempts reached"
  116. if jobs:
  117. self.db.commit()
  118. return len(jobs)
  119. def update_status(
  120. self,
  121. *,
  122. job_id: str,
  123. status: ScheduledJobStatus,
  124. last_error_message: str | None = None,
  125. ) -> ScheduledJob | None:
  126. entity = self.db.get(ScheduledJob, job_id)
  127. if entity is None:
  128. return None
  129. entity.status = status
  130. entity.last_error_message = last_error_message
  131. if status in {"completed", "failed", "cancelled"}:
  132. entity.completed_time = datetime.utcnow()
  133. entity.lease_expire_time = None
  134. self.db.commit()
  135. self.db.refresh(entity)
  136. return entity