| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- from fastapi import APIRouter, Depends, HTTPException, Query, Request
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth
- from core_shared import try_build_redis_client
- from core_shared.task_queue import TaskQueuePublisher
- from app.application.services import SchedulerApplicationService
- from app.bootstrap.settings import SchedulerServiceSettings
- from app.db.session import get_db
- from app.domain.repositories import ScheduledJobRepository
- from app.schemas.scheduler import (
- DueJobClaimRequest,
- ScheduledJobCreateRequest,
- ScheduledJobResponse,
- ScheduledJobStatusUpdateRequest,
- )
- router = APIRouter()
- def get_scheduler_application_service(
- request: Request,
- db: Session = Depends(get_db),
- ) -> SchedulerApplicationService:
- settings: SchedulerServiceSettings = request.app.state.settings
- redis_client = try_build_redis_client(settings.redis_url)
- return SchedulerApplicationService(
- job_repository=ScheduledJobRepository(db),
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ),
- )
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="scheduler-service", status="ok", database="ok")
- @router.post("/jobs", response_model=ScheduledJobResponse)
- def create_job(
- payload: ScheduledJobCreateRequest,
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> ScheduledJobResponse:
- return ScheduledJobResponse.from_entity(service.create_job(payload))
- @router.get("/jobs", response_model=list[ScheduledJobResponse])
- def list_jobs(
- tenant_id: str = Query(...),
- status: ScheduledJobStatus | None = Query(default=None),
- job_type: ScheduledJobType | None = Query(default=None),
- limit: int = Query(default=100, ge=1, le=500),
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> list[ScheduledJobResponse]:
- return [
- ScheduledJobResponse.from_entity(item)
- for item in service.list_jobs(
- tenant_id=tenant_id,
- status=status,
- job_type=job_type,
- limit=limit,
- )
- ]
- @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse])
- def claim_due_jobs(
- payload: DueJobClaimRequest,
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> list[ScheduledJobResponse]:
- return [
- ScheduledJobResponse.from_entity(item)
- for item in service.claim_due_jobs(payload)
- ]
- @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse)
- def update_job_status(
- job_id: str,
- payload: ScheduledJobStatusUpdateRequest,
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> ScheduledJobResponse:
- entity = service.update_job_status(job_id=job_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=f"scheduled job not found: {job_id}")
- return ScheduledJobResponse.from_entity(entity)
|