routes.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from fastapi import APIRouter, Depends, HTTPException, Query, Request
  2. from sqlalchemy import text
  3. from sqlalchemy.orm import Session
  4. from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth
  5. from core_shared import try_build_redis_client
  6. from core_shared.task_queue import TaskQueuePublisher
  7. from app.application.services import SchedulerApplicationService
  8. from app.bootstrap.settings import SchedulerServiceSettings
  9. from app.db.session import get_db
  10. from app.domain.repositories import ScheduledJobRepository
  11. from app.schemas.scheduler import (
  12. DueJobClaimRequest,
  13. ScheduledJobCreateRequest,
  14. ScheduledJobResponse,
  15. ScheduledJobStatusUpdateRequest,
  16. )
  17. router = APIRouter()
  18. def get_scheduler_application_service(
  19. request: Request,
  20. db: Session = Depends(get_db),
  21. ) -> SchedulerApplicationService:
  22. settings: SchedulerServiceSettings = request.app.state.settings
  23. redis_client = try_build_redis_client(settings.redis_url)
  24. return SchedulerApplicationService(
  25. job_repository=ScheduledJobRepository(db),
  26. task_queue_publisher=(
  27. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  28. ),
  29. )
  30. @router.get("/health", response_model=ServiceHealth)
  31. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  32. db.execute(text("SELECT 1"))
  33. return ServiceHealth(service="scheduler-service", status="ok", database="ok")
  34. @router.post("/jobs", response_model=ScheduledJobResponse)
  35. def create_job(
  36. payload: ScheduledJobCreateRequest,
  37. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  38. ) -> ScheduledJobResponse:
  39. return ScheduledJobResponse.from_entity(service.create_job(payload))
  40. @router.get("/jobs", response_model=list[ScheduledJobResponse])
  41. def list_jobs(
  42. tenant_id: str = Query(...),
  43. status: ScheduledJobStatus | None = Query(default=None),
  44. job_type: ScheduledJobType | None = Query(default=None),
  45. limit: int = Query(default=100, ge=1, le=500),
  46. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  47. ) -> list[ScheduledJobResponse]:
  48. return [
  49. ScheduledJobResponse.from_entity(item)
  50. for item in service.list_jobs(
  51. tenant_id=tenant_id,
  52. status=status,
  53. job_type=job_type,
  54. limit=limit,
  55. )
  56. ]
  57. @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse])
  58. def claim_due_jobs(
  59. payload: DueJobClaimRequest,
  60. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  61. ) -> list[ScheduledJobResponse]:
  62. return [
  63. ScheduledJobResponse.from_entity(item)
  64. for item in service.claim_due_jobs(payload)
  65. ]
  66. @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse)
  67. def update_job_status(
  68. job_id: str,
  69. payload: ScheduledJobStatusUpdateRequest,
  70. service: SchedulerApplicationService = Depends(get_scheduler_application_service),
  71. ) -> ScheduledJobResponse:
  72. entity = service.update_job_status(job_id=job_id, payload=payload)
  73. if entity is None:
  74. raise HTTPException(status_code=404, detail=f"scheduled job not found: {job_id}")
  75. return ScheduledJobResponse.from_entity(entity)