| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508 |
- import asyncio
- import json
- from datetime import datetime
- from time import perf_counter
- from typing import Annotated, AsyncIterator
- from uuid import uuid4
- from core_domain import ServiceDescriptor, ServiceHealth
- import httpx
- from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, StreamingResponse
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from pydantic import BaseModel
- from app.bootstrap.settings import ApiGatewaySettings
- from app.db.session import get_db
- from app.domain.repositories import (
- ApiKeyRepository,
- AppApiKeyRepository,
- AppDefinitionRepository,
- AppInvocationAuditRepository,
- GatewayRequestAuditRepository,
- )
- from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key
- from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy
- from core_shared import error_detail
- from core_shared.security import build_internal_service_headers
- from app.schemas.gateway import (
- ApiKeyCreateRequest,
- ApiKeyCreateResponse,
- ApiKeyListRequest,
- ApiKeyResponse,
- ApiKeyStatusPostRequest,
- ApiKeyStatusUpdateRequest,
- AppApiKeyCreateRequest,
- AppApiKeyCreateResponse,
- AppApiKeyListRequest,
- AppApiKeyResponse,
- AppApiKeyStatusUpdateRequest,
- AppAuditListRequest,
- AppCreateRequest,
- AppDetailRequest,
- AppInvocationAuditResponse,
- AppListRequest,
- AppResponse,
- AppStatusUpdateRequest,
- AppUpdateRequest,
- GatewayAuditServiceStats,
- GatewayAuditStatsResponse,
- GatewayRequestAuditResponse,
- GatewayServicesHealthResponse,
- OpenApiChatRequest,
- OpenApiChatResponse,
- )
- router = APIRouter()
- DbSession = Annotated[Session, Depends(get_db)]
- class SessionExecuteRequest(BaseModel):
- session_id: str
- message_text: str
- stream: bool = False
- class SessionExecuteResponse(BaseModel):
- session_id: str
- run_request_id: str
- target_type: str
- target_id: str
- target_config_id: str | None = None
- request_status: str
- user_message_id: str
- assistant_message_id: str | None = None
- agent_run_id: str | None = None
- team_run_id: str | None = None
- output_text: str | None = None
- error_message: str | None = None
- @router.get("/health", response_model=ServiceDescriptor)
- def health_check(db: DbSession) -> ServiceDescriptor:
- db.execute(text("SELECT 1"))
- return ServiceDescriptor(name="api-gateway")
- @router.get("/ready", response_model=ServiceHealth)
- def readiness_check(db: DbSession) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="api-gateway", status="ok", database="ok")
- @router.post("/gateway/api-keys", response_model=ApiKeyCreateResponse)
- def create_api_key(
- payload: ApiKeyCreateRequest,
- db: DbSession) -> ApiKeyCreateResponse:
- api_key = generate_api_key()
- entity = ApiKeyRepository(db).create(
- name=payload.name,
- key_prefix=get_api_key_prefix(api_key),
- key_hash=hash_api_key(api_key),
- scopes=payload.scopes,
- expires_time=payload.expires_time)
- return ApiKeyCreateResponse(
- id=entity.id,
- name=entity.name,
- key_prefix=entity.key_prefix,
- api_key=api_key,
- status=entity.status,
- scopes=entity.scopes,
- expires_time=entity.expires_time,
- created_time=entity.created_time)
- @router.get("/gateway/api-keys", response_model=list[ApiKeyResponse])
- def list_api_keys(
- db: DbSession) -> list[ApiKeyResponse]:
- return [
- ApiKeyResponse.from_entity(item)
- for item in ApiKeyRepository(db).list_all()
- ]
- @router.post("/gateway/api-keys/list", response_model=list[ApiKeyResponse])
- def list_api_keys_post(
- payload: ApiKeyListRequest,
- db: DbSession) -> list[ApiKeyResponse]:
- return [
- ApiKeyResponse.from_entity(item)
- for item in ApiKeyRepository(db).list_all()
- ]
- @router.patch("/gateway/api-keys/{api_key_id}/status", response_model=ApiKeyResponse)
- def update_api_key_status(
- api_key_id: str,
- payload: ApiKeyStatusUpdateRequest,
- db: DbSession) -> ApiKeyResponse:
- entity = ApiKeyRepository(db).update_status(
- api_key_id=api_key_id,
- status=payload.status)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.api_key.not_found", id=api_key_id))
- return ApiKeyResponse.from_entity(entity)
- @router.post("/gateway/api-keys/status", response_model=ApiKeyResponse)
- def update_api_key_status_post(
- payload: ApiKeyStatusPostRequest,
- db: DbSession) -> ApiKeyResponse:
- entity = ApiKeyRepository(db).update_status(
- api_key_id=payload.api_key_id,
- status=payload.status)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.api_key.not_found", id=payload.api_key_id))
- return ApiKeyResponse.from_entity(entity)
- @router.get("/gateway/audits", response_model=list[GatewayRequestAuditResponse])
- def list_gateway_audits(
- db: DbSession,
- request_id: Annotated[str | None, Query()] = None,
- target_service: Annotated[str | None, Query()] = None,
- limit: Annotated[int, Query(ge=1, le=500)] = 100) -> list[GatewayRequestAuditResponse]:
- items = GatewayRequestAuditRepository(db).list_by_scope(
- request_id=request_id,
- target_service=target_service,
- limit=limit)
- return [GatewayRequestAuditResponse.from_entity(item) for item in items]
- @router.get("/gateway/audits/stats", response_model=GatewayAuditStatsResponse)
- def gateway_audit_stats(
- db: DbSession) -> GatewayAuditStatsResponse:
- rows = GatewayRequestAuditRepository(db).stats_by_service()
- services = [
- GatewayAuditServiceStats(
- target_service=target_service,
- request_count=request_count,
- error_count=error_count,
- average_duration_ms=round(average_duration_ms, 2))
- for target_service, request_count, error_count, average_duration_ms in rows
- ]
- return GatewayAuditStatsResponse(
- total_request_count=sum(item.request_count for item in services),
- total_error_count=sum(item.error_count for item in services),
- services=services)
- def get_gateway_settings() -> ApiGatewaySettings:
- return ApiGatewaySettings()
- def get_service_proxy(
- settings: Annotated[ApiGatewaySettings, Depends(get_gateway_settings)]) -> ServiceProxy:
- return ServiceProxy(settings=settings, timeout_seconds=settings.proxy_timeout_seconds)
- GatewaySettingsDep = Annotated[ApiGatewaySettings, Depends(get_gateway_settings)]
- ServiceProxyDep = Annotated[ServiceProxy, Depends(get_service_proxy)]
- def build_proxy_targets(settings: ApiGatewaySettings) -> dict[ProxyServiceName, ProxyTarget]:
- return {
- "session-service": ProxyTarget(
- service_name="session-service",
- base_url=settings.session_service_url,
- path_prefix="/sessions",
- health_path="/sessions/health"),
- "tool-service": ProxyTarget(
- service_name="tool-service",
- base_url=settings.tool_service_url,
- path_prefix="/tools",
- health_path="/tools/health"),
- "model-gateway-service": ProxyTarget(
- service_name="model-gateway-service",
- base_url=settings.model_gateway_service_url,
- path_prefix="/models",
- health_path="/models/health"),
- "model-provider-service": ProxyTarget(
- service_name="model-provider-service",
- base_url=settings.model_gateway_service_url,
- path_prefix="/models/providers",
- health_path="/models/health"),
- "code-runner-service": ProxyTarget(
- service_name="code-runner-service",
- base_url=settings.code_runner_service_url,
- path_prefix="/code",
- health_path="/code/health"),
- "agent-service": ProxyTarget(
- service_name="agent-service",
- base_url=settings.agent_service_url,
- path_prefix="/agents",
- health_path="/agents/health"),
- "memory-service": ProxyTarget(
- service_name="memory-service",
- base_url=settings.memory_service_url,
- path_prefix="/memories",
- health_path="/memories/health"),
- "team-service": ProxyTarget(
- service_name="team-service",
- base_url=settings.team_service_url,
- path_prefix="/teams",
- health_path="/teams/health"),
- "skill-service": ProxyTarget(
- service_name="skill-service",
- base_url=settings.skill_service_url,
- path_prefix="/skills",
- health_path="/skills/health"),
- "human-service": ProxyTarget(
- service_name="human-service",
- base_url=settings.human_service_url,
- path_prefix="/human",
- health_path="/human/health"),
- "knowledge-service": ProxyTarget(
- service_name="knowledge-service",
- base_url=settings.knowledge_service_url,
- path_prefix="/knowledge",
- health_path="/knowledge/health"),
- "event-service": ProxyTarget(
- service_name="event-service",
- base_url=settings.event_service_url,
- path_prefix="/events",
- health_path="/events/health"),
- "identity-service": ProxyTarget(
- service_name="identity-service",
- base_url=settings.auth_service_url,
- path_prefix="/identity",
- health_path="/identity/health"),
- "scheduler-service": ProxyTarget(
- service_name="scheduler-service",
- base_url=settings.scheduler_service_url,
- path_prefix="/scheduler",
- health_path="/scheduler/health"),
- }
- @router.get("/gateway/services/health", response_model=GatewayServicesHealthResponse)
- async def downstream_health_check(
- settings: GatewaySettingsDep) -> GatewayServicesHealthResponse:
- targets = build_proxy_targets(settings)
- health_proxy = ServiceProxy(
- settings=settings,
- timeout_seconds=settings.downstream_health_timeout_seconds)
- downstream_services = await asyncio.gather(
- *[health_proxy.check_health(target) for target in targets.values()]
- )
- status = "ok" if all(item.status == "ok" for item in downstream_services) else "degraded"
- return GatewayServicesHealthResponse(
- status=status,
- downstream_services=downstream_services)
- @router.post("/gateway/sessions/execute")
- async def execute_session(
- payload: SessionExecuteRequest,
- request: Request,
- settings: GatewaySettingsDep):
- if payload.stream:
- return StreamingResponse(
- _stream_session_execute(payload, request, settings),
- media_type="text/event-stream",
- headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
- targets = build_proxy_targets(settings)
- session_target = targets["session-service"]
- agent_target = targets["agent-service"]
- team_target = targets["team-service"]
- headers = _build_internal_headers(request, settings)
- async with httpx_client(settings.proxy_timeout_seconds) as client:
- session = await _post_json(
- client=client,
- target=session_target,
- path="detail",
- payload={"session_id": payload.session_id},
- headers=headers)
- target_type = _get_string(session, "runtime_target_type")
- target_id = _get_string(session, "runtime_target_id")
- target_config_id = _get_optional_string(session, "runtime_target_config_id")
- if target_type not in {"agent", "team"} or not target_id:
- raise HTTPException(status_code=422, detail=error_detail("error.session.runtime_not_configured"))
- run_request_payload = {
- "target_type": target_type,
- "target_id": target_id,
- "target_config_id": target_config_id,
- "mode": "production",
- "input_text": payload.message_text,
- }
- run_request = await _post_json(
- client=client,
- target=session_target,
- path="run-requests",
- payload={
- "session_id": payload.session_id,
- "app_config_id": target_config_id or target_id,
- "workflow_config_id": target_id,
- "trigger_type": "chat",
- "request_payload_json": run_request_payload,
- "request_status": "accepted",
- },
- headers=headers)
- run_request_id = _get_string(run_request, "id")
- user_message = await _post_json(
- client=client,
- target=session_target,
- path="messages",
- payload={
- "session_id": payload.session_id,
- "turn_id": run_request_id,
- "role": "user",
- "content_type": "text",
- "content_text": payload.message_text,
- "content_json": {},
- },
- headers=headers)
- user_message_id = _get_string(user_message, "id")
- await _post_json(
- client=client,
- target=session_target,
- path="run-requests/update",
- payload={
- "run_request_id": run_request_id,
- "request_status": "running",
- "request_payload_json": {
- **run_request_payload,
- "user_message_id": user_message_id,
- },
- },
- headers=headers)
- assistant_message_id: str | None = None
- agent_run_id: str | None = None
- team_run_id: str | None = None
- output_text: str | None = None
- error_message: str | None = None
- request_status = "completed"
- try:
- if target_type == "agent":
- agent_run = await _post_json(
- client=client,
- target=agent_target,
- path="runs",
- payload={
- "agent_id": target_id,
- "agent_config_id": target_config_id,
- "session_id": payload.session_id,
- "input_text": payload.message_text,
- "input_json": {
- "source": "session",
- "run_request_id": run_request_id,
- },
- },
- headers=headers)
- agent_run_id = _get_string(agent_run, "id")
- execute_result = await _post_json(
- client=client,
- target=agent_target,
- path="runs/execute",
- payload={
- "agent_run_id": agent_run_id,
- "dry_run": False,
- },
- headers=headers)
- run_data = _get_dict(execute_result, "run")
- output_text = _resolve_output_text(run_data)
- error_message = _get_optional_string(run_data, "error_message")
- else:
- team_run = await _post_json(
- client=client,
- target=team_target,
- path="runs",
- payload={
- "team_id": target_id,
- "team_config_id": target_config_id,
- "session_id": payload.session_id,
- "input_text": payload.message_text,
- "input_json": {
- "source": "session",
- "run_request_id": run_request_id,
- },
- "enqueue": True,
- },
- headers=headers)
- team_run_id = _get_string(team_run, "id")
- execute_result = await _post_json(
- client=client,
- target=team_target,
- path=f"runs/{team_run_id}/execute",
- payload={
- "dry_run": False,
- },
- headers=headers)
- run_data = _get_dict(execute_result, "run")
- output_text = _resolve_output_text(run_data)
- error_message = _get_optional_string(run_data, "error_message")
- if error_message:
- request_status = "failed"
- if output_text:
- assistant_message = await _post_json(
- client=client,
- target=session_target,
- path="messages",
- payload={
- "session_id": payload.session_id,
- "turn_id": run_request_id,
- "role": "assistant",
- "content_type": "text",
- "content_text": output_text,
- "content_json": {},
- },
- headers=headers)
- assistant_message_id = _get_string(assistant_message, "id")
- except HTTPException as exc:
- request_status = "failed"
- error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
- await _post_json(
- client=client,
- target=session_target,
- path="run-requests/update",
- payload={
- "run_request_id": run_request_id,
- "request_status": request_status,
- "request_payload_json": {
- **run_request_payload,
- "user_message_id": user_message_id,
- "assistant_message_id": assistant_message_id,
- "agent_run_id": agent_run_id,
- "team_run_id": team_run_id,
- "output_text": output_text,
- "error_message": error_message,
- },
- },
- headers=headers)
- return SessionExecuteResponse(
- session_id=payload.session_id,
- run_request_id=run_request_id,
- target_type=target_type,
- target_id=target_id,
- target_config_id=target_config_id,
- request_status=request_status,
- user_message_id=user_message_id,
- assistant_message_id=assistant_message_id,
- agent_run_id=agent_run_id,
- team_run_id=team_run_id,
- output_text=output_text,
- error_message=error_message)
- async def _stream_session_execute(
- payload: SessionExecuteRequest,
- request: Request,
- settings: ApiGatewaySettings):
- targets = build_proxy_targets(settings)
- session_target = targets["session-service"]
- agent_target = targets["agent-service"]
- team_target = targets["team-service"]
- headers = _build_internal_headers(request, settings)
- client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds)
- try:
- session = await _post_json(
- client=client, target=session_target, path="detail",
- payload={"session_id": payload.session_id}, headers=headers)
- target_type = _get_string(session, "runtime_target_type")
- target_id = _get_string(session, "runtime_target_id")
- target_config_id = _get_optional_string(session, "runtime_target_config_id")
- if target_type not in {"agent", "team"} or not target_id:
- raise HTTPException(status_code=422, detail=error_detail("error.session.runtime_not_configured"))
- run_request_payload = {
- "target_type": target_type, "target_id": target_id,
- "target_config_id": target_config_id, "mode": "production",
- "input_text": payload.message_text,
- }
- run_request = await _post_json(
- client=client, target=session_target, path="run-requests",
- payload={
- "session_id": payload.session_id,
- "app_config_id": target_config_id or target_id,
- "workflow_config_id": target_id,
- "trigger_type": "chat",
- "request_payload_json": run_request_payload,
- "request_status": "accepted",
- }, headers=headers)
- run_request_id = _get_string(run_request, "id")
- user_message = await _post_json(
- client=client, target=session_target, path="messages",
- payload={
- "session_id": payload.session_id, "turn_id": run_request_id,
- "role": "user", "content_type": "text",
- "content_text": payload.message_text, "content_json": {},
- }, headers=headers)
- user_message_id = _get_string(user_message, "id")
- await _post_json(
- client=client, target=session_target, path="run-requests/update",
- payload={
- "run_request_id": run_request_id, "request_status": "running",
- "request_payload_json": {**run_request_payload, "user_message_id": user_message_id},
- }, headers=headers)
- yield _sse("session.execute.started", {
- "run_request_id": run_request_id, "user_message_id": user_message_id,
- "target_type": target_type, "target_id": target_id,
- })
- output_text = ""
- error_message: str | None = None
- agent_run_id: str | None = None
- team_run_id: str | None = None
- if target_type == "agent":
- agent_run = await _post_json(
- client=client, target=agent_target, path="runs",
- payload={
- "agent_id": target_id, "agent_config_id": target_config_id,
- "session_id": payload.session_id, "input_text": payload.message_text,
- "input_json": {"source": "session", "run_request_id": run_request_id},
- }, headers=headers)
- agent_run_id = _get_string(agent_run, "id")
- stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream")
- async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
- if not resp.is_success:
- error_message = await _read_stream_error(resp)
- else:
- async for ev_name, ev_data in _parse_sse(resp):
- data = json.loads(ev_data)
- yield _sse(ev_name, data)
- if ev_name == "agent.run.delta" and isinstance(data.get("text"), str):
- output_text += data["text"]
- elif ev_name == "agent.run.completed":
- run_data = data.get("run", data)
- if not output_text and isinstance(run_data.get("output_text"), str):
- output_text = run_data["output_text"]
- elif ev_name == "agent.run.failed":
- error_message = data.get("error_message", "Agent execution failed")
- if not isinstance(error_message, str):
- error_message = "Agent execution failed"
- else:
- team_run = await _post_json(
- client=client, target=team_target, path="runs",
- payload={
- "team_id": target_id, "team_config_id": target_config_id,
- "session_id": payload.session_id, "input_text": payload.message_text,
- "input_json": {"source": "session", "run_request_id": run_request_id},
- "enqueue": True,
- }, headers=headers)
- team_run_id = _get_string(team_run, "id")
- stream_url = _target_url(team_target, f"runs/{team_run_id}/execute-stream")
- async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
- if not resp.is_success:
- error_message = await _read_stream_error(resp)
- else:
- async for ev_name, ev_data in _parse_sse(resp):
- data = json.loads(ev_data)
- yield _sse(ev_name, data)
- if ev_name == "team.run.delta" and isinstance(data.get("text"), str):
- output_text += data["text"]
- elif ev_name == "team.run.completed":
- run_data = data.get("run", data)
- if not output_text and isinstance(run_data.get("output_text"), str):
- output_text = run_data["output_text"]
- elif ev_name == "team.run.failed":
- error_message = data.get("error_message", "Team execution failed")
- if not isinstance(error_message, str):
- error_message = "Team execution failed"
- request_status = "failed" if error_message else "completed"
- assistant_message_id: str | None = None
- if output_text:
- assistant_message = await _post_json(
- client=client, target=session_target, path="messages",
- payload={
- "session_id": payload.session_id, "turn_id": run_request_id,
- "role": "assistant", "content_type": "text",
- "content_text": output_text, "content_json": {},
- }, headers=headers)
- assistant_message_id = _get_string(assistant_message, "id")
- await _post_json(
- client=client, target=session_target, path="run-requests/update",
- payload={
- "run_request_id": run_request_id, "request_status": request_status,
- "request_payload_json": {
- **run_request_payload, "user_message_id": user_message_id,
- "assistant_message_id": assistant_message_id,
- "agent_run_id": agent_run_id, "team_run_id": team_run_id,
- "output_text": output_text, "error_message": error_message,
- },
- }, headers=headers)
- yield _sse("session.execute.completed", {
- "run_request_id": run_request_id, "request_status": request_status,
- "assistant_message_id": assistant_message_id, "output_text": output_text,
- "error_message": error_message,
- })
- except HTTPException as exc:
- detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
- yield _sse("session.execute.failed", {"error_message": detail})
- except Exception as exc:
- yield _sse("session.execute.failed", {"error_message": str(exc)})
- finally:
- await client.aclose()
- # ── Application Admin Routes ─────────────────────────────────────────────────
- @router.post("/gateway/apps", response_model=AppResponse)
- def create_app(payload: AppCreateRequest, db: DbSession) -> AppResponse:
- existing = AppDefinitionRepository(db).get_by_code(code=payload.code)
- if existing is not None:
- raise HTTPException(status_code=409, detail=error_detail("error.app.code_exists", code=payload.code))
- entity = AppDefinitionRepository(db).create(
- code=payload.code,
- name=payload.name,
- description=payload.description,
- target_type=payload.target_type,
- target_id=payload.target_id,
- owner_user_id=payload.owner_user_id,
- settings_json=payload.settings_json)
- return AppResponse.from_entity(entity)
- @router.post("/gateway/apps/list", response_model=list[AppResponse])
- def list_apps(payload: AppListRequest, db: DbSession) -> list[AppResponse]:
- return [AppResponse.from_entity(e) for e in AppDefinitionRepository(db).list_all()]
- @router.post("/gateway/apps/detail", response_model=AppResponse)
- def get_app_detail(payload: AppDetailRequest, db: DbSession) -> AppResponse:
- entity = AppDefinitionRepository(db).get_by_id(app_id=payload.app_id)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=payload.app_id))
- return AppResponse.from_entity(entity)
- @router.post("/gateway/apps/update", response_model=AppResponse)
- def update_app(payload: AppUpdateRequest, db: DbSession) -> AppResponse:
- entity = AppDefinitionRepository(db).update(
- app_id=payload.app_id,
- name=payload.name,
- description=payload.description,
- target_type=payload.target_type,
- target_id=payload.target_id,
- settings_json=payload.settings_json)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=payload.app_id))
- return AppResponse.from_entity(entity)
- @router.post("/gateway/apps/status", response_model=AppResponse)
- def update_app_status(payload: AppStatusUpdateRequest, db: DbSession) -> AppResponse:
- entity = AppDefinitionRepository(db).update_status(app_id=payload.app_id, status=payload.status)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=payload.app_id))
- return AppResponse.from_entity(entity)
- @router.post("/gateway/apps/{app_id}/api-keys", response_model=AppApiKeyCreateResponse)
- def create_app_api_key(app_id: str, payload: AppApiKeyCreateRequest, db: DbSession) -> AppApiKeyCreateResponse:
- app_entity = AppDefinitionRepository(db).get_by_id(app_id=app_id)
- if app_entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=app_id))
- api_key = generate_api_key()
- entity = AppApiKeyRepository(db).create(
- app_id=app_id,
- name=payload.name,
- key_prefix=get_api_key_prefix(api_key),
- key_hash=hash_api_key(api_key),
- scopes=payload.scopes,
- expires_time=payload.expires_time)
- return AppApiKeyCreateResponse(
- id=entity.id,
- app_id=entity.app_id,
- name=entity.name,
- key_prefix=entity.key_prefix,
- api_key=api_key,
- status=entity.status,
- scopes=entity.scopes,
- expires_time=entity.expires_time,
- created_time=entity.created_time)
- @router.post("/gateway/apps/{app_id}/api-keys/list", response_model=list[AppApiKeyResponse])
- def list_app_api_keys(app_id: str, payload: AppApiKeyListRequest, db: DbSession) -> list[AppApiKeyResponse]:
- return [AppApiKeyResponse.from_entity(e) for e in AppApiKeyRepository(db).list_by_app(app_id=app_id)]
- @router.post("/gateway/apps/{app_id}/api-keys/status", response_model=AppApiKeyResponse)
- def update_app_api_key_status(app_id: str, payload: AppApiKeyStatusUpdateRequest, db: DbSession) -> AppApiKeyResponse:
- entity = AppApiKeyRepository(db).update_status(api_key_id=payload.api_key_id, status=payload.status)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.api_key.not_found", id=payload.api_key_id))
- return AppApiKeyResponse.from_entity(entity)
- @router.post("/gateway/apps/{app_id}/audits", response_model=list[AppInvocationAuditResponse])
- def list_app_audits(app_id: str, payload: AppAuditListRequest, db: DbSession) -> list[AppInvocationAuditResponse]:
- return [
- AppInvocationAuditResponse.from_entity(e)
- for e in AppInvocationAuditRepository(db).list_by_app(app_id=app_id, limit=payload.limit)
- ]
- # ── OpenAPI External Invocation ──────────────────────────────────────────────
- def _authenticate_app_api_key(request: Request, db: Session):
- settings = ApiGatewaySettings()
- token: str | None = None
- authorization = request.headers.get("authorization")
- if authorization:
- scheme, _, t = authorization.partition(" ")
- if scheme.lower() == "bearer" and t.strip():
- token = t.strip()
- if token is None:
- token = request.headers.get(settings.api_key_header_name)
- if not token:
- raise HTTPException(status_code=401, detail=error_detail("error.auth.missing_token"))
- key_hash = hash_api_key(token)
- key_entity = AppApiKeyRepository(db).get_active_by_hash(key_hash=key_hash)
- if key_entity is None:
- raise HTTPException(status_code=401, detail=error_detail("error.api_key.invalid"))
- if key_entity.expires_time is not None and key_entity.expires_time <= datetime.utcnow():
- raise HTTPException(status_code=401, detail=error_detail("error.api_key.expired"))
- app_entity = AppDefinitionRepository(db).get_by_id(app_id=key_entity.app_id)
- if app_entity is None:
- raise HTTPException(status_code=401, detail=error_detail("error.app.not_found"))
- if app_entity.status != "published":
- raise HTTPException(status_code=403, detail=error_detail("error.app.not_published", status=app_entity.status))
- AppApiKeyRepository(db).touch_last_used_time(api_key_id=key_entity.id)
- return key_entity, app_entity
- @router.post("/gateway/openapi/apps/{app_code}/chat", response_model=OpenApiChatResponse)
- async def openapi_chat(app_code: str, payload: OpenApiChatRequest, request: Request, db: DbSession):
- start = perf_counter()
- request_id = str(uuid4())
- key_entity, app_entity = _authenticate_app_api_key(request, db)
- if app_entity.code != app_code:
- raise HTTPException(status_code=403, detail=error_detail("error.api_key.unauthorized"))
- targets = build_proxy_targets(ApiGatewaySettings())
- session_target = targets["session-service"]
- agent_target = targets["agent-service"]
- team_target = targets["team-service"]
- headers = _build_internal_headers(request, ApiGatewaySettings())
- async with httpx_client(ApiGatewaySettings().proxy_timeout_seconds) as client:
- session_id = payload.session_id
- if not session_id:
- session_data = await _post_json(
- client=client, target=session_target, path="",
- payload={
- "app_id": app_entity.id,
- "user_id": payload.user_id or "openapi",
- "channel_type": "openapi",
- "runtime_target_type": app_entity.target_type,
- "runtime_target_id": app_entity.target_id,
- }, headers=headers)
- session_id = _get_string(session_data, "id")
- run_request_payload = {
- "target_type": app_entity.target_type,
- "target_id": app_entity.target_id,
- "mode": "production",
- "input_text": payload.message,
- }
- run_request = await _post_json(
- client=client, target=session_target, path="run-requests",
- payload={
- "session_id": session_id,
- "app_config_id": app_entity.target_id,
- "workflow_config_id": app_entity.target_id,
- "trigger_type": "chat",
- "request_payload_json": run_request_payload,
- "request_status": "accepted",
- }, headers=headers)
- run_request_id = _get_string(run_request, "id")
- user_message = await _post_json(
- client=client, target=session_target, path="messages",
- payload={
- "session_id": session_id, "turn_id": run_request_id,
- "role": "user", "content_type": "text",
- "content_text": payload.message, "content_json": {},
- }, headers=headers)
- await _post_json(
- client=client, target=session_target, path="run-requests/update",
- payload={
- "run_request_id": run_request_id, "request_status": "running",
- "request_payload_json": {**run_request_payload, "user_message_id": _get_string(user_message, "id")},
- }, headers=headers)
- output_text: str | None = None
- error_message: str | None = None
- request_status = "completed"
- try:
- if app_entity.target_type == "agent":
- agent_run = await _post_json(
- client=client, target=agent_target, path="runs",
- payload={
- "agent_id": app_entity.target_id,
- "session_id": session_id,
- "input_text": payload.message,
- "input_json": {"source": "openapi", "run_request_id": run_request_id},
- }, headers=headers)
- agent_run_id = _get_string(agent_run, "id")
- execute_result = await _post_json(
- client=client, target=agent_target, path="runs/execute",
- payload={"agent_run_id": agent_run_id, "dry_run": False}, headers=headers)
- run_data = _get_dict(execute_result, "run")
- output_text = _resolve_output_text(run_data)
- error_message = _get_optional_string(run_data, "error_message")
- else:
- team_run = await _post_json(
- client=client, target=team_target, path="runs",
- payload={
- "team_id": app_entity.target_id,
- "session_id": session_id,
- "input_text": payload.message,
- "input_json": {"source": "openapi", "run_request_id": run_request_id},
- "enqueue": True,
- }, headers=headers)
- team_run_id = _get_string(team_run, "id")
- execute_result = await _post_json(
- client=client, target=team_target, path=f"runs/{team_run_id}/execute",
- payload={"dry_run": False}, headers=headers)
- run_data = _get_dict(execute_result, "run")
- output_text = _resolve_output_text(run_data)
- error_message = _get_optional_string(run_data, "error_message")
- if error_message:
- request_status = "failed"
- if output_text:
- await _post_json(
- client=client, target=session_target, path="messages",
- payload={
- "session_id": session_id, "turn_id": run_request_id,
- "role": "assistant", "content_type": "text",
- "content_text": output_text, "content_json": {},
- }, headers=headers)
- except HTTPException as exc:
- request_status = "failed"
- error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
- await _post_json(
- client=client, target=session_target, path="run-requests/update",
- payload={
- "run_request_id": run_request_id, "request_status": request_status,
- "request_payload_json": {
- **run_request_payload,
- "user_message_id": _get_string(user_message, "id"),
- "output_text": output_text, "error_message": error_message,
- },
- }, headers=headers)
- duration_ms = int((perf_counter() - start) * 1000)
- AppInvocationAuditRepository(db).create(
- app_id=app_entity.id,
- api_key_prefix=key_entity.key_prefix,
- request_id=request_id,
- session_id=session_id,
- run_request_id=run_request_id,
- target_type=app_entity.target_type,
- target_id=app_entity.target_id,
- invoke_type="sync",
- status=request_status,
- duration_ms=duration_ms,
- error_message=error_message,
- client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None)
- return OpenApiChatResponse(
- request_id=request_id,
- app_code=app_entity.code,
- session_id=session_id,
- run_request_id=run_request_id,
- target_type=app_entity.target_type,
- target_id=app_entity.target_id,
- status=request_status,
- output_text=output_text,
- error=error_message)
- @router.post("/gateway/openapi/apps/{app_code}/chat/stream")
- async def openapi_chat_stream(app_code: str, payload: OpenApiChatRequest, request: Request):
- settings = ApiGatewaySettings()
- session_factory = request.app.state.session_factory
- auth_db = session_factory()
- try:
- key_entity, app_entity = _authenticate_app_api_key(request, auth_db)
- except HTTPException as exc:
- auth_db.close()
- detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
- return StreamingResponse(
- _single_sse("failed", {"status": "failed", "error_code": "auth_error", "error_message": detail}),
- media_type="text/event-stream",
- headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
- finally:
- auth_db.close()
- if app_entity.code != app_code:
- return StreamingResponse(
- _single_sse("failed", {"status": "failed", "error_code": "forbidden", "error_message": "api key does not belong to this app"}),
- media_type="text/event-stream",
- headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
- if app_entity.target_type != "agent":
- return StreamingResponse(
- _single_sse("failed", {"status": "failed", "error_code": "unsupported", "error_message": "streaming is only supported for agent targets in V0.1"}),
- media_type="text/event-stream",
- headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
- return StreamingResponse(
- _stream_openapi_chat(app_code, payload, request, key_entity, app_entity, session_factory, settings),
- media_type="text/event-stream",
- headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
- def _single_sse(event: str, data: dict) -> AsyncIterator[str]:
- async def _gen():
- yield _sse(event, data)
- return _gen()
- async def _stream_openapi_chat(
- app_code: str,
- payload: OpenApiChatRequest,
- request: Request,
- key_entity,
- app_entity,
- session_factory,
- settings: ApiGatewaySettings):
- start = perf_counter()
- request_id = str(uuid4())
- targets = build_proxy_targets(settings)
- session_target = targets["session-service"]
- agent_target = targets["agent-service"]
- headers = _build_internal_headers(request, settings)
- client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds)
- output_text = ""
- error_message: str | None = None
- session_id: str | None = None
- run_request_id: str | None = None
- request_status = "failed"
- try:
- session_id = payload.session_id
- if not session_id:
- session_data = await _post_json(
- client=client, target=session_target, path="",
- payload={
- "app_id": app_entity.id,
- "user_id": payload.user_id or "openapi",
- "channel_type": "openapi",
- "runtime_target_type": app_entity.target_type,
- "runtime_target_id": app_entity.target_id,
- }, headers=headers)
- session_id = _get_string(session_data, "id")
- run_request_payload = {
- "target_type": app_entity.target_type,
- "target_id": app_entity.target_id,
- "mode": "production",
- "input_text": payload.message,
- }
- run_request = await _post_json(
- client=client, target=session_target, path="run-requests",
- payload={
- "session_id": session_id,
- "app_config_id": app_entity.target_id,
- "workflow_config_id": app_entity.target_id,
- "trigger_type": "chat",
- "request_payload_json": run_request_payload,
- "request_status": "accepted",
- }, headers=headers)
- run_request_id = _get_string(run_request, "id")
- user_message = await _post_json(
- client=client, target=session_target, path="messages",
- payload={
- "session_id": session_id, "turn_id": run_request_id,
- "role": "user", "content_type": "text",
- "content_text": payload.message, "content_json": {},
- }, headers=headers)
- user_message_id = _get_string(user_message, "id")
- await _post_json(
- client=client, target=session_target, path="run-requests/update",
- payload={
- "run_request_id": run_request_id, "request_status": "running",
- "request_payload_json": {**run_request_payload, "user_message_id": user_message_id},
- }, headers=headers)
- yield _sse("started", {
- "request_id": request_id,
- "session_id": session_id,
- "run_request_id": run_request_id})
- agent_run = await _post_json(
- client=client, target=agent_target, path="runs",
- payload={
- "agent_id": app_entity.target_id,
- "session_id": session_id,
- "input_text": payload.message,
- "input_json": {"source": "openapi", "run_request_id": run_request_id},
- }, headers=headers)
- agent_run_id = _get_string(agent_run, "id")
- stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream")
- async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
- if not resp.is_success:
- error_message = await _read_stream_error(resp)
- else:
- async for ev_name, ev_data in _parse_sse(resp):
- data = json.loads(ev_data)
- if ev_name == "agent.run.delta":
- text_chunk = data.get("text", "")
- yield _sse("delta", {"text": text_chunk})
- if isinstance(text_chunk, str):
- output_text += text_chunk
- elif ev_name == "agent.run.completed":
- run_data = data.get("run", data)
- final_text = _get_optional_string(run_data, "output_text")
- if not output_text and final_text:
- output_text = final_text
- yield _sse("completed", {"status": "completed", "output_text": output_text})
- elif ev_name == "agent.run.failed":
- msg = data.get("error_message", "Agent execution failed")
- if not isinstance(msg, str):
- msg = "Agent execution failed"
- error_message = msg
- yield _sse("failed", {"status": "failed", "error_code": "agent_error", "error_message": msg})
- else:
- yield _sse(ev_name, data)
- request_status = "failed" if error_message else "completed"
- if output_text:
- await _post_json(
- client=client, target=session_target, path="messages",
- payload={
- "session_id": session_id, "turn_id": run_request_id,
- "role": "assistant", "content_type": "text",
- "content_text": output_text, "content_json": {},
- }, headers=headers)
- await _post_json(
- client=client, target=session_target, path="run-requests/update",
- payload={
- "run_request_id": run_request_id, "request_status": request_status,
- "request_payload_json": {
- **run_request_payload, "user_message_id": user_message_id,
- "output_text": output_text, "error_message": error_message,
- },
- }, headers=headers)
- except HTTPException as exc:
- detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
- yield _sse("failed", {"status": "failed", "error_code": "gateway_error", "error_message": detail})
- except Exception as exc:
- yield _sse("failed", {"status": "failed", "error_code": "internal_error", "error_message": str(exc)})
- finally:
- await client.aclose()
- duration_ms = int((perf_counter() - start) * 1000)
- audit_db = session_factory()
- try:
- AppInvocationAuditRepository(audit_db).create(
- app_id=app_entity.id,
- api_key_prefix=key_entity.key_prefix,
- request_id=request_id,
- session_id=session_id,
- run_request_id=run_request_id,
- target_type=app_entity.target_type,
- target_id=app_entity.target_id,
- invoke_type="stream",
- status=request_status,
- duration_ms=duration_ms,
- error_message=error_message,
- client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None)
- except Exception:
- pass
- finally:
- audit_db.close()
- @router.api_route(
- "/gateway/sessions",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/sessions/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_session_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["session-service"],
- path=path)
- @router.api_route(
- "/gateway/agents",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/agents/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_agent_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["agent-service"],
- path=path)
- @router.api_route(
- "/gateway/memories",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/memories/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_memory_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["memory-service"],
- path=path)
- @router.api_route(
- "/gateway/teams",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/teams/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_team_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["team-service"],
- path=path)
- @router.api_route(
- "/gateway/skills",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/skills/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_skill_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["skill-service"],
- path=path)
- @router.api_route(
- "/gateway/human",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/human/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_human_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["human-service"],
- path=path)
- @router.api_route(
- "/gateway/knowledge",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/knowledge/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_knowledge_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["knowledge-service"],
- path=path)
- @router.api_route(
- "/gateway/events",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/events/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_event_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["event-service"],
- path=path)
- @router.api_route(
- "/gateway/identity",
- methods=["POST"])
- @router.api_route(
- "/gateway/identity/{path:path}",
- methods=["POST"])
- async def proxy_identity_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["identity-service"],
- path=path)
- @router.api_route(
- "/gateway/scheduler",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/scheduler/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_scheduler_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["scheduler-service"],
- path=path)
- @router.api_route(
- "/gateway/tools",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/tools/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_tool_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["tool-service"],
- path=path)
- @router.api_route(
- "/gateway/models",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/models/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_model_gateway_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["model-gateway-service"],
- path=path)
- @router.api_route(
- "/gateway/model-providers",
- methods=["POST"])
- @router.api_route(
- "/gateway/model-providers/{path:path}",
- methods=["POST"])
- async def proxy_model_provider_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["model-provider-service"],
- path=path)
- @router.api_route(
- "/gateway/code",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- @router.api_route(
- "/gateway/code/{path:path}",
- methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
- async def proxy_code_runner_service(
- request: Request,
- settings: GatewaySettingsDep,
- proxy: ServiceProxyDep,
- path: str = "") -> Response:
- return await proxy.forward(
- request=request,
- target=build_proxy_targets(settings)["code-runner-service"],
- path=path)
- def _build_internal_headers(request: Request, settings: ApiGatewaySettings) -> dict[str, str]:
- headers = build_internal_service_headers(settings)
- authorization = request.headers.get("authorization")
- user_id = request.headers.get("x-user-id")
- if authorization:
- headers["authorization"] = authorization
- if user_id:
- headers["x-user-id"] = user_id
- return headers
- def httpx_client(timeout_seconds: float) -> httpx.AsyncClient:
- return httpx.AsyncClient(timeout=timeout_seconds)
- async def _post_json(
- *,
- client: httpx.AsyncClient,
- target: ProxyTarget,
- path: str,
- payload: dict[str, object],
- headers: dict[str, str]) -> dict[str, object]:
- url = _target_url(target, path)
- try:
- response = await client.post(url, headers=headers, json=payload)
- except httpx.HTTPError as exc:
- raise HTTPException(status_code=502, detail=error_detail("error.downstream.request_failed", service=target.service_name, error=str(exc))) from exc
- if not response.is_success:
- raise HTTPException(status_code=response.status_code, detail=_error_detail(response))
- data = response.json()
- if not isinstance(data, dict):
- raise HTTPException(status_code=502, detail=error_detail("error.downstream.unexpected_response", service=target.service_name))
- return data
- def _target_url(target: ProxyTarget, path: str) -> str:
- normalized_path = path.strip("/")
- if normalized_path:
- return f"{target.base_url.rstrip('/')}{target.path_prefix}/{normalized_path}"
- return f"{target.base_url.rstrip('/')}{target.path_prefix}"
- def _error_detail(response: httpx.Response):
- try:
- payload = response.json()
- except ValueError:
- return error_detail("error.downstream.generic_failure", status=str(response.status_code))
- if isinstance(payload, dict):
- detail = payload.get("detail")
- if isinstance(detail, str):
- return detail
- error = payload.get("error")
- if isinstance(error, dict):
- message = error.get("message")
- if isinstance(message, str):
- return message
- return error_detail("error.downstream.generic_failure", status=str(response.status_code))
- def _get_string(payload: dict[str, object], key: str) -> str:
- value = payload.get(key)
- if not isinstance(value, str) or not value:
- raise HTTPException(status_code=502, detail=error_detail("error.downstream.missing_field", field=key))
- return value
- def _get_optional_string(payload: dict[str, object], key: str) -> str | None:
- value = payload.get(key)
- return value if isinstance(value, str) and value else None
- def _get_dict(payload: dict[str, object], key: str) -> dict[str, object]:
- value = payload.get(key)
- if not isinstance(value, dict):
- raise HTTPException(status_code=502, detail=error_detail("error.downstream.missing_field", field=key))
- return value
- def _resolve_output_text(run_payload: dict[str, object]) -> str | None:
- output_text = _get_optional_string(run_payload, "output_text")
- if output_text:
- return output_text
- output_json = run_payload.get("output_json")
- if isinstance(output_json, dict) and output_json:
- return json.dumps(output_json, ensure_ascii=False)
- return None
- def _sse(event: str, data: dict) -> str:
- return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
- async def _parse_sse(response: httpx.Response):
- current_event = "message"
- current_data = ""
- async for line in response.aiter_lines():
- if line.startswith("event:"):
- current_event = line[6:].strip()
- elif line.startswith("data:"):
- current_data = line[5:].strip()
- elif line == "":
- if current_data:
- yield current_event, current_data
- current_event = "message"
- current_data = ""
- if current_data:
- yield current_event, current_data
- async def _read_stream_error(response: httpx.Response) -> str:
- body = await response.aread()
- text = body.decode(errors="replace")
- try:
- data = json.loads(text)
- if isinstance(data, dict):
- detail = data.get("detail")
- if isinstance(detail, str):
- return detail
- except (ValueError, UnicodeDecodeError):
- pass
- return text or f"downstream error {response.status_code}"
|