import asyncio import json from typing import Annotated from core_domain import ServiceDescriptor, ServiceHealth import httpx from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, StreamingResponse from sqlalchemy import text from sqlalchemy.orm import Session from pydantic import BaseModel from app.bootstrap.settings import ApiGatewaySettings from app.db.session import get_db from app.domain.repositories import ApiKeyRepository, GatewayRequestAuditRepository from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy from core_shared.security import build_internal_service_headers from app.schemas.gateway import ( ApiKeyCreateRequest, ApiKeyCreateResponse, ApiKeyListRequest, ApiKeyResponse, ApiKeyStatusPostRequest, ApiKeyStatusUpdateRequest, GatewayAuditServiceStats, GatewayAuditStatsResponse, GatewayRequestAuditResponse, GatewayServicesHealthResponse, ) router = APIRouter() DbSession = Annotated[Session, Depends(get_db)] class SessionExecuteRequest(BaseModel): session_id: str message_text: str stream: bool = False class SessionExecuteResponse(BaseModel): session_id: str run_request_id: str target_type: str target_id: str target_config_id: str | None = None request_status: str user_message_id: str assistant_message_id: str | None = None agent_run_id: str | None = None team_run_id: str | None = None output_text: str | None = None error_message: str | None = None @router.get("/health", response_model=ServiceDescriptor) def health_check(db: DbSession) -> ServiceDescriptor: db.execute(text("SELECT 1")) return ServiceDescriptor(name="api-gateway") @router.get("/ready", response_model=ServiceHealth) def readiness_check(db: DbSession) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="api-gateway", status="ok", database="ok") @router.post("/gateway/api-keys", response_model=ApiKeyCreateResponse) def create_api_key( payload: ApiKeyCreateRequest, db: DbSession) -> ApiKeyCreateResponse: api_key = generate_api_key() entity = ApiKeyRepository(db).create( name=payload.name, key_prefix=get_api_key_prefix(api_key), key_hash=hash_api_key(api_key), scopes=payload.scopes, expires_time=payload.expires_time) return ApiKeyCreateResponse( id=entity.id, name=entity.name, key_prefix=entity.key_prefix, api_key=api_key, status=entity.status, scopes=entity.scopes, expires_time=entity.expires_time, created_time=entity.created_time) @router.get("/gateway/api-keys", response_model=list[ApiKeyResponse]) def list_api_keys( db: DbSession) -> list[ApiKeyResponse]: return [ ApiKeyResponse.from_entity(item) for item in ApiKeyRepository(db).list_all() ] @router.post("/gateway/api-keys/list", response_model=list[ApiKeyResponse]) def list_api_keys_post( payload: ApiKeyListRequest, db: DbSession) -> list[ApiKeyResponse]: return [ ApiKeyResponse.from_entity(item) for item in ApiKeyRepository(db).list_all() ] @router.patch("/gateway/api-keys/{api_key_id}/status", response_model=ApiKeyResponse) def update_api_key_status( api_key_id: str, payload: ApiKeyStatusUpdateRequest, db: DbSession) -> ApiKeyResponse: entity = ApiKeyRepository(db).update_status( api_key_id=api_key_id, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=f"api key not found: {api_key_id}") return ApiKeyResponse.from_entity(entity) @router.post("/gateway/api-keys/status", response_model=ApiKeyResponse) def update_api_key_status_post( payload: ApiKeyStatusPostRequest, db: DbSession) -> ApiKeyResponse: entity = ApiKeyRepository(db).update_status( api_key_id=payload.api_key_id, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=f"api key not found: {payload.api_key_id}") return ApiKeyResponse.from_entity(entity) @router.get("/gateway/audits", response_model=list[GatewayRequestAuditResponse]) def list_gateway_audits( db: DbSession, request_id: Annotated[str | None, Query()] = None, target_service: Annotated[str | None, Query()] = None, limit: Annotated[int, Query(ge=1, le=500)] = 100) -> list[GatewayRequestAuditResponse]: items = GatewayRequestAuditRepository(db).list_by_scope( request_id=request_id, target_service=target_service, limit=limit) return [GatewayRequestAuditResponse.from_entity(item) for item in items] @router.get("/gateway/audits/stats", response_model=GatewayAuditStatsResponse) def gateway_audit_stats( db: DbSession) -> GatewayAuditStatsResponse: rows = GatewayRequestAuditRepository(db).stats_by_service() services = [ GatewayAuditServiceStats( target_service=target_service, request_count=request_count, error_count=error_count, average_duration_ms=round(average_duration_ms, 2)) for target_service, request_count, error_count, average_duration_ms in rows ] return GatewayAuditStatsResponse( total_request_count=sum(item.request_count for item in services), total_error_count=sum(item.error_count for item in services), services=services) def get_gateway_settings() -> ApiGatewaySettings: return ApiGatewaySettings() def get_service_proxy( settings: Annotated[ApiGatewaySettings, Depends(get_gateway_settings)]) -> ServiceProxy: return ServiceProxy(settings=settings, timeout_seconds=settings.proxy_timeout_seconds) GatewaySettingsDep = Annotated[ApiGatewaySettings, Depends(get_gateway_settings)] ServiceProxyDep = Annotated[ServiceProxy, Depends(get_service_proxy)] def build_proxy_targets(settings: ApiGatewaySettings) -> dict[ProxyServiceName, ProxyTarget]: return { "session-service": ProxyTarget( service_name="session-service", base_url=settings.session_service_url, path_prefix="/sessions", health_path="/sessions/health"), "tool-service": ProxyTarget( service_name="tool-service", base_url=settings.tool_service_url, path_prefix="/tools", health_path="/tools/health"), "model-gateway-service": ProxyTarget( service_name="model-gateway-service", base_url=settings.model_gateway_service_url, path_prefix="/models", health_path="/models/health"), "model-provider-service": ProxyTarget( service_name="model-provider-service", base_url=settings.model_gateway_service_url, path_prefix="/models/providers", health_path="/models/health"), "code-runner-service": ProxyTarget( service_name="code-runner-service", base_url=settings.code_runner_service_url, path_prefix="/code", health_path="/code/health"), "agent-service": ProxyTarget( service_name="agent-service", base_url=settings.agent_service_url, path_prefix="/agents", health_path="/agents/health"), "memory-service": ProxyTarget( service_name="memory-service", base_url=settings.memory_service_url, path_prefix="/memories", health_path="/memories/health"), "team-service": ProxyTarget( service_name="team-service", base_url=settings.team_service_url, path_prefix="/teams", health_path="/teams/health"), "skill-service": ProxyTarget( service_name="skill-service", base_url=settings.skill_service_url, path_prefix="/skills", health_path="/skills/health"), "human-service": ProxyTarget( service_name="human-service", base_url=settings.human_service_url, path_prefix="/human", health_path="/human/health"), "knowledge-service": ProxyTarget( service_name="knowledge-service", base_url=settings.knowledge_service_url, path_prefix="/knowledge", health_path="/knowledge/health"), "event-service": ProxyTarget( service_name="event-service", base_url=settings.event_service_url, path_prefix="/events", health_path="/events/health"), "identity-service": ProxyTarget( service_name="identity-service", base_url=settings.auth_service_url, path_prefix="/identity", health_path="/identity/health"), "scheduler-service": ProxyTarget( service_name="scheduler-service", base_url=settings.scheduler_service_url, path_prefix="/scheduler", health_path="/scheduler/health"), } @router.get("/gateway/services/health", response_model=GatewayServicesHealthResponse) async def downstream_health_check( settings: GatewaySettingsDep) -> GatewayServicesHealthResponse: targets = build_proxy_targets(settings) health_proxy = ServiceProxy( settings=settings, timeout_seconds=settings.downstream_health_timeout_seconds) downstream_services = await asyncio.gather( *[health_proxy.check_health(target) for target in targets.values()] ) status = "ok" if all(item.status == "ok" for item in downstream_services) else "degraded" return GatewayServicesHealthResponse( status=status, downstream_services=downstream_services) @router.post("/gateway/sessions/execute") async def execute_session( payload: SessionExecuteRequest, request: Request, settings: GatewaySettingsDep): if payload.stream: return StreamingResponse( _stream_session_execute(payload, request, settings), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) targets = build_proxy_targets(settings) session_target = targets["session-service"] agent_target = targets["agent-service"] team_target = targets["team-service"] headers = _build_internal_headers(request, settings) async with httpx_client(settings.proxy_timeout_seconds) as client: session = await _post_json( client=client, target=session_target, path="detail", payload={"session_id": payload.session_id}, headers=headers) target_type = _get_string(session, "runtime_target_type") target_id = _get_string(session, "runtime_target_id") target_config_id = _get_optional_string(session, "runtime_target_config_id") if target_type not in {"agent", "team"} or not target_id: raise HTTPException(status_code=422, detail="session runtime target is not configured") run_request_payload = { "target_type": target_type, "target_id": target_id, "target_config_id": target_config_id, "mode": "production", "input_text": payload.message_text, } run_request = await _post_json( client=client, target=session_target, path="run-requests", payload={ "session_id": payload.session_id, "app_config_id": target_config_id or target_id, "workflow_config_id": target_id, "trigger_type": "chat", "request_payload_json": run_request_payload, "request_status": "accepted", }, headers=headers) run_request_id = _get_string(run_request, "id") user_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "user", "content_type": "text", "content_text": payload.message_text, "content_json": {}, }, headers=headers) user_message_id = _get_string(user_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": "running", "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, }, }, headers=headers) assistant_message_id: str | None = None agent_run_id: str | None = None team_run_id: str | None = None output_text: str | None = None error_message: str | None = None request_status = "completed" try: if target_type == "agent": agent_run = await _post_json( client=client, target=agent_target, path="runs", payload={ "agent_id": target_id, "agent_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": { "source": "session", "run_request_id": run_request_id, }, }, headers=headers) agent_run_id = _get_string(agent_run, "id") execute_result = await _post_json( client=client, target=agent_target, path="runs/execute", payload={ "agent_run_id": agent_run_id, "dry_run": False, }, headers=headers) run_data = _get_dict(execute_result, "run") output_text = _resolve_output_text(run_data) error_message = _get_optional_string(run_data, "error_message") else: team_run = await _post_json( client=client, target=team_target, path="runs", payload={ "team_id": target_id, "team_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": { "source": "session", "run_request_id": run_request_id, }, "enqueue": True, }, headers=headers) team_run_id = _get_string(team_run, "id") execute_result = await _post_json( client=client, target=team_target, path=f"runs/{team_run_id}/execute", payload={ "dry_run": False, }, headers=headers) run_data = _get_dict(execute_result, "run") output_text = _resolve_output_text(run_data) error_message = _get_optional_string(run_data, "error_message") if error_message: request_status = "failed" if output_text: assistant_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "assistant", "content_type": "text", "content_text": output_text, "content_json": {}, }, headers=headers) assistant_message_id = _get_string(assistant_message, "id") except HTTPException as exc: request_status = "failed" error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": request_status, "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, "assistant_message_id": assistant_message_id, "agent_run_id": agent_run_id, "team_run_id": team_run_id, "output_text": output_text, "error_message": error_message, }, }, headers=headers) return SessionExecuteResponse( session_id=payload.session_id, run_request_id=run_request_id, target_type=target_type, target_id=target_id, target_config_id=target_config_id, request_status=request_status, user_message_id=user_message_id, assistant_message_id=assistant_message_id, agent_run_id=agent_run_id, team_run_id=team_run_id, output_text=output_text, error_message=error_message) async def _stream_session_execute( payload: SessionExecuteRequest, request: Request, settings: ApiGatewaySettings): targets = build_proxy_targets(settings) session_target = targets["session-service"] agent_target = targets["agent-service"] team_target = targets["team-service"] headers = _build_internal_headers(request, settings) client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds) try: session = await _post_json( client=client, target=session_target, path="detail", payload={"session_id": payload.session_id}, headers=headers) target_type = _get_string(session, "runtime_target_type") target_id = _get_string(session, "runtime_target_id") target_config_id = _get_optional_string(session, "runtime_target_config_id") if target_type not in {"agent", "team"} or not target_id: raise HTTPException(status_code=422, detail="session runtime target is not configured") run_request_payload = { "target_type": target_type, "target_id": target_id, "target_config_id": target_config_id, "mode": "production", "input_text": payload.message_text, } run_request = await _post_json( client=client, target=session_target, path="run-requests", payload={ "session_id": payload.session_id, "app_config_id": target_config_id or target_id, "workflow_config_id": target_id, "trigger_type": "chat", "request_payload_json": run_request_payload, "request_status": "accepted", }, headers=headers) run_request_id = _get_string(run_request, "id") user_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "user", "content_type": "text", "content_text": payload.message_text, "content_json": {}, }, headers=headers) user_message_id = _get_string(user_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": "running", "request_payload_json": {**run_request_payload, "user_message_id": user_message_id}, }, headers=headers) yield _sse("session.execute.started", { "run_request_id": run_request_id, "user_message_id": user_message_id, "target_type": target_type, "target_id": target_id, }) output_text = "" error_message: str | None = None agent_run_id: str | None = None team_run_id: str | None = None if target_type == "agent": agent_run = await _post_json( client=client, target=agent_target, path="runs", payload={ "agent_id": target_id, "agent_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": {"source": "session", "run_request_id": run_request_id}, }, headers=headers) agent_run_id = _get_string(agent_run, "id") stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream") async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp: if not resp.is_success: error_message = await _read_stream_error(resp) else: async for ev_name, ev_data in _parse_sse(resp): data = json.loads(ev_data) yield _sse(ev_name, data) if ev_name == "agent.run.delta" and isinstance(data.get("text"), str): output_text += data["text"] elif ev_name == "agent.run.completed": run_data = data.get("run", data) if not output_text and isinstance(run_data.get("output_text"), str): output_text = run_data["output_text"] elif ev_name == "agent.run.failed": error_message = data.get("error_message", "Agent execution failed") if not isinstance(error_message, str): error_message = "Agent execution failed" else: team_run = await _post_json( client=client, target=team_target, path="runs", payload={ "team_id": target_id, "team_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": {"source": "session", "run_request_id": run_request_id}, "enqueue": True, }, headers=headers) team_run_id = _get_string(team_run, "id") stream_url = _target_url(team_target, f"runs/{team_run_id}/execute-stream") async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp: if not resp.is_success: error_message = await _read_stream_error(resp) else: async for ev_name, ev_data in _parse_sse(resp): data = json.loads(ev_data) yield _sse(ev_name, data) if ev_name == "team.run.delta" and isinstance(data.get("text"), str): output_text += data["text"] elif ev_name == "team.run.completed": run_data = data.get("run", data) if not output_text and isinstance(run_data.get("output_text"), str): output_text = run_data["output_text"] elif ev_name == "team.run.failed": error_message = data.get("error_message", "Team execution failed") if not isinstance(error_message, str): error_message = "Team execution failed" request_status = "failed" if error_message else "completed" assistant_message_id: str | None = None if output_text: assistant_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "assistant", "content_type": "text", "content_text": output_text, "content_json": {}, }, headers=headers) assistant_message_id = _get_string(assistant_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": request_status, "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, "assistant_message_id": assistant_message_id, "agent_run_id": agent_run_id, "team_run_id": team_run_id, "output_text": output_text, "error_message": error_message, }, }, headers=headers) yield _sse("session.execute.completed", { "run_request_id": run_request_id, "request_status": request_status, "assistant_message_id": assistant_message_id, "output_text": output_text, "error_message": error_message, }) except HTTPException as exc: detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) yield _sse("session.execute.failed", {"error_message": detail}) except Exception as exc: yield _sse("session.execute.failed", {"error_message": str(exc)}) finally: await client.aclose() @router.api_route( "/gateway/sessions", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/sessions/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_session_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["session-service"], path=path) @router.api_route( "/gateway/agents", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/agents/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_agent_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["agent-service"], path=path) @router.api_route( "/gateway/memories", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/memories/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_memory_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["memory-service"], path=path) @router.api_route( "/gateway/teams", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/teams/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_team_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["team-service"], path=path) @router.api_route( "/gateway/skills", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/skills/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_skill_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["skill-service"], path=path) @router.api_route( "/gateway/human", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/human/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_human_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["human-service"], path=path) @router.api_route( "/gateway/knowledge", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/knowledge/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_knowledge_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["knowledge-service"], path=path) @router.api_route( "/gateway/events", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/events/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_event_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["event-service"], path=path) @router.api_route( "/gateway/identity", methods=["POST"]) @router.api_route( "/gateway/identity/{path:path}", methods=["POST"]) async def proxy_identity_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["identity-service"], path=path) @router.api_route( "/gateway/scheduler", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/scheduler/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_scheduler_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["scheduler-service"], path=path) @router.api_route( "/gateway/tools", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/tools/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_tool_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["tool-service"], path=path) @router.api_route( "/gateway/models", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/models/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_model_gateway_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["model-gateway-service"], path=path) @router.api_route( "/gateway/model-providers", methods=["POST"]) @router.api_route( "/gateway/model-providers/{path:path}", methods=["POST"]) async def proxy_model_provider_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["model-provider-service"], path=path) @router.api_route( "/gateway/code", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/code/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_code_runner_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["code-runner-service"], path=path) def _build_internal_headers(request: Request, settings: ApiGatewaySettings) -> dict[str, str]: headers = build_internal_service_headers(settings) authorization = request.headers.get("authorization") user_id = request.headers.get("x-user-id") if authorization: headers["authorization"] = authorization if user_id: headers["x-user-id"] = user_id return headers def httpx_client(timeout_seconds: float) -> httpx.AsyncClient: return httpx.AsyncClient(timeout=timeout_seconds) async def _post_json( *, client: httpx.AsyncClient, target: ProxyTarget, path: str, payload: dict[str, object], headers: dict[str, str]) -> dict[str, object]: url = _target_url(target, path) try: response = await client.post(url, headers=headers, json=payload) except httpx.HTTPError as exc: raise HTTPException(status_code=502, detail=f"{target.service_name} request failed: {exc}") from exc if not response.is_success: raise HTTPException(status_code=response.status_code, detail=_error_detail(response)) data = response.json() if not isinstance(data, dict): raise HTTPException(status_code=502, detail=f"{target.service_name} returned unexpected response") return data def _target_url(target: ProxyTarget, path: str) -> str: normalized_path = path.strip("/") if normalized_path: return f"{target.base_url.rstrip('/')}{target.path_prefix}/{normalized_path}" return f"{target.base_url.rstrip('/')}{target.path_prefix}" def _error_detail(response: httpx.Response) -> str: try: payload = response.json() except ValueError: return response.text or f"downstream request failed with {response.status_code}" if isinstance(payload, dict): detail = payload.get("detail") if isinstance(detail, str): return detail error = payload.get("error") if isinstance(error, dict): message = error.get("message") if isinstance(message, str): return message return response.text or f"downstream request failed with {response.status_code}" def _get_string(payload: dict[str, object], key: str) -> str: value = payload.get(key) if not isinstance(value, str) or not value: raise HTTPException(status_code=502, detail=f"downstream response missing {key}") return value def _get_optional_string(payload: dict[str, object], key: str) -> str | None: value = payload.get(key) return value if isinstance(value, str) and value else None def _get_dict(payload: dict[str, object], key: str) -> dict[str, object]: value = payload.get(key) if not isinstance(value, dict): raise HTTPException(status_code=502, detail=f"downstream response missing {key}") return value def _resolve_output_text(run_payload: dict[str, object]) -> str | None: output_text = _get_optional_string(run_payload, "output_text") if output_text: return output_text output_json = run_payload.get("output_json") if isinstance(output_json, dict) and output_json: return json.dumps(output_json, ensure_ascii=False) return None def _sse(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" async def _parse_sse(response: httpx.Response): current_event = "message" current_data = "" async for line in response.aiter_lines(): if line.startswith("event:"): current_event = line[6:].strip() elif line.startswith("data:"): current_data = line[5:].strip() elif line == "": if current_data: yield current_event, current_data current_event = "message" current_data = "" if current_data: yield current_event, current_data async def _read_stream_error(response: httpx.Response) -> str: body = await response.aread() text = body.decode(errors="replace") try: data = json.loads(text) if isinstance(data, dict): detail = data.get("detail") if isinstance(detail, str): return detail except (ValueError, UnicodeDecodeError): pass return text or f"downstream error {response.status_code}"