workflow_client.py 871 B

123456789101112131415161718192021222324
  1. import httpx
  2. from core_domain import WorkflowConfigContract
  3. class WorkflowServiceClientError(Exception):
  4. pass
  5. class WorkflowServiceClient:
  6. def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
  7. self.base_url = base_url.rstrip("/")
  8. self.timeout_seconds = timeout_seconds
  9. def get_workflow_config(
  10. self,
  11. *,
  12. workflow_config_id: str) -> WorkflowConfigContract:
  13. try:
  14. with httpx.Client(timeout=self.timeout_seconds) as client:
  15. response = client.get(f"{self.base_url}/workflows/configs/{workflow_config_id}")
  16. response.raise_for_status()
  17. return WorkflowConfigContract.model_validate(response.json())
  18. except httpx.HTTPError as exc:
  19. raise WorkflowServiceClientError(f"workflow-service request failed: {exc}") from exc