worker.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from uuid import uuid4
  8. from sqlalchemy.orm import Session, sessionmaker
  9. from core_shared import try_build_redis_client
  10. from app.application.services import build_agent_application_service
  11. from app.bootstrap.settings import AgentServiceSettings
  12. from app.db.session import build_session_factory
  13. @dataclass(frozen=True)
  14. class AgentWorkerStats:
  15. worker_key: str
  16. executed_count: int = 0
  17. idle_count: int = 0
  18. error_count: int = 0
  19. class AgentWorker:
  20. def __init__(
  21. self,
  22. *,
  23. settings: AgentServiceSettings,
  24. session_factory: sessionmaker[Session],
  25. worker_key: str,
  26. ) -> None:
  27. self.settings = settings
  28. self.session_factory = session_factory
  29. self.worker_key = worker_key
  30. self.redis_client = try_build_redis_client(settings.redis_url)
  31. def run_forever(self) -> AgentWorkerStats:
  32. executed_count = 0
  33. idle_count = 0
  34. error_count = 0
  35. while True:
  36. try:
  37. executed = self.run_once()
  38. except Exception:
  39. error_count += 1
  40. traceback.print_exc()
  41. executed = False
  42. if executed:
  43. executed_count += 1
  44. idle_count = 0
  45. else:
  46. idle_count += 1
  47. time.sleep(self.settings.worker_poll_interval_seconds)
  48. if self.settings.worker_max_idle_cycles is not None:
  49. if idle_count >= self.settings.worker_max_idle_cycles:
  50. return AgentWorkerStats(
  51. worker_key=self.worker_key,
  52. executed_count=executed_count,
  53. idle_count=idle_count,
  54. error_count=error_count,
  55. )
  56. def run_once(self) -> bool:
  57. db = self.session_factory()
  58. try:
  59. service = build_agent_application_service(db=db, settings=self.settings)
  60. result = service.execute_next_claimed_agent_run(
  61. worker_key=self.worker_key,
  62. lease_seconds=self.settings.worker_lease_seconds,
  63. dry_run=self.settings.worker_dry_run,
  64. redis_client=self.redis_client,
  65. )
  66. return result is not None
  67. finally:
  68. db.close()
  69. def build_worker_key() -> str:
  70. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  71. if configured_key:
  72. return configured_key
  73. hostname = socket.gethostname()
  74. return f"{hostname}-{uuid4().hex[:8]}"
  75. def main() -> None:
  76. settings = AgentServiceSettings()
  77. worker = AgentWorker(
  78. settings=settings,
  79. session_factory=build_session_factory(settings),
  80. worker_key=build_worker_key(),
  81. )
  82. stats = worker.run_forever()
  83. print(
  84. "agent-worker stopped "
  85. f"worker_key={stats.worker_key} "
  86. f"executed_count={stats.executed_count} "
  87. f"idle_count={stats.idle_count} "
  88. f"error_count={stats.error_count}",
  89. flush=True,
  90. )
  91. if __name__ == "__main__":
  92. main()