| 123456789101112131415161718192021222324 |
- import httpx
- from core_domain import WorkflowConfigContract
- class WorkflowServiceClientError(Exception):
- pass
- class WorkflowServiceClient:
- def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- def get_workflow_config(
- self,
- *,
- workflow_config_id: str) -> WorkflowConfigContract:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.get(f"{self.base_url}/workflows/configs/{workflow_config_id}")
- response.raise_for_status()
- return WorkflowConfigContract.model_validate(response.json())
- except httpx.HTTPError as exc:
- raise WorkflowServiceClientError(f"workflow-service request failed: {exc}") from exc
|