from datetime import datetime from core_domain import MemoryScopeType, MemoryStatus from app.application.retrieval import ( build_hash_embedding, cosine_similarity, keyword_score, rerank_score, ) from app.bootstrap.settings import MemoryServiceSettings from app.db.models import MemoryItem from app.domain.repositories import MemoryItemRepository from app.schemas.memory import MemoryCreateRequest, MemorySearchRequest, MemoryStatusUpdateRequest class MemoryApplicationService: def __init__( self, *, memory_repository: MemoryItemRepository, settings: MemoryServiceSettings | None = None) -> None: self.memory_repository = memory_repository self.settings = settings or MemoryServiceSettings() def create_memory(self, payload: MemoryCreateRequest) -> MemoryItem: embedding_json = build_hash_embedding( payload.content_text, dimensions=self.settings.embedding_dimensions) return self.memory_repository.create( scope_type=payload.scope_type, scope_id=payload.scope_id, memory_type=payload.memory_type, content_text=payload.content_text, content_json=payload.content_json, metadata_json=payload.metadata_json, embedding_model=self.settings.embedding_model, embedding_json=embedding_json, owner_agent_id=payload.owner_agent_id, user_id=payload.user_id, session_id=payload.session_id, source_ref=payload.source_ref, importance_score=payload.importance_score, expires_time=payload.expires_time) def list_memories( self, *, scope_type: MemoryScopeType | None = None, scope_id: str | None = None, status: MemoryStatus | None = "active", limit: int = 100) -> list[MemoryItem]: return self.memory_repository.list_by_scope( scope_type=scope_type, scope_id=scope_id, status=status, limit=limit) def search_memories( self, payload: MemorySearchRequest) -> list[tuple[MemoryItem, float, dict[str, float | str]]]: query_embedding = build_hash_embedding( payload.query, dimensions=self.settings.embedding_dimensions) candidates = self.memory_repository.search_candidates( scope_type=payload.scope_type, scope_id=payload.scope_id, owner_agent_id=payload.owner_agent_id, user_id=payload.user_id, session_id=payload.session_id, limit=max(payload.limit * 10, payload.limit)) scored_items = [ self._score(item=item, query=payload.query, query_embedding=query_embedding) for item in candidates ] scored_items.sort(key=lambda item: item[1], reverse=True) items = [item for item, _, _ in scored_items[: payload.limit]] now = datetime.utcnow() self.memory_repository.touch_many(memory_ids=[item.id for item in items], accessed_time=now) return scored_items[: payload.limit] def update_memory_status( self, *, memory_id: str, payload: MemoryStatusUpdateRequest) -> MemoryItem | None: return self.memory_repository.update_status( memory_id=memory_id, status=payload.status) def _score( self, *, item: MemoryItem, query: str, query_embedding: list[float]) -> tuple[MemoryItem, float, dict[str, float | str]]: keyword = keyword_score(query, item.content_text) vector = cosine_similarity(query_embedding, item.embedding_json) score = rerank_score( keyword=keyword, vector=vector, importance_score=item.importance_score) return item, score, { "keyword_score": round(keyword, 6), "vector_score": round(vector, 6), "importance_score": float(item.importance_score), "embedding_model": item.embedding_model or self.settings.embedding_model, "rerank_mode": "hybrid-local", }