| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from datetime import datetime
- from core_domain import MemoryScopeType, MemoryStatus
- from app.db.models import MemoryItem
- from app.domain.repositories import MemoryItemRepository
- from app.schemas.memory import (
- MemoryCreateRequest,
- MemorySearchRequest,
- MemoryStatusUpdateRequest,
- )
- class MemoryApplicationService:
- def __init__(self, *, memory_repository: MemoryItemRepository) -> None:
- self.memory_repository = memory_repository
- def create_memory(self, payload: MemoryCreateRequest) -> MemoryItem:
- return self.memory_repository.create(
- tenant_id=payload.tenant_id,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- memory_type=payload.memory_type,
- content_text=payload.content_text,
- content_json=payload.content_json,
- metadata_json=payload.metadata_json,
- owner_agent_id=payload.owner_agent_id,
- user_id=payload.user_id,
- session_id=payload.session_id,
- source_ref=payload.source_ref,
- importance_score=payload.importance_score,
- expires_time=payload.expires_time,
- )
- def list_memories(
- self,
- *,
- tenant_id: str,
- scope_type: MemoryScopeType | None = None,
- scope_id: str | None = None,
- status: MemoryStatus | None = "active",
- limit: int = 100,
- ) -> list[MemoryItem]:
- return self.memory_repository.list_by_scope(
- tenant_id=tenant_id,
- scope_type=scope_type,
- scope_id=scope_id,
- status=status,
- limit=limit,
- )
- def search_memories(self, payload: MemorySearchRequest) -> list[tuple[MemoryItem, float]]:
- items = self.memory_repository.search(
- tenant_id=payload.tenant_id,
- query=payload.query,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- owner_agent_id=payload.owner_agent_id,
- user_id=payload.user_id,
- session_id=payload.session_id,
- limit=payload.limit,
- )
- now = datetime.utcnow()
- self.memory_repository.touch_many(memory_ids=[item.id for item in items], accessed_time=now)
- return [(item, self._score(item=item, query=payload.query)) for item in items]
- def update_memory_status(
- self,
- *,
- memory_id: str,
- payload: MemoryStatusUpdateRequest,
- ) -> MemoryItem | None:
- return self.memory_repository.update_status(
- tenant_id=payload.tenant_id,
- memory_id=memory_id,
- status=payload.status,
- )
- def _score(self, *, item: MemoryItem, query: str) -> float:
- lowered_content = item.content_text.lower()
- lowered_query = query.lower()
- exact_bonus = 1.0 if lowered_query in lowered_content else 0.0
- importance_bonus = min(item.importance_score, 100) / 100
- return round(exact_bonus + importance_bonus, 4)
|