worker.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from uuid import uuid4
  8. from sqlalchemy.orm import sessionmaker, Session
  9. from app.application.services import build_runtime_application_service
  10. from app.bootstrap.settings import RuntimeServiceSettings
  11. from app.db.session import build_session_factory
  12. @dataclass(frozen=True)
  13. class RuntimeWorkerStats:
  14. worker_key: str
  15. executed_count: int = 0
  16. idle_count: int = 0
  17. error_count: int = 0
  18. class RuntimeWorker:
  19. def __init__(
  20. self,
  21. *,
  22. settings: RuntimeServiceSettings,
  23. session_factory: sessionmaker[Session],
  24. worker_key: str,
  25. ) -> None:
  26. self.settings = settings
  27. self.session_factory = session_factory
  28. self.worker_key = worker_key
  29. def run_forever(self) -> RuntimeWorkerStats:
  30. executed_count = 0
  31. idle_count = 0
  32. error_count = 0
  33. while True:
  34. try:
  35. executed = self.run_once()
  36. except Exception:
  37. error_count += 1
  38. traceback.print_exc()
  39. executed = False
  40. if executed:
  41. executed_count += 1
  42. idle_count = 0
  43. else:
  44. idle_count += 1
  45. time.sleep(self.settings.worker_poll_interval_seconds)
  46. if self.settings.worker_max_idle_cycles is not None:
  47. if idle_count >= self.settings.worker_max_idle_cycles:
  48. return RuntimeWorkerStats(
  49. worker_key=self.worker_key,
  50. executed_count=executed_count,
  51. idle_count=idle_count,
  52. error_count=error_count,
  53. )
  54. def run_once(self) -> bool:
  55. db = self.session_factory()
  56. try:
  57. service = build_runtime_application_service(db=db, settings=self.settings)
  58. result = service.execute_next_claimed_node_run(
  59. worker_key=self.worker_key,
  60. lease_seconds=self.settings.worker_lease_seconds,
  61. )
  62. return result is not None
  63. finally:
  64. db.close()
  65. def build_worker_key() -> str:
  66. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  67. if configured_key:
  68. return configured_key
  69. hostname = socket.gethostname()
  70. return f"{hostname}-{uuid4().hex[:8]}"
  71. def main() -> None:
  72. settings = RuntimeServiceSettings()
  73. worker = RuntimeWorker(
  74. settings=settings,
  75. session_factory=build_session_factory(settings),
  76. worker_key=build_worker_key(),
  77. )
  78. stats = worker.run_forever()
  79. print(
  80. "runtime-worker stopped "
  81. f"worker_key={stats.worker_key} "
  82. f"executed_count={stats.executed_count} "
  83. f"idle_count={stats.idle_count} "
  84. f"error_count={stats.error_count}",
  85. flush=True,
  86. )
  87. if __name__ == "__main__":
  88. main()