|
@@ -1,16 +1,20 @@
|
|
|
import asyncio
|
|
import asyncio
|
|
|
|
|
+import json
|
|
|
from typing import Annotated
|
|
from typing import Annotated
|
|
|
|
|
|
|
|
from core_domain import ServiceDescriptor, ServiceHealth
|
|
from core_domain import ServiceDescriptor, ServiceHealth
|
|
|
-from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
|
|
|
|
|
|
|
+import httpx
|
|
|
|
|
+from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, StreamingResponse
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy import text
|
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
+from pydantic import BaseModel
|
|
|
|
|
|
|
|
from app.bootstrap.settings import ApiGatewaySettings
|
|
from app.bootstrap.settings import ApiGatewaySettings
|
|
|
from app.db.session import get_db
|
|
from app.db.session import get_db
|
|
|
from app.domain.repositories import ApiKeyRepository, GatewayRequestAuditRepository
|
|
from app.domain.repositories import ApiKeyRepository, GatewayRequestAuditRepository
|
|
|
from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key
|
|
from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key
|
|
|
from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy
|
|
from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy
|
|
|
|
|
+from core_shared.security import build_internal_service_headers
|
|
|
from app.schemas.gateway import (
|
|
from app.schemas.gateway import (
|
|
|
ApiKeyCreateRequest,
|
|
ApiKeyCreateRequest,
|
|
|
ApiKeyCreateResponse,
|
|
ApiKeyCreateResponse,
|
|
@@ -28,6 +32,27 @@ router = APIRouter()
|
|
|
DbSession = Annotated[Session, Depends(get_db)]
|
|
DbSession = Annotated[Session, Depends(get_db)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+class SessionExecuteRequest(BaseModel):
|
|
|
|
|
+ session_id: str
|
|
|
|
|
+ message_text: str
|
|
|
|
|
+ stream: bool = False
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class SessionExecuteResponse(BaseModel):
|
|
|
|
|
+ session_id: str
|
|
|
|
|
+ run_request_id: str
|
|
|
|
|
+ target_type: str
|
|
|
|
|
+ target_id: str
|
|
|
|
|
+ target_config_id: str | None = None
|
|
|
|
|
+ request_status: str
|
|
|
|
|
+ user_message_id: str
|
|
|
|
|
+ assistant_message_id: str | None = None
|
|
|
|
|
+ agent_run_id: str | None = None
|
|
|
|
|
+ team_run_id: str | None = None
|
|
|
|
|
+ output_text: str | None = None
|
|
|
|
|
+ error_message: str | None = None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@router.get("/health", response_model=ServiceDescriptor)
|
|
@router.get("/health", response_model=ServiceDescriptor)
|
|
|
def health_check(db: DbSession) -> ServiceDescriptor:
|
|
def health_check(db: DbSession) -> ServiceDescriptor:
|
|
|
db.execute(text("SELECT 1"))
|
|
db.execute(text("SELECT 1"))
|
|
@@ -241,6 +266,372 @@ async def downstream_health_check(
|
|
|
downstream_services=downstream_services)
|
|
downstream_services=downstream_services)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+@router.post("/gateway/sessions/execute")
|
|
|
|
|
+async def execute_session(
|
|
|
|
|
+ payload: SessionExecuteRequest,
|
|
|
|
|
+ request: Request,
|
|
|
|
|
+ settings: GatewaySettingsDep):
|
|
|
|
|
+ if payload.stream:
|
|
|
|
|
+ return StreamingResponse(
|
|
|
|
|
+ _stream_session_execute(payload, request, settings),
|
|
|
|
|
+ media_type="text/event-stream",
|
|
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
|
|
+
|
|
|
|
|
+ targets = build_proxy_targets(settings)
|
|
|
|
|
+ session_target = targets["session-service"]
|
|
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
|
|
+ team_target = targets["team-service"]
|
|
|
|
|
+ headers = _build_internal_headers(request, settings)
|
|
|
|
|
+
|
|
|
|
|
+ async with httpx_client(settings.proxy_timeout_seconds) as client:
|
|
|
|
|
+ session = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=session_target,
|
|
|
|
|
+ path="detail",
|
|
|
|
|
+ payload={"session_id": payload.session_id},
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+
|
|
|
|
|
+ target_type = _get_string(session, "runtime_target_type")
|
|
|
|
|
+ target_id = _get_string(session, "runtime_target_id")
|
|
|
|
|
+ target_config_id = _get_optional_string(session, "runtime_target_config_id")
|
|
|
|
|
+ if target_type not in {"agent", "team"} or not target_id:
|
|
|
|
|
+ raise HTTPException(status_code=422, detail="session runtime target is not configured")
|
|
|
|
|
+
|
|
|
|
|
+ run_request_payload = {
|
|
|
|
|
+ "target_type": target_type,
|
|
|
|
|
+ "target_id": target_id,
|
|
|
|
|
+ "target_config_id": target_config_id,
|
|
|
|
|
+ "mode": "production",
|
|
|
|
|
+ "input_text": payload.message_text,
|
|
|
|
|
+ }
|
|
|
|
|
+ run_request = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=session_target,
|
|
|
|
|
+ path="run-requests",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "session_id": payload.session_id,
|
|
|
|
|
+ "app_config_id": target_config_id or target_id,
|
|
|
|
|
+ "workflow_config_id": target_id,
|
|
|
|
|
+ "trigger_type": "chat",
|
|
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
|
|
+ "request_status": "accepted",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
|
|
+
|
|
|
|
|
+ user_message = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=session_target,
|
|
|
|
|
+ path="messages",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "session_id": payload.session_id,
|
|
|
|
|
+ "turn_id": run_request_id,
|
|
|
|
|
+ "role": "user",
|
|
|
|
|
+ "content_type": "text",
|
|
|
|
|
+ "content_text": payload.message_text,
|
|
|
|
|
+ "content_json": {},
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ user_message_id = _get_string(user_message, "id")
|
|
|
|
|
+
|
|
|
|
|
+ await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=session_target,
|
|
|
|
|
+ path="run-requests/update",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "run_request_id": run_request_id,
|
|
|
|
|
+ "request_status": "running",
|
|
|
|
|
+ "request_payload_json": {
|
|
|
|
|
+ **run_request_payload,
|
|
|
|
|
+ "user_message_id": user_message_id,
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+
|
|
|
|
|
+ assistant_message_id: str | None = None
|
|
|
|
|
+ agent_run_id: str | None = None
|
|
|
|
|
+ team_run_id: str | None = None
|
|
|
|
|
+ output_text: str | None = None
|
|
|
|
|
+ error_message: str | None = None
|
|
|
|
|
+ request_status = "completed"
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ if target_type == "agent":
|
|
|
|
|
+ agent_run = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=agent_target,
|
|
|
|
|
+ path="runs",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "agent_id": target_id,
|
|
|
|
|
+ "agent_config_id": target_config_id,
|
|
|
|
|
+ "session_id": payload.session_id,
|
|
|
|
|
+ "input_text": payload.message_text,
|
|
|
|
|
+ "input_json": {
|
|
|
|
|
+ "source": "session",
|
|
|
|
|
+ "run_request_id": run_request_id,
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
|
|
+ execute_result = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=agent_target,
|
|
|
|
|
+ path="runs/execute",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "agent_run_id": agent_run_id,
|
|
|
|
|
+ "dry_run": False,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
|
|
+ else:
|
|
|
|
|
+ team_run = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=team_target,
|
|
|
|
|
+ path="runs",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "team_id": target_id,
|
|
|
|
|
+ "team_config_id": target_config_id,
|
|
|
|
|
+ "session_id": payload.session_id,
|
|
|
|
|
+ "input_text": payload.message_text,
|
|
|
|
|
+ "input_json": {
|
|
|
|
|
+ "source": "session",
|
|
|
|
|
+ "run_request_id": run_request_id,
|
|
|
|
|
+ },
|
|
|
|
|
+ "enqueue": True,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ team_run_id = _get_string(team_run, "id")
|
|
|
|
|
+ execute_result = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=team_target,
|
|
|
|
|
+ path=f"runs/{team_run_id}/execute",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "dry_run": False,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
|
|
+
|
|
|
|
|
+ if error_message:
|
|
|
|
|
+ request_status = "failed"
|
|
|
|
|
+
|
|
|
|
|
+ if output_text:
|
|
|
|
|
+ assistant_message = await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=session_target,
|
|
|
|
|
+ path="messages",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "session_id": payload.session_id,
|
|
|
|
|
+ "turn_id": run_request_id,
|
|
|
|
|
+ "role": "assistant",
|
|
|
|
|
+ "content_type": "text",
|
|
|
|
|
+ "content_text": output_text,
|
|
|
|
|
+ "content_json": {},
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+ assistant_message_id = _get_string(assistant_message, "id")
|
|
|
|
|
+ except HTTPException as exc:
|
|
|
|
|
+ request_status = "failed"
|
|
|
|
|
+ error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
|
|
+
|
|
|
|
|
+ await _post_json(
|
|
|
|
|
+ client=client,
|
|
|
|
|
+ target=session_target,
|
|
|
|
|
+ path="run-requests/update",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "run_request_id": run_request_id,
|
|
|
|
|
+ "request_status": request_status,
|
|
|
|
|
+ "request_payload_json": {
|
|
|
|
|
+ **run_request_payload,
|
|
|
|
|
+ "user_message_id": user_message_id,
|
|
|
|
|
+ "assistant_message_id": assistant_message_id,
|
|
|
|
|
+ "agent_run_id": agent_run_id,
|
|
|
|
|
+ "team_run_id": team_run_id,
|
|
|
|
|
+ "output_text": output_text,
|
|
|
|
|
+ "error_message": error_message,
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ headers=headers)
|
|
|
|
|
+
|
|
|
|
|
+ return SessionExecuteResponse(
|
|
|
|
|
+ session_id=payload.session_id,
|
|
|
|
|
+ run_request_id=run_request_id,
|
|
|
|
|
+ target_type=target_type,
|
|
|
|
|
+ target_id=target_id,
|
|
|
|
|
+ target_config_id=target_config_id,
|
|
|
|
|
+ request_status=request_status,
|
|
|
|
|
+ user_message_id=user_message_id,
|
|
|
|
|
+ assistant_message_id=assistant_message_id,
|
|
|
|
|
+ agent_run_id=agent_run_id,
|
|
|
|
|
+ team_run_id=team_run_id,
|
|
|
|
|
+ output_text=output_text,
|
|
|
|
|
+ error_message=error_message)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _stream_session_execute(
|
|
|
|
|
+ payload: SessionExecuteRequest,
|
|
|
|
|
+ request: Request,
|
|
|
|
|
+ settings: ApiGatewaySettings):
|
|
|
|
|
+ targets = build_proxy_targets(settings)
|
|
|
|
|
+ session_target = targets["session-service"]
|
|
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
|
|
+ team_target = targets["team-service"]
|
|
|
|
|
+ headers = _build_internal_headers(request, settings)
|
|
|
|
|
+ client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ session = await _post_json(
|
|
|
|
|
+ client=client, target=session_target, path="detail",
|
|
|
|
|
+ payload={"session_id": payload.session_id}, headers=headers)
|
|
|
|
|
+ target_type = _get_string(session, "runtime_target_type")
|
|
|
|
|
+ target_id = _get_string(session, "runtime_target_id")
|
|
|
|
|
+ target_config_id = _get_optional_string(session, "runtime_target_config_id")
|
|
|
|
|
+ if target_type not in {"agent", "team"} or not target_id:
|
|
|
|
|
+ raise HTTPException(status_code=422, detail="session runtime target is not configured")
|
|
|
|
|
+
|
|
|
|
|
+ run_request_payload = {
|
|
|
|
|
+ "target_type": target_type, "target_id": target_id,
|
|
|
|
|
+ "target_config_id": target_config_id, "mode": "production",
|
|
|
|
|
+ "input_text": payload.message_text,
|
|
|
|
|
+ }
|
|
|
|
|
+ run_request = await _post_json(
|
|
|
|
|
+ client=client, target=session_target, path="run-requests",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "session_id": payload.session_id,
|
|
|
|
|
+ "app_config_id": target_config_id or target_id,
|
|
|
|
|
+ "workflow_config_id": target_id,
|
|
|
|
|
+ "trigger_type": "chat",
|
|
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
|
|
+ "request_status": "accepted",
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
|
|
+
|
|
|
|
|
+ user_message = await _post_json(
|
|
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "session_id": payload.session_id, "turn_id": run_request_id,
|
|
|
|
|
+ "role": "user", "content_type": "text",
|
|
|
|
|
+ "content_text": payload.message_text, "content_json": {},
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+ user_message_id = _get_string(user_message, "id")
|
|
|
|
|
+
|
|
|
|
|
+ await _post_json(
|
|
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "run_request_id": run_request_id, "request_status": "running",
|
|
|
|
|
+ "request_payload_json": {**run_request_payload, "user_message_id": user_message_id},
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+
|
|
|
|
|
+ yield _sse("session.execute.started", {
|
|
|
|
|
+ "run_request_id": run_request_id, "user_message_id": user_message_id,
|
|
|
|
|
+ "target_type": target_type, "target_id": target_id,
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ output_text = ""
|
|
|
|
|
+ error_message: str | None = None
|
|
|
|
|
+ agent_run_id: str | None = None
|
|
|
|
|
+ team_run_id: str | None = None
|
|
|
|
|
+
|
|
|
|
|
+ if target_type == "agent":
|
|
|
|
|
+ agent_run = await _post_json(
|
|
|
|
|
+ client=client, target=agent_target, path="runs",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "agent_id": target_id, "agent_config_id": target_config_id,
|
|
|
|
|
+ "session_id": payload.session_id, "input_text": payload.message_text,
|
|
|
|
|
+ "input_json": {"source": "session", "run_request_id": run_request_id},
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
|
|
+
|
|
|
|
|
+ stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream")
|
|
|
|
|
+ async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
|
|
|
|
|
+ if not resp.is_success:
|
|
|
|
|
+ error_message = await _read_stream_error(resp)
|
|
|
|
|
+ else:
|
|
|
|
|
+ async for ev_name, ev_data in _parse_sse(resp):
|
|
|
|
|
+ data = json.loads(ev_data)
|
|
|
|
|
+ yield _sse(ev_name, data)
|
|
|
|
|
+ if ev_name == "agent.run.delta" and isinstance(data.get("text"), str):
|
|
|
|
|
+ output_text += data["text"]
|
|
|
|
|
+ elif ev_name == "agent.run.completed":
|
|
|
|
|
+ run_data = data.get("run", data)
|
|
|
|
|
+ if not output_text and isinstance(run_data.get("output_text"), str):
|
|
|
|
|
+ output_text = run_data["output_text"]
|
|
|
|
|
+ elif ev_name == "agent.run.failed":
|
|
|
|
|
+ error_message = data.get("error_message", "Agent execution failed")
|
|
|
|
|
+ if not isinstance(error_message, str):
|
|
|
|
|
+ error_message = "Agent execution failed"
|
|
|
|
|
+ else:
|
|
|
|
|
+ team_run = await _post_json(
|
|
|
|
|
+ client=client, target=team_target, path="runs",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "team_id": target_id, "team_config_id": target_config_id,
|
|
|
|
|
+ "session_id": payload.session_id, "input_text": payload.message_text,
|
|
|
|
|
+ "input_json": {"source": "session", "run_request_id": run_request_id},
|
|
|
|
|
+ "enqueue": True,
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+ team_run_id = _get_string(team_run, "id")
|
|
|
|
|
+
|
|
|
|
|
+ stream_url = _target_url(team_target, f"runs/{team_run_id}/execute-stream")
|
|
|
|
|
+ async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
|
|
|
|
|
+ if not resp.is_success:
|
|
|
|
|
+ error_message = await _read_stream_error(resp)
|
|
|
|
|
+ else:
|
|
|
|
|
+ async for ev_name, ev_data in _parse_sse(resp):
|
|
|
|
|
+ data = json.loads(ev_data)
|
|
|
|
|
+ yield _sse(ev_name, data)
|
|
|
|
|
+ if ev_name == "team.run.delta" and isinstance(data.get("text"), str):
|
|
|
|
|
+ output_text += data["text"]
|
|
|
|
|
+ elif ev_name == "team.run.completed":
|
|
|
|
|
+ run_data = data.get("run", data)
|
|
|
|
|
+ if not output_text and isinstance(run_data.get("output_text"), str):
|
|
|
|
|
+ output_text = run_data["output_text"]
|
|
|
|
|
+ elif ev_name == "team.run.failed":
|
|
|
|
|
+ error_message = data.get("error_message", "Team execution failed")
|
|
|
|
|
+ if not isinstance(error_message, str):
|
|
|
|
|
+ error_message = "Team execution failed"
|
|
|
|
|
+
|
|
|
|
|
+ request_status = "failed" if error_message else "completed"
|
|
|
|
|
+ assistant_message_id: str | None = None
|
|
|
|
|
+ if output_text:
|
|
|
|
|
+ assistant_message = await _post_json(
|
|
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "session_id": payload.session_id, "turn_id": run_request_id,
|
|
|
|
|
+ "role": "assistant", "content_type": "text",
|
|
|
|
|
+ "content_text": output_text, "content_json": {},
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+ assistant_message_id = _get_string(assistant_message, "id")
|
|
|
|
|
+
|
|
|
|
|
+ await _post_json(
|
|
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
|
|
+ "request_payload_json": {
|
|
|
|
|
+ **run_request_payload, "user_message_id": user_message_id,
|
|
|
|
|
+ "assistant_message_id": assistant_message_id,
|
|
|
|
|
+ "agent_run_id": agent_run_id, "team_run_id": team_run_id,
|
|
|
|
|
+ "output_text": output_text, "error_message": error_message,
|
|
|
|
|
+ },
|
|
|
|
|
+ }, headers=headers)
|
|
|
|
|
+
|
|
|
|
|
+ yield _sse("session.execute.completed", {
|
|
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
|
|
+ "assistant_message_id": assistant_message_id, "output_text": output_text,
|
|
|
|
|
+ "error_message": error_message,
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ except HTTPException as exc:
|
|
|
|
|
+ detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
|
|
+ yield _sse("session.execute.failed", {"error_message": detail})
|
|
|
|
|
+ except Exception as exc:
|
|
|
|
|
+ yield _sse("session.execute.failed", {"error_message": str(exc)})
|
|
|
|
|
+ finally:
|
|
|
|
|
+ await client.aclose()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@router.api_route(
|
|
@router.api_route(
|
|
|
"/gateway/sessions",
|
|
"/gateway/sessions",
|
|
|
methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
|
|
methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
|
|
@@ -477,3 +868,126 @@ async def proxy_code_runner_service(
|
|
|
request=request,
|
|
request=request,
|
|
|
target=build_proxy_targets(settings)["code-runner-service"],
|
|
target=build_proxy_targets(settings)["code-runner-service"],
|
|
|
path=path)
|
|
path=path)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _build_internal_headers(request: Request, settings: ApiGatewaySettings) -> dict[str, str]:
|
|
|
|
|
+ headers = build_internal_service_headers(settings)
|
|
|
|
|
+ authorization = request.headers.get("authorization")
|
|
|
|
|
+ user_id = request.headers.get("x-user-id")
|
|
|
|
|
+ if authorization:
|
|
|
|
|
+ headers["authorization"] = authorization
|
|
|
|
|
+ if user_id:
|
|
|
|
|
+ headers["x-user-id"] = user_id
|
|
|
|
|
+ return headers
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def httpx_client(timeout_seconds: float) -> httpx.AsyncClient:
|
|
|
|
|
+ return httpx.AsyncClient(timeout=timeout_seconds)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _post_json(
|
|
|
|
|
+ *,
|
|
|
|
|
+ client: httpx.AsyncClient,
|
|
|
|
|
+ target: ProxyTarget,
|
|
|
|
|
+ path: str,
|
|
|
|
|
+ payload: dict[str, object],
|
|
|
|
|
+ headers: dict[str, str]) -> dict[str, object]:
|
|
|
|
|
+ url = _target_url(target, path)
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = await client.post(url, headers=headers, json=payload)
|
|
|
|
|
+ except httpx.HTTPError as exc:
|
|
|
|
|
+ raise HTTPException(status_code=502, detail=f"{target.service_name} request failed: {exc}") from exc
|
|
|
|
|
+ if not response.is_success:
|
|
|
|
|
+ raise HTTPException(status_code=response.status_code, detail=_error_detail(response))
|
|
|
|
|
+ data = response.json()
|
|
|
|
|
+ if not isinstance(data, dict):
|
|
|
|
|
+ raise HTTPException(status_code=502, detail=f"{target.service_name} returned unexpected response")
|
|
|
|
|
+ return data
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _target_url(target: ProxyTarget, path: str) -> str:
|
|
|
|
|
+ normalized_path = path.strip("/")
|
|
|
|
|
+ if normalized_path:
|
|
|
|
|
+ return f"{target.base_url.rstrip('/')}{target.path_prefix}/{normalized_path}"
|
|
|
|
|
+ return f"{target.base_url.rstrip('/')}{target.path_prefix}"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _error_detail(response: httpx.Response) -> str:
|
|
|
|
|
+ try:
|
|
|
|
|
+ payload = response.json()
|
|
|
|
|
+ except ValueError:
|
|
|
|
|
+ return response.text or f"downstream request failed with {response.status_code}"
|
|
|
|
|
+ if isinstance(payload, dict):
|
|
|
|
|
+ detail = payload.get("detail")
|
|
|
|
|
+ if isinstance(detail, str):
|
|
|
|
|
+ return detail
|
|
|
|
|
+ error = payload.get("error")
|
|
|
|
|
+ if isinstance(error, dict):
|
|
|
|
|
+ message = error.get("message")
|
|
|
|
|
+ if isinstance(message, str):
|
|
|
|
|
+ return message
|
|
|
|
|
+ return response.text or f"downstream request failed with {response.status_code}"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _get_string(payload: dict[str, object], key: str) -> str:
|
|
|
|
|
+ value = payload.get(key)
|
|
|
|
|
+ if not isinstance(value, str) or not value:
|
|
|
|
|
+ raise HTTPException(status_code=502, detail=f"downstream response missing {key}")
|
|
|
|
|
+ return value
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _get_optional_string(payload: dict[str, object], key: str) -> str | None:
|
|
|
|
|
+ value = payload.get(key)
|
|
|
|
|
+ return value if isinstance(value, str) and value else None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _get_dict(payload: dict[str, object], key: str) -> dict[str, object]:
|
|
|
|
|
+ value = payload.get(key)
|
|
|
|
|
+ if not isinstance(value, dict):
|
|
|
|
|
+ raise HTTPException(status_code=502, detail=f"downstream response missing {key}")
|
|
|
|
|
+ return value
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _resolve_output_text(run_payload: dict[str, object]) -> str | None:
|
|
|
|
|
+ output_text = _get_optional_string(run_payload, "output_text")
|
|
|
|
|
+ if output_text:
|
|
|
|
|
+ return output_text
|
|
|
|
|
+ output_json = run_payload.get("output_json")
|
|
|
|
|
+ if isinstance(output_json, dict) and output_json:
|
|
|
|
|
+ return json.dumps(output_json, ensure_ascii=False)
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _sse(event: str, data: dict) -> str:
|
|
|
|
|
+ return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _parse_sse(response: httpx.Response):
|
|
|
|
|
+ current_event = "message"
|
|
|
|
|
+ current_data = ""
|
|
|
|
|
+ async for line in response.aiter_lines():
|
|
|
|
|
+ if line.startswith("event:"):
|
|
|
|
|
+ current_event = line[6:].strip()
|
|
|
|
|
+ elif line.startswith("data:"):
|
|
|
|
|
+ current_data = line[5:].strip()
|
|
|
|
|
+ elif line == "":
|
|
|
|
|
+ if current_data:
|
|
|
|
|
+ yield current_event, current_data
|
|
|
|
|
+ current_event = "message"
|
|
|
|
|
+ current_data = ""
|
|
|
|
|
+ if current_data:
|
|
|
|
|
+ yield current_event, current_data
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _read_stream_error(response: httpx.Response) -> str:
|
|
|
|
|
+ body = await response.aread()
|
|
|
|
|
+ text = body.decode(errors="replace")
|
|
|
|
|
+ try:
|
|
|
|
|
+ data = json.loads(text)
|
|
|
|
|
+ if isinstance(data, dict):
|
|
|
|
|
+ detail = data.get("detail")
|
|
|
|
|
+ if isinstance(detail, str):
|
|
|
|
|
+ return detail
|
|
|
|
|
+ except (ValueError, UnicodeDecodeError):
|
|
|
|
|
+ pass
|
|
|
|
|
+ return text or f"downstream error {response.status_code}"
|