| 123456789101112131415161718192021222324252627282930313233 |
- import httpx
- from core_domain import KnowledgeSearchRequestContract, KnowledgeSearchResultContract
- class KnowledgeServiceClientError(Exception):
- pass
- class KnowledgeServiceClient:
- def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- def search(
- self,
- payload: KnowledgeSearchRequestContract,
- ) -> list[KnowledgeSearchResultContract]:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.post(
- f"{self.base_url}/knowledge/search",
- json=payload.model_dump(mode="json"),
- )
- response.raise_for_status()
- return [
- KnowledgeSearchResultContract.model_validate(item)
- for item in response.json()
- ]
- except httpx.HTTPError as exc:
- raise KnowledgeServiceClientError(
- f"knowledge-service search failed: {exc}"
- ) from exc
|