repositories.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from datetime import datetime, timedelta
  2. from core_domain import ScheduledJobStatus, ScheduledJobType
  3. from core_shared import JSONValue
  4. from sqlalchemy import or_, select
  5. from sqlalchemy.orm import Session
  6. from app.db.models import ScheduledJob
  7. class ScheduledJobRepository:
  8. def __init__(self, db: Session) -> None:
  9. self.db = db
  10. def create(
  11. self,
  12. *,
  13. job_type: ScheduledJobType,
  14. name: str,
  15. description: str | None,
  16. target_service: str | None,
  17. target_url: str | None,
  18. method: str | None,
  19. payload_json: dict[str, JSONValue],
  20. schedule_time: datetime,
  21. max_attempts: int,
  22. metadata_json: dict[str, JSONValue]) -> ScheduledJob:
  23. entity = ScheduledJob(
  24. job_type=job_type,
  25. name=name,
  26. description=description,
  27. target_service=target_service,
  28. target_url=target_url,
  29. method=method,
  30. payload_json=payload_json,
  31. schedule_time=schedule_time,
  32. max_attempts=max_attempts,
  33. metadata_json=metadata_json)
  34. self.db.add(entity)
  35. self.db.commit()
  36. self.db.refresh(entity)
  37. return entity
  38. def list_by_scope(
  39. self,
  40. *,
  41. status: ScheduledJobStatus | None = None,
  42. job_type: ScheduledJobType | None = None,
  43. limit: int = 100) -> list[ScheduledJob]:
  44. stmt = select(ScheduledJob)
  45. if status is not None:
  46. stmt = stmt.where(ScheduledJob.status == status)
  47. if job_type is not None:
  48. stmt = stmt.where(ScheduledJob.job_type == job_type)
  49. stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
  50. return list(self.db.scalars(stmt))
  51. def get_by_id(self, *, job_id: str) -> ScheduledJob | None:
  52. stmt = (
  53. select(ScheduledJob)
  54. .where(ScheduledJob.id == job_id)
  55. )
  56. return self.db.scalar(stmt)
  57. def claim_due_jobs(
  58. self,
  59. *,
  60. worker_key: str,
  61. lease_seconds: int,
  62. limit: int,
  63. now_time: datetime) -> list[ScheduledJob]:
  64. self.release_expired_leases(now_time=now_time)
  65. stmt = (
  66. select(ScheduledJob)
  67. .where(ScheduledJob.status == "scheduled")
  68. .where(ScheduledJob.schedule_time <= now_time)
  69. .where(ScheduledJob.attempt_count < ScheduledJob.max_attempts)
  70. )
  71. stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
  72. jobs = list(self.db.scalars(stmt))
  73. lease_expire_time = now_time + timedelta(seconds=lease_seconds)
  74. for job in jobs:
  75. job.status = "claimed"
  76. job.claimed_by = worker_key
  77. job.claimed_time = now_time
  78. job.lease_expire_time = lease_expire_time
  79. job.attempt_count += 1
  80. if jobs:
  81. self.db.commit()
  82. for job in jobs:
  83. self.db.refresh(job)
  84. return jobs
  85. def release_expired_leases(self, *, now_time: datetime, max_items: int = 100) -> int:
  86. stmt = (
  87. select(ScheduledJob)
  88. .where(ScheduledJob.status == "claimed")
  89. .where(
  90. or_(
  91. ScheduledJob.lease_expire_time.is_(None),
  92. ScheduledJob.lease_expire_time <= now_time)
  93. )
  94. .limit(max_items)
  95. )
  96. jobs = list(self.db.scalars(stmt))
  97. for job in jobs:
  98. job.status = "scheduled" if job.attempt_count < job.max_attempts else "failed"
  99. job.claimed_by = None
  100. job.claimed_time = None
  101. job.lease_expire_time = None
  102. if job.status == "failed":
  103. job.last_error_message = "lease expired and max attempts reached"
  104. if jobs:
  105. self.db.commit()
  106. return len(jobs)
  107. def update_status(
  108. self,
  109. *,
  110. job_id: str,
  111. status: ScheduledJobStatus,
  112. last_error_message: str | None = None) -> ScheduledJob | None:
  113. entity = self.db.get(ScheduledJob, job_id)
  114. if entity is None:
  115. return None
  116. entity.status = status
  117. entity.last_error_message = last_error_message
  118. if status in {"completed", "failed", "cancelled"}:
  119. entity.completed_time = datetime.utcnow()
  120. entity.lease_expire_time = None
  121. self.db.commit()
  122. self.db.refresh(entity)
  123. return entity