| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import httpx
- from core_domain import (
- MemoryCreateContract,
- MemoryItemContract,
- MemorySearchRequestContract,
- MemorySearchResultContract,
- )
- class MemoryClientError(Exception):
- pass
- class MemoryClient:
- def __init__(self, *, base_url: str, timeout_seconds: float = 10.0) -> None:
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- def create_memory(self, payload: MemoryCreateContract) -> MemoryItemContract:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.post(
- f"{self.base_url}/memories/create",
- json=_create_payload_to_contract(payload))
- response.raise_for_status()
- return MemoryItemContract.model_validate(_memory_dto_to_contract(_unwrap(response.json())))
- except httpx.HTTPError as exc:
- raise MemoryClientError(f"memory-service create request failed: {exc}") from exc
- def search_memories(
- self,
- payload: MemorySearchRequestContract) -> list[MemorySearchResultContract]:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.post(
- f"{self.base_url}/memories/search/query",
- json=_search_payload_to_contract(payload))
- response.raise_for_status()
- return [
- MemorySearchResultContract.model_validate({
- "item": _memory_dto_to_contract(item["item"]),
- "score": item["score"],
- "score_json": item.get("scoreDetails", {}),
- })
- for item in _unwrap(response.json())
- ]
- except httpx.HTTPError as exc:
- raise MemoryClientError(f"memory-service search request failed: {exc}") from exc
- def _unwrap(payload: dict) -> object:
- if not payload.get("success", False):
- message = payload.get("error", {}).get("message", "memory-service request failed")
- raise MemoryClientError(str(message))
- return payload.get("data")
- def _create_payload_to_contract(payload: MemoryCreateContract) -> dict:
- data = payload.model_dump(mode="json")
- return {
- "scopeType": data["scope_type"],
- "scopeId": data["scope_id"],
- "memoryType": data.get("memory_type", "fact"),
- "contentText": data["content_text"],
- "content": data.get("content_json"),
- "metadata": data.get("metadata_json", {}),
- "ownerAgentId": data.get("owner_agent_id"),
- "userId": data.get("user_id"),
- "sessionId": data.get("session_id"),
- "sourceRef": data.get("source_ref"),
- "importanceScore": data.get("importance_score", 0),
- "expiresTime": data.get("expires_time"),
- }
- def _search_payload_to_contract(payload: MemorySearchRequestContract) -> dict:
- data = payload.model_dump(mode="json")
- return {
- "query": data["query"],
- "scopeType": data.get("scope_type"),
- "scopeId": data.get("scope_id"),
- "ownerAgentId": data.get("owner_agent_id"),
- "userId": data.get("user_id"),
- "sessionId": data.get("session_id"),
- "limit": data.get("limit", 8),
- }
- def _memory_dto_to_contract(item: object) -> dict:
- if not isinstance(item, dict):
- raise MemoryClientError("invalid memory-service response")
- return {
- "id": item["id"],
- "scope_type": item["scopeType"],
- "scope_id": item["scopeId"],
- "memory_type": item["memoryType"],
- "content_text": item["contentText"],
- "content_json": item.get("content"),
- "metadata_json": item.get("metadata", {}),
- "embedding_model": item.get("embeddingModel"),
- "embedding_json": item.get("embedding"),
- "owner_agent_id": item.get("ownerAgentId"),
- "user_id": item.get("userId"),
- "session_id": item.get("sessionId"),
- "source_ref": item.get("sourceRef"),
- "importance_score": item.get("importanceScore", 0),
- "status": item["status"],
- "last_accessed_time": item.get("lastAccessedTime"),
- "expires_time": item.get("expiresTime"),
- "created_time": item["createdTime"],
- }
|