worker.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from math import ceil
  8. from threading import Event, Thread
  9. from uuid import uuid4
  10. from core_shared import try_build_redis_client
  11. from core_shared.task_queue import TEAM_RUN_QUEUE, build_task_queue_consumer
  12. from sqlalchemy.orm import Session, sessionmaker
  13. from app.application.services import build_team_application_service
  14. from app.bootstrap.settings import TeamServiceSettings
  15. from app.db.session import build_session_factory
  16. from app.domain.repositories import (
  17. TeamDefinitionRepository,
  18. TeamRunRepository,
  19. TeamConfigRepository,
  20. )
  21. @dataclass(frozen=True)
  22. class TeamWorkerStats:
  23. worker_key: str
  24. executed_count: int = 0
  25. idle_count: int = 0
  26. error_count: int = 0
  27. class TeamWorker:
  28. def __init__(
  29. self,
  30. *,
  31. settings: TeamServiceSettings,
  32. session_factory: sessionmaker[Session],
  33. worker_key: str) -> None:
  34. self.settings = settings
  35. self.session_factory = session_factory
  36. self.worker_key = worker_key
  37. self.redis_client = try_build_redis_client(settings.redis_url)
  38. self.task_queue = build_task_queue_consumer(
  39. client=self.redis_client,
  40. queue_name=TEAM_RUN_QUEUE)
  41. def run_forever(self) -> TeamWorkerStats:
  42. executed_count = 0
  43. idle_count = 0
  44. error_count = 0
  45. while True:
  46. try:
  47. executed = self.run_once()
  48. except Exception:
  49. error_count += 1
  50. traceback.print_exc()
  51. executed = False
  52. if executed:
  53. executed_count += 1
  54. idle_count = 0
  55. else:
  56. idle_count += 1
  57. if self.task_queue is None:
  58. time.sleep(self.settings.worker_poll_interval_seconds)
  59. if self.settings.worker_max_idle_cycles is not None:
  60. if idle_count >= self.settings.worker_max_idle_cycles:
  61. return TeamWorkerStats(
  62. worker_key=self.worker_key,
  63. executed_count=executed_count,
  64. idle_count=idle_count,
  65. error_count=error_count)
  66. def run_once(self) -> bool:
  67. self._wait_for_queue_signal()
  68. db = self.session_factory()
  69. try:
  70. service = build_team_application_service(
  71. team_repository=TeamDefinitionRepository(db),
  72. team_config_repository=TeamConfigRepository(db),
  73. team_run_repository=TeamRunRepository(db),
  74. settings=self.settings)
  75. result = service.execute_next_claimed_team_run(
  76. worker_key=self.worker_key,
  77. lease_seconds=self.settings.worker_lease_seconds,
  78. stale_running_seconds=self.settings.worker_stale_running_seconds,
  79. dry_run=self.settings.worker_dry_run,
  80. redis_client=self.redis_client)
  81. return result is not None
  82. finally:
  83. db.close()
  84. def _wait_for_queue_signal(self) -> None:
  85. if self.task_queue is None:
  86. return
  87. try:
  88. self.task_queue.dequeue(
  89. timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)))
  90. except Exception:
  91. return
  92. class BackgroundTeamWorker:
  93. def __init__(
  94. self,
  95. *,
  96. settings: TeamServiceSettings,
  97. session_factory: sessionmaker[Session],
  98. worker_key: str) -> None:
  99. self.settings = settings
  100. self.worker = TeamWorker(
  101. settings=settings,
  102. session_factory=session_factory,
  103. worker_key=worker_key)
  104. self.stop_event = Event()
  105. self.thread = Thread(
  106. target=self._run,
  107. name=f"team-auto-worker-{worker_key}",
  108. daemon=True)
  109. def start(self) -> None:
  110. self.thread.start()
  111. def stop(self, *, timeout_seconds: float) -> None:
  112. self.stop_event.set()
  113. self.thread.join(timeout=timeout_seconds)
  114. def _run(self) -> None:
  115. while not self.stop_event.is_set():
  116. try:
  117. executed = self.worker.run_once()
  118. except Exception:
  119. traceback.print_exc()
  120. executed = False
  121. if not executed:
  122. self.stop_event.wait(self.settings.worker_poll_interval_seconds)
  123. def build_worker_key() -> str:
  124. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  125. if configured_key:
  126. return configured_key
  127. hostname = socket.gethostname()
  128. return f"{hostname}-{uuid4().hex[:8]}"
  129. def main() -> None:
  130. settings = TeamServiceSettings()
  131. worker = TeamWorker(
  132. settings=settings,
  133. session_factory=build_session_factory(settings),
  134. worker_key=build_worker_key())
  135. stats = worker.run_forever()
  136. print(
  137. "team-worker stopped "
  138. f"worker_key={stats.worker_key} "
  139. f"executed_count={stats.executed_count} "
  140. f"idle_count={stats.idle_count} "
  141. f"error_count={stats.error_count}",
  142. flush=True)
  143. if __name__ == "__main__":
  144. main()