services.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. from datetime import datetime
  2. from core_domain import ScheduledJobStatus, ScheduledJobType
  3. from app.db.models import ScheduledJob
  4. from app.domain.repositories import ScheduledJobRepository
  5. from app.schemas.scheduler import (
  6. DueJobClaimRequest,
  7. ScheduledJobCreateRequest,
  8. ScheduledJobStatusUpdateRequest,
  9. )
  10. class SchedulerApplicationService:
  11. def __init__(self, *, job_repository: ScheduledJobRepository) -> None:
  12. self.job_repository = job_repository
  13. def create_job(self, payload: ScheduledJobCreateRequest) -> ScheduledJob:
  14. return self.job_repository.create(
  15. tenant_id=payload.tenant_id,
  16. job_type=payload.job_type,
  17. name=payload.name,
  18. description=payload.description,
  19. target_service=payload.target_service,
  20. target_url=payload.target_url,
  21. method=payload.method,
  22. payload_json=payload.payload_json,
  23. schedule_time=payload.schedule_time,
  24. max_attempts=payload.max_attempts,
  25. metadata_json=payload.metadata_json,
  26. )
  27. def list_jobs(
  28. self,
  29. *,
  30. tenant_id: str,
  31. status: ScheduledJobStatus | None,
  32. job_type: ScheduledJobType | None,
  33. limit: int,
  34. ) -> list[ScheduledJob]:
  35. return self.job_repository.list_by_scope(
  36. tenant_id=tenant_id,
  37. status=status,
  38. job_type=job_type,
  39. limit=limit,
  40. )
  41. def claim_due_jobs(self, payload: DueJobClaimRequest) -> list[ScheduledJob]:
  42. return self.job_repository.claim_due_jobs(
  43. tenant_id=payload.tenant_id,
  44. worker_key=payload.worker_key,
  45. lease_seconds=payload.lease_seconds,
  46. limit=payload.limit,
  47. now_time=payload.now_time or datetime.utcnow(),
  48. )
  49. def update_job_status(
  50. self,
  51. *,
  52. job_id: str,
  53. payload: ScheduledJobStatusUpdateRequest,
  54. ) -> ScheduledJob | None:
  55. entity = self.job_repository.get_by_id(tenant_id=payload.tenant_id, job_id=job_id)
  56. if entity is None:
  57. return None
  58. return self.job_repository.update_status(
  59. job_id=job_id,
  60. status=payload.status,
  61. last_error_message=payload.last_error_message,
  62. )