| 12345678910111213141516171819202122232425262728293031323334 |
- import httpx
- from core_domain import HumanTaskContract, HumanTaskCreateContract
- class HumanServiceClientError(Exception):
- pass
- class HumanServiceClient:
- def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- def create_task(self, payload: HumanTaskCreateContract) -> HumanTaskContract:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.post(
- f"{self.base_url}/human/tasks",
- json=payload.model_dump(mode="json"))
- response.raise_for_status()
- return HumanTaskContract.model_validate(response.json())
- except httpx.HTTPError as exc:
- raise HumanServiceClientError(
- f"human-service create task failed: {exc}"
- ) from exc
- def get_task(self, *, human_task_id: str) -> HumanTaskContract:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.get(f"{self.base_url}/human/tasks/{human_task_id}")
- response.raise_for_status()
- return HumanTaskContract.model_validate(response.json())
- except httpx.HTTPError as exc:
- raise HumanServiceClientError(f"human-service get task failed: {exc}") from exc
|