import httpx from core_domain import ( MemoryCreateContract, MemoryItemContract, MemorySearchRequestContract, MemorySearchResultContract, ) class MemoryClientError(Exception): pass class MemoryClient: def __init__(self, *, base_url: str, timeout_seconds: float = 10.0) -> None: self.base_url = base_url.rstrip("/") self.timeout_seconds = timeout_seconds def create_memory(self, payload: MemoryCreateContract) -> MemoryItemContract: try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post( f"{self.base_url}/memories/create", json=_create_payload_to_contract(payload)) response.raise_for_status() return MemoryItemContract.model_validate(_memory_dto_to_contract(_unwrap(response.json()))) except httpx.HTTPError as exc: raise MemoryClientError(f"memory-service create request failed: {exc}") from exc def search_memories( self, payload: MemorySearchRequestContract) -> list[MemorySearchResultContract]: try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post( f"{self.base_url}/memories/search/query", json=_search_payload_to_contract(payload)) response.raise_for_status() return [ MemorySearchResultContract.model_validate({ "item": _memory_dto_to_contract(item["item"]), "score": item["score"], "score_json": item.get("scoreDetails", {}), }) for item in _unwrap(response.json()) ] except httpx.HTTPError as exc: raise MemoryClientError(f"memory-service search request failed: {exc}") from exc def _unwrap(payload: dict) -> object: if not payload.get("success", False): message = payload.get("error", {}).get("message", "memory-service request failed") raise MemoryClientError(str(message)) return payload.get("data") def _create_payload_to_contract(payload: MemoryCreateContract) -> dict: data = payload.model_dump(mode="json") return { "scopeType": data["scope_type"], "scopeId": data["scope_id"], "memoryType": data.get("memory_type", "fact"), "contentText": data["content_text"], "content": data.get("content_json"), "metadata": data.get("metadata_json", {}), "ownerAgentId": data.get("owner_agent_id"), "userId": data.get("user_id"), "sessionId": data.get("session_id"), "sourceRef": data.get("source_ref"), "importanceScore": data.get("importance_score", 0), "expiresTime": data.get("expires_time"), } def _search_payload_to_contract(payload: MemorySearchRequestContract) -> dict: data = payload.model_dump(mode="json") return { "query": data["query"], "scopeType": data.get("scope_type"), "scopeId": data.get("scope_id"), "ownerAgentId": data.get("owner_agent_id"), "userId": data.get("user_id"), "sessionId": data.get("session_id"), "limit": data.get("limit", 8), } def _memory_dto_to_contract(item: object) -> dict: if not isinstance(item, dict): raise MemoryClientError("invalid memory-service response") return { "id": item["id"], "scope_type": item["scopeType"], "scope_id": item["scopeId"], "memory_type": item["memoryType"], "content_text": item["contentText"], "content_json": item.get("content"), "metadata_json": item.get("metadata", {}), "embedding_model": item.get("embeddingModel"), "embedding_json": item.get("embedding"), "owner_agent_id": item.get("ownerAgentId"), "user_id": item.get("userId"), "session_id": item.get("sessionId"), "source_ref": item.get("sourceRef"), "importance_score": item.get("importanceScore", 0), "status": item["status"], "last_accessed_time": item.get("lastAccessedTime"), "expires_time": item.get("expiresTime"), "created_time": item["createdTime"], }