services.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. from datetime import datetime
  2. from core_domain import ScheduledJobStatus, ScheduledJobType
  3. from core_shared.task_queue import TaskQueuePublisher
  4. from app.db.models import ScheduledJob
  5. from app.domain.repositories import ScheduledJobRepository
  6. from app.schemas.scheduler import (
  7. DueJobClaimRequest,
  8. ScheduledJobCreateRequest,
  9. ScheduledJobStatusUpdateRequest,
  10. )
  11. class SchedulerApplicationService:
  12. def __init__(
  13. self,
  14. *,
  15. job_repository: ScheduledJobRepository,
  16. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  17. self.job_repository = job_repository
  18. self.task_queue_publisher = task_queue_publisher
  19. def create_job(self, payload: ScheduledJobCreateRequest) -> ScheduledJob:
  20. job = self.job_repository.create(
  21. job_type=payload.job_type,
  22. name=payload.name,
  23. description=payload.description,
  24. target_service=payload.target_service,
  25. target_url=payload.target_url,
  26. method=payload.method,
  27. payload_json=payload.payload_json,
  28. schedule_time=payload.schedule_time,
  29. max_attempts=payload.max_attempts,
  30. metadata_json=payload.metadata_json)
  31. if self.task_queue_publisher is not None:
  32. self.task_queue_publisher.publish_scheduled_job(
  33. scheduled_job_id=job.id)
  34. return job
  35. def list_jobs(
  36. self,
  37. *,
  38. status: ScheduledJobStatus | None,
  39. job_type: ScheduledJobType | None,
  40. limit: int) -> list[ScheduledJob]:
  41. return self.job_repository.list_by_scope(
  42. status=status,
  43. job_type=job_type,
  44. limit=limit)
  45. def claim_due_jobs(self, payload: DueJobClaimRequest) -> list[ScheduledJob]:
  46. return self.job_repository.claim_due_jobs(
  47. worker_key=payload.worker_key,
  48. lease_seconds=payload.lease_seconds,
  49. limit=payload.limit,
  50. now_time=payload.now_time or datetime.utcnow())
  51. def update_job_status(
  52. self,
  53. *,
  54. job_id: str,
  55. payload: ScheduledJobStatusUpdateRequest) -> ScheduledJob | None:
  56. entity = self.job_repository.get_by_id(job_id=job_id)
  57. if entity is None:
  58. return None
  59. return self.job_repository.update_status(
  60. job_id=job_id,
  61. status=payload.status,
  62. last_error_message=payload.last_error_message)