from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth from core_shared import error_detail, try_build_redis_client from core_shared.task_queue import TaskQueuePublisher from fastapi import APIRouter, Depends, HTTPException, Query, Request from sqlalchemy import text from sqlalchemy.orm import Session from app.application.services import SchedulerApplicationService from app.bootstrap.settings import SchedulerServiceSettings from app.db.session import get_db from app.domain.repositories import ScheduledJobRepository from app.schemas.scheduler import ( DueJobClaimRequest, ScheduledJobCreateRequest, ScheduledJobListRequest, ScheduledJobResponse, ScheduledJobStatusPostRequest, ScheduledJobStatusUpdateRequest, ) router = APIRouter() def get_scheduler_application_service( request: Request, db: Session = Depends(get_db)) -> SchedulerApplicationService: settings: SchedulerServiceSettings = request.app.state.settings redis_client = try_build_redis_client(settings.redis_url) return SchedulerApplicationService( job_repository=ScheduledJobRepository(db), task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None )) @router.get("/health", response_model=ServiceHealth) def health_check(db: Session = Depends(get_db)) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="scheduler-service", status="ok", database="ok") @router.post("/jobs", response_model=ScheduledJobResponse) def create_job( payload: ScheduledJobCreateRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> ScheduledJobResponse: return ScheduledJobResponse.from_entity(service.create_job(payload)) @router.get("/jobs", response_model=list[ScheduledJobResponse]) def list_jobs( status: ScheduledJobStatus | None = Query(default=None), job_type: ScheduledJobType | None = Query(default=None), limit: int = Query(default=100, ge=1, le=500), service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> list[ScheduledJobResponse]: return [ ScheduledJobResponse.from_entity(item) for item in service.list_jobs( status=status, job_type=job_type, limit=limit) ] @router.post("/jobs/list", response_model=list[ScheduledJobResponse]) def list_jobs_post( payload: ScheduledJobListRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> list[ScheduledJobResponse]: return [ ScheduledJobResponse.from_entity(item) for item in service.list_jobs( status=payload.status, job_type=payload.job_type, limit=payload.limit) ] @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse]) def claim_due_jobs( payload: DueJobClaimRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> list[ScheduledJobResponse]: return [ ScheduledJobResponse.from_entity(item) for item in service.claim_due_jobs(payload) ] @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse) def update_job_status( job_id: str, payload: ScheduledJobStatusUpdateRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> ScheduledJobResponse: entity = service.update_job_status(job_id=job_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.scheduled_job.not_found", id=job_id)) return ScheduledJobResponse.from_entity(entity) @router.post("/jobs/status", response_model=ScheduledJobResponse) def update_job_status_post( payload: ScheduledJobStatusPostRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> ScheduledJobResponse: entity = service.update_job_status( job_id=payload.job_id, payload=ScheduledJobStatusUpdateRequest( status=payload.status, last_error_message=payload.last_error_message)) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.scheduled_job.not_found", id=payload.job_id)) return ScheduledJobResponse.from_entity(entity)