knowledge_client.py 1.1 KB

123456789101112131415161718192021222324252627282930313233
  1. import httpx
  2. from core_domain import KnowledgeSearchRequestContract, KnowledgeSearchResultContract
  3. class KnowledgeServiceClientError(Exception):
  4. pass
  5. class KnowledgeServiceClient:
  6. def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
  7. self.base_url = base_url.rstrip("/")
  8. self.timeout_seconds = timeout_seconds
  9. def search(
  10. self,
  11. payload: KnowledgeSearchRequestContract,
  12. ) -> list[KnowledgeSearchResultContract]:
  13. try:
  14. with httpx.Client(timeout=self.timeout_seconds) as client:
  15. response = client.post(
  16. f"{self.base_url}/knowledge/search",
  17. json=payload.model_dump(mode="json"),
  18. )
  19. response.raise_for_status()
  20. return [
  21. KnowledgeSearchResultContract.model_validate(item)
  22. for item in response.json()
  23. ]
  24. except httpx.HTTPError as exc:
  25. raise KnowledgeServiceClientError(
  26. f"knowledge-service search failed: {exc}"
  27. ) from exc