| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- from __future__ import annotations
- import os
- import socket
- import time
- import traceback
- from dataclasses import dataclass
- from uuid import uuid4
- from sqlalchemy.orm import Session, sessionmaker
- from app.application.services import build_team_application_service
- from app.bootstrap.settings import TeamServiceSettings
- from app.db.session import build_session_factory
- from app.domain.repositories import (
- TeamDefinitionRepository,
- TeamRunRepository,
- TeamVersionRepository,
- )
- @dataclass(frozen=True)
- class TeamWorkerStats:
- worker_key: str
- executed_count: int = 0
- idle_count: int = 0
- error_count: int = 0
- class TeamWorker:
- def __init__(
- self,
- *,
- settings: TeamServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str,
- ) -> None:
- self.settings = settings
- self.session_factory = session_factory
- self.worker_key = worker_key
- def run_forever(self) -> TeamWorkerStats:
- executed_count = 0
- idle_count = 0
- error_count = 0
- while True:
- try:
- executed = self.run_once()
- except Exception:
- error_count += 1
- traceback.print_exc()
- executed = False
- if executed:
- executed_count += 1
- idle_count = 0
- else:
- idle_count += 1
- time.sleep(self.settings.worker_poll_interval_seconds)
- if self.settings.worker_max_idle_cycles is not None:
- if idle_count >= self.settings.worker_max_idle_cycles:
- return TeamWorkerStats(
- worker_key=self.worker_key,
- executed_count=executed_count,
- idle_count=idle_count,
- error_count=error_count,
- )
- def run_once(self) -> bool:
- db = self.session_factory()
- try:
- service = build_team_application_service(
- team_repository=TeamDefinitionRepository(db),
- team_version_repository=TeamVersionRepository(db),
- team_run_repository=TeamRunRepository(db),
- settings=self.settings,
- )
- result = service.execute_next_claimed_team_run(
- worker_key=self.worker_key,
- lease_seconds=self.settings.worker_lease_seconds,
- dry_run=self.settings.worker_dry_run,
- )
- return result is not None
- finally:
- db.close()
- def build_worker_key() -> str:
- configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
- if configured_key:
- return configured_key
- hostname = socket.gethostname()
- return f"{hostname}-{uuid4().hex[:8]}"
- def main() -> None:
- settings = TeamServiceSettings()
- worker = TeamWorker(
- settings=settings,
- session_factory=build_session_factory(settings),
- worker_key=build_worker_key(),
- )
- stats = worker.run_forever()
- print(
- "team-worker stopped "
- f"worker_key={stats.worker_key} "
- f"executed_count={stats.executed_count} "
- f"idle_count={stats.idle_count} "
- f"error_count={stats.error_count}",
- flush=True,
- )
- if __name__ == "__main__":
- main()
|