routes.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from fastapi import APIRouter, Depends, HTTPException, Query
  2. from sqlalchemy import text
  3. from sqlalchemy.orm import Session
  4. from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth
  5. from app.application.services import SchedulerApplicationService
  6. from app.db.session import get_db
  7. from app.domain.repositories import ScheduledJobRepository
  8. from app.schemas.scheduler import (
  9. DueJobClaimRequest,
  10. ScheduledJobCreateRequest,
  11. ScheduledJobResponse,
  12. ScheduledJobStatusUpdateRequest,
  13. )
  14. router = APIRouter()
  15. def get_scheduler_application_service(
  16. db: Session = Depends(get_db),
  17. ) -> SchedulerApplicationService:
  18. return SchedulerApplicationService(job_repository=ScheduledJobRepository(db))
  19. @router.get("/health", response_model=ServiceHealth)
  20. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  21. db.execute(text("SELECT 1"))
  22. return ServiceHealth(service="scheduler-service", status="ok", database="ok")
  23. @router.post("/jobs", response_model=ScheduledJobResponse)
  24. def create_job(
  25. payload: ScheduledJobCreateRequest,
  26. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  27. ) -> ScheduledJobResponse:
  28. return ScheduledJobResponse.from_entity(service.create_job(payload))
  29. @router.get("/jobs", response_model=list[ScheduledJobResponse])
  30. def list_jobs(
  31. tenant_id: str = Query(...),
  32. status: ScheduledJobStatus | None = Query(default=None),
  33. job_type: ScheduledJobType | None = Query(default=None),
  34. limit: int = Query(default=100, ge=1, le=500),
  35. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  36. ) -> list[ScheduledJobResponse]:
  37. return [
  38. ScheduledJobResponse.from_entity(item)
  39. for item in service.list_jobs(
  40. tenant_id=tenant_id,
  41. status=status,
  42. job_type=job_type,
  43. limit=limit,
  44. )
  45. ]
  46. @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse])
  47. def claim_due_jobs(
  48. payload: DueJobClaimRequest,
  49. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  50. ) -> list[ScheduledJobResponse]:
  51. return [
  52. ScheduledJobResponse.from_entity(item)
  53. for item in service.claim_due_jobs(payload)
  54. ]
  55. @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse)
  56. def update_job_status(
  57. job_id: str,
  58. payload: ScheduledJobStatusUpdateRequest,
  59. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  60. ) -> ScheduledJobResponse:
  61. entity = service.update_job_status(job_id=job_id, payload=payload)
  62. if entity is None:
  63. raise HTTPException(status_code=404, detail=f"scheduled job not found: {job_id}")
  64. return ScheduledJobResponse.from_entity(entity)