worker.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from uuid import uuid4
  8. from sqlalchemy.orm import Session, sessionmaker
  9. from app.application.services import build_team_application_service
  10. from app.bootstrap.settings import TeamServiceSettings
  11. from app.db.session import build_session_factory
  12. from app.domain.repositories import (
  13. TeamDefinitionRepository,
  14. TeamRunRepository,
  15. TeamVersionRepository,
  16. )
  17. @dataclass(frozen=True)
  18. class TeamWorkerStats:
  19. worker_key: str
  20. executed_count: int = 0
  21. idle_count: int = 0
  22. error_count: int = 0
  23. class TeamWorker:
  24. def __init__(
  25. self,
  26. *,
  27. settings: TeamServiceSettings,
  28. session_factory: sessionmaker[Session],
  29. worker_key: str,
  30. ) -> None:
  31. self.settings = settings
  32. self.session_factory = session_factory
  33. self.worker_key = worker_key
  34. def run_forever(self) -> TeamWorkerStats:
  35. executed_count = 0
  36. idle_count = 0
  37. error_count = 0
  38. while True:
  39. try:
  40. executed = self.run_once()
  41. except Exception:
  42. error_count += 1
  43. traceback.print_exc()
  44. executed = False
  45. if executed:
  46. executed_count += 1
  47. idle_count = 0
  48. else:
  49. idle_count += 1
  50. time.sleep(self.settings.worker_poll_interval_seconds)
  51. if self.settings.worker_max_idle_cycles is not None:
  52. if idle_count >= self.settings.worker_max_idle_cycles:
  53. return TeamWorkerStats(
  54. worker_key=self.worker_key,
  55. executed_count=executed_count,
  56. idle_count=idle_count,
  57. error_count=error_count,
  58. )
  59. def run_once(self) -> bool:
  60. db = self.session_factory()
  61. try:
  62. service = build_team_application_service(
  63. team_repository=TeamDefinitionRepository(db),
  64. team_version_repository=TeamVersionRepository(db),
  65. team_run_repository=TeamRunRepository(db),
  66. settings=self.settings,
  67. )
  68. result = service.execute_next_claimed_team_run(
  69. worker_key=self.worker_key,
  70. lease_seconds=self.settings.worker_lease_seconds,
  71. dry_run=self.settings.worker_dry_run,
  72. )
  73. return result is not None
  74. finally:
  75. db.close()
  76. def build_worker_key() -> str:
  77. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  78. if configured_key:
  79. return configured_key
  80. hostname = socket.gethostname()
  81. return f"{hostname}-{uuid4().hex[:8]}"
  82. def main() -> None:
  83. settings = TeamServiceSettings()
  84. worker = TeamWorker(
  85. settings=settings,
  86. session_factory=build_session_factory(settings),
  87. worker_key=build_worker_key(),
  88. )
  89. stats = worker.run_forever()
  90. print(
  91. "team-worker stopped "
  92. f"worker_key={stats.worker_key} "
  93. f"executed_count={stats.executed_count} "
  94. f"idle_count={stats.idle_count} "
  95. f"error_count={stats.error_count}",
  96. flush=True,
  97. )
  98. if __name__ == "__main__":
  99. main()