| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- from datetime import datetime
- from core_domain import MemoryScopeType, MemoryStatus
- from app.application.retrieval import (
- build_hash_embedding,
- cosine_similarity,
- keyword_score,
- rerank_score,
- )
- from app.bootstrap.settings import MemoryServiceSettings
- from app.db.models import MemoryItem
- from app.domain.repositories import MemoryItemRepository
- from app.schemas.memory import (
- MemoryCreateRequest,
- MemorySearchRequest,
- MemoryStatusUpdateRequest,
- )
- class MemoryApplicationService:
- def __init__(
- self,
- *,
- memory_repository: MemoryItemRepository,
- settings: MemoryServiceSettings | None = None,
- ) -> None:
- self.memory_repository = memory_repository
- self.settings = settings or MemoryServiceSettings()
- def create_memory(self, payload: MemoryCreateRequest) -> MemoryItem:
- embedding_json = build_hash_embedding(
- payload.content_text,
- dimensions=self.settings.embedding_dimensions,
- )
- return self.memory_repository.create(
- tenant_id=payload.tenant_id,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- memory_type=payload.memory_type,
- content_text=payload.content_text,
- content_json=payload.content_json,
- metadata_json=payload.metadata_json,
- embedding_model=self.settings.embedding_model,
- embedding_json=embedding_json,
- owner_agent_id=payload.owner_agent_id,
- user_id=payload.user_id,
- session_id=payload.session_id,
- source_ref=payload.source_ref,
- importance_score=payload.importance_score,
- expires_time=payload.expires_time,
- )
- def list_memories(
- self,
- *,
- tenant_id: str,
- scope_type: MemoryScopeType | None = None,
- scope_id: str | None = None,
- status: MemoryStatus | None = "active",
- limit: int = 100,
- ) -> list[MemoryItem]:
- return self.memory_repository.list_by_scope(
- tenant_id=tenant_id,
- scope_type=scope_type,
- scope_id=scope_id,
- status=status,
- limit=limit,
- )
- def search_memories(
- self,
- payload: MemorySearchRequest,
- ) -> list[tuple[MemoryItem, float, dict[str, float | str]]]:
- query_embedding = build_hash_embedding(
- payload.query,
- dimensions=self.settings.embedding_dimensions,
- )
- candidates = self.memory_repository.search_candidates(
- tenant_id=payload.tenant_id,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- owner_agent_id=payload.owner_agent_id,
- user_id=payload.user_id,
- session_id=payload.session_id,
- limit=max(payload.limit * 10, payload.limit),
- )
- scored_items = [
- self._score(item=item, query=payload.query, query_embedding=query_embedding)
- for item in candidates
- ]
- scored_items.sort(key=lambda item: item[1], reverse=True)
- items = [item for item, _, _ in scored_items[: payload.limit]]
- now = datetime.utcnow()
- self.memory_repository.touch_many(memory_ids=[item.id for item in items], accessed_time=now)
- return scored_items[: payload.limit]
- def update_memory_status(
- self,
- *,
- memory_id: str,
- payload: MemoryStatusUpdateRequest,
- ) -> MemoryItem | None:
- return self.memory_repository.update_status(
- tenant_id=payload.tenant_id,
- memory_id=memory_id,
- status=payload.status,
- )
- def _score(
- self,
- *,
- item: MemoryItem,
- query: str,
- query_embedding: list[float],
- ) -> tuple[MemoryItem, float, dict[str, float | str]]:
- keyword = keyword_score(query, item.content_text)
- vector = cosine_similarity(query_embedding, item.embedding_json)
- score = rerank_score(
- keyword=keyword,
- vector=vector,
- importance_score=item.importance_score,
- )
- return item, score, {
- "keyword_score": round(keyword, 6),
- "vector_score": round(vector, 6),
- "importance_score": float(item.importance_score),
- "embedding_model": item.embedding_model or self.settings.embedding_model,
- "rerank_mode": "hybrid-local",
- }
|