| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- from __future__ import annotations
- import os
- import socket
- import time
- import traceback
- from dataclasses import dataclass
- from math import ceil
- from uuid import uuid4
- from sqlalchemy.orm import Session, sessionmaker
- from core_shared import try_build_redis_client
- from core_shared.task_queue import AGENT_RUN_QUEUE, build_task_queue_consumer
- from app.application.services import build_agent_application_service
- from app.bootstrap.settings import AgentServiceSettings
- from app.db.session import build_session_factory
- @dataclass(frozen=True)
- class AgentWorkerStats:
- worker_key: str
- executed_count: int = 0
- idle_count: int = 0
- error_count: int = 0
- class AgentWorker:
- def __init__(
- self,
- *,
- settings: AgentServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str,
- ) -> None:
- self.settings = settings
- self.session_factory = session_factory
- self.worker_key = worker_key
- self.redis_client = try_build_redis_client(settings.redis_url)
- self.task_queue = build_task_queue_consumer(
- client=self.redis_client,
- queue_name=AGENT_RUN_QUEUE,
- )
- def run_forever(self) -> AgentWorkerStats:
- executed_count = 0
- idle_count = 0
- error_count = 0
- while True:
- try:
- executed = self.run_once()
- except Exception:
- error_count += 1
- traceback.print_exc()
- executed = False
- if executed:
- executed_count += 1
- idle_count = 0
- else:
- idle_count += 1
- if self.task_queue is None:
- time.sleep(self.settings.worker_poll_interval_seconds)
- if self.settings.worker_max_idle_cycles is not None:
- if idle_count >= self.settings.worker_max_idle_cycles:
- return AgentWorkerStats(
- worker_key=self.worker_key,
- executed_count=executed_count,
- idle_count=idle_count,
- error_count=error_count,
- )
- def run_once(self) -> bool:
- self._wait_for_queue_signal()
- db = self.session_factory()
- try:
- service = build_agent_application_service(db=db, settings=self.settings)
- result = service.execute_next_claimed_agent_run(
- worker_key=self.worker_key,
- lease_seconds=self.settings.worker_lease_seconds,
- dry_run=self.settings.worker_dry_run,
- redis_client=self.redis_client,
- )
- return result is not None
- finally:
- db.close()
- def _wait_for_queue_signal(self) -> None:
- if self.task_queue is None:
- return
- try:
- self.task_queue.dequeue(
- timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)),
- )
- except Exception:
- return
- def build_worker_key() -> str:
- configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
- if configured_key:
- return configured_key
- hostname = socket.gethostname()
- return f"{hostname}-{uuid4().hex[:8]}"
- def main() -> None:
- settings = AgentServiceSettings()
- worker = AgentWorker(
- settings=settings,
- session_factory=build_session_factory(settings),
- worker_key=build_worker_key(),
- )
- stats = worker.run_forever()
- print(
- "agent-worker stopped "
- f"worker_key={stats.worker_key} "
- f"executed_count={stats.executed_count} "
- f"idle_count={stats.idle_count} "
- f"error_count={stats.error_count}",
- flush=True,
- )
- if __name__ == "__main__":
- main()
|