| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- from fastapi import APIRouter, Depends, HTTPException, Query
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth
- from app.application.services import SchedulerApplicationService
- from app.db.session import get_db
- from app.domain.repositories import ScheduledJobRepository
- from app.schemas.scheduler import (
- DueJobClaimRequest,
- ScheduledJobCreateRequest,
- ScheduledJobResponse,
- ScheduledJobStatusUpdateRequest,
- )
- router = APIRouter()
- def get_scheduler_application_service(
- db: Session = Depends(get_db),
- ) -> SchedulerApplicationService:
- return SchedulerApplicationService(job_repository=ScheduledJobRepository(db))
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="scheduler-service", status="ok", database="ok")
- @router.post("/jobs", response_model=ScheduledJobResponse)
- def create_job(
- payload: ScheduledJobCreateRequest,
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> ScheduledJobResponse:
- return ScheduledJobResponse.from_entity(service.create_job(payload))
- @router.get("/jobs", response_model=list[ScheduledJobResponse])
- def list_jobs(
- tenant_id: str = Query(...),
- status: ScheduledJobStatus | None = Query(default=None),
- job_type: ScheduledJobType | None = Query(default=None),
- limit: int = Query(default=100, ge=1, le=500),
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> list[ScheduledJobResponse]:
- return [
- ScheduledJobResponse.from_entity(item)
- for item in service.list_jobs(
- tenant_id=tenant_id,
- status=status,
- job_type=job_type,
- limit=limit,
- )
- ]
- @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse])
- def claim_due_jobs(
- payload: DueJobClaimRequest,
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> list[ScheduledJobResponse]:
- return [
- ScheduledJobResponse.from_entity(item)
- for item in service.claim_due_jobs(payload)
- ]
- @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse)
- def update_job_status(
- job_id: str,
- payload: ScheduledJobStatusUpdateRequest,
- service: SchedulerApplicationService = Depends(get_scheduler_application_service),
- ) -> ScheduledJobResponse:
- entity = service.update_job_status(job_id=job_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=f"scheduled job not found: {job_id}")
- return ScheduledJobResponse.from_entity(entity)
|