worker.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from datetime import datetime
  8. from math import ceil
  9. from uuid import uuid4
  10. import httpx
  11. from sqlalchemy.orm import Session, sessionmaker
  12. from core_shared import JSONValue, try_build_redis_client
  13. from core_shared.task_queue import SCHEDULED_JOB_QUEUE, build_task_queue_consumer
  14. from app.bootstrap.settings import SchedulerServiceSettings
  15. from app.db.models import ScheduledJob
  16. from app.db.session import build_session_factory
  17. from app.domain.repositories import ScheduledJobRepository
  18. @dataclass(frozen=True)
  19. class SchedulerWorkerStats:
  20. worker_key: str
  21. executed_count: int = 0
  22. idle_count: int = 0
  23. error_count: int = 0
  24. class ScheduledJobExecutor:
  25. def __init__(self, *, settings: SchedulerServiceSettings) -> None:
  26. self.settings = settings
  27. def execute(self, job: ScheduledJob) -> None:
  28. if job.job_type == "event":
  29. self._publish_event(job)
  30. return
  31. self._call_target(job)
  32. def _publish_event(self, job: ScheduledJob) -> None:
  33. payload = job.payload_json
  34. target_url = job.target_url or f"{self.settings.event_service_url.rstrip('/')}/events"
  35. event_payload: dict[str, JSONValue] = {
  36. "tenant_id": job.tenant_id,
  37. "event_type": str(payload.get("event_type", "scheduled.job.due")),
  38. "source_service": str(payload.get("source_service", "scheduler-service")),
  39. "aggregate_type": _optional_str(payload.get("aggregate_type")),
  40. "aggregate_id": _optional_str(payload.get("aggregate_id")),
  41. "correlation_id": _optional_str(payload.get("correlation_id")),
  42. "causation_id": str(payload.get("causation_id", job.id)),
  43. "payload_json": _dict_json(payload.get("payload_json"), fallback=payload),
  44. "metadata_json": job.metadata_json,
  45. }
  46. self._request(method=job.method or "POST", url=target_url, payload=event_payload)
  47. def _call_target(self, job: ScheduledJob) -> None:
  48. if not job.target_url:
  49. raise ValueError(f"scheduled job requires target_url for job_type={job.job_type}")
  50. self._request(
  51. method=job.method or "POST",
  52. url=job.target_url,
  53. payload=job.payload_json,
  54. )
  55. def _request(
  56. self,
  57. *,
  58. method: str,
  59. url: str,
  60. payload: dict[str, JSONValue],
  61. ) -> None:
  62. with httpx.Client(timeout=self.settings.worker_request_timeout_seconds) as client:
  63. response = client.request(method=method, url=url, json=payload)
  64. response.raise_for_status()
  65. class SchedulerWorker:
  66. def __init__(
  67. self,
  68. *,
  69. settings: SchedulerServiceSettings,
  70. session_factory: sessionmaker[Session],
  71. worker_key: str,
  72. ) -> None:
  73. self.settings = settings
  74. self.session_factory = session_factory
  75. self.worker_key = worker_key
  76. self.executor = ScheduledJobExecutor(settings=settings)
  77. self.redis_client = try_build_redis_client(settings.redis_url)
  78. self.task_queue = build_task_queue_consumer(
  79. client=self.redis_client,
  80. queue_name=SCHEDULED_JOB_QUEUE,
  81. )
  82. def run_forever(self) -> SchedulerWorkerStats:
  83. executed_count = 0
  84. idle_count = 0
  85. error_count = 0
  86. while True:
  87. try:
  88. executed = self.run_once()
  89. except Exception:
  90. error_count += 1
  91. traceback.print_exc()
  92. executed = False
  93. if executed:
  94. executed_count += 1
  95. idle_count = 0
  96. else:
  97. idle_count += 1
  98. if self.task_queue is None:
  99. time.sleep(self.settings.worker_poll_interval_seconds)
  100. if self.settings.worker_max_idle_cycles is not None:
  101. if idle_count >= self.settings.worker_max_idle_cycles:
  102. return SchedulerWorkerStats(
  103. worker_key=self.worker_key,
  104. executed_count=executed_count,
  105. idle_count=idle_count,
  106. error_count=error_count,
  107. )
  108. def run_once(self) -> bool:
  109. self._wait_for_queue_signal()
  110. db = self.session_factory()
  111. try:
  112. repository = ScheduledJobRepository(db)
  113. jobs = repository.claim_due_jobs(
  114. tenant_id=None,
  115. worker_key=self.worker_key,
  116. lease_seconds=self.settings.worker_lease_seconds,
  117. limit=self.settings.worker_claim_limit,
  118. now_time=datetime.utcnow(),
  119. )
  120. for job in jobs:
  121. self._execute_claimed_job(repository=repository, job=job)
  122. return bool(jobs)
  123. finally:
  124. db.close()
  125. def _wait_for_queue_signal(self) -> None:
  126. if self.task_queue is None:
  127. return
  128. try:
  129. self.task_queue.dequeue(
  130. timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)),
  131. )
  132. except Exception:
  133. return
  134. def _execute_claimed_job(
  135. self,
  136. *,
  137. repository: ScheduledJobRepository,
  138. job: ScheduledJob,
  139. ) -> None:
  140. lock = None
  141. idempotency_store = None
  142. if self.redis_client is not None:
  143. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  144. lock = DistributedLock(
  145. client=self.redis_client,
  146. name=f"scheduled-job:{job.id}:lock",
  147. ttl_seconds=self.settings.worker_lease_seconds,
  148. )
  149. if not lock.acquire():
  150. return
  151. idempotency_store = IdempotencyStore(
  152. client=self.redis_client,
  153. prefix="scheduled-job-idempotency",
  154. )
  155. if not idempotency_store.begin(key=job.id):
  156. lock.release()
  157. return
  158. try:
  159. self.executor.execute(job)
  160. except Exception as exc:
  161. repository.update_status(
  162. job_id=job.id,
  163. status="failed",
  164. last_error_message=str(exc),
  165. )
  166. traceback.print_exc()
  167. if lock is not None:
  168. lock.release()
  169. return
  170. if idempotency_store is not None:
  171. idempotency_store.complete(
  172. key=job.id,
  173. result={"status": "completed", "scheduled_job_id": job.id},
  174. )
  175. if lock is not None:
  176. lock.release()
  177. repository.update_status(job_id=job.id, status="completed")
  178. def _optional_str(value: JSONValue) -> str | None:
  179. if value is None:
  180. return None
  181. return str(value)
  182. def _dict_json(
  183. value: JSONValue,
  184. *,
  185. fallback: dict[str, JSONValue],
  186. ) -> dict[str, JSONValue]:
  187. if isinstance(value, dict):
  188. return value
  189. return fallback
  190. def build_worker_key() -> str:
  191. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  192. if configured_key:
  193. return configured_key
  194. hostname = socket.gethostname()
  195. return f"{hostname}-{uuid4().hex[:8]}"
  196. def main() -> None:
  197. settings = SchedulerServiceSettings()
  198. worker = SchedulerWorker(
  199. settings=settings,
  200. session_factory=build_session_factory(settings),
  201. worker_key=build_worker_key(),
  202. )
  203. stats = worker.run_forever()
  204. print(
  205. "scheduler-worker stopped "
  206. f"worker_key={stats.worker_key} "
  207. f"executed_count={stats.executed_count} "
  208. f"idle_count={stats.idle_count} "
  209. f"error_count={stats.error_count}",
  210. flush=True,
  211. )
  212. if __name__ == "__main__":
  213. main()