worker.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from datetime import datetime
  8. from uuid import uuid4
  9. import httpx
  10. from sqlalchemy.orm import Session, sessionmaker
  11. from core_shared import JSONValue
  12. from app.bootstrap.settings import SchedulerServiceSettings
  13. from app.db.models import ScheduledJob
  14. from app.db.session import build_session_factory
  15. from app.domain.repositories import ScheduledJobRepository
  16. @dataclass(frozen=True)
  17. class SchedulerWorkerStats:
  18. worker_key: str
  19. executed_count: int = 0
  20. idle_count: int = 0
  21. error_count: int = 0
  22. class ScheduledJobExecutor:
  23. def __init__(self, *, settings: SchedulerServiceSettings) -> None:
  24. self.settings = settings
  25. def execute(self, job: ScheduledJob) -> None:
  26. if job.job_type == "event":
  27. self._publish_event(job)
  28. return
  29. self._call_target(job)
  30. def _publish_event(self, job: ScheduledJob) -> None:
  31. payload = job.payload_json
  32. target_url = job.target_url or f"{self.settings.event_service_url.rstrip('/')}/events"
  33. event_payload: dict[str, JSONValue] = {
  34. "tenant_id": job.tenant_id,
  35. "event_type": str(payload.get("event_type", "scheduled.job.due")),
  36. "source_service": str(payload.get("source_service", "scheduler-service")),
  37. "aggregate_type": _optional_str(payload.get("aggregate_type")),
  38. "aggregate_id": _optional_str(payload.get("aggregate_id")),
  39. "correlation_id": _optional_str(payload.get("correlation_id")),
  40. "causation_id": str(payload.get("causation_id", job.id)),
  41. "payload_json": _dict_json(payload.get("payload_json"), fallback=payload),
  42. "metadata_json": job.metadata_json,
  43. }
  44. self._request(method=job.method or "POST", url=target_url, payload=event_payload)
  45. def _call_target(self, job: ScheduledJob) -> None:
  46. if not job.target_url:
  47. raise ValueError(f"scheduled job requires target_url for job_type={job.job_type}")
  48. self._request(
  49. method=job.method or "POST",
  50. url=job.target_url,
  51. payload=job.payload_json,
  52. )
  53. def _request(
  54. self,
  55. *,
  56. method: str,
  57. url: str,
  58. payload: dict[str, JSONValue],
  59. ) -> None:
  60. with httpx.Client(timeout=self.settings.worker_request_timeout_seconds) as client:
  61. response = client.request(method=method, url=url, json=payload)
  62. response.raise_for_status()
  63. class SchedulerWorker:
  64. def __init__(
  65. self,
  66. *,
  67. settings: SchedulerServiceSettings,
  68. session_factory: sessionmaker[Session],
  69. worker_key: str,
  70. ) -> None:
  71. self.settings = settings
  72. self.session_factory = session_factory
  73. self.worker_key = worker_key
  74. self.executor = ScheduledJobExecutor(settings=settings)
  75. def run_forever(self) -> SchedulerWorkerStats:
  76. executed_count = 0
  77. idle_count = 0
  78. error_count = 0
  79. while True:
  80. try:
  81. executed = self.run_once()
  82. except Exception:
  83. error_count += 1
  84. traceback.print_exc()
  85. executed = False
  86. if executed:
  87. executed_count += 1
  88. idle_count = 0
  89. else:
  90. idle_count += 1
  91. time.sleep(self.settings.worker_poll_interval_seconds)
  92. if self.settings.worker_max_idle_cycles is not None:
  93. if idle_count >= self.settings.worker_max_idle_cycles:
  94. return SchedulerWorkerStats(
  95. worker_key=self.worker_key,
  96. executed_count=executed_count,
  97. idle_count=idle_count,
  98. error_count=error_count,
  99. )
  100. def run_once(self) -> bool:
  101. db = self.session_factory()
  102. try:
  103. repository = ScheduledJobRepository(db)
  104. jobs = repository.claim_due_jobs(
  105. tenant_id=None,
  106. worker_key=self.worker_key,
  107. lease_seconds=self.settings.worker_lease_seconds,
  108. limit=self.settings.worker_claim_limit,
  109. now_time=datetime.utcnow(),
  110. )
  111. for job in jobs:
  112. self._execute_claimed_job(repository=repository, job=job)
  113. return bool(jobs)
  114. finally:
  115. db.close()
  116. def _execute_claimed_job(
  117. self,
  118. *,
  119. repository: ScheduledJobRepository,
  120. job: ScheduledJob,
  121. ) -> None:
  122. try:
  123. self.executor.execute(job)
  124. except Exception as exc:
  125. repository.update_status(
  126. job_id=job.id,
  127. status="failed",
  128. last_error_message=str(exc),
  129. )
  130. traceback.print_exc()
  131. return
  132. repository.update_status(job_id=job.id, status="completed")
  133. def _optional_str(value: JSONValue) -> str | None:
  134. if value is None:
  135. return None
  136. return str(value)
  137. def _dict_json(
  138. value: JSONValue,
  139. *,
  140. fallback: dict[str, JSONValue],
  141. ) -> dict[str, JSONValue]:
  142. if isinstance(value, dict):
  143. return value
  144. return fallback
  145. def build_worker_key() -> str:
  146. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  147. if configured_key:
  148. return configured_key
  149. hostname = socket.gethostname()
  150. return f"{hostname}-{uuid4().hex[:8]}"
  151. def main() -> None:
  152. settings = SchedulerServiceSettings()
  153. worker = SchedulerWorker(
  154. settings=settings,
  155. session_factory=build_session_factory(settings),
  156. worker_key=build_worker_key(),
  157. )
  158. stats = worker.run_forever()
  159. print(
  160. "scheduler-worker stopped "
  161. f"worker_key={stats.worker_key} "
  162. f"executed_count={stats.executed_count} "
  163. f"idle_count={stats.idle_count} "
  164. f"error_count={stats.error_count}",
  165. flush=True,
  166. )
  167. if __name__ == "__main__":
  168. main()