from fastapi import APIRouter, Depends, HTTPException, Query, Request from sqlalchemy import text from sqlalchemy.orm import Session from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth from core_shared import try_build_redis_client from core_shared.task_queue import TaskQueuePublisher from app.application.services import SchedulerApplicationService from app.bootstrap.settings import SchedulerServiceSettings from app.db.session import get_db from app.domain.repositories import ScheduledJobRepository from app.schemas.scheduler import ( DueJobClaimRequest, ScheduledJobCreateRequest, ScheduledJobResponse, ScheduledJobStatusUpdateRequest, ) router = APIRouter() def get_scheduler_application_service( request: Request, db: Session = Depends(get_db), ) -> SchedulerApplicationService: settings: SchedulerServiceSettings = request.app.state.settings redis_client = try_build_redis_client(settings.redis_url) return SchedulerApplicationService( job_repository=ScheduledJobRepository(db), task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None ), ) @router.get("/health", response_model=ServiceHealth) def health_check(db: Session = Depends(get_db)) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="scheduler-service", status="ok", database="ok") @router.post("/jobs", response_model=ScheduledJobResponse) def create_job( payload: ScheduledJobCreateRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service), ) -> ScheduledJobResponse: return ScheduledJobResponse.from_entity(service.create_job(payload)) @router.get("/jobs", response_model=list[ScheduledJobResponse]) def list_jobs( tenant_id: str = Query(...), status: ScheduledJobStatus | None = Query(default=None), job_type: ScheduledJobType | None = Query(default=None), limit: int = Query(default=100, ge=1, le=500), service: SchedulerApplicationService = Depends(get_scheduler_application_service), ) -> list[ScheduledJobResponse]: return [ ScheduledJobResponse.from_entity(item) for item in service.list_jobs( tenant_id=tenant_id, status=status, job_type=job_type, limit=limit, ) ] @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse]) def claim_due_jobs( payload: DueJobClaimRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service), ) -> list[ScheduledJobResponse]: return [ ScheduledJobResponse.from_entity(item) for item in service.claim_due_jobs(payload) ] @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse) def update_job_status( job_id: str, payload: ScheduledJobStatusUpdateRequest, service: SchedulerApplicationService = Depends(get_scheduler_application_service), ) -> ScheduledJobResponse: entity = service.update_job_status(job_id=job_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=f"scheduled job not found: {job_id}") return ScheduledJobResponse.from_entity(entity)