from __future__ import annotations import os import socket import time import traceback from dataclasses import dataclass from math import ceil from threading import Event, Thread from uuid import uuid4 from core_shared import JSONValue, try_build_redis_client from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, build_task_queue_consumer from sqlalchemy.orm import Session, sessionmaker from app.application.services import build_tool_application_service from app.bootstrap.settings import ToolServiceSettings from app.db.session import build_session_factory @dataclass(frozen=True) class ToolWorkerStats: worker_key: str executed_count: int = 0 idle_count: int = 0 error_count: int = 0 class ToolWorker: def __init__( self, *, settings: ToolServiceSettings, session_factory: sessionmaker[Session], worker_key: str) -> None: self.settings = settings self.session_factory = session_factory self.worker_key = worker_key self.redis_client = try_build_redis_client(settings.redis_url) self.task_queue = build_task_queue_consumer( client=self.redis_client, queue_name=TOOL_MCP_DISCOVERY_QUEUE) def run_forever(self) -> ToolWorkerStats: executed_count = 0 idle_count = 0 error_count = 0 while True: try: executed = self.run_once() except Exception: error_count += 1 traceback.print_exc() executed = False if executed: executed_count += 1 idle_count = 0 else: idle_count += 1 if self.task_queue is None: time.sleep(self.settings.worker_poll_interval_seconds) if self.settings.worker_max_idle_cycles is not None: if idle_count >= self.settings.worker_max_idle_cycles: return ToolWorkerStats( worker_key=self.worker_key, executed_count=executed_count, idle_count=idle_count, error_count=error_count) def run_once(self) -> bool: queue_payload = self._wait_for_queue_signal() db = self.session_factory() try: service = build_tool_application_service(db=db, settings=self.settings) if queue_payload is not None: connection_id = _optional_str(queue_payload.get("connection_id")) job_id = _optional_str(queue_payload.get("job_id")) if connection_id is not None: result = service.execute_mcp_discovery_job( connection_id=connection_id, job_id=job_id, worker_key=self.worker_key, lease_seconds=self.settings.worker_lease_seconds, redis_client=self.redis_client) if result is not None: return True result = service.execute_next_pending_mcp_discovery( worker_key=self.worker_key, lease_seconds=self.settings.worker_lease_seconds, stale_discovery_seconds=self.settings.worker_stale_discovery_seconds, redis_client=self.redis_client) return result is not None finally: db.close() def _wait_for_queue_signal(self) -> dict[str, JSONValue] | None: if self.task_queue is None: return None try: return self.task_queue.dequeue( timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds))) except Exception: return None class BackgroundToolWorker: def __init__( self, *, settings: ToolServiceSettings, session_factory: sessionmaker[Session], worker_key: str) -> None: self.settings = settings self.worker = ToolWorker( settings=settings, session_factory=session_factory, worker_key=worker_key) self.stop_event = Event() self.thread = Thread( target=self._run, name=f"tool-auto-worker-{worker_key}", daemon=True) def start(self) -> None: self.thread.start() def stop(self, *, timeout_seconds: float) -> None: self.stop_event.set() self.thread.join(timeout=timeout_seconds) def _run(self) -> None: while not self.stop_event.is_set(): try: executed = self.worker.run_once() except Exception: traceback.print_exc() executed = False if not executed: self.stop_event.wait(self.settings.worker_poll_interval_seconds) def _optional_str(value: JSONValue) -> str | None: return value if isinstance(value, str) and value else None def build_worker_key() -> str: configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY") if configured_key: return configured_key hostname = socket.gethostname() return f"{hostname}-{uuid4().hex[:8]}" def main() -> None: settings = ToolServiceSettings() worker = ToolWorker( settings=settings, session_factory=build_session_factory(settings), worker_key=build_worker_key()) stats = worker.run_forever() print( "tool-worker stopped " f"worker_key={stats.worker_key} " f"executed_count={stats.executed_count} " f"idle_count={stats.idle_count} " f"error_count={stats.error_count}", flush=True) if __name__ == "__main__": main()