from datetime import datetime from core_domain import ScheduledJobStatus, ScheduledJobType from core_shared.task_queue import TaskQueuePublisher from app.db.models import ScheduledJob from app.domain.repositories import ScheduledJobRepository from app.schemas.scheduler import ( DueJobClaimRequest, ScheduledJobCreateRequest, ScheduledJobStatusUpdateRequest, ) class SchedulerApplicationService: def __init__( self, *, job_repository: ScheduledJobRepository, task_queue_publisher: TaskQueuePublisher | None = None, ) -> None: self.job_repository = job_repository self.task_queue_publisher = task_queue_publisher def create_job(self, payload: ScheduledJobCreateRequest) -> ScheduledJob: job = self.job_repository.create( tenant_id=payload.tenant_id, job_type=payload.job_type, name=payload.name, description=payload.description, target_service=payload.target_service, target_url=payload.target_url, method=payload.method, payload_json=payload.payload_json, schedule_time=payload.schedule_time, max_attempts=payload.max_attempts, metadata_json=payload.metadata_json, ) if self.task_queue_publisher is not None: self.task_queue_publisher.publish_scheduled_job( tenant_id=job.tenant_id, scheduled_job_id=job.id, ) return job def list_jobs( self, *, tenant_id: str, status: ScheduledJobStatus | None, job_type: ScheduledJobType | None, limit: int, ) -> list[ScheduledJob]: return self.job_repository.list_by_scope( tenant_id=tenant_id, status=status, job_type=job_type, limit=limit, ) def claim_due_jobs(self, payload: DueJobClaimRequest) -> list[ScheduledJob]: return self.job_repository.claim_due_jobs( tenant_id=payload.tenant_id, worker_key=payload.worker_key, lease_seconds=payload.lease_seconds, limit=payload.limit, now_time=payload.now_time or datetime.utcnow(), ) def update_job_status( self, *, job_id: str, payload: ScheduledJobStatusUpdateRequest, ) -> ScheduledJob | None: entity = self.job_repository.get_by_id(tenant_id=payload.tenant_id, job_id=job_id) if entity is None: return None return self.job_repository.update_status( job_id=job_id, status=payload.status, last_error_message=payload.last_error_message, )