from datetime import datetime from core_domain import ScheduledJobStatus, ScheduledJobType from core_shared.task_queue import TaskQueuePublisher from app.db.models import ScheduledJob from app.domain.repositories import ScheduledJobRepository from app.schemas.scheduler import ( DueJobClaimRequest, ScheduledJobCreateRequest, ScheduledJobStatusUpdateRequest, ) class SchedulerApplicationService: def __init__( self, *, job_repository: ScheduledJobRepository, task_queue_publisher: TaskQueuePublisher | None = None) -> None: self.job_repository = job_repository self.task_queue_publisher = task_queue_publisher def create_job(self, payload: ScheduledJobCreateRequest) -> ScheduledJob: job = self.job_repository.create( job_type=payload.job_type, name=payload.name, description=payload.description, target_service=payload.target_service, target_url=payload.target_url, method=payload.method, payload_json=payload.payload_json, schedule_time=payload.schedule_time, max_attempts=payload.max_attempts, metadata_json=payload.metadata_json) if self.task_queue_publisher is not None: self.task_queue_publisher.publish_scheduled_job( scheduled_job_id=job.id) return job def list_jobs( self, *, status: ScheduledJobStatus | None, job_type: ScheduledJobType | None, limit: int) -> list[ScheduledJob]: return self.job_repository.list_by_scope( status=status, job_type=job_type, limit=limit) def claim_due_jobs(self, payload: DueJobClaimRequest) -> list[ScheduledJob]: return self.job_repository.claim_due_jobs( worker_key=payload.worker_key, lease_seconds=payload.lease_seconds, limit=payload.limit, now_time=payload.now_time or datetime.utcnow()) def update_job_status( self, *, job_id: str, payload: ScheduledJobStatusUpdateRequest) -> ScheduledJob | None: entity = self.job_repository.get_by_id(job_id=job_id) if entity is None: return None return self.job_repository.update_status( job_id=job_id, status=payload.status, last_error_message=payload.last_error_message)