| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- from datetime import datetime, timedelta
- from sqlalchemy import or_, select
- from sqlalchemy.orm import Session
- from core_domain import ScheduledJobStatus, ScheduledJobType
- from core_shared import JSONValue
- from app.db.models import ScheduledJob
- class ScheduledJobRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- tenant_id: str,
- job_type: ScheduledJobType,
- name: str,
- description: str | None,
- target_service: str | None,
- target_url: str | None,
- method: str | None,
- payload_json: dict[str, JSONValue],
- schedule_time: datetime,
- max_attempts: int,
- metadata_json: dict[str, JSONValue],
- ) -> ScheduledJob:
- entity = ScheduledJob(
- tenant_id=tenant_id,
- job_type=job_type,
- name=name,
- description=description,
- target_service=target_service,
- target_url=target_url,
- method=method,
- payload_json=payload_json,
- schedule_time=schedule_time,
- max_attempts=max_attempts,
- metadata_json=metadata_json,
- )
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_scope(
- self,
- *,
- tenant_id: str,
- status: ScheduledJobStatus | None = None,
- job_type: ScheduledJobType | None = None,
- limit: int = 100,
- ) -> list[ScheduledJob]:
- stmt = select(ScheduledJob).where(ScheduledJob.tenant_id == tenant_id)
- if status is not None:
- stmt = stmt.where(ScheduledJob.status == status)
- if job_type is not None:
- stmt = stmt.where(ScheduledJob.job_type == job_type)
- stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
- return list(self.db.scalars(stmt))
- def get_by_id(self, *, tenant_id: str, job_id: str) -> ScheduledJob | None:
- stmt = (
- select(ScheduledJob)
- .where(ScheduledJob.tenant_id == tenant_id)
- .where(ScheduledJob.id == job_id)
- )
- return self.db.scalar(stmt)
- def claim_due_jobs(
- self,
- *,
- tenant_id: str | None,
- worker_key: str,
- lease_seconds: int,
- limit: int,
- now_time: datetime,
- ) -> list[ScheduledJob]:
- self.release_expired_leases(now_time=now_time)
- stmt = (
- select(ScheduledJob)
- .where(ScheduledJob.status == "scheduled")
- .where(ScheduledJob.schedule_time <= now_time)
- .where(ScheduledJob.attempt_count < ScheduledJob.max_attempts)
- )
- if tenant_id is not None:
- stmt = stmt.where(ScheduledJob.tenant_id == tenant_id)
- stmt = stmt.order_by(ScheduledJob.schedule_time.asc()).limit(limit)
- jobs = list(self.db.scalars(stmt))
- lease_expire_time = now_time + timedelta(seconds=lease_seconds)
- for job in jobs:
- job.status = "claimed"
- job.claimed_by = worker_key
- job.claimed_time = now_time
- job.lease_expire_time = lease_expire_time
- job.attempt_count += 1
- if jobs:
- self.db.commit()
- for job in jobs:
- self.db.refresh(job)
- return jobs
- def release_expired_leases(self, *, now_time: datetime, max_items: int = 100) -> int:
- stmt = (
- select(ScheduledJob)
- .where(ScheduledJob.status == "claimed")
- .where(
- or_(
- ScheduledJob.lease_expire_time.is_(None),
- ScheduledJob.lease_expire_time <= now_time,
- )
- )
- .limit(max_items)
- )
- jobs = list(self.db.scalars(stmt))
- for job in jobs:
- job.status = "scheduled" if job.attempt_count < job.max_attempts else "failed"
- job.claimed_by = None
- job.claimed_time = None
- job.lease_expire_time = None
- if job.status == "failed":
- job.last_error_message = "lease expired and max attempts reached"
- if jobs:
- self.db.commit()
- return len(jobs)
- def update_status(
- self,
- *,
- job_id: str,
- status: ScheduledJobStatus,
- last_error_message: str | None = None,
- ) -> ScheduledJob | None:
- entity = self.db.get(ScheduledJob, job_id)
- if entity is None:
- return None
- entity.status = status
- entity.last_error_message = last_error_message
- if status in {"completed", "failed", "cancelled"}:
- entity.completed_time = datetime.utcnow()
- entity.lease_expire_time = None
- self.db.commit()
- self.db.refresh(entity)
- return entity
|