| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- from __future__ import annotations
- import os
- import socket
- import time
- import traceback
- from dataclasses import dataclass
- from math import ceil
- from threading import Event, Thread
- from uuid import uuid4
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, build_task_queue_consumer
- from sqlalchemy.orm import Session, sessionmaker
- from app.application.services import build_knowledge_application_service
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.session import build_session_factory
- @dataclass(frozen=True)
- class KnowledgeWorkerStats:
- worker_key: str
- executed_count: int = 0
- idle_count: int = 0
- error_count: int = 0
- class KnowledgeWorker:
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str) -> None:
- self.settings = settings
- self.session_factory = session_factory
- self.worker_key = worker_key
- self.redis_client = try_build_redis_client(settings.redis_url)
- self.task_queue = build_task_queue_consumer(
- client=self.redis_client,
- queue_name=KNOWLEDGE_DOCUMENT_QUEUE)
- def run_forever(self) -> KnowledgeWorkerStats:
- executed_count = 0
- idle_count = 0
- error_count = 0
- while True:
- try:
- executed = self.run_once()
- except Exception:
- error_count += 1
- traceback.print_exc()
- executed = False
- if executed:
- executed_count += 1
- idle_count = 0
- else:
- idle_count += 1
- if self.task_queue is None:
- time.sleep(self.settings.worker_poll_interval_seconds)
- if self.settings.worker_max_idle_cycles is not None:
- if idle_count >= self.settings.worker_max_idle_cycles:
- return KnowledgeWorkerStats(
- worker_key=self.worker_key,
- executed_count=executed_count,
- idle_count=idle_count,
- error_count=error_count)
- def run_once(self) -> bool:
- queue_payload = self._wait_for_queue_signal()
- db = self.session_factory()
- try:
- service = build_knowledge_application_service(db=db, settings=self.settings)
- if queue_payload is not None:
- document_id = _optional_str(queue_payload.get("document_id"))
- action = _optional_str(queue_payload.get("action")) or "reindex"
- job_id = _optional_str(queue_payload.get("job_id"))
- if document_id is not None:
- result = service.execute_document_index_job(
- document_id=document_id,
- action=action,
- job_id=job_id,
- worker_key=self.worker_key,
- lease_seconds=self.settings.worker_lease_seconds,
- redis_client=self.redis_client)
- if result is not None:
- return True
- result = service.execute_next_pending_document_job(
- worker_key=self.worker_key,
- lease_seconds=self.settings.worker_lease_seconds,
- stale_indexing_seconds=self.settings.worker_stale_indexing_seconds,
- redis_client=self.redis_client)
- return result is not None
- finally:
- db.close()
- def _wait_for_queue_signal(self) -> dict[str, JSONValue] | None:
- if self.task_queue is None:
- return None
- try:
- return self.task_queue.dequeue(
- timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)))
- except Exception:
- return None
- class BackgroundKnowledgeWorker:
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str) -> None:
- self.settings = settings
- self.worker = KnowledgeWorker(
- settings=settings,
- session_factory=session_factory,
- worker_key=worker_key)
- self.stop_event = Event()
- self.thread = Thread(
- target=self._run,
- name=f"knowledge-auto-worker-{worker_key}",
- daemon=True)
- def start(self) -> None:
- self.thread.start()
- def stop(self, *, timeout_seconds: float) -> None:
- self.stop_event.set()
- self.thread.join(timeout=timeout_seconds)
- def _run(self) -> None:
- while not self.stop_event.is_set():
- try:
- executed = self.worker.run_once()
- except Exception:
- traceback.print_exc()
- executed = False
- if not executed:
- self.stop_event.wait(self.settings.worker_poll_interval_seconds)
- def _optional_str(value: JSONValue) -> str | None:
- return value if isinstance(value, str) and value else None
- def build_worker_key() -> str:
- configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
- if configured_key:
- return configured_key
- hostname = socket.gethostname()
- return f"{hostname}-{uuid4().hex[:8]}"
- def main() -> None:
- settings = KnowledgeServiceSettings()
- worker = KnowledgeWorker(
- settings=settings,
- session_factory=build_session_factory(settings),
- worker_key=build_worker_key())
- stats = worker.run_forever()
- print(
- "knowledge-worker stopped "
- f"worker_key={stats.worker_key} "
- f"executed_count={stats.executed_count} "
- f"idle_count={stats.idle_count} "
- f"error_count={stats.error_count}",
- flush=True)
- if __name__ == "__main__":
- main()
|