| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- from __future__ import annotations
- import os
- import socket
- import time
- import traceback
- from dataclasses import dataclass
- from datetime import datetime
- from uuid import uuid4
- import httpx
- from sqlalchemy.orm import Session, sessionmaker
- from core_shared import JSONValue
- from app.bootstrap.settings import SchedulerServiceSettings
- from app.db.models import ScheduledJob
- from app.db.session import build_session_factory
- from app.domain.repositories import ScheduledJobRepository
- @dataclass(frozen=True)
- class SchedulerWorkerStats:
- worker_key: str
- executed_count: int = 0
- idle_count: int = 0
- error_count: int = 0
- class ScheduledJobExecutor:
- def __init__(self, *, settings: SchedulerServiceSettings) -> None:
- self.settings = settings
- def execute(self, job: ScheduledJob) -> None:
- if job.job_type == "event":
- self._publish_event(job)
- return
- self._call_target(job)
- def _publish_event(self, job: ScheduledJob) -> None:
- payload = job.payload_json
- target_url = job.target_url or f"{self.settings.event_service_url.rstrip('/')}/events"
- event_payload: dict[str, JSONValue] = {
- "tenant_id": job.tenant_id,
- "event_type": str(payload.get("event_type", "scheduled.job.due")),
- "source_service": str(payload.get("source_service", "scheduler-service")),
- "aggregate_type": _optional_str(payload.get("aggregate_type")),
- "aggregate_id": _optional_str(payload.get("aggregate_id")),
- "correlation_id": _optional_str(payload.get("correlation_id")),
- "causation_id": str(payload.get("causation_id", job.id)),
- "payload_json": _dict_json(payload.get("payload_json"), fallback=payload),
- "metadata_json": job.metadata_json,
- }
- self._request(method=job.method or "POST", url=target_url, payload=event_payload)
- def _call_target(self, job: ScheduledJob) -> None:
- if not job.target_url:
- raise ValueError(f"scheduled job requires target_url for job_type={job.job_type}")
- self._request(
- method=job.method or "POST",
- url=job.target_url,
- payload=job.payload_json,
- )
- def _request(
- self,
- *,
- method: str,
- url: str,
- payload: dict[str, JSONValue],
- ) -> None:
- with httpx.Client(timeout=self.settings.worker_request_timeout_seconds) as client:
- response = client.request(method=method, url=url, json=payload)
- response.raise_for_status()
- class SchedulerWorker:
- def __init__(
- self,
- *,
- settings: SchedulerServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str,
- ) -> None:
- self.settings = settings
- self.session_factory = session_factory
- self.worker_key = worker_key
- self.executor = ScheduledJobExecutor(settings=settings)
- def run_forever(self) -> SchedulerWorkerStats:
- executed_count = 0
- idle_count = 0
- error_count = 0
- while True:
- try:
- executed = self.run_once()
- except Exception:
- error_count += 1
- traceback.print_exc()
- executed = False
- if executed:
- executed_count += 1
- idle_count = 0
- else:
- idle_count += 1
- time.sleep(self.settings.worker_poll_interval_seconds)
- if self.settings.worker_max_idle_cycles is not None:
- if idle_count >= self.settings.worker_max_idle_cycles:
- return SchedulerWorkerStats(
- worker_key=self.worker_key,
- executed_count=executed_count,
- idle_count=idle_count,
- error_count=error_count,
- )
- def run_once(self) -> bool:
- db = self.session_factory()
- try:
- repository = ScheduledJobRepository(db)
- jobs = repository.claim_due_jobs(
- tenant_id=None,
- worker_key=self.worker_key,
- lease_seconds=self.settings.worker_lease_seconds,
- limit=self.settings.worker_claim_limit,
- now_time=datetime.utcnow(),
- )
- for job in jobs:
- self._execute_claimed_job(repository=repository, job=job)
- return bool(jobs)
- finally:
- db.close()
- def _execute_claimed_job(
- self,
- *,
- repository: ScheduledJobRepository,
- job: ScheduledJob,
- ) -> None:
- try:
- self.executor.execute(job)
- except Exception as exc:
- repository.update_status(
- job_id=job.id,
- status="failed",
- last_error_message=str(exc),
- )
- traceback.print_exc()
- return
- repository.update_status(job_id=job.id, status="completed")
- def _optional_str(value: JSONValue) -> str | None:
- if value is None:
- return None
- return str(value)
- def _dict_json(
- value: JSONValue,
- *,
- fallback: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- if isinstance(value, dict):
- return value
- return fallback
- def build_worker_key() -> str:
- configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
- if configured_key:
- return configured_key
- hostname = socket.gethostname()
- return f"{hostname}-{uuid4().hex[:8]}"
- def main() -> None:
- settings = SchedulerServiceSettings()
- worker = SchedulerWorker(
- settings=settings,
- session_factory=build_session_factory(settings),
- worker_key=build_worker_key(),
- )
- stats = worker.run_forever()
- print(
- "scheduler-worker stopped "
- f"worker_key={stats.worker_key} "
- f"executed_count={stats.executed_count} "
- f"idle_count={stats.idle_count} "
- f"error_count={stats.error_count}",
- flush=True,
- )
- if __name__ == "__main__":
- main()
|