worker.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from datetime import datetime
  8. from uuid import uuid4
  9. import httpx
  10. from sqlalchemy.orm import Session, sessionmaker
  11. from core_shared import JSONValue, try_build_redis_client
  12. from app.bootstrap.settings import SchedulerServiceSettings
  13. from app.db.models import ScheduledJob
  14. from app.db.session import build_session_factory
  15. from app.domain.repositories import ScheduledJobRepository
  16. @dataclass(frozen=True)
  17. class SchedulerWorkerStats:
  18. worker_key: str
  19. executed_count: int = 0
  20. idle_count: int = 0
  21. error_count: int = 0
  22. class ScheduledJobExecutor:
  23. def __init__(self, *, settings: SchedulerServiceSettings) -> None:
  24. self.settings = settings
  25. def execute(self, job: ScheduledJob) -> None:
  26. if job.job_type == "event":
  27. self._publish_event(job)
  28. return
  29. self._call_target(job)
  30. def _publish_event(self, job: ScheduledJob) -> None:
  31. payload = job.payload_json
  32. target_url = job.target_url or f"{self.settings.event_service_url.rstrip('/')}/events"
  33. event_payload: dict[str, JSONValue] = {
  34. "tenant_id": job.tenant_id,
  35. "event_type": str(payload.get("event_type", "scheduled.job.due")),
  36. "source_service": str(payload.get("source_service", "scheduler-service")),
  37. "aggregate_type": _optional_str(payload.get("aggregate_type")),
  38. "aggregate_id": _optional_str(payload.get("aggregate_id")),
  39. "correlation_id": _optional_str(payload.get("correlation_id")),
  40. "causation_id": str(payload.get("causation_id", job.id)),
  41. "payload_json": _dict_json(payload.get("payload_json"), fallback=payload),
  42. "metadata_json": job.metadata_json,
  43. }
  44. self._request(method=job.method or "POST", url=target_url, payload=event_payload)
  45. def _call_target(self, job: ScheduledJob) -> None:
  46. if not job.target_url:
  47. raise ValueError(f"scheduled job requires target_url for job_type={job.job_type}")
  48. self._request(
  49. method=job.method or "POST",
  50. url=job.target_url,
  51. payload=job.payload_json,
  52. )
  53. def _request(
  54. self,
  55. *,
  56. method: str,
  57. url: str,
  58. payload: dict[str, JSONValue],
  59. ) -> None:
  60. with httpx.Client(timeout=self.settings.worker_request_timeout_seconds) as client:
  61. response = client.request(method=method, url=url, json=payload)
  62. response.raise_for_status()
  63. class SchedulerWorker:
  64. def __init__(
  65. self,
  66. *,
  67. settings: SchedulerServiceSettings,
  68. session_factory: sessionmaker[Session],
  69. worker_key: str,
  70. ) -> None:
  71. self.settings = settings
  72. self.session_factory = session_factory
  73. self.worker_key = worker_key
  74. self.executor = ScheduledJobExecutor(settings=settings)
  75. self.redis_client = try_build_redis_client(settings.redis_url)
  76. def run_forever(self) -> SchedulerWorkerStats:
  77. executed_count = 0
  78. idle_count = 0
  79. error_count = 0
  80. while True:
  81. try:
  82. executed = self.run_once()
  83. except Exception:
  84. error_count += 1
  85. traceback.print_exc()
  86. executed = False
  87. if executed:
  88. executed_count += 1
  89. idle_count = 0
  90. else:
  91. idle_count += 1
  92. time.sleep(self.settings.worker_poll_interval_seconds)
  93. if self.settings.worker_max_idle_cycles is not None:
  94. if idle_count >= self.settings.worker_max_idle_cycles:
  95. return SchedulerWorkerStats(
  96. worker_key=self.worker_key,
  97. executed_count=executed_count,
  98. idle_count=idle_count,
  99. error_count=error_count,
  100. )
  101. def run_once(self) -> bool:
  102. db = self.session_factory()
  103. try:
  104. repository = ScheduledJobRepository(db)
  105. jobs = repository.claim_due_jobs(
  106. tenant_id=None,
  107. worker_key=self.worker_key,
  108. lease_seconds=self.settings.worker_lease_seconds,
  109. limit=self.settings.worker_claim_limit,
  110. now_time=datetime.utcnow(),
  111. )
  112. for job in jobs:
  113. self._execute_claimed_job(repository=repository, job=job)
  114. return bool(jobs)
  115. finally:
  116. db.close()
  117. def _execute_claimed_job(
  118. self,
  119. *,
  120. repository: ScheduledJobRepository,
  121. job: ScheduledJob,
  122. ) -> None:
  123. lock = None
  124. idempotency_store = None
  125. if self.redis_client is not None:
  126. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  127. lock = DistributedLock(
  128. client=self.redis_client,
  129. name=f"scheduled-job:{job.id}:lock",
  130. ttl_seconds=self.settings.worker_lease_seconds,
  131. )
  132. if not lock.acquire():
  133. return
  134. idempotency_store = IdempotencyStore(
  135. client=self.redis_client,
  136. prefix="scheduled-job-idempotency",
  137. )
  138. if not idempotency_store.begin(key=job.id):
  139. lock.release()
  140. return
  141. try:
  142. self.executor.execute(job)
  143. except Exception as exc:
  144. repository.update_status(
  145. job_id=job.id,
  146. status="failed",
  147. last_error_message=str(exc),
  148. )
  149. traceback.print_exc()
  150. if lock is not None:
  151. lock.release()
  152. return
  153. if idempotency_store is not None:
  154. idempotency_store.complete(
  155. key=job.id,
  156. result={"status": "completed", "scheduled_job_id": job.id},
  157. )
  158. if lock is not None:
  159. lock.release()
  160. repository.update_status(job_id=job.id, status="completed")
  161. def _optional_str(value: JSONValue) -> str | None:
  162. if value is None:
  163. return None
  164. return str(value)
  165. def _dict_json(
  166. value: JSONValue,
  167. *,
  168. fallback: dict[str, JSONValue],
  169. ) -> dict[str, JSONValue]:
  170. if isinstance(value, dict):
  171. return value
  172. return fallback
  173. def build_worker_key() -> str:
  174. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  175. if configured_key:
  176. return configured_key
  177. hostname = socket.gethostname()
  178. return f"{hostname}-{uuid4().hex[:8]}"
  179. def main() -> None:
  180. settings = SchedulerServiceSettings()
  181. worker = SchedulerWorker(
  182. settings=settings,
  183. session_factory=build_session_factory(settings),
  184. worker_key=build_worker_key(),
  185. )
  186. stats = worker.run_forever()
  187. print(
  188. "scheduler-worker stopped "
  189. f"worker_key={stats.worker_key} "
  190. f"executed_count={stats.executed_count} "
  191. f"idle_count={stats.idle_count} "
  192. f"error_count={stats.error_count}",
  193. flush=True,
  194. )
  195. if __name__ == "__main__":
  196. main()