from __future__ import annotations import os import socket import time import traceback from dataclasses import dataclass from datetime import datetime from math import ceil from uuid import uuid4 import httpx from core_shared import JSONValue, try_build_redis_client from core_shared.task_queue import SCHEDULED_JOB_QUEUE, build_task_queue_consumer from sqlalchemy.orm import Session, sessionmaker from app.bootstrap.settings import SchedulerServiceSettings from app.db.models import ScheduledJob from app.db.session import build_session_factory from app.domain.repositories import ScheduledJobRepository @dataclass(frozen=True) class SchedulerWorkerStats: worker_key: str executed_count: int = 0 idle_count: int = 0 error_count: int = 0 class ScheduledJobExecutor: def __init__(self, *, settings: SchedulerServiceSettings) -> None: self.settings = settings def execute(self, job: ScheduledJob) -> None: if job.job_type == "event": self._publish_event(job) return self._call_target(job) def _publish_event(self, job: ScheduledJob) -> None: payload = job.payload_json target_url = job.target_url or f"{self.settings.event_service_url.rstrip('/')}/events" event_payload: dict[str, JSONValue] = { "event_type": str(payload.get("event_type", "scheduled.job.due")), "source_service": str(payload.get("source_service", "scheduler-service")), "aggregate_type": _optional_str(payload.get("aggregate_type")), "aggregate_id": _optional_str(payload.get("aggregate_id")), "correlation_id": _optional_str(payload.get("correlation_id")), "causation_id": str(payload.get("causation_id", job.id)), "payload_json": _dict_json(payload.get("payload_json"), fallback=payload), "metadata_json": job.metadata_json, } self._request(method=job.method or "POST", url=target_url, payload=event_payload) def _call_target(self, job: ScheduledJob) -> None: if not job.target_url: raise ValueError(f"scheduled job requires target_url for job_type={job.job_type}") self._request( method=job.method or "POST", url=job.target_url, payload=job.payload_json) def _request( self, *, method: str, url: str, payload: dict[str, JSONValue]) -> None: with httpx.Client(timeout=self.settings.worker_request_timeout_seconds) as client: response = client.request(method=method, url=url, json=payload) response.raise_for_status() class SchedulerWorker: def __init__( self, *, settings: SchedulerServiceSettings, session_factory: sessionmaker[Session], worker_key: str) -> None: self.settings = settings self.session_factory = session_factory self.worker_key = worker_key self.executor = ScheduledJobExecutor(settings=settings) self.redis_client = try_build_redis_client(settings.redis_url) self.task_queue = build_task_queue_consumer( client=self.redis_client, queue_name=SCHEDULED_JOB_QUEUE) def run_forever(self) -> SchedulerWorkerStats: executed_count = 0 idle_count = 0 error_count = 0 while True: try: executed = self.run_once() except Exception: error_count += 1 traceback.print_exc() executed = False if executed: executed_count += 1 idle_count = 0 else: idle_count += 1 if self.task_queue is None: time.sleep(self.settings.worker_poll_interval_seconds) if self.settings.worker_max_idle_cycles is not None: if idle_count >= self.settings.worker_max_idle_cycles: return SchedulerWorkerStats( worker_key=self.worker_key, executed_count=executed_count, idle_count=idle_count, error_count=error_count) def run_once(self) -> bool: self._wait_for_queue_signal() db = self.session_factory() try: repository = ScheduledJobRepository(db) jobs = repository.claim_due_jobs( worker_key=self.worker_key, lease_seconds=self.settings.worker_lease_seconds, limit=self.settings.worker_claim_limit, now_time=datetime.utcnow()) for job in jobs: self._execute_claimed_job(repository=repository, job=job) return bool(jobs) finally: db.close() def _wait_for_queue_signal(self) -> None: if self.task_queue is None: return try: self.task_queue.dequeue( timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds))) except Exception: return def _execute_claimed_job( self, *, repository: ScheduledJobRepository, job: ScheduledJob) -> None: lock = None idempotency_store = None if self.redis_client is not None: from core_shared.redis_primitives import DistributedLock, IdempotencyStore lock = DistributedLock( client=self.redis_client, name=f"scheduled-job:{job.id}:lock", ttl_seconds=self.settings.worker_lease_seconds) if not lock.acquire(): return idempotency_store = IdempotencyStore( client=self.redis_client, prefix="scheduled-job-idempotency") if not idempotency_store.begin(key=job.id): lock.release() return try: self.executor.execute(job) except Exception as exc: repository.update_status( job_id=job.id, status="failed", last_error_message=str(exc)) traceback.print_exc() if lock is not None: lock.release() return if idempotency_store is not None: idempotency_store.complete( key=job.id, result={"status": "completed", "scheduled_job_id": job.id}) if lock is not None: lock.release() repository.update_status(job_id=job.id, status="completed") def _optional_str(value: JSONValue) -> str | None: if value is None: return None return str(value) def _dict_json( value: JSONValue, *, fallback: dict[str, JSONValue]) -> dict[str, JSONValue]: if isinstance(value, dict): return value return fallback def build_worker_key() -> str: configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY") if configured_key: return configured_key hostname = socket.gethostname() return f"{hostname}-{uuid4().hex[:8]}" def main() -> None: settings = SchedulerServiceSettings() worker = SchedulerWorker( settings=settings, session_factory=build_session_factory(settings), worker_key=build_worker_key()) stats = worker.run_forever() print( "scheduler-worker stopped " f"worker_key={stats.worker_key} " f"executed_count={stats.executed_count} " f"idle_count={stats.idle_count} " f"error_count={stats.error_count}", flush=True) if __name__ == "__main__": main()