from __future__ import annotations import os import socket import time import traceback from dataclasses import dataclass from math import ceil from uuid import uuid4 from core_shared import try_build_redis_client from core_shared.task_queue import RUNTIME_NODE_RUN_QUEUE, build_task_queue_consumer from sqlalchemy.orm import Session, sessionmaker from app.application.services import build_runtime_application_service from app.bootstrap.settings import RuntimeServiceSettings from app.db.session import build_session_factory @dataclass(frozen=True) class RuntimeWorkerStats: worker_key: str executed_count: int = 0 idle_count: int = 0 error_count: int = 0 class RuntimeWorker: def __init__( self, *, settings: RuntimeServiceSettings, session_factory: sessionmaker[Session], worker_key: str) -> None: self.settings = settings self.session_factory = session_factory self.worker_key = worker_key self.redis_client = try_build_redis_client(settings.redis_url) self.task_queue = build_task_queue_consumer( client=self.redis_client, queue_name=RUNTIME_NODE_RUN_QUEUE) def run_forever(self) -> RuntimeWorkerStats: executed_count = 0 idle_count = 0 error_count = 0 while True: try: executed = self.run_once() except Exception: error_count += 1 traceback.print_exc() executed = False if executed: executed_count += 1 idle_count = 0 else: idle_count += 1 if self.task_queue is None: time.sleep(self.settings.worker_poll_interval_seconds) if self.settings.worker_max_idle_cycles is not None: if idle_count >= self.settings.worker_max_idle_cycles: return RuntimeWorkerStats( worker_key=self.worker_key, executed_count=executed_count, idle_count=idle_count, error_count=error_count) def run_once(self) -> bool: self._wait_for_queue_signal() db = self.session_factory() try: service = build_runtime_application_service(db=db, settings=self.settings) result = service.execute_next_claimed_node_run( worker_key=self.worker_key, lease_seconds=self.settings.worker_lease_seconds, redis_client=self.redis_client) return result is not None finally: db.close() def _wait_for_queue_signal(self) -> None: if self.task_queue is None: return try: self.task_queue.dequeue( timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds))) except Exception: return def build_worker_key() -> str: configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY") if configured_key: return configured_key hostname = socket.gethostname() return f"{hostname}-{uuid4().hex[:8]}" def main() -> None: settings = RuntimeServiceSettings() worker = RuntimeWorker( settings=settings, session_factory=build_session_factory(settings), worker_key=build_worker_key()) stats = worker.run_forever() print( "runtime-worker stopped " f"worker_key={stats.worker_key} " f"executed_count={stats.executed_count} " f"idle_count={stats.idle_count} " f"error_count={stats.error_count}", flush=True) if __name__ == "__main__": main()