from __future__ import annotations import os import socket import time import traceback from dataclasses import dataclass from uuid import uuid4 from sqlalchemy.orm import sessionmaker, Session from app.application.services import build_runtime_application_service from app.bootstrap.settings import RuntimeServiceSettings from app.db.session import build_session_factory @dataclass(frozen=True) class RuntimeWorkerStats: worker_key: str executed_count: int = 0 idle_count: int = 0 error_count: int = 0 class RuntimeWorker: def __init__( self, *, settings: RuntimeServiceSettings, session_factory: sessionmaker[Session], worker_key: str, ) -> None: self.settings = settings self.session_factory = session_factory self.worker_key = worker_key def run_forever(self) -> RuntimeWorkerStats: executed_count = 0 idle_count = 0 error_count = 0 while True: try: executed = self.run_once() except Exception: error_count += 1 traceback.print_exc() executed = False if executed: executed_count += 1 idle_count = 0 else: idle_count += 1 time.sleep(self.settings.worker_poll_interval_seconds) if self.settings.worker_max_idle_cycles is not None: if idle_count >= self.settings.worker_max_idle_cycles: return RuntimeWorkerStats( worker_key=self.worker_key, executed_count=executed_count, idle_count=idle_count, error_count=error_count, ) def run_once(self) -> bool: db = self.session_factory() try: service = build_runtime_application_service(db=db, settings=self.settings) result = service.execute_next_claimed_node_run( worker_key=self.worker_key, lease_seconds=self.settings.worker_lease_seconds, ) return result is not None finally: db.close() def build_worker_key() -> str: configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY") if configured_key: return configured_key hostname = socket.gethostname() return f"{hostname}-{uuid4().hex[:8]}" def main() -> None: settings = RuntimeServiceSettings() worker = RuntimeWorker( settings=settings, session_factory=build_session_factory(settings), worker_key=build_worker_key(), ) stats = worker.run_forever() print( "runtime-worker stopped " f"worker_key={stats.worker_key} " f"executed_count={stats.executed_count} " f"idle_count={stats.idle_count} " f"error_count={stats.error_count}", flush=True, ) if __name__ == "__main__": main()