| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- from __future__ import annotations
- import os
- import socket
- import time
- import traceback
- from dataclasses import dataclass
- from math import ceil
- from threading import Event, Thread
- from uuid import uuid4
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import MEMORY_TOUCH_QUEUE, build_task_queue_consumer
- from sqlalchemy.orm import Session, sessionmaker
- from app.application.services import build_memory_application_service
- from app.bootstrap.settings import MemoryServiceSettings
- from app.db.session import build_session_factory
- @dataclass(frozen=True)
- class MemoryWorkerStats:
- worker_key: str
- executed_count: int = 0
- idle_count: int = 0
- error_count: int = 0
- class MemoryWorker:
- def __init__(
- self,
- *,
- settings: MemoryServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str) -> None:
- self.settings = settings
- self.session_factory = session_factory
- self.worker_key = worker_key
- self.redis_client = try_build_redis_client(settings.redis_url)
- self.task_queue = build_task_queue_consumer(
- client=self.redis_client,
- queue_name=MEMORY_TOUCH_QUEUE)
- def run_forever(self) -> MemoryWorkerStats:
- executed_count = 0
- idle_count = 0
- error_count = 0
- while True:
- try:
- executed = self.run_once()
- except Exception:
- error_count += 1
- traceback.print_exc()
- executed = False
- if executed:
- executed_count += 1
- idle_count = 0
- else:
- idle_count += 1
- if self.task_queue is None:
- time.sleep(self.settings.worker_poll_interval_seconds)
- if self.settings.worker_max_idle_cycles is not None:
- if idle_count >= self.settings.worker_max_idle_cycles:
- return MemoryWorkerStats(
- worker_key=self.worker_key,
- executed_count=executed_count,
- idle_count=idle_count,
- error_count=error_count)
- def run_once(self) -> bool:
- payload = self._wait_for_queue_signal()
- if payload is None:
- return False
- memory_ids = _read_string_list(payload.get("memory_ids"))
- if not memory_ids:
- return False
- db = self.session_factory()
- try:
- service = build_memory_application_service(db=db, settings=self.settings)
- service.touch_memories(memory_ids=memory_ids)
- return True
- finally:
- db.close()
- def _wait_for_queue_signal(self) -> dict[str, JSONValue] | None:
- if self.task_queue is None:
- return None
- try:
- return self.task_queue.dequeue(
- timeout_seconds=max(1, ceil(self.settings.worker_poll_interval_seconds)))
- except Exception:
- return None
- class BackgroundMemoryWorker:
- def __init__(
- self,
- *,
- settings: MemoryServiceSettings,
- session_factory: sessionmaker[Session],
- worker_key: str) -> None:
- self.settings = settings
- self.worker = MemoryWorker(
- settings=settings,
- session_factory=session_factory,
- worker_key=worker_key)
- self.stop_event = Event()
- self.thread = Thread(
- target=self._run,
- name=f"memory-auto-worker-{worker_key}",
- daemon=True)
- def start(self) -> None:
- self.thread.start()
- def stop(self, *, timeout_seconds: float) -> None:
- self.stop_event.set()
- self.thread.join(timeout=timeout_seconds)
- def _run(self) -> None:
- while not self.stop_event.is_set():
- try:
- executed = self.worker.run_once()
- except Exception:
- traceback.print_exc()
- executed = False
- if not executed:
- self.stop_event.wait(self.settings.worker_poll_interval_seconds)
- def _read_string_list(value: JSONValue) -> list[str]:
- if not isinstance(value, list):
- return []
- return [item for item in value if isinstance(item, str) and item]
- def build_worker_key() -> str:
- configured_key = os.getenv("AGENT_PLATFORM_WORKER_KEY")
- if configured_key:
- return configured_key
- hostname = socket.gethostname()
- return f"{hostname}-{uuid4().hex[:8]}"
- def main() -> None:
- settings = MemoryServiceSettings()
- worker = MemoryWorker(
- settings=settings,
- session_factory=build_session_factory(settings),
- worker_key=build_worker_key())
- stats = worker.run_forever()
- print(
- "memory-worker stopped "
- f"worker_key={stats.worker_key} "
- f"executed_count={stats.executed_count} "
- f"idle_count={stats.idle_count} "
- f"error_count={stats.error_count}",
- flush=True)
- if __name__ == "__main__":
- main()
|