worker.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from math import ceil
  8. from uuid import uuid4
  9. from sqlalchemy.orm import Session, sessionmaker
  10. from core_shared import try_build_redis_client
  11. from core_shared.task_queue import AGENT_RUN_QUEUE, build_task_queue_consumer
  12. from app.application.services import build_agent_application_service
  13. from app.bootstrap.settings import AgentServiceSettings
  14. from app.db.session import build_session_factory
  15. @dataclass(frozen=True)
  16. class AgentWorkerStats:
  17. worker_key: str
  18. executed_count: int = 0
  19. idle_count: int = 0
  20. error_count: int = 0
  21. class AgentWorker:
  22. def __init__(
  23. self,
  24. *,
  25. settings: AgentServiceSettings,
  26. session_factory: sessionmaker[Session],
  27. worker_key: str,
  28. ) -> None:
  29. self.settings = settings
  30. self.session_factory = session_factory
  31. self.worker_key = worker_key
  32. self.redis_client = try_build_redis_client(settings.redis_url)
  33. self.task_queue = build_task_queue_consumer(
  34. client=self.redis_client,
  35. queue_name=AGENT_RUN_QUEUE,
  36. )
  37. def run_forever(self) -> AgentWorkerStats:
  38. executed_count = 0
  39. idle_count = 0
  40. error_count = 0
  41. while True:
  42. try:
  43. executed = self.run_once()
  44. except Exception:
  45. error_count += 1
  46. traceback.print_exc()
  47. executed = False
  48. if executed:
  49. executed_count += 1
  50. idle_count = 0
  51. else:
  52. idle_count += 1
  53. if self.task_queue is None:
  54. time.sleep(self.settings.worker_poll_interval_seconds)
  55. if self.settings.worker_max_idle_cycles is not None:
  56. if idle_count >= self.settings.worker_max_idle_cycles:
  57. return AgentWorkerStats(
  58. worker_key=self.worker_key,
  59. executed_count=executed_count,
  60. idle_count=idle_count,
  61. error_count=error_count,
  62. )
  63. def run_once(self) -> bool:
  64. self._wait_for_queue_signal()
  65. db = self.session_factory()
  66. try:
  67. service = build_agent_application_service(db=db, settings=self.settings)
  68. result = service.execute_next_claimed_agent_run(
  69. worker_key=self.worker_key,
  70. lease_seconds=self.settings.worker_lease_seconds,
  71. dry_run=self.settings.worker_dry_run,
  72. redis_client=self.redis_client,
  73. )
  74. return result is not None
  75. finally:
  76. db.close()
  77. def _wait_for_queue_signal(self) -> None:
  78. if self.task_queue is None:
  79. return
  80. try:
  81. self.task_queue.dequeue(
  82. timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)),
  83. )
  84. except Exception:
  85. return
  86. def build_worker_key() -> str:
  87. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  88. if configured_key:
  89. return configured_key
  90. hostname = socket.gethostname()
  91. return f"{hostname}-{uuid4().hex[:8]}"
  92. def main() -> None:
  93. settings = AgentServiceSettings()
  94. worker = AgentWorker(
  95. settings=settings,
  96. session_factory=build_session_factory(settings),
  97. worker_key=build_worker_key(),
  98. )
  99. stats = worker.run_forever()
  100. print(
  101. "agent-worker stopped "
  102. f"worker_key={stats.worker_key} "
  103. f"executed_count={stats.executed_count} "
  104. f"idle_count={stats.idle_count} "
  105. f"error_count={stats.error_count}",
  106. flush=True,
  107. )
  108. if __name__ == "__main__":
  109. main()