from datetime import datetime, timedelta from sqlalchemy import or_, select from sqlalchemy.orm import Session from core_domain import ScheduledJobStatus, ScheduledJobType from core_shared import JSONValue from app.db.models import ScheduledJob class ScheduledJobRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, job_type: ScheduledJobType, name: str, description: str | None, target_service: str | None, target_url: str | None, method: str | None, payload_json: dict[str, JSONValue], schedule_time: datetime, max_attempts: int, metadata_json: dict[str, JSONValue], ) -> ScheduledJob: entity = ScheduledJob( tenant_id=tenant_id, job_type=job_type, name=name, description=description, target_service=target_service, target_url=target_url, method=method, payload_json=payload_json, schedule_time=schedule_time, max_attempts=max_attempts, metadata_json=metadata_json, ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_scope( self, *, tenant_id: str, status: ScheduledJobStatus | None = None, job_type: ScheduledJobType | None = None, limit: int = 100, ) -> list[ScheduledJob]: stmt = select(ScheduledJob).where(ScheduledJob.tenant_id == tenant_id) if status is not None: stmt = stmt.where(ScheduledJob.status == status) if job_type is not None: stmt = stmt.where(ScheduledJob.job_type == job_type) stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit) return list(self.db.scalars(stmt)) def get_by_id(self, *, tenant_id: str, job_id: str) -> ScheduledJob | None: stmt = ( select(ScheduledJob) .where(ScheduledJob.tenant_id == tenant_id) .where(ScheduledJob.id == job_id) ) return self.db.scalar(stmt) def claim_due_jobs( self, *, tenant_id: str, worker_key: str, lease_seconds: int, limit: int, now_time: datetime, ) -> list[ScheduledJob]: self.release_expired_leases(now_time=now_time) stmt = ( select(ScheduledJob) .where(ScheduledJob.tenant_id == tenant_id) .where(ScheduledJob.status == "scheduled") .where(ScheduledJob.schedule_time <= now_time) .where(ScheduledJob.attempt_count < ScheduledJob.max_attempts) .order_by(ScheduledJob.schedule_time.asc()) .limit(limit) ) jobs = list(self.db.scalars(stmt)) lease_expire_time = now_time + timedelta(seconds=lease_seconds) for job in jobs: job.status = "claimed" job.claimed_by = worker_key job.claimed_time = now_time job.lease_expire_time = lease_expire_time job.attempt_count += 1 if jobs: self.db.commit() for job in jobs: self.db.refresh(job) return jobs def release_expired_leases(self, *, now_time: datetime, max_items: int = 100) -> int: stmt = ( select(ScheduledJob) .where(ScheduledJob.status == "claimed") .where( or_( ScheduledJob.lease_expire_time.is_(None), ScheduledJob.lease_expire_time <= now_time, ) ) .limit(max_items) ) jobs = list(self.db.scalars(stmt)) for job in jobs: job.status = "scheduled" if job.attempt_count < job.max_attempts else "failed" job.claimed_by = None job.claimed_time = None job.lease_expire_time = None if job.status == "failed": job.last_error_message = "lease expired and max attempts reached" if jobs: self.db.commit() return len(jobs) def update_status( self, *, job_id: str, status: ScheduledJobStatus, last_error_message: str | None = None, ) -> ScheduledJob | None: entity = self.db.get(ScheduledJob, job_id) if entity is None: return None entity.status = status entity.last_error_message = last_error_message if status in {"completed", "failed", "cancelled"}: entity.completed_time = datetime.utcnow() entity.lease_expire_time = None self.db.commit() self.db.refresh(entity) return entity