routes.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. from core_domain import ScheduledJobStatus, ScheduledJobType, ServiceHealth
  2. from core_shared import error_detail, try_build_redis_client
  3. from core_shared.task_queue import TaskQueuePublisher
  4. from fastapi import APIRouter, Depends, HTTPException, Query, Request
  5. from sqlalchemy import text
  6. from sqlalchemy.orm import Session
  7. from app.application.services import SchedulerApplicationService
  8. from app.bootstrap.settings import SchedulerServiceSettings
  9. from app.db.session import get_db
  10. from app.domain.repositories import ScheduledJobRepository
  11. from app.schemas.scheduler import (
  12. DueJobClaimRequest,
  13. ScheduledJobCreateRequest,
  14. ScheduledJobListRequest,
  15. ScheduledJobResponse,
  16. ScheduledJobStatusPostRequest,
  17. ScheduledJobStatusUpdateRequest,
  18. )
  19. router = APIRouter()
  20. def get_scheduler_application_service(
  21. request: Request,
  22. db: Session = Depends(get_db)) -> SchedulerApplicationService:
  23. settings: SchedulerServiceSettings = request.app.state.settings
  24. redis_client = try_build_redis_client(settings.redis_url)
  25. return SchedulerApplicationService(
  26. job_repository=ScheduledJobRepository(db),
  27. task_queue_publisher=(
  28. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  29. ))
  30. @router.get("/health", response_model=ServiceHealth)
  31. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  32. db.execute(text("SELECT 1"))
  33. return ServiceHealth(service="scheduler-service", status="ok", database="ok")
  34. @router.post("/jobs", response_model=ScheduledJobResponse)
  35. def create_job(
  36. payload: ScheduledJobCreateRequest,
  37. service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> ScheduledJobResponse:
  38. return ScheduledJobResponse.from_entity(service.create_job(payload))
  39. @router.get("/jobs", response_model=list[ScheduledJobResponse])
  40. def list_jobs(
  41. status: ScheduledJobStatus | None = Query(default=None),
  42. job_type: ScheduledJobType | None = Query(default=None),
  43. limit: int = Query(default=100, ge=1, le=500),
  44. service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> list[ScheduledJobResponse]:
  45. return [
  46. ScheduledJobResponse.from_entity(item)
  47. for item in service.list_jobs(
  48. status=status,
  49. job_type=job_type,
  50. limit=limit)
  51. ]
  52. @router.post("/jobs/list", response_model=list[ScheduledJobResponse])
  53. def list_jobs_post(
  54. payload: ScheduledJobListRequest,
  55. service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> list[ScheduledJobResponse]:
  56. return [
  57. ScheduledJobResponse.from_entity(item)
  58. for item in service.list_jobs(
  59. status=payload.status,
  60. job_type=payload.job_type,
  61. limit=payload.limit)
  62. ]
  63. @router.post("/jobs/claim-due", response_model=list[ScheduledJobResponse])
  64. def claim_due_jobs(
  65. payload: DueJobClaimRequest,
  66. service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> list[ScheduledJobResponse]:
  67. return [
  68. ScheduledJobResponse.from_entity(item)
  69. for item in service.claim_due_jobs(payload)
  70. ]
  71. @router.post("/jobs/{job_id}/status", response_model=ScheduledJobResponse)
  72. def update_job_status(
  73. job_id: str,
  74. payload: ScheduledJobStatusUpdateRequest,
  75. service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> ScheduledJobResponse:
  76. entity = service.update_job_status(job_id=job_id, payload=payload)
  77. if entity is None:
  78. raise HTTPException(status_code=404, detail=error_detail("error.scheduled_job.not_found", id=job_id))
  79. return ScheduledJobResponse.from_entity(entity)
  80. @router.post("/jobs/status", response_model=ScheduledJobResponse)
  81. def update_job_status_post(
  82. payload: ScheduledJobStatusPostRequest,
  83. service: SchedulerApplicationService = Depends(get_scheduler_application_service)) -> ScheduledJobResponse:
  84. entity = service.update_job_status(
  85. job_id=payload.job_id,
  86. payload=ScheduledJobStatusUpdateRequest(
  87. status=payload.status,
  88. last_error_message=payload.last_error_message))
  89. if entity is None:
  90. raise HTTPException(status_code=404, detail=error_detail("error.scheduled_job.not_found", id=payload.job_id))
  91. return ScheduledJobResponse.from_entity(entity)