workflow_client.py 963 B

1234567891011121314151617181920212223242526
  1. import httpx
  2. from core_domain import WorkflowVersionContract
  3. class WorkflowServiceClientError(Exception):
  4. pass
  5. class WorkflowServiceClient:
  6. def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
  7. self.base_url = base_url.rstrip("/")
  8. self.timeout_seconds = timeout_seconds
  9. def get_workflow_version(self, *, tenant_id: str, workflow_version_id: str) -> WorkflowVersionContract:
  10. try:
  11. with httpx.Client(timeout=self.timeout_seconds) as client:
  12. response = client.get(
  13. f"{self.base_url}/workflows/versions/{workflow_version_id}",
  14. params={"tenant_id": tenant_id},
  15. )
  16. response.raise_for_status()
  17. return WorkflowVersionContract.model_validate(response.json())
  18. except httpx.HTTPError as exc:
  19. raise WorkflowServiceClientError(f"workflow-service request failed: {exc}") from exc