| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- import httpx
- from core_domain import ChatCompletionRequestContract, ChatCompletionResponseContract
- from core_shared import JSONValue
- from app.bootstrap.settings import ModelGatewayServiceSettings
- class ModelProviderClientError(Exception):
- pass
- class ModelProviderClient:
- def __init__(self, *, settings: ModelGatewayServiceSettings) -> None:
- self.settings = settings
- def create_chat_completion(
- self,
- payload: ChatCompletionRequestContract,
- *,
- provider_type: str | None = None,
- provider_base_url: str | None = None,
- provider_api_key: str | None = None,
- timeout_seconds: float = 60.0,
- ) -> ChatCompletionResponseContract:
- if payload.model is None:
- raise ModelProviderClientError("model is required for chat completion")
- resolved_provider_type = provider_type or self.settings.provider_type
- if resolved_provider_type == "anthropic":
- return self._create_anthropic_message(
- payload,
- provider_base_url=provider_base_url,
- provider_api_key=provider_api_key,
- timeout_seconds=timeout_seconds)
- return self._create_openai_compatible_chat_completion(
- payload,
- provider_base_url=provider_base_url,
- provider_api_key=provider_api_key,
- timeout_seconds=timeout_seconds)
- def list_models(
- self,
- *,
- provider_type: str | None = None,
- provider_base_url: str | None = None,
- provider_api_key: str | None = None,
- timeout_seconds: float = 30.0,
- ) -> list[dict[str, JSONValue]]:
- resolved_provider_type = provider_type or self.settings.provider_type
- if resolved_provider_type == "anthropic":
- return self._list_anthropic_models(
- provider_base_url=provider_base_url,
- provider_api_key=provider_api_key,
- timeout_seconds=timeout_seconds)
- return self._list_openai_compatible_models(
- provider_base_url=provider_base_url,
- provider_api_key=provider_api_key,
- timeout_seconds=timeout_seconds)
- def _list_openai_compatible_models(
- self,
- *,
- provider_base_url: str | None,
- provider_api_key: str | None,
- timeout_seconds: float) -> list[dict[str, JSONValue]]:
- request_headers: dict[str, str] = {"content-type": "application/json"}
- api_key = (
- provider_api_key
- if provider_api_key is not None
- else self.settings.provider_api_key
- )
- if api_key:
- request_headers["authorization"] = f"Bearer {api_key}"
- try:
- base_url = provider_base_url or self.settings.provider_base_url
- with httpx.Client(timeout=timeout_seconds) as client:
- response = client.get(_join_url(base_url, "models"), headers=request_headers)
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = exc.response.text[:1000]
- raise ModelProviderClientError(
- f"model provider list models failed: {exc.response.status_code} {detail}") from exc
- except httpx.HTTPError as exc:
- raise ModelProviderClientError(f"model provider list models failed: {exc}") from exc
- return _extract_model_items(_coerce_json_dict(response.json()))
- def _list_anthropic_models(
- self,
- *,
- provider_base_url: str | None,
- provider_api_key: str | None,
- timeout_seconds: float) -> list[dict[str, JSONValue]]:
- api_key = (
- provider_api_key
- if provider_api_key is not None
- else self.settings.provider_api_key
- )
- if not api_key:
- raise ModelProviderClientError("anthropic api key is required")
- request_headers = {
- "content-type": "application/json",
- "x-api-key": api_key,
- "anthropic-version": "2023-06-01",
- }
- try:
- base_url = provider_base_url or self.settings.provider_base_url
- with httpx.Client(timeout=timeout_seconds) as client:
- response = client.get(_join_url(base_url, "v1/models"), headers=request_headers)
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = exc.response.text[:1000]
- raise ModelProviderClientError(
- f"anthropic list models failed: {exc.response.status_code} {detail}") from exc
- except httpx.HTTPError as exc:
- raise ModelProviderClientError(f"anthropic list models failed: {exc}") from exc
- return _extract_model_items(_coerce_json_dict(response.json()))
- def _create_openai_compatible_chat_completion(
- self,
- payload: ChatCompletionRequestContract,
- *,
- provider_base_url: str | None,
- provider_api_key: str | None,
- timeout_seconds: float) -> ChatCompletionResponseContract:
- request_payload: dict[str, JSONValue] = {
- "model": payload.model or "",
- "messages": [item.model_dump(mode="json") for item in payload.messages],
- }
- if payload.temperature is not None:
- request_payload["temperature"] = payload.temperature
- if payload.max_tokens is not None:
- request_payload["max_tokens"] = payload.max_tokens
- if payload.tools_json:
- request_payload["tools"] = payload.tools_json
- if payload.tool_choice is not None:
- request_payload["tool_choice"] = payload.tool_choice
- request_headers: dict[str, str] = {"content-type": "application/json"}
- api_key = (
- provider_api_key
- if provider_api_key is not None
- else self.settings.provider_api_key
- )
- if api_key:
- request_headers["authorization"] = f"Bearer {api_key}"
- try:
- base_url = provider_base_url or self.settings.provider_base_url
- with httpx.Client(timeout=timeout_seconds) as client:
- response = client.post(
- _join_url(base_url, "chat/completions"),
- json=request_payload,
- headers=request_headers)
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = exc.response.text[:1000]
- raise ModelProviderClientError(
- f"model provider request failed: {exc.response.status_code} {detail}") from exc
- except httpx.HTTPError as exc:
- raise ModelProviderClientError(f"model provider request failed: {exc}") from exc
- response_json = _coerce_json_dict(response.json())
- content = _extract_response_content(response_json)
- finish_reason = _extract_finish_reason(response_json)
- tool_calls_json = _extract_tool_calls_json(response_json)
- usage_json = _extract_usage_json(response_json)
- return ChatCompletionResponseContract(
- model=payload.model,
- content=content,
- finish_reason=finish_reason,
- tool_calls_json=tool_calls_json,
- usage_json=usage_json,
- raw_response_json=response_json)
- def _create_anthropic_message(
- self,
- payload: ChatCompletionRequestContract,
- *,
- provider_base_url: str | None,
- provider_api_key: str | None,
- timeout_seconds: float) -> ChatCompletionResponseContract:
- api_key = (
- provider_api_key
- if provider_api_key is not None
- else self.settings.provider_api_key
- )
- if not api_key:
- raise ModelProviderClientError("anthropic api key is required")
- system_prompt, messages = _to_anthropic_messages(payload)
- request_payload: dict[str, JSONValue] = {
- "model": payload.model or "",
- "max_tokens": payload.max_tokens or 1024,
- "messages": messages,
- }
- if system_prompt:
- request_payload["system"] = system_prompt
- if payload.temperature is not None:
- request_payload["temperature"] = payload.temperature
- request_headers = {
- "content-type": "application/json",
- "x-api-key": api_key,
- "anthropic-version": "2023-06-01",
- }
- try:
- base_url = provider_base_url or self.settings.provider_base_url
- with httpx.Client(timeout=timeout_seconds) as client:
- response = client.post(
- _join_url(base_url, "v1/messages"),
- json=request_payload,
- headers=request_headers)
- response.raise_for_status()
- except httpx.HTTPStatusError as exc:
- detail = exc.response.text[:1000]
- raise ModelProviderClientError(
- f"anthropic request failed: {exc.response.status_code} {detail}") from exc
- except httpx.HTTPError as exc:
- raise ModelProviderClientError(f"anthropic request failed: {exc}") from exc
- response_json = _coerce_json_dict(response.json())
- return ChatCompletionResponseContract(
- model=_read_string(response_json, "model") or payload.model,
- content=_extract_anthropic_content(response_json),
- finish_reason=_read_string(response_json, "stop_reason"),
- tool_calls_json=[],
- usage_json=_extract_usage_json(response_json),
- raw_response_json=response_json)
- def _coerce_json_dict(payload: JSONValue) -> dict[str, JSONValue]:
- if isinstance(payload, dict):
- return {str(key): value for key, value in payload.items()}
- return {}
- def _join_url(base_url: str, path: str) -> str:
- normalized_base = base_url.rstrip("/")
- normalized_path = path.strip("/")
- if normalized_path.startswith("v1/") and normalized_base.endswith("/v1"):
- normalized_path = normalized_path.removeprefix("v1/")
- return f"{normalized_base}/{normalized_path}"
- def _to_anthropic_messages(
- payload: ChatCompletionRequestContract) -> tuple[str | None, list[dict[str, JSONValue]]]:
- system_parts: list[str] = []
- messages: list[dict[str, JSONValue]] = []
- for message in payload.messages:
- if message.role == "system":
- system_parts.append(message.content)
- continue
- role = "assistant" if message.role == "assistant" else "user"
- if messages and messages[-1].get("role") == role:
- previous = messages[-1].get("content")
- messages[-1]["content"] = f"{previous}\n\n{message.content}" if isinstance(previous, str) else message.content
- else:
- messages.append({"role": role, "content": message.content})
- if not messages:
- messages.append({"role": "user", "content": ""})
- return ("\n\n".join(system_parts) if system_parts else None), messages
- def _extract_anthropic_content(payload: dict[str, JSONValue]) -> str:
- content = payload.get("content")
- if isinstance(content, str):
- return content
- if not isinstance(content, list):
- return ""
- parts: list[str] = []
- for item in content:
- if not isinstance(item, dict):
- continue
- text = item.get("text")
- if isinstance(text, str):
- parts.append(text)
- return "\n".join(parts)
- def _read_string(payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- return value if isinstance(value, str) else None
- def _extract_model_items(payload: dict[str, JSONValue]) -> list[dict[str, JSONValue]]:
- data = payload.get("data")
- if not isinstance(data, list):
- data = payload.get("models")
- if not isinstance(data, list):
- return []
- items: list[dict[str, JSONValue]] = []
- for item in data:
- if isinstance(item, str):
- model_id = item
- display_name = item
- owned_by = None
- elif isinstance(item, dict):
- model_id = _read_string(item, "id") or _read_string(item, "model") or _read_string(item, "name")
- if model_id is None:
- continue
- display_name = _read_string(item, "display_name") or _read_string(item, "displayName") or model_id
- owned_by = _read_string(item, "owned_by") or _read_string(item, "ownedBy")
- else:
- continue
- model_item: dict[str, JSONValue] = {
- "modelId": model_id,
- "displayName": display_name,
- "modelType": _infer_model_type(model_id),
- }
- if owned_by:
- model_item["ownedBy"] = owned_by
- items.append(model_item)
- return items
- def _infer_model_type(model_id: str) -> str:
- normalized = model_id.lower()
- if "embed" in normalized or "embedding" in normalized:
- return "embedding"
- if "rerank" in normalized or "ranker" in normalized:
- return "rerank"
- if "moderation" in normalized:
- return "moderation"
- if "image" in normalized or "vision" in normalized:
- return "image"
- if "audio" in normalized or "whisper" in normalized or "tts" in normalized:
- return "audio"
- if "reason" in normalized or "thinking" in normalized or normalized.endswith("-r1") or "-r1-" in normalized:
- return "reasoning"
- return "chat"
- def _extract_response_content(payload: dict[str, JSONValue]) -> str:
- choices = payload.get("choices")
- if isinstance(choices, list) and choices:
- first_choice = choices[0]
- if isinstance(first_choice, dict):
- message = first_choice.get("message")
- if isinstance(message, dict):
- content = message.get("content")
- if isinstance(content, str):
- return content
- text = first_choice.get("text")
- if isinstance(text, str):
- return text
- return ""
- def _extract_finish_reason(payload: dict[str, JSONValue]) -> str | None:
- choices = payload.get("choices")
- if isinstance(choices, list) and choices:
- first_choice = choices[0]
- if isinstance(first_choice, dict):
- finish_reason = first_choice.get("finish_reason")
- if isinstance(finish_reason, str):
- return finish_reason
- return None
- def _extract_tool_calls_json(payload: dict[str, JSONValue]) -> list[dict[str, JSONValue]]:
- choices = payload.get("choices")
- if isinstance(choices, list) and choices:
- first_choice = choices[0]
- if isinstance(first_choice, dict):
- message = first_choice.get("message")
- if isinstance(message, dict):
- tool_calls = message.get("tool_calls")
- if isinstance(tool_calls, list):
- return [
- {str(item_key): item_value for item_key, item_value in item.items()}
- for item in tool_calls
- if isinstance(item, dict)
- ]
- return []
- def _extract_usage_json(payload: dict[str, JSONValue]) -> dict[str, JSONValue]:
- usage = payload.get("usage")
- if isinstance(usage, dict):
- return {str(key): value for key, value in usage.items()}
- return {}
|