human_client.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334
  1. import httpx
  2. from core_domain import HumanTaskContract, HumanTaskCreateContract
  3. class HumanServiceClientError(Exception):
  4. pass
  5. class HumanServiceClient:
  6. def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
  7. self.base_url = base_url.rstrip("/")
  8. self.timeout_seconds = timeout_seconds
  9. def create_task(self, payload: HumanTaskCreateContract) -> HumanTaskContract:
  10. try:
  11. with httpx.Client(timeout=self.timeout_seconds) as client:
  12. response = client.post(
  13. f"{self.base_url}/human/tasks",
  14. json=payload.model_dump(mode="json"))
  15. response.raise_for_status()
  16. return HumanTaskContract.model_validate(response.json())
  17. except httpx.HTTPError as exc:
  18. raise HumanServiceClientError(
  19. f"human-service create task failed: {exc}"
  20. ) from exc
  21. def get_task(self, *, human_task_id: str) -> HumanTaskContract:
  22. try:
  23. with httpx.Client(timeout=self.timeout_seconds) as client:
  24. response = client.get(f"{self.base_url}/human/tasks/{human_task_id}")
  25. response.raise_for_status()
  26. return HumanTaskContract.model_validate(response.json())
  27. except httpx.HTTPError as exc:
  28. raise HumanServiceClientError(f"human-service get task failed: {exc}") from exc