worker.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from __future__ import annotations
  2. import os
  3. import socket
  4. import time
  5. import traceback
  6. from dataclasses import dataclass
  7. from math import ceil
  8. from threading import Event, Thread
  9. from uuid import uuid4
  10. from core_shared import JSONValue, try_build_redis_client
  11. from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, build_task_queue_consumer
  12. from sqlalchemy.orm import Session, sessionmaker
  13. from app.application.services import build_knowledge_application_service
  14. from app.bootstrap.settings import KnowledgeServiceSettings
  15. from app.db.session import build_session_factory
  16. @dataclass(frozen=True)
  17. class KnowledgeWorkerStats:
  18. worker_key: str
  19. executed_count: int = 0
  20. idle_count: int = 0
  21. error_count: int = 0
  22. class KnowledgeWorker:
  23. def __init__(
  24. self,
  25. *,
  26. settings: KnowledgeServiceSettings,
  27. session_factory: sessionmaker[Session],
  28. worker_key: str) -> None:
  29. self.settings = settings
  30. self.session_factory = session_factory
  31. self.worker_key = worker_key
  32. self.redis_client = try_build_redis_client(settings.redis_url)
  33. self.task_queue = build_task_queue_consumer(
  34. client=self.redis_client,
  35. queue_name=KNOWLEDGE_DOCUMENT_QUEUE)
  36. def run_forever(self) -> KnowledgeWorkerStats:
  37. executed_count = 0
  38. idle_count = 0
  39. error_count = 0
  40. while True:
  41. try:
  42. executed = self.run_once()
  43. except Exception:
  44. error_count += 1
  45. traceback.print_exc()
  46. executed = False
  47. if executed:
  48. executed_count += 1
  49. idle_count = 0
  50. else:
  51. idle_count += 1
  52. if self.task_queue is None:
  53. time.sleep(self.settings.worker_poll_interval_seconds)
  54. if self.settings.worker_max_idle_cycles is not None:
  55. if idle_count >= self.settings.worker_max_idle_cycles:
  56. return KnowledgeWorkerStats(
  57. worker_key=self.worker_key,
  58. executed_count=executed_count,
  59. idle_count=idle_count,
  60. error_count=error_count)
  61. def run_once(self) -> bool:
  62. queue_payload = self._wait_for_queue_signal()
  63. db = self.session_factory()
  64. try:
  65. service = build_knowledge_application_service(db=db, settings=self.settings)
  66. if queue_payload is not None:
  67. document_id = _optional_str(queue_payload.get("document_id"))
  68. action = _optional_str(queue_payload.get("action")) or "reindex"
  69. job_id = _optional_str(queue_payload.get("job_id"))
  70. if document_id is not None:
  71. result = service.execute_document_index_job(
  72. document_id=document_id,
  73. action=action,
  74. job_id=job_id,
  75. worker_key=self.worker_key,
  76. lease_seconds=self.settings.worker_lease_seconds,
  77. redis_client=self.redis_client)
  78. if result is not None:
  79. return True
  80. result = service.execute_next_pending_document_job(
  81. worker_key=self.worker_key,
  82. lease_seconds=self.settings.worker_lease_seconds,
  83. stale_indexing_seconds=self.settings.worker_stale_indexing_seconds,
  84. redis_client=self.redis_client)
  85. return result is not None
  86. finally:
  87. db.close()
  88. def _wait_for_queue_signal(self) -> dict[str, JSONValue] | None:
  89. if self.task_queue is None:
  90. return None
  91. try:
  92. return self.task_queue.dequeue(
  93. timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)))
  94. except Exception:
  95. return None
  96. class BackgroundKnowledgeWorker:
  97. def __init__(
  98. self,
  99. *,
  100. settings: KnowledgeServiceSettings,
  101. session_factory: sessionmaker[Session],
  102. worker_key: str) -> None:
  103. self.settings = settings
  104. self.worker = KnowledgeWorker(
  105. settings=settings,
  106. session_factory=session_factory,
  107. worker_key=worker_key)
  108. self.stop_event = Event()
  109. self.thread = Thread(
  110. target=self._run,
  111. name=f"knowledge-auto-worker-{worker_key}",
  112. daemon=True)
  113. def start(self) -> None:
  114. self.thread.start()
  115. def stop(self, *, timeout_seconds: float) -> None:
  116. self.stop_event.set()
  117. self.thread.join(timeout=timeout_seconds)
  118. def _run(self) -> None:
  119. while not self.stop_event.is_set():
  120. try:
  121. executed = self.worker.run_once()
  122. except Exception:
  123. traceback.print_exc()
  124. executed = False
  125. if not executed:
  126. self.stop_event.wait(self.settings.worker_poll_interval_seconds)
  127. def _optional_str(value: JSONValue) -> str | None:
  128. return value if isinstance(value, str) and value else None
  129. def build_worker_key() -> str:
  130. configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
  131. if configured_key:
  132. return configured_key
  133. hostname = socket.gethostname()
  134. return f"{hostname}-{uuid4().hex[:8]}"
  135. def main() -> None:
  136. settings = KnowledgeServiceSettings()
  137. worker = KnowledgeWorker(
  138. settings=settings,
  139. session_factory=build_session_factory(settings),
  140. worker_key=build_worker_key())
  141. stats = worker.run_forever()
  142. print(
  143. "knowledge-worker stopped "
  144. f"worker_key={stats.worker_key} "
  145. f"executed_count={stats.executed_count} "
  146. f"idle_count={stats.idle_count} "
  147. f"error_count={stats.error_count}",
  148. flush=True)
  149. if __name__ == "__main__":
  150. main()