from __future__ import annotations import os import socket import time import traceback from dataclasses import dataclass from math import ceil from threading import Event, Thread from uuid import uuid4 from core_shared import try_build_redis_client from core_shared.task_queue import TEAM_RUN_QUEUE, build_task_queue_consumer from sqlalchemy.orm import Session, sessionmaker from app.application.services import build_team_application_service from app.bootstrap.settings import TeamServiceSettings from app.db.session import build_session_factory from app.domain.repositories import ( TeamDefinitionRepository, TeamRunRepository, TeamConfigRepository, ) @dataclass(frozen=True) class TeamWorkerStats: worker_key: str executed_count: int = 0 idle_count: int = 0 error_count: int = 0 class TeamWorker: def __init__( self, *, settings: TeamServiceSettings, session_factory: sessionmaker[Session], worker_key: str) -> None: self.settings = settings self.session_factory = session_factory self.worker_key = worker_key self.redis_client = try_build_redis_client(settings.redis_url) self.task_queue = build_task_queue_consumer( client=self.redis_client, queue_name=TEAM_RUN_QUEUE) def run_forever(self) -> TeamWorkerStats: executed_count = 0 idle_count = 0 error_count = 0 while True: try: executed = self.run_once() except Exception: error_count += 1 traceback.print_exc() executed = False if executed: executed_count += 1 idle_count = 0 else: idle_count += 1 if self.task_queue is None: time.sleep(self.settings.worker_poll_interval_seconds) if self.settings.worker_max_idle_cycles is not None: if idle_count >= self.settings.worker_max_idle_cycles: return TeamWorkerStats( worker_key=self.worker_key, executed_count=executed_count, idle_count=idle_count, error_count=error_count) def run_once(self) -> bool: self._wait_for_queue_signal() db = self.session_factory() try: service = build_team_application_service( team_repository=TeamDefinitionRepository(db), team_config_repository=TeamConfigRepository(db), team_run_repository=TeamRunRepository(db), settings=self.settings) result = service.execute_next_claimed_team_run( worker_key=self.worker_key, lease_seconds=self.settings.worker_lease_seconds, stale_running_seconds=self.settings.worker_stale_running_seconds, dry_run=self.settings.worker_dry_run, redis_client=self.redis_client) return result is not None finally: db.close() def _wait_for_queue_signal(self) -> None: if self.task_queue is None: return try: self.task_queue.dequeue( timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds))) except Exception: return class BackgroundTeamWorker: def __init__( self, *, settings: TeamServiceSettings, session_factory: sessionmaker[Session], worker_key: str) -> None: self.settings = settings self.worker = TeamWorker( settings=settings, session_factory=session_factory, worker_key=worker_key) self.stop_event = Event() self.thread = Thread( target=self._run, name=f"team-auto-worker-{worker_key}", daemon=True) def start(self) -> None: self.thread.start() def stop(self, *, timeout_seconds: float) -> None: self.stop_event.set() self.thread.join(timeout=timeout_seconds) def _run(self) -> None: while not self.stop_event.is_set(): try: executed = self.worker.run_once() except Exception: traceback.print_exc() executed = False if not executed: self.stop_event.wait(self.settings.worker_poll_interval_seconds) def build_worker_key() -> str: configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY") if configured_key: return configured_key hostname = socket.gethostname() return f"{hostname}-{uuid4().hex[:8]}" def main() -> None: settings = TeamServiceSettings() worker = TeamWorker( settings=settings, session_factory=build_session_factory(settings), worker_key=build_worker_key()) stats = worker.run_forever() print( "team-worker stopped " f"worker_key={stats.worker_key} " f"executed_count={stats.executed_count} " f"idle_count={stats.idle_count} " f"error_count={stats.error_count}", flush=True) if __name__ == "__main__": main()