services.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from datetime import datetime
  2. from core_domain import ScheduledJobStatus, ScheduledJobType
  3. from core_shared.task_queue import TaskQueuePublisher
  4. from app.db.models import ScheduledJob
  5. from app.domain.repositories import ScheduledJobRepository
  6. from app.schemas.scheduler import (
  7. DueJobClaimRequest,
  8. ScheduledJobCreateRequest,
  9. ScheduledJobStatusUpdateRequest,
  10. )
  11. class SchedulerApplicationService:
  12. def __init__(
  13. self,
  14. *,
  15. job_repository: ScheduledJobRepository,
  16. task_queue_publisher: TaskQueuePublisher | None = None,
  17. ) -> None:
  18. self.job_repository = job_repository
  19. self.task_queue_publisher = task_queue_publisher
  20. def create_job(self, payload: ScheduledJobCreateRequest) -> ScheduledJob:
  21. job = self.job_repository.create(
  22. tenant_id=payload.tenant_id,
  23. job_type=payload.job_type,
  24. name=payload.name,
  25. description=payload.description,
  26. target_service=payload.target_service,
  27. target_url=payload.target_url,
  28. method=payload.method,
  29. payload_json=payload.payload_json,
  30. schedule_time=payload.schedule_time,
  31. max_attempts=payload.max_attempts,
  32. metadata_json=payload.metadata_json,
  33. )
  34. if self.task_queue_publisher is not None:
  35. self.task_queue_publisher.publish_scheduled_job(
  36. tenant_id=job.tenant_id,
  37. scheduled_job_id=job.id,
  38. )
  39. return job
  40. def list_jobs(
  41. self,
  42. *,
  43. tenant_id: str,
  44. status: ScheduledJobStatus | None,
  45. job_type: ScheduledJobType | None,
  46. limit: int,
  47. ) -> list[ScheduledJob]:
  48. return self.job_repository.list_by_scope(
  49. tenant_id=tenant_id,
  50. status=status,
  51. job_type=job_type,
  52. limit=limit,
  53. )
  54. def claim_due_jobs(self, payload: DueJobClaimRequest) -> list[ScheduledJob]:
  55. return self.job_repository.claim_due_jobs(
  56. tenant_id=payload.tenant_id,
  57. worker_key=payload.worker_key,
  58. lease_seconds=payload.lease_seconds,
  59. limit=payload.limit,
  60. now_time=payload.now_time or datetime.utcnow(),
  61. )
  62. def update_job_status(
  63. self,
  64. *,
  65. job_id: str,
  66. payload: ScheduledJobStatusUpdateRequest,
  67. ) -> ScheduledJob | None:
  68. entity = self.job_repository.get_by_id(tenant_id=payload.tenant_id, job_id=job_id)
  69. if entity is None:
  70. return None
  71. return self.job_repository.update_status(
  72. job_id=job_id,
  73. status=payload.status,
  74. last_error_message=payload.last_error_message,
  75. )