knowledge_client.py 1.1 KB

123456789101112131415161718192021222324252627282930
  1. import httpx
  2. from core_domain import KnowledgeSearchRequestContract, KnowledgeSearchResultContract
  3. class KnowledgeServiceClientError(Exception):
  4. pass
  5. class KnowledgeServiceClient:
  6. def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
  7. self.base_url = base_url.rstrip("/")
  8. self.timeout_seconds = timeout_seconds
  9. def search(
  10. self,
  11. payload: KnowledgeSearchRequestContract) -> list[KnowledgeSearchResultContract]:
  12. try:
  13. with httpx.Client(timeout=self.timeout_seconds) as client:
  14. response = client.post(
  15. f"{self.base_url}/knowledge/search",
  16. json=payload.model_dump(mode="json"))
  17. response.raise_for_status()
  18. return [
  19. KnowledgeSearchResultContract.model_validate(item)
  20. for item in response.json()
  21. ]
  22. except httpx.HTTPError as exc:
  23. raise KnowledgeServiceClientError(
  24. f"knowledge-service search failed: {exc}"
  25. ) from exc