|
|
@@ -1,16 +1,29 @@
|
|
|
import asyncio
|
|
|
-from typing import Annotated
|
|
|
+import json
|
|
|
+from datetime import datetime
|
|
|
+from time import perf_counter
|
|
|
+from typing import Annotated, AsyncIterator
|
|
|
+from uuid import uuid4
|
|
|
|
|
|
from core_domain import ServiceDescriptor, ServiceHealth
|
|
|
-from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
|
|
|
+import httpx
|
|
|
+from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, StreamingResponse
|
|
|
from sqlalchemy import text
|
|
|
from sqlalchemy.orm import Session
|
|
|
+from pydantic import BaseModel
|
|
|
|
|
|
from app.bootstrap.settings import ApiGatewaySettings
|
|
|
from app.db.session import get_db
|
|
|
-from app.domain.repositories import ApiKeyRepository, GatewayRequestAuditRepository
|
|
|
+from app.domain.repositories import (
|
|
|
+ ApiKeyRepository,
|
|
|
+ AppApiKeyRepository,
|
|
|
+ AppDefinitionRepository,
|
|
|
+ AppInvocationAuditRepository,
|
|
|
+ GatewayRequestAuditRepository,
|
|
|
+)
|
|
|
from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key
|
|
|
from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy
|
|
|
+from core_shared.security import build_internal_service_headers
|
|
|
from app.schemas.gateway import (
|
|
|
ApiKeyCreateRequest,
|
|
|
ApiKeyCreateResponse,
|
|
|
@@ -18,16 +31,52 @@ from app.schemas.gateway import (
|
|
|
ApiKeyResponse,
|
|
|
ApiKeyStatusPostRequest,
|
|
|
ApiKeyStatusUpdateRequest,
|
|
|
+ AppApiKeyCreateRequest,
|
|
|
+ AppApiKeyCreateResponse,
|
|
|
+ AppApiKeyListRequest,
|
|
|
+ AppApiKeyResponse,
|
|
|
+ AppApiKeyStatusUpdateRequest,
|
|
|
+ AppAuditListRequest,
|
|
|
+ AppCreateRequest,
|
|
|
+ AppDetailRequest,
|
|
|
+ AppInvocationAuditResponse,
|
|
|
+ AppListRequest,
|
|
|
+ AppResponse,
|
|
|
+ AppStatusUpdateRequest,
|
|
|
+ AppUpdateRequest,
|
|
|
GatewayAuditServiceStats,
|
|
|
GatewayAuditStatsResponse,
|
|
|
GatewayRequestAuditResponse,
|
|
|
GatewayServicesHealthResponse,
|
|
|
+ OpenApiChatRequest,
|
|
|
+ OpenApiChatResponse,
|
|
|
)
|
|
|
|
|
|
router = APIRouter()
|
|
|
DbSession = Annotated[Session, Depends(get_db)]
|
|
|
|
|
|
|
|
|
+class SessionExecuteRequest(BaseModel):
|
|
|
+ session_id: str
|
|
|
+ message_text: str
|
|
|
+ stream: bool = False
|
|
|
+
|
|
|
+
|
|
|
+class SessionExecuteResponse(BaseModel):
|
|
|
+ session_id: str
|
|
|
+ run_request_id: str
|
|
|
+ target_type: str
|
|
|
+ target_id: str
|
|
|
+ target_config_id: str | None = None
|
|
|
+ request_status: str
|
|
|
+ user_message_id: str
|
|
|
+ assistant_message_id: str | None = None
|
|
|
+ agent_run_id: str | None = None
|
|
|
+ team_run_id: str | None = None
|
|
|
+ output_text: str | None = None
|
|
|
+ error_message: str | None = None
|
|
|
+
|
|
|
+
|
|
|
@router.get("/health", response_model=ServiceDescriptor)
|
|
|
def health_check(db: DbSession) -> ServiceDescriptor:
|
|
|
db.execute(text("SELECT 1"))
|
|
|
@@ -241,6 +290,862 @@ async def downstream_health_check(
|
|
|
downstream_services=downstream_services)
|
|
|
|
|
|
|
|
|
+@router.post("/gateway/sessions/execute")
|
|
|
+async def execute_session(
|
|
|
+ payload: SessionExecuteRequest,
|
|
|
+ request: Request,
|
|
|
+ settings: GatewaySettingsDep):
|
|
|
+ if payload.stream:
|
|
|
+ return StreamingResponse(
|
|
|
+ _stream_session_execute(payload, request, settings),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+
|
|
|
+ targets = build_proxy_targets(settings)
|
|
|
+ session_target = targets["session-service"]
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
+ team_target = targets["team-service"]
|
|
|
+ headers = _build_internal_headers(request, settings)
|
|
|
+
|
|
|
+ async with httpx_client(settings.proxy_timeout_seconds) as client:
|
|
|
+ session = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=session_target,
|
|
|
+ path="detail",
|
|
|
+ payload={"session_id": payload.session_id},
|
|
|
+ headers=headers)
|
|
|
+
|
|
|
+ target_type = _get_string(session, "runtime_target_type")
|
|
|
+ target_id = _get_string(session, "runtime_target_id")
|
|
|
+ target_config_id = _get_optional_string(session, "runtime_target_config_id")
|
|
|
+ if target_type not in {"agent", "team"} or not target_id:
|
|
|
+ raise HTTPException(status_code=422, detail="session runtime target is not configured")
|
|
|
+
|
|
|
+ run_request_payload = {
|
|
|
+ "target_type": target_type,
|
|
|
+ "target_id": target_id,
|
|
|
+ "target_config_id": target_config_id,
|
|
|
+ "mode": "production",
|
|
|
+ "input_text": payload.message_text,
|
|
|
+ }
|
|
|
+ run_request = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=session_target,
|
|
|
+ path="run-requests",
|
|
|
+ payload={
|
|
|
+ "session_id": payload.session_id,
|
|
|
+ "app_config_id": target_config_id or target_id,
|
|
|
+ "workflow_config_id": target_id,
|
|
|
+ "trigger_type": "chat",
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
+ "request_status": "accepted",
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
+
|
|
|
+ user_message = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=session_target,
|
|
|
+ path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": payload.session_id,
|
|
|
+ "turn_id": run_request_id,
|
|
|
+ "role": "user",
|
|
|
+ "content_type": "text",
|
|
|
+ "content_text": payload.message_text,
|
|
|
+ "content_json": {},
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ user_message_id = _get_string(user_message, "id")
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=session_target,
|
|
|
+ path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id,
|
|
|
+ "request_status": "running",
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload,
|
|
|
+ "user_message_id": user_message_id,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+
|
|
|
+ assistant_message_id: str | None = None
|
|
|
+ agent_run_id: str | None = None
|
|
|
+ team_run_id: str | None = None
|
|
|
+ output_text: str | None = None
|
|
|
+ error_message: str | None = None
|
|
|
+ request_status = "completed"
|
|
|
+
|
|
|
+ try:
|
|
|
+ if target_type == "agent":
|
|
|
+ agent_run = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=agent_target,
|
|
|
+ path="runs",
|
|
|
+ payload={
|
|
|
+ "agent_id": target_id,
|
|
|
+ "agent_config_id": target_config_id,
|
|
|
+ "session_id": payload.session_id,
|
|
|
+ "input_text": payload.message_text,
|
|
|
+ "input_json": {
|
|
|
+ "source": "session",
|
|
|
+ "run_request_id": run_request_id,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
+ execute_result = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=agent_target,
|
|
|
+ path="runs/execute",
|
|
|
+ payload={
|
|
|
+ "agent_run_id": agent_run_id,
|
|
|
+ "dry_run": False,
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
+ else:
|
|
|
+ team_run = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=team_target,
|
|
|
+ path="runs",
|
|
|
+ payload={
|
|
|
+ "team_id": target_id,
|
|
|
+ "team_config_id": target_config_id,
|
|
|
+ "session_id": payload.session_id,
|
|
|
+ "input_text": payload.message_text,
|
|
|
+ "input_json": {
|
|
|
+ "source": "session",
|
|
|
+ "run_request_id": run_request_id,
|
|
|
+ },
|
|
|
+ "enqueue": True,
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ team_run_id = _get_string(team_run, "id")
|
|
|
+ execute_result = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=team_target,
|
|
|
+ path=f"runs/{team_run_id}/execute",
|
|
|
+ payload={
|
|
|
+ "dry_run": False,
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
+
|
|
|
+ if error_message:
|
|
|
+ request_status = "failed"
|
|
|
+
|
|
|
+ if output_text:
|
|
|
+ assistant_message = await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=session_target,
|
|
|
+ path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": payload.session_id,
|
|
|
+ "turn_id": run_request_id,
|
|
|
+ "role": "assistant",
|
|
|
+ "content_type": "text",
|
|
|
+ "content_text": output_text,
|
|
|
+ "content_json": {},
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+ assistant_message_id = _get_string(assistant_message, "id")
|
|
|
+ except HTTPException as exc:
|
|
|
+ request_status = "failed"
|
|
|
+ error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client,
|
|
|
+ target=session_target,
|
|
|
+ path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id,
|
|
|
+ "request_status": request_status,
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload,
|
|
|
+ "user_message_id": user_message_id,
|
|
|
+ "assistant_message_id": assistant_message_id,
|
|
|
+ "agent_run_id": agent_run_id,
|
|
|
+ "team_run_id": team_run_id,
|
|
|
+ "output_text": output_text,
|
|
|
+ "error_message": error_message,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ headers=headers)
|
|
|
+
|
|
|
+ return SessionExecuteResponse(
|
|
|
+ session_id=payload.session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=target_type,
|
|
|
+ target_id=target_id,
|
|
|
+ target_config_id=target_config_id,
|
|
|
+ request_status=request_status,
|
|
|
+ user_message_id=user_message_id,
|
|
|
+ assistant_message_id=assistant_message_id,
|
|
|
+ agent_run_id=agent_run_id,
|
|
|
+ team_run_id=team_run_id,
|
|
|
+ output_text=output_text,
|
|
|
+ error_message=error_message)
|
|
|
+
|
|
|
+
|
|
|
+async def _stream_session_execute(
|
|
|
+ payload: SessionExecuteRequest,
|
|
|
+ request: Request,
|
|
|
+ settings: ApiGatewaySettings):
|
|
|
+ targets = build_proxy_targets(settings)
|
|
|
+ session_target = targets["session-service"]
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
+ team_target = targets["team-service"]
|
|
|
+ headers = _build_internal_headers(request, settings)
|
|
|
+ client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds)
|
|
|
+
|
|
|
+ try:
|
|
|
+ session = await _post_json(
|
|
|
+ client=client, target=session_target, path="detail",
|
|
|
+ payload={"session_id": payload.session_id}, headers=headers)
|
|
|
+ target_type = _get_string(session, "runtime_target_type")
|
|
|
+ target_id = _get_string(session, "runtime_target_id")
|
|
|
+ target_config_id = _get_optional_string(session, "runtime_target_config_id")
|
|
|
+ if target_type not in {"agent", "team"} or not target_id:
|
|
|
+ raise HTTPException(status_code=422, detail="session runtime target is not configured")
|
|
|
+
|
|
|
+ run_request_payload = {
|
|
|
+ "target_type": target_type, "target_id": target_id,
|
|
|
+ "target_config_id": target_config_id, "mode": "production",
|
|
|
+ "input_text": payload.message_text,
|
|
|
+ }
|
|
|
+ run_request = await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests",
|
|
|
+ payload={
|
|
|
+ "session_id": payload.session_id,
|
|
|
+ "app_config_id": target_config_id or target_id,
|
|
|
+ "workflow_config_id": target_id,
|
|
|
+ "trigger_type": "chat",
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
+ "request_status": "accepted",
|
|
|
+ }, headers=headers)
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
+
|
|
|
+ user_message = await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": payload.session_id, "turn_id": run_request_id,
|
|
|
+ "role": "user", "content_type": "text",
|
|
|
+ "content_text": payload.message_text, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+ user_message_id = _get_string(user_message, "id")
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": "running",
|
|
|
+ "request_payload_json": {**run_request_payload, "user_message_id": user_message_id},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ yield _sse("session.execute.started", {
|
|
|
+ "run_request_id": run_request_id, "user_message_id": user_message_id,
|
|
|
+ "target_type": target_type, "target_id": target_id,
|
|
|
+ })
|
|
|
+
|
|
|
+ output_text = ""
|
|
|
+ error_message: str | None = None
|
|
|
+ agent_run_id: str | None = None
|
|
|
+ team_run_id: str | None = None
|
|
|
+
|
|
|
+ if target_type == "agent":
|
|
|
+ agent_run = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "agent_id": target_id, "agent_config_id": target_config_id,
|
|
|
+ "session_id": payload.session_id, "input_text": payload.message_text,
|
|
|
+ "input_json": {"source": "session", "run_request_id": run_request_id},
|
|
|
+ }, headers=headers)
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
+
|
|
|
+ stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream")
|
|
|
+ async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
|
|
|
+ if not resp.is_success:
|
|
|
+ error_message = await _read_stream_error(resp)
|
|
|
+ else:
|
|
|
+ async for ev_name, ev_data in _parse_sse(resp):
|
|
|
+ data = json.loads(ev_data)
|
|
|
+ yield _sse(ev_name, data)
|
|
|
+ if ev_name == "agent.run.delta" and isinstance(data.get("text"), str):
|
|
|
+ output_text += data["text"]
|
|
|
+ elif ev_name == "agent.run.completed":
|
|
|
+ run_data = data.get("run", data)
|
|
|
+ if not output_text and isinstance(run_data.get("output_text"), str):
|
|
|
+ output_text = run_data["output_text"]
|
|
|
+ elif ev_name == "agent.run.failed":
|
|
|
+ error_message = data.get("error_message", "Agent execution failed")
|
|
|
+ if not isinstance(error_message, str):
|
|
|
+ error_message = "Agent execution failed"
|
|
|
+ else:
|
|
|
+ team_run = await _post_json(
|
|
|
+ client=client, target=team_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "team_id": target_id, "team_config_id": target_config_id,
|
|
|
+ "session_id": payload.session_id, "input_text": payload.message_text,
|
|
|
+ "input_json": {"source": "session", "run_request_id": run_request_id},
|
|
|
+ "enqueue": True,
|
|
|
+ }, headers=headers)
|
|
|
+ team_run_id = _get_string(team_run, "id")
|
|
|
+
|
|
|
+ stream_url = _target_url(team_target, f"runs/{team_run_id}/execute-stream")
|
|
|
+ async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
|
|
|
+ if not resp.is_success:
|
|
|
+ error_message = await _read_stream_error(resp)
|
|
|
+ else:
|
|
|
+ async for ev_name, ev_data in _parse_sse(resp):
|
|
|
+ data = json.loads(ev_data)
|
|
|
+ yield _sse(ev_name, data)
|
|
|
+ if ev_name == "team.run.delta" and isinstance(data.get("text"), str):
|
|
|
+ output_text += data["text"]
|
|
|
+ elif ev_name == "team.run.completed":
|
|
|
+ run_data = data.get("run", data)
|
|
|
+ if not output_text and isinstance(run_data.get("output_text"), str):
|
|
|
+ output_text = run_data["output_text"]
|
|
|
+ elif ev_name == "team.run.failed":
|
|
|
+ error_message = data.get("error_message", "Team execution failed")
|
|
|
+ if not isinstance(error_message, str):
|
|
|
+ error_message = "Team execution failed"
|
|
|
+
|
|
|
+ request_status = "failed" if error_message else "completed"
|
|
|
+ assistant_message_id: str | None = None
|
|
|
+ if output_text:
|
|
|
+ assistant_message = await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": payload.session_id, "turn_id": run_request_id,
|
|
|
+ "role": "assistant", "content_type": "text",
|
|
|
+ "content_text": output_text, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+ assistant_message_id = _get_string(assistant_message, "id")
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload, "user_message_id": user_message_id,
|
|
|
+ "assistant_message_id": assistant_message_id,
|
|
|
+ "agent_run_id": agent_run_id, "team_run_id": team_run_id,
|
|
|
+ "output_text": output_text, "error_message": error_message,
|
|
|
+ },
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ yield _sse("session.execute.completed", {
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
+ "assistant_message_id": assistant_message_id, "output_text": output_text,
|
|
|
+ "error_message": error_message,
|
|
|
+ })
|
|
|
+
|
|
|
+ except HTTPException as exc:
|
|
|
+ detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+ yield _sse("session.execute.failed", {"error_message": detail})
|
|
|
+ except Exception as exc:
|
|
|
+ yield _sse("session.execute.failed", {"error_message": str(exc)})
|
|
|
+ finally:
|
|
|
+ await client.aclose()
|
|
|
+
|
|
|
+
|
|
|
+# ── Application Admin Routes ─────────────────────────────────────────────────
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps", response_model=AppResponse)
|
|
|
+def create_app(payload: AppCreateRequest, db: DbSession) -> AppResponse:
|
|
|
+ existing = AppDefinitionRepository(db).get_by_code(code=payload.code)
|
|
|
+ if existing is not None:
|
|
|
+ raise HTTPException(status_code=409, detail=f"app code already exists: {payload.code}")
|
|
|
+ entity = AppDefinitionRepository(db).create(
|
|
|
+ code=payload.code,
|
|
|
+ name=payload.name,
|
|
|
+ description=payload.description,
|
|
|
+ target_type=payload.target_type,
|
|
|
+ target_id=payload.target_id,
|
|
|
+ owner_user_id=payload.owner_user_id,
|
|
|
+ settings_json=payload.settings_json)
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/list", response_model=list[AppResponse])
|
|
|
+def list_apps(payload: AppListRequest, db: DbSession) -> list[AppResponse]:
|
|
|
+ return [AppResponse.from_entity(e) for e in AppDefinitionRepository(db).list_all()]
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/detail", response_model=AppResponse)
|
|
|
+def get_app_detail(payload: AppDetailRequest, db: DbSession) -> AppResponse:
|
|
|
+ entity = AppDefinitionRepository(db).get_by_id(app_id=payload.app_id)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {payload.app_id}")
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/update", response_model=AppResponse)
|
|
|
+def update_app(payload: AppUpdateRequest, db: DbSession) -> AppResponse:
|
|
|
+ entity = AppDefinitionRepository(db).update(
|
|
|
+ app_id=payload.app_id,
|
|
|
+ name=payload.name,
|
|
|
+ description=payload.description,
|
|
|
+ target_type=payload.target_type,
|
|
|
+ target_id=payload.target_id,
|
|
|
+ settings_json=payload.settings_json)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {payload.app_id}")
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/status", response_model=AppResponse)
|
|
|
+def update_app_status(payload: AppStatusUpdateRequest, db: DbSession) -> AppResponse:
|
|
|
+ entity = AppDefinitionRepository(db).update_status(app_id=payload.app_id, status=payload.status)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {payload.app_id}")
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/api-keys", response_model=AppApiKeyCreateResponse)
|
|
|
+def create_app_api_key(app_id: str, payload: AppApiKeyCreateRequest, db: DbSession) -> AppApiKeyCreateResponse:
|
|
|
+ app_entity = AppDefinitionRepository(db).get_by_id(app_id=app_id)
|
|
|
+ if app_entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {app_id}")
|
|
|
+ api_key = generate_api_key()
|
|
|
+ entity = AppApiKeyRepository(db).create(
|
|
|
+ app_id=app_id,
|
|
|
+ name=payload.name,
|
|
|
+ key_prefix=get_api_key_prefix(api_key),
|
|
|
+ key_hash=hash_api_key(api_key),
|
|
|
+ scopes=payload.scopes,
|
|
|
+ expires_time=payload.expires_time)
|
|
|
+ return AppApiKeyCreateResponse(
|
|
|
+ id=entity.id,
|
|
|
+ app_id=entity.app_id,
|
|
|
+ name=entity.name,
|
|
|
+ key_prefix=entity.key_prefix,
|
|
|
+ api_key=api_key,
|
|
|
+ status=entity.status,
|
|
|
+ scopes=entity.scopes,
|
|
|
+ expires_time=entity.expires_time,
|
|
|
+ created_time=entity.created_time)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/api-keys/list", response_model=list[AppApiKeyResponse])
|
|
|
+def list_app_api_keys(app_id: str, payload: AppApiKeyListRequest, db: DbSession) -> list[AppApiKeyResponse]:
|
|
|
+ return [AppApiKeyResponse.from_entity(e) for e in AppApiKeyRepository(db).list_by_app(app_id=app_id)]
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/api-keys/status", response_model=AppApiKeyResponse)
|
|
|
+def update_app_api_key_status(app_id: str, payload: AppApiKeyStatusUpdateRequest, db: DbSession) -> AppApiKeyResponse:
|
|
|
+ entity = AppApiKeyRepository(db).update_status(api_key_id=payload.api_key_id, status=payload.status)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"api key not found: {payload.api_key_id}")
|
|
|
+ return AppApiKeyResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/audits", response_model=list[AppInvocationAuditResponse])
|
|
|
+def list_app_audits(app_id: str, payload: AppAuditListRequest, db: DbSession) -> list[AppInvocationAuditResponse]:
|
|
|
+ return [
|
|
|
+ AppInvocationAuditResponse.from_entity(e)
|
|
|
+ for e in AppInvocationAuditRepository(db).list_by_app(app_id=app_id, limit=payload.limit)
|
|
|
+ ]
|
|
|
+
|
|
|
+
|
|
|
+# ── OpenAPI External Invocation ──────────────────────────────────────────────
|
|
|
+
|
|
|
+
|
|
|
+def _authenticate_app_api_key(request: Request, db: Session):
|
|
|
+ settings = ApiGatewaySettings()
|
|
|
+ token: str | None = None
|
|
|
+ authorization = request.headers.get("authorization")
|
|
|
+ if authorization:
|
|
|
+ scheme, _, t = authorization.partition(" ")
|
|
|
+ if scheme.lower() == "bearer" and t.strip():
|
|
|
+ token = t.strip()
|
|
|
+ if token is None:
|
|
|
+ token = request.headers.get(settings.api_key_header_name)
|
|
|
+ if not token:
|
|
|
+ raise HTTPException(status_code=401, detail="missing bearer token or api key")
|
|
|
+
|
|
|
+ key_hash = hash_api_key(token)
|
|
|
+ key_entity = AppApiKeyRepository(db).get_active_by_hash(key_hash=key_hash)
|
|
|
+ if key_entity is None:
|
|
|
+ raise HTTPException(status_code=401, detail="invalid api key")
|
|
|
+ if key_entity.expires_time is not None and key_entity.expires_time <= datetime.utcnow():
|
|
|
+ raise HTTPException(status_code=401, detail="api key expired")
|
|
|
+
|
|
|
+ app_entity = AppDefinitionRepository(db).get_by_id(app_id=key_entity.app_id)
|
|
|
+ if app_entity is None:
|
|
|
+ raise HTTPException(status_code=401, detail="app not found")
|
|
|
+ if app_entity.status != "published":
|
|
|
+ raise HTTPException(status_code=403, detail=f"app is {app_entity.status}, not published")
|
|
|
+
|
|
|
+ AppApiKeyRepository(db).touch_last_used_time(api_key_id=key_entity.id)
|
|
|
+ return key_entity, app_entity
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/openapi/apps/{app_code}/chat", response_model=OpenApiChatResponse)
|
|
|
+async def openapi_chat(app_code: str, payload: OpenApiChatRequest, request: Request, db: DbSession):
|
|
|
+ start = perf_counter()
|
|
|
+ request_id = str(uuid4())
|
|
|
+ key_entity, app_entity = _authenticate_app_api_key(request, db)
|
|
|
+ if app_entity.code != app_code:
|
|
|
+ raise HTTPException(status_code=403, detail="api key does not belong to this app")
|
|
|
+
|
|
|
+ targets = build_proxy_targets(ApiGatewaySettings())
|
|
|
+ session_target = targets["session-service"]
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
+ team_target = targets["team-service"]
|
|
|
+ headers = _build_internal_headers(request, ApiGatewaySettings())
|
|
|
+
|
|
|
+ async with httpx_client(ApiGatewaySettings().proxy_timeout_seconds) as client:
|
|
|
+ session_id = payload.session_id
|
|
|
+ if not session_id:
|
|
|
+ session_data = await _post_json(
|
|
|
+ client=client, target=session_target, path="",
|
|
|
+ payload={
|
|
|
+ "app_id": app_entity.id,
|
|
|
+ "user_id": payload.user_id or "openapi",
|
|
|
+ "channel_type": "openapi",
|
|
|
+ "runtime_target_type": app_entity.target_type,
|
|
|
+ "runtime_target_id": app_entity.target_id,
|
|
|
+ }, headers=headers)
|
|
|
+ session_id = _get_string(session_data, "id")
|
|
|
+
|
|
|
+ run_request_payload = {
|
|
|
+ "target_type": app_entity.target_type,
|
|
|
+ "target_id": app_entity.target_id,
|
|
|
+ "mode": "production",
|
|
|
+ "input_text": payload.message,
|
|
|
+ }
|
|
|
+ run_request = await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id,
|
|
|
+ "app_config_id": app_entity.target_id,
|
|
|
+ "workflow_config_id": app_entity.target_id,
|
|
|
+ "trigger_type": "chat",
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
+ "request_status": "accepted",
|
|
|
+ }, headers=headers)
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
+
|
|
|
+ user_message = await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "user", "content_type": "text",
|
|
|
+ "content_text": payload.message, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": "running",
|
|
|
+ "request_payload_json": {**run_request_payload, "user_message_id": _get_string(user_message, "id")},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ output_text: str | None = None
|
|
|
+ error_message: str | None = None
|
|
|
+ request_status = "completed"
|
|
|
+
|
|
|
+ try:
|
|
|
+ if app_entity.target_type == "agent":
|
|
|
+ agent_run = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "agent_id": app_entity.target_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "input_text": payload.message,
|
|
|
+ "input_json": {"source": "openapi", "run_request_id": run_request_id},
|
|
|
+ }, headers=headers)
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
+ execute_result = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs/execute",
|
|
|
+ payload={"agent_run_id": agent_run_id, "dry_run": False}, headers=headers)
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
+ else:
|
|
|
+ team_run = await _post_json(
|
|
|
+ client=client, target=team_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "team_id": app_entity.target_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "input_text": payload.message,
|
|
|
+ "input_json": {"source": "openapi", "run_request_id": run_request_id},
|
|
|
+ "enqueue": True,
|
|
|
+ }, headers=headers)
|
|
|
+ team_run_id = _get_string(team_run, "id")
|
|
|
+ execute_result = await _post_json(
|
|
|
+ client=client, target=team_target, path=f"runs/{team_run_id}/execute",
|
|
|
+ payload={"dry_run": False}, headers=headers)
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
+
|
|
|
+ if error_message:
|
|
|
+ request_status = "failed"
|
|
|
+
|
|
|
+ if output_text:
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "assistant", "content_type": "text",
|
|
|
+ "content_text": output_text, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+ except HTTPException as exc:
|
|
|
+ request_status = "failed"
|
|
|
+ error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload,
|
|
|
+ "user_message_id": _get_string(user_message, "id"),
|
|
|
+ "output_text": output_text, "error_message": error_message,
|
|
|
+ },
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ duration_ms = int((perf_counter() - start) * 1000)
|
|
|
+ AppInvocationAuditRepository(db).create(
|
|
|
+ app_id=app_entity.id,
|
|
|
+ api_key_prefix=key_entity.key_prefix,
|
|
|
+ request_id=request_id,
|
|
|
+ session_id=session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=app_entity.target_type,
|
|
|
+ target_id=app_entity.target_id,
|
|
|
+ invoke_type="sync",
|
|
|
+ status=request_status,
|
|
|
+ duration_ms=duration_ms,
|
|
|
+ error_message=error_message,
|
|
|
+ client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None)
|
|
|
+
|
|
|
+ return OpenApiChatResponse(
|
|
|
+ request_id=request_id,
|
|
|
+ app_code=app_entity.code,
|
|
|
+ session_id=session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=app_entity.target_type,
|
|
|
+ target_id=app_entity.target_id,
|
|
|
+ status=request_status,
|
|
|
+ output_text=output_text,
|
|
|
+ error=error_message)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/openapi/apps/{app_code}/chat/stream")
|
|
|
+async def openapi_chat_stream(app_code: str, payload: OpenApiChatRequest, request: Request):
|
|
|
+ settings = ApiGatewaySettings()
|
|
|
+ session_factory = request.app.state.session_factory
|
|
|
+
|
|
|
+ auth_db = session_factory()
|
|
|
+ try:
|
|
|
+ key_entity, app_entity = _authenticate_app_api_key(request, auth_db)
|
|
|
+ except HTTPException as exc:
|
|
|
+ auth_db.close()
|
|
|
+ detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+ return StreamingResponse(
|
|
|
+ _single_sse("failed", {"status": "failed", "error_code": "auth_error", "error_message": detail}),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+ finally:
|
|
|
+ auth_db.close()
|
|
|
+
|
|
|
+ if app_entity.code != app_code:
|
|
|
+ return StreamingResponse(
|
|
|
+ _single_sse("failed", {"status": "failed", "error_code": "forbidden", "error_message": "api key does not belong to this app"}),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+ if app_entity.target_type != "agent":
|
|
|
+ return StreamingResponse(
|
|
|
+ _single_sse("failed", {"status": "failed", "error_code": "unsupported", "error_message": "streaming is only supported for agent targets in V0.1"}),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+
|
|
|
+ return StreamingResponse(
|
|
|
+ _stream_openapi_chat(app_code, payload, request, key_entity, app_entity, session_factory, settings),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+
|
|
|
+
|
|
|
+def _single_sse(event: str, data: dict) -> AsyncIterator[str]:
|
|
|
+ async def _gen():
|
|
|
+ yield _sse(event, data)
|
|
|
+ return _gen()
|
|
|
+
|
|
|
+
|
|
|
+async def _stream_openapi_chat(
|
|
|
+ app_code: str,
|
|
|
+ payload: OpenApiChatRequest,
|
|
|
+ request: Request,
|
|
|
+ key_entity,
|
|
|
+ app_entity,
|
|
|
+ session_factory,
|
|
|
+ settings: ApiGatewaySettings):
|
|
|
+ start = perf_counter()
|
|
|
+ request_id = str(uuid4())
|
|
|
+
|
|
|
+ targets = build_proxy_targets(settings)
|
|
|
+ session_target = targets["session-service"]
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
+ headers = _build_internal_headers(request, settings)
|
|
|
+ client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds)
|
|
|
+
|
|
|
+ output_text = ""
|
|
|
+ error_message: str | None = None
|
|
|
+ session_id: str | None = None
|
|
|
+ run_request_id: str | None = None
|
|
|
+ request_status = "failed"
|
|
|
+
|
|
|
+ try:
|
|
|
+ session_id = payload.session_id
|
|
|
+ if not session_id:
|
|
|
+ session_data = await _post_json(
|
|
|
+ client=client, target=session_target, path="",
|
|
|
+ payload={
|
|
|
+ "app_id": app_entity.id,
|
|
|
+ "user_id": payload.user_id or "openapi",
|
|
|
+ "channel_type": "openapi",
|
|
|
+ "runtime_target_type": app_entity.target_type,
|
|
|
+ "runtime_target_id": app_entity.target_id,
|
|
|
+ }, headers=headers)
|
|
|
+ session_id = _get_string(session_data, "id")
|
|
|
+
|
|
|
+ run_request_payload = {
|
|
|
+ "target_type": app_entity.target_type,
|
|
|
+ "target_id": app_entity.target_id,
|
|
|
+ "mode": "production",
|
|
|
+ "input_text": payload.message,
|
|
|
+ }
|
|
|
+ run_request = await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id,
|
|
|
+ "app_config_id": app_entity.target_id,
|
|
|
+ "workflow_config_id": app_entity.target_id,
|
|
|
+ "trigger_type": "chat",
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
+ "request_status": "accepted",
|
|
|
+ }, headers=headers)
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
+
|
|
|
+ user_message = await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "user", "content_type": "text",
|
|
|
+ "content_text": payload.message, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+ user_message_id = _get_string(user_message, "id")
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": "running",
|
|
|
+ "request_payload_json": {**run_request_payload, "user_message_id": user_message_id},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ yield _sse("started", {
|
|
|
+ "request_id": request_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "run_request_id": run_request_id})
|
|
|
+
|
|
|
+ agent_run = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "agent_id": app_entity.target_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "input_text": payload.message,
|
|
|
+ "input_json": {"source": "openapi", "run_request_id": run_request_id},
|
|
|
+ }, headers=headers)
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
+
|
|
|
+ stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream")
|
|
|
+ async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
|
|
|
+ if not resp.is_success:
|
|
|
+ error_message = await _read_stream_error(resp)
|
|
|
+ else:
|
|
|
+ async for ev_name, ev_data in _parse_sse(resp):
|
|
|
+ data = json.loads(ev_data)
|
|
|
+ if ev_name == "agent.run.delta":
|
|
|
+ text_chunk = data.get("text", "")
|
|
|
+ yield _sse("delta", {"text": text_chunk})
|
|
|
+ if isinstance(text_chunk, str):
|
|
|
+ output_text += text_chunk
|
|
|
+ elif ev_name == "agent.run.completed":
|
|
|
+ run_data = data.get("run", data)
|
|
|
+ final_text = _get_optional_string(run_data, "output_text")
|
|
|
+ if not output_text and final_text:
|
|
|
+ output_text = final_text
|
|
|
+ yield _sse("completed", {"status": "completed", "output_text": output_text})
|
|
|
+ elif ev_name == "agent.run.failed":
|
|
|
+ msg = data.get("error_message", "Agent execution failed")
|
|
|
+ if not isinstance(msg, str):
|
|
|
+ msg = "Agent execution failed"
|
|
|
+ error_message = msg
|
|
|
+ yield _sse("failed", {"status": "failed", "error_code": "agent_error", "error_message": msg})
|
|
|
+ else:
|
|
|
+ yield _sse(ev_name, data)
|
|
|
+
|
|
|
+ request_status = "failed" if error_message else "completed"
|
|
|
+
|
|
|
+ if output_text:
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "assistant", "content_type": "text",
|
|
|
+ "content_text": output_text, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload, "user_message_id": user_message_id,
|
|
|
+ "output_text": output_text, "error_message": error_message,
|
|
|
+ },
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ except HTTPException as exc:
|
|
|
+ detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+ yield _sse("failed", {"status": "failed", "error_code": "gateway_error", "error_message": detail})
|
|
|
+ except Exception as exc:
|
|
|
+ yield _sse("failed", {"status": "failed", "error_code": "internal_error", "error_message": str(exc)})
|
|
|
+ finally:
|
|
|
+ await client.aclose()
|
|
|
+ duration_ms = int((perf_counter() - start) * 1000)
|
|
|
+ audit_db = session_factory()
|
|
|
+ try:
|
|
|
+ AppInvocationAuditRepository(audit_db).create(
|
|
|
+ app_id=app_entity.id,
|
|
|
+ api_key_prefix=key_entity.key_prefix,
|
|
|
+ request_id=request_id,
|
|
|
+ session_id=session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=app_entity.target_type,
|
|
|
+ target_id=app_entity.target_id,
|
|
|
+ invoke_type="stream",
|
|
|
+ status=request_status,
|
|
|
+ duration_ms=duration_ms,
|
|
|
+ error_message=error_message,
|
|
|
+ client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ finally:
|
|
|
+ audit_db.close()
|
|
|
+
|
|
|
+
|
|
|
@router.api_route(
|
|
|
"/gateway/sessions",
|
|
|
methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
|
|
|
@@ -477,3 +1382,126 @@ async def proxy_code_runner_service(
|
|
|
request=request,
|
|
|
target=build_proxy_targets(settings)["code-runner-service"],
|
|
|
path=path)
|
|
|
+
|
|
|
+
|
|
|
+def _build_internal_headers(request: Request, settings: ApiGatewaySettings) -> dict[str, str]:
|
|
|
+ headers = build_internal_service_headers(settings)
|
|
|
+ authorization = request.headers.get("authorization")
|
|
|
+ user_id = request.headers.get("x-user-id")
|
|
|
+ if authorization:
|
|
|
+ headers["authorization"] = authorization
|
|
|
+ if user_id:
|
|
|
+ headers["x-user-id"] = user_id
|
|
|
+ return headers
|
|
|
+
|
|
|
+
|
|
|
+def httpx_client(timeout_seconds: float) -> httpx.AsyncClient:
|
|
|
+ return httpx.AsyncClient(timeout=timeout_seconds)
|
|
|
+
|
|
|
+
|
|
|
+async def _post_json(
|
|
|
+ *,
|
|
|
+ client: httpx.AsyncClient,
|
|
|
+ target: ProxyTarget,
|
|
|
+ path: str,
|
|
|
+ payload: dict[str, object],
|
|
|
+ headers: dict[str, str]) -> dict[str, object]:
|
|
|
+ url = _target_url(target, path)
|
|
|
+ try:
|
|
|
+ response = await client.post(url, headers=headers, json=payload)
|
|
|
+ except httpx.HTTPError as exc:
|
|
|
+ raise HTTPException(status_code=502, detail=f"{target.service_name} request failed: {exc}") from exc
|
|
|
+ if not response.is_success:
|
|
|
+ raise HTTPException(status_code=response.status_code, detail=_error_detail(response))
|
|
|
+ data = response.json()
|
|
|
+ if not isinstance(data, dict):
|
|
|
+ raise HTTPException(status_code=502, detail=f"{target.service_name} returned unexpected response")
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+def _target_url(target: ProxyTarget, path: str) -> str:
|
|
|
+ normalized_path = path.strip("/")
|
|
|
+ if normalized_path:
|
|
|
+ return f"{target.base_url.rstrip('/')}{target.path_prefix}/{normalized_path}"
|
|
|
+ return f"{target.base_url.rstrip('/')}{target.path_prefix}"
|
|
|
+
|
|
|
+
|
|
|
+def _error_detail(response: httpx.Response) -> str:
|
|
|
+ try:
|
|
|
+ payload = response.json()
|
|
|
+ except ValueError:
|
|
|
+ return response.text or f"downstream request failed with {response.status_code}"
|
|
|
+ if isinstance(payload, dict):
|
|
|
+ detail = payload.get("detail")
|
|
|
+ if isinstance(detail, str):
|
|
|
+ return detail
|
|
|
+ error = payload.get("error")
|
|
|
+ if isinstance(error, dict):
|
|
|
+ message = error.get("message")
|
|
|
+ if isinstance(message, str):
|
|
|
+ return message
|
|
|
+ return response.text or f"downstream request failed with {response.status_code}"
|
|
|
+
|
|
|
+
|
|
|
+def _get_string(payload: dict[str, object], key: str) -> str:
|
|
|
+ value = payload.get(key)
|
|
|
+ if not isinstance(value, str) or not value:
|
|
|
+ raise HTTPException(status_code=502, detail=f"downstream response missing {key}")
|
|
|
+ return value
|
|
|
+
|
|
|
+
|
|
|
+def _get_optional_string(payload: dict[str, object], key: str) -> str | None:
|
|
|
+ value = payload.get(key)
|
|
|
+ return value if isinstance(value, str) and value else None
|
|
|
+
|
|
|
+
|
|
|
+def _get_dict(payload: dict[str, object], key: str) -> dict[str, object]:
|
|
|
+ value = payload.get(key)
|
|
|
+ if not isinstance(value, dict):
|
|
|
+ raise HTTPException(status_code=502, detail=f"downstream response missing {key}")
|
|
|
+ return value
|
|
|
+
|
|
|
+
|
|
|
+def _resolve_output_text(run_payload: dict[str, object]) -> str | None:
|
|
|
+ output_text = _get_optional_string(run_payload, "output_text")
|
|
|
+ if output_text:
|
|
|
+ return output_text
|
|
|
+ output_json = run_payload.get("output_json")
|
|
|
+ if isinstance(output_json, dict) and output_json:
|
|
|
+ return json.dumps(output_json, ensure_ascii=False)
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def _sse(event: str, data: dict) -> str:
|
|
|
+ return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
|
|
+
|
|
|
+
|
|
|
+async def _parse_sse(response: httpx.Response):
|
|
|
+ current_event = "message"
|
|
|
+ current_data = ""
|
|
|
+ async for line in response.aiter_lines():
|
|
|
+ if line.startswith("event:"):
|
|
|
+ current_event = line[6:].strip()
|
|
|
+ elif line.startswith("data:"):
|
|
|
+ current_data = line[5:].strip()
|
|
|
+ elif line == "":
|
|
|
+ if current_data:
|
|
|
+ yield current_event, current_data
|
|
|
+ current_event = "message"
|
|
|
+ current_data = ""
|
|
|
+ if current_data:
|
|
|
+ yield current_event, current_data
|
|
|
+
|
|
|
+
|
|
|
+async def _read_stream_error(response: httpx.Response) -> str:
|
|
|
+ body = await response.aread()
|
|
|
+ text = body.decode(errors="replace")
|
|
|
+ try:
|
|
|
+ data = json.loads(text)
|
|
|
+ if isinstance(data, dict):
|
|
|
+ detail = data.get("detail")
|
|
|
+ if isinstance(detail, str):
|
|
|
+ return detail
|
|
|
+ except (ValueError, UnicodeDecodeError):
|
|
|
+ pass
|
|
|
+ return text or f"downstream error {response.status_code}"
|