| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import httpx
- from core_domain import ToolBindingDetailContract
- from core_shared import JSONValue
- class ToolServiceClientError(Exception):
- pass
- class ToolServiceClient:
- def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- def get_tool_binding_detail(
- self,
- *,
- tenant_id: str,
- binding_id: str,
- ) -> ToolBindingDetailContract:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.get(
- f"{self.base_url}/tools/bindings/{binding_id}",
- params={"tenant_id": tenant_id},
- )
- response.raise_for_status()
- return ToolBindingDetailContract.model_validate(response.json())
- except httpx.HTTPError as exc:
- raise ToolServiceClientError(f"tool-service lookup failed: {exc}") from exc
- def invoke_http_tool(
- self,
- *,
- detail: ToolBindingDetailContract,
- input_json: dict[str, JSONValue],
- config_json: dict[str, JSONValue],
- ) -> tuple[str | None, dict[str, JSONValue]]:
- invoke_config_json = detail.tool_version.invoke_config_json or {}
- binding_config_json = detail.binding.config_json or {}
- url = _read_string(config_json, "url") or _read_string(invoke_config_json, "url")
- base_url = (
- _read_string(config_json, "base_url")
- or _read_string(binding_config_json, "base_url")
- or _read_string(invoke_config_json, "base_url")
- )
- path = _read_string(config_json, "path") or _read_string(invoke_config_json, "path")
- resolved_url = _resolve_url(url=url, base_url=base_url, path=path)
- if resolved_url is None:
- raise ToolServiceClientError("http tool requires url or base_url/path")
- method = (_read_string(invoke_config_json, "method") or "GET").upper()
- headers = _merge_string_maps(
- _read_dict(invoke_config_json, "headers"),
- _read_dict(binding_config_json, "headers"),
- _read_dict(config_json, "headers"),
- )
- params = _merge_string_maps(
- _read_dict(invoke_config_json, "query"),
- _read_dict(config_json, "query"),
- )
- body_json = _merge_json_dicts(
- _read_dict(invoke_config_json, "body"),
- _read_dict(config_json, "body"),
- )
- if not body_json and method not in {"GET", "HEAD"}:
- body_json = input_json
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.request(
- method=method,
- url=resolved_url,
- headers=headers,
- params=params,
- json=None if method in {"GET", "HEAD"} else body_json,
- )
- response.raise_for_status()
- except httpx.HTTPError as exc:
- raise ToolServiceClientError(f"http tool invocation failed: {exc}") from exc
- response_json = _try_parse_json_response(response)
- response_text = None if response_json is not None else response.text
- return response_text, {
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "tool_version_id": detail.tool_version.id,
- "tool_name": detail.tool_definition.name,
- "request_url": resolved_url,
- "request_method": method,
- "response_status_code": response.status_code,
- "response_json": response_json,
- }
- def _read_string(payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str) and value:
- return value
- return None
- def _read_dict(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
- value = payload.get(key)
- if isinstance(value, dict):
- return {str(item_key): item_value for item_key, item_value in value.items()}
- return {}
- def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
- merged: dict[str, JSONValue] = {}
- for item in items:
- merged.update(item)
- return merged
- def _merge_string_maps(*items: dict[str, JSONValue]) -> dict[str, str]:
- merged: dict[str, str] = {}
- for item in items:
- for key, value in item.items():
- if isinstance(value, (str, int, float, bool)):
- merged[key] = str(value)
- return merged
- def _resolve_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
- if url is not None:
- return url
- if base_url is None or path is None:
- return None
- return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
- def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
- content_type = response.headers.get("content-type", "")
- if "json" not in content_type.lower():
- return None
- try:
- value = response.json()
- except ValueError:
- return None
- if isinstance(value, (dict, list, str, int, float, bool)) or value is None:
- return value
- return None
|