import asyncio import json from datetime import datetime from time import perf_counter from typing import Annotated, AsyncIterator from uuid import uuid4 from core_domain import ServiceDescriptor, ServiceHealth import httpx from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, StreamingResponse from sqlalchemy import text from sqlalchemy.orm import Session from pydantic import BaseModel from app.bootstrap.settings import ApiGatewaySettings from app.db.session import get_db from app.domain.repositories import ( ApiKeyRepository, AppApiKeyRepository, AppDefinitionRepository, AppInvocationAuditRepository, GatewayRequestAuditRepository, ) from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy from core_shared import error_detail from core_shared.security import build_internal_service_headers from app.schemas.gateway import ( ApiKeyCreateRequest, ApiKeyCreateResponse, ApiKeyListRequest, ApiKeyResponse, ApiKeyStatusPostRequest, ApiKeyStatusUpdateRequest, AppApiKeyCreateRequest, AppApiKeyCreateResponse, AppApiKeyListRequest, AppApiKeyResponse, AppApiKeyStatusUpdateRequest, AppAuditListRequest, AppCreateRequest, AppDetailRequest, AppInvocationAuditResponse, AppListRequest, AppResponse, AppStatusUpdateRequest, AppUpdateRequest, GatewayAuditServiceStats, GatewayAuditStatsResponse, GatewayRequestAuditResponse, GatewayServicesHealthResponse, OpenApiChatRequest, OpenApiChatResponse, ) router = APIRouter() DbSession = Annotated[Session, Depends(get_db)] class SessionExecuteRequest(BaseModel): session_id: str message_text: str stream: bool = False class SessionExecuteResponse(BaseModel): session_id: str run_request_id: str target_type: str target_id: str target_config_id: str | None = None request_status: str user_message_id: str assistant_message_id: str | None = None agent_run_id: str | None = None team_run_id: str | None = None output_text: str | None = None error_message: str | None = None @router.get("/health", response_model=ServiceDescriptor) def health_check(db: DbSession) -> ServiceDescriptor: db.execute(text("SELECT 1")) return ServiceDescriptor(name="api-gateway") @router.get("/ready", response_model=ServiceHealth) def readiness_check(db: DbSession) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="api-gateway", status="ok", database="ok") @router.post("/gateway/api-keys", response_model=ApiKeyCreateResponse) def create_api_key( payload: ApiKeyCreateRequest, db: DbSession) -> ApiKeyCreateResponse: api_key = generate_api_key() entity = ApiKeyRepository(db).create( name=payload.name, key_prefix=get_api_key_prefix(api_key), key_hash=hash_api_key(api_key), scopes=payload.scopes, expires_time=payload.expires_time) return ApiKeyCreateResponse( id=entity.id, name=entity.name, key_prefix=entity.key_prefix, api_key=api_key, status=entity.status, scopes=entity.scopes, expires_time=entity.expires_time, created_time=entity.created_time) @router.get("/gateway/api-keys", response_model=list[ApiKeyResponse]) def list_api_keys( db: DbSession) -> list[ApiKeyResponse]: return [ ApiKeyResponse.from_entity(item) for item in ApiKeyRepository(db).list_all() ] @router.post("/gateway/api-keys/list", response_model=list[ApiKeyResponse]) def list_api_keys_post( payload: ApiKeyListRequest, db: DbSession) -> list[ApiKeyResponse]: return [ ApiKeyResponse.from_entity(item) for item in ApiKeyRepository(db).list_all() ] @router.patch("/gateway/api-keys/{api_key_id}/status", response_model=ApiKeyResponse) def update_api_key_status( api_key_id: str, payload: ApiKeyStatusUpdateRequest, db: DbSession) -> ApiKeyResponse: entity = ApiKeyRepository(db).update_status( api_key_id=api_key_id, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.api_key.not_found", id=api_key_id)) return ApiKeyResponse.from_entity(entity) @router.post("/gateway/api-keys/status", response_model=ApiKeyResponse) def update_api_key_status_post( payload: ApiKeyStatusPostRequest, db: DbSession) -> ApiKeyResponse: entity = ApiKeyRepository(db).update_status( api_key_id=payload.api_key_id, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.api_key.not_found", id=payload.api_key_id)) return ApiKeyResponse.from_entity(entity) @router.get("/gateway/audits", response_model=list[GatewayRequestAuditResponse]) def list_gateway_audits( db: DbSession, request_id: Annotated[str | None, Query()] = None, target_service: Annotated[str | None, Query()] = None, limit: Annotated[int, Query(ge=1, le=500)] = 100) -> list[GatewayRequestAuditResponse]: items = GatewayRequestAuditRepository(db).list_by_scope( request_id=request_id, target_service=target_service, limit=limit) return [GatewayRequestAuditResponse.from_entity(item) for item in items] @router.get("/gateway/audits/stats", response_model=GatewayAuditStatsResponse) def gateway_audit_stats( db: DbSession) -> GatewayAuditStatsResponse: rows = GatewayRequestAuditRepository(db).stats_by_service() services = [ GatewayAuditServiceStats( target_service=target_service, request_count=request_count, error_count=error_count, average_duration_ms=round(average_duration_ms, 2)) for target_service, request_count, error_count, average_duration_ms in rows ] return GatewayAuditStatsResponse( total_request_count=sum(item.request_count for item in services), total_error_count=sum(item.error_count for item in services), services=services) def get_gateway_settings() -> ApiGatewaySettings: return ApiGatewaySettings() def get_service_proxy( settings: Annotated[ApiGatewaySettings, Depends(get_gateway_settings)]) -> ServiceProxy: return ServiceProxy(settings=settings, timeout_seconds=settings.proxy_timeout_seconds) GatewaySettingsDep = Annotated[ApiGatewaySettings, Depends(get_gateway_settings)] ServiceProxyDep = Annotated[ServiceProxy, Depends(get_service_proxy)] def build_proxy_targets(settings: ApiGatewaySettings) -> dict[ProxyServiceName, ProxyTarget]: return { "session-service": ProxyTarget( service_name="session-service", base_url=settings.session_service_url, path_prefix="/sessions", health_path="/sessions/health"), "tool-service": ProxyTarget( service_name="tool-service", base_url=settings.tool_service_url, path_prefix="/tools", health_path="/tools/health"), "model-gateway-service": ProxyTarget( service_name="model-gateway-service", base_url=settings.model_gateway_service_url, path_prefix="/models", health_path="/models/health"), "model-provider-service": ProxyTarget( service_name="model-provider-service", base_url=settings.model_gateway_service_url, path_prefix="/models/providers", health_path="/models/health"), "code-runner-service": ProxyTarget( service_name="code-runner-service", base_url=settings.code_runner_service_url, path_prefix="/code", health_path="/code/health"), "agent-service": ProxyTarget( service_name="agent-service", base_url=settings.agent_service_url, path_prefix="/agents", health_path="/agents/health"), "memory-service": ProxyTarget( service_name="memory-service", base_url=settings.memory_service_url, path_prefix="/memories", health_path="/memories/health"), "team-service": ProxyTarget( service_name="team-service", base_url=settings.team_service_url, path_prefix="/teams", health_path="/teams/health"), "skill-service": ProxyTarget( service_name="skill-service", base_url=settings.skill_service_url, path_prefix="/skills", health_path="/skills/health"), "human-service": ProxyTarget( service_name="human-service", base_url=settings.human_service_url, path_prefix="/human", health_path="/human/health"), "knowledge-service": ProxyTarget( service_name="knowledge-service", base_url=settings.knowledge_service_url, path_prefix="/knowledge", health_path="/knowledge/health"), "event-service": ProxyTarget( service_name="event-service", base_url=settings.event_service_url, path_prefix="/events", health_path="/events/health"), "identity-service": ProxyTarget( service_name="identity-service", base_url=settings.auth_service_url, path_prefix="/identity", health_path="/identity/health"), "scheduler-service": ProxyTarget( service_name="scheduler-service", base_url=settings.scheduler_service_url, path_prefix="/scheduler", health_path="/scheduler/health"), } @router.get("/gateway/services/health", response_model=GatewayServicesHealthResponse) async def downstream_health_check( settings: GatewaySettingsDep) -> GatewayServicesHealthResponse: targets = build_proxy_targets(settings) health_proxy = ServiceProxy( settings=settings, timeout_seconds=settings.downstream_health_timeout_seconds) downstream_services = await asyncio.gather( *[health_proxy.check_health(target) for target in targets.values()] ) status = "ok" if all(item.status == "ok" for item in downstream_services) else "degraded" return GatewayServicesHealthResponse( status=status, downstream_services=downstream_services) @router.post("/gateway/sessions/execute") async def execute_session( payload: SessionExecuteRequest, request: Request, settings: GatewaySettingsDep): if payload.stream: return StreamingResponse( _stream_session_execute(payload, request, settings), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) targets = build_proxy_targets(settings) session_target = targets["session-service"] agent_target = targets["agent-service"] team_target = targets["team-service"] headers = _build_internal_headers(request, settings) async with httpx_client(settings.proxy_timeout_seconds) as client: session = await _post_json( client=client, target=session_target, path="detail", payload={"session_id": payload.session_id}, headers=headers) target_type = _get_string(session, "runtime_target_type") target_id = _get_string(session, "runtime_target_id") target_config_id = _get_optional_string(session, "runtime_target_config_id") if target_type not in {"agent", "team"} or not target_id: raise HTTPException(status_code=422, detail=error_detail("error.session.runtime_not_configured")) run_request_payload = { "target_type": target_type, "target_id": target_id, "target_config_id": target_config_id, "mode": "production", "input_text": payload.message_text, } run_request = await _post_json( client=client, target=session_target, path="run-requests", payload={ "session_id": payload.session_id, "app_config_id": target_config_id or target_id, "workflow_config_id": target_id, "trigger_type": "chat", "request_payload_json": run_request_payload, "request_status": "accepted", }, headers=headers) run_request_id = _get_string(run_request, "id") user_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "user", "content_type": "text", "content_text": payload.message_text, "content_json": {}, }, headers=headers) user_message_id = _get_string(user_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": "running", "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, }, }, headers=headers) assistant_message_id: str | None = None agent_run_id: str | None = None team_run_id: str | None = None output_text: str | None = None error_message: str | None = None request_status = "completed" try: if target_type == "agent": agent_run = await _post_json( client=client, target=agent_target, path="runs", payload={ "agent_id": target_id, "agent_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": { "source": "session", "run_request_id": run_request_id, }, }, headers=headers) agent_run_id = _get_string(agent_run, "id") execute_result = await _post_json( client=client, target=agent_target, path="runs/execute", payload={ "agent_run_id": agent_run_id, "dry_run": False, }, headers=headers) run_data = _get_dict(execute_result, "run") output_text = _resolve_output_text(run_data) error_message = _get_optional_string(run_data, "error_message") else: team_run = await _post_json( client=client, target=team_target, path="runs", payload={ "team_id": target_id, "team_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": { "source": "session", "run_request_id": run_request_id, }, "enqueue": True, }, headers=headers) team_run_id = _get_string(team_run, "id") execute_result = await _post_json( client=client, target=team_target, path=f"runs/{team_run_id}/execute", payload={ "dry_run": False, }, headers=headers) run_data = _get_dict(execute_result, "run") output_text = _resolve_output_text(run_data) error_message = _get_optional_string(run_data, "error_message") if error_message: request_status = "failed" if output_text: assistant_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "assistant", "content_type": "text", "content_text": output_text, "content_json": {}, }, headers=headers) assistant_message_id = _get_string(assistant_message, "id") except HTTPException as exc: request_status = "failed" error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": request_status, "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, "assistant_message_id": assistant_message_id, "agent_run_id": agent_run_id, "team_run_id": team_run_id, "output_text": output_text, "error_message": error_message, }, }, headers=headers) return SessionExecuteResponse( session_id=payload.session_id, run_request_id=run_request_id, target_type=target_type, target_id=target_id, target_config_id=target_config_id, request_status=request_status, user_message_id=user_message_id, assistant_message_id=assistant_message_id, agent_run_id=agent_run_id, team_run_id=team_run_id, output_text=output_text, error_message=error_message) async def _stream_session_execute( payload: SessionExecuteRequest, request: Request, settings: ApiGatewaySettings): targets = build_proxy_targets(settings) session_target = targets["session-service"] agent_target = targets["agent-service"] team_target = targets["team-service"] headers = _build_internal_headers(request, settings) client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds) try: session = await _post_json( client=client, target=session_target, path="detail", payload={"session_id": payload.session_id}, headers=headers) target_type = _get_string(session, "runtime_target_type") target_id = _get_string(session, "runtime_target_id") target_config_id = _get_optional_string(session, "runtime_target_config_id") if target_type not in {"agent", "team"} or not target_id: raise HTTPException(status_code=422, detail=error_detail("error.session.runtime_not_configured")) run_request_payload = { "target_type": target_type, "target_id": target_id, "target_config_id": target_config_id, "mode": "production", "input_text": payload.message_text, } run_request = await _post_json( client=client, target=session_target, path="run-requests", payload={ "session_id": payload.session_id, "app_config_id": target_config_id or target_id, "workflow_config_id": target_id, "trigger_type": "chat", "request_payload_json": run_request_payload, "request_status": "accepted", }, headers=headers) run_request_id = _get_string(run_request, "id") user_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "user", "content_type": "text", "content_text": payload.message_text, "content_json": {}, }, headers=headers) user_message_id = _get_string(user_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": "running", "request_payload_json": {**run_request_payload, "user_message_id": user_message_id}, }, headers=headers) yield _sse("session.execute.started", { "run_request_id": run_request_id, "user_message_id": user_message_id, "target_type": target_type, "target_id": target_id, }) output_text = "" error_message: str | None = None agent_run_id: str | None = None team_run_id: str | None = None if target_type == "agent": agent_run = await _post_json( client=client, target=agent_target, path="runs", payload={ "agent_id": target_id, "agent_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": {"source": "session", "run_request_id": run_request_id}, }, headers=headers) agent_run_id = _get_string(agent_run, "id") stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream") async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp: if not resp.is_success: error_message = await _read_stream_error(resp) else: async for ev_name, ev_data in _parse_sse(resp): data = json.loads(ev_data) yield _sse(ev_name, data) if ev_name == "agent.run.delta" and isinstance(data.get("text"), str): output_text += data["text"] elif ev_name == "agent.run.completed": run_data = data.get("run", data) if not output_text and isinstance(run_data.get("output_text"), str): output_text = run_data["output_text"] elif ev_name == "agent.run.failed": error_message = data.get("error_message", "Agent execution failed") if not isinstance(error_message, str): error_message = "Agent execution failed" else: team_run = await _post_json( client=client, target=team_target, path="runs", payload={ "team_id": target_id, "team_config_id": target_config_id, "session_id": payload.session_id, "input_text": payload.message_text, "input_json": {"source": "session", "run_request_id": run_request_id}, "enqueue": True, }, headers=headers) team_run_id = _get_string(team_run, "id") stream_url = _target_url(team_target, f"runs/{team_run_id}/execute-stream") async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp: if not resp.is_success: error_message = await _read_stream_error(resp) else: async for ev_name, ev_data in _parse_sse(resp): data = json.loads(ev_data) yield _sse(ev_name, data) if ev_name == "team.run.delta" and isinstance(data.get("text"), str): output_text += data["text"] elif ev_name == "team.run.completed": run_data = data.get("run", data) if not output_text and isinstance(run_data.get("output_text"), str): output_text = run_data["output_text"] elif ev_name == "team.run.failed": error_message = data.get("error_message", "Team execution failed") if not isinstance(error_message, str): error_message = "Team execution failed" request_status = "failed" if error_message else "completed" assistant_message_id: str | None = None if output_text: assistant_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": payload.session_id, "turn_id": run_request_id, "role": "assistant", "content_type": "text", "content_text": output_text, "content_json": {}, }, headers=headers) assistant_message_id = _get_string(assistant_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": request_status, "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, "assistant_message_id": assistant_message_id, "agent_run_id": agent_run_id, "team_run_id": team_run_id, "output_text": output_text, "error_message": error_message, }, }, headers=headers) yield _sse("session.execute.completed", { "run_request_id": run_request_id, "request_status": request_status, "assistant_message_id": assistant_message_id, "output_text": output_text, "error_message": error_message, }) except HTTPException as exc: detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) yield _sse("session.execute.failed", {"error_message": detail}) except Exception as exc: yield _sse("session.execute.failed", {"error_message": str(exc)}) finally: await client.aclose() # ── Application Admin Routes ───────────────────────────────────────────────── @router.post("/gateway/apps", response_model=AppResponse) def create_app(payload: AppCreateRequest, db: DbSession) -> AppResponse: existing = AppDefinitionRepository(db).get_by_code(code=payload.code) if existing is not None: raise HTTPException(status_code=409, detail=error_detail("error.app.code_exists", code=payload.code)) entity = AppDefinitionRepository(db).create( code=payload.code, name=payload.name, description=payload.description, target_type=payload.target_type, target_id=payload.target_id, owner_user_id=payload.owner_user_id, settings_json=payload.settings_json) return AppResponse.from_entity(entity) @router.post("/gateway/apps/list", response_model=list[AppResponse]) def list_apps(payload: AppListRequest, db: DbSession) -> list[AppResponse]: return [AppResponse.from_entity(e) for e in AppDefinitionRepository(db).list_all()] @router.post("/gateway/apps/detail", response_model=AppResponse) def get_app_detail(payload: AppDetailRequest, db: DbSession) -> AppResponse: entity = AppDefinitionRepository(db).get_by_id(app_id=payload.app_id) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=payload.app_id)) return AppResponse.from_entity(entity) @router.post("/gateway/apps/update", response_model=AppResponse) def update_app(payload: AppUpdateRequest, db: DbSession) -> AppResponse: entity = AppDefinitionRepository(db).update( app_id=payload.app_id, name=payload.name, description=payload.description, target_type=payload.target_type, target_id=payload.target_id, settings_json=payload.settings_json) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=payload.app_id)) return AppResponse.from_entity(entity) @router.post("/gateway/apps/status", response_model=AppResponse) def update_app_status(payload: AppStatusUpdateRequest, db: DbSession) -> AppResponse: entity = AppDefinitionRepository(db).update_status(app_id=payload.app_id, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=payload.app_id)) return AppResponse.from_entity(entity) @router.post("/gateway/apps/{app_id}/api-keys", response_model=AppApiKeyCreateResponse) def create_app_api_key(app_id: str, payload: AppApiKeyCreateRequest, db: DbSession) -> AppApiKeyCreateResponse: app_entity = AppDefinitionRepository(db).get_by_id(app_id=app_id) if app_entity is None: raise HTTPException(status_code=404, detail=error_detail("error.app.not_found", id=app_id)) api_key = generate_api_key() entity = AppApiKeyRepository(db).create( app_id=app_id, name=payload.name, key_prefix=get_api_key_prefix(api_key), key_hash=hash_api_key(api_key), scopes=payload.scopes, expires_time=payload.expires_time) return AppApiKeyCreateResponse( id=entity.id, app_id=entity.app_id, name=entity.name, key_prefix=entity.key_prefix, api_key=api_key, status=entity.status, scopes=entity.scopes, expires_time=entity.expires_time, created_time=entity.created_time) @router.post("/gateway/apps/{app_id}/api-keys/list", response_model=list[AppApiKeyResponse]) def list_app_api_keys(app_id: str, payload: AppApiKeyListRequest, db: DbSession) -> list[AppApiKeyResponse]: return [AppApiKeyResponse.from_entity(e) for e in AppApiKeyRepository(db).list_by_app(app_id=app_id)] @router.post("/gateway/apps/{app_id}/api-keys/status", response_model=AppApiKeyResponse) def update_app_api_key_status(app_id: str, payload: AppApiKeyStatusUpdateRequest, db: DbSession) -> AppApiKeyResponse: entity = AppApiKeyRepository(db).update_status(api_key_id=payload.api_key_id, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.api_key.not_found", id=payload.api_key_id)) return AppApiKeyResponse.from_entity(entity) @router.post("/gateway/apps/{app_id}/audits", response_model=list[AppInvocationAuditResponse]) def list_app_audits(app_id: str, payload: AppAuditListRequest, db: DbSession) -> list[AppInvocationAuditResponse]: return [ AppInvocationAuditResponse.from_entity(e) for e in AppInvocationAuditRepository(db).list_by_app(app_id=app_id, limit=payload.limit) ] # ── OpenAPI External Invocation ────────────────────────────────────────────── def _authenticate_app_api_key(request: Request, db: Session): settings = ApiGatewaySettings() token: str | None = None authorization = request.headers.get("authorization") if authorization: scheme, _, t = authorization.partition(" ") if scheme.lower() == "bearer" and t.strip(): token = t.strip() if token is None: token = request.headers.get(settings.api_key_header_name) if not token: raise HTTPException(status_code=401, detail=error_detail("error.auth.missing_token")) key_hash = hash_api_key(token) key_entity = AppApiKeyRepository(db).get_active_by_hash(key_hash=key_hash) if key_entity is None: raise HTTPException(status_code=401, detail=error_detail("error.api_key.invalid")) if key_entity.expires_time is not None and key_entity.expires_time <= datetime.utcnow(): raise HTTPException(status_code=401, detail=error_detail("error.api_key.expired")) app_entity = AppDefinitionRepository(db).get_by_id(app_id=key_entity.app_id) if app_entity is None: raise HTTPException(status_code=401, detail=error_detail("error.app.not_found")) if app_entity.status != "published": raise HTTPException(status_code=403, detail=error_detail("error.app.not_published", status=app_entity.status)) AppApiKeyRepository(db).touch_last_used_time(api_key_id=key_entity.id) return key_entity, app_entity @router.post("/gateway/openapi/apps/{app_code}/chat", response_model=OpenApiChatResponse) async def openapi_chat(app_code: str, payload: OpenApiChatRequest, request: Request, db: DbSession): start = perf_counter() request_id = str(uuid4()) key_entity, app_entity = _authenticate_app_api_key(request, db) if app_entity.code != app_code: raise HTTPException(status_code=403, detail=error_detail("error.api_key.unauthorized")) targets = build_proxy_targets(ApiGatewaySettings()) session_target = targets["session-service"] agent_target = targets["agent-service"] team_target = targets["team-service"] headers = _build_internal_headers(request, ApiGatewaySettings()) async with httpx_client(ApiGatewaySettings().proxy_timeout_seconds) as client: session_id = payload.session_id if not session_id: session_data = await _post_json( client=client, target=session_target, path="", payload={ "app_id": app_entity.id, "user_id": payload.user_id or "openapi", "channel_type": "openapi", "runtime_target_type": app_entity.target_type, "runtime_target_id": app_entity.target_id, }, headers=headers) session_id = _get_string(session_data, "id") run_request_payload = { "target_type": app_entity.target_type, "target_id": app_entity.target_id, "mode": "production", "input_text": payload.message, } run_request = await _post_json( client=client, target=session_target, path="run-requests", payload={ "session_id": session_id, "app_config_id": app_entity.target_id, "workflow_config_id": app_entity.target_id, "trigger_type": "chat", "request_payload_json": run_request_payload, "request_status": "accepted", }, headers=headers) run_request_id = _get_string(run_request, "id") user_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": session_id, "turn_id": run_request_id, "role": "user", "content_type": "text", "content_text": payload.message, "content_json": {}, }, headers=headers) await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": "running", "request_payload_json": {**run_request_payload, "user_message_id": _get_string(user_message, "id")}, }, headers=headers) output_text: str | None = None error_message: str | None = None request_status = "completed" try: if app_entity.target_type == "agent": agent_run = await _post_json( client=client, target=agent_target, path="runs", payload={ "agent_id": app_entity.target_id, "session_id": session_id, "input_text": payload.message, "input_json": {"source": "openapi", "run_request_id": run_request_id}, }, headers=headers) agent_run_id = _get_string(agent_run, "id") execute_result = await _post_json( client=client, target=agent_target, path="runs/execute", payload={"agent_run_id": agent_run_id, "dry_run": False}, headers=headers) run_data = _get_dict(execute_result, "run") output_text = _resolve_output_text(run_data) error_message = _get_optional_string(run_data, "error_message") else: team_run = await _post_json( client=client, target=team_target, path="runs", payload={ "team_id": app_entity.target_id, "session_id": session_id, "input_text": payload.message, "input_json": {"source": "openapi", "run_request_id": run_request_id}, "enqueue": True, }, headers=headers) team_run_id = _get_string(team_run, "id") execute_result = await _post_json( client=client, target=team_target, path=f"runs/{team_run_id}/execute", payload={"dry_run": False}, headers=headers) run_data = _get_dict(execute_result, "run") output_text = _resolve_output_text(run_data) error_message = _get_optional_string(run_data, "error_message") if error_message: request_status = "failed" if output_text: await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": session_id, "turn_id": run_request_id, "role": "assistant", "content_type": "text", "content_text": output_text, "content_json": {}, }, headers=headers) except HTTPException as exc: request_status = "failed" error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": request_status, "request_payload_json": { **run_request_payload, "user_message_id": _get_string(user_message, "id"), "output_text": output_text, "error_message": error_message, }, }, headers=headers) duration_ms = int((perf_counter() - start) * 1000) AppInvocationAuditRepository(db).create( app_id=app_entity.id, api_key_prefix=key_entity.key_prefix, request_id=request_id, session_id=session_id, run_request_id=run_request_id, target_type=app_entity.target_type, target_id=app_entity.target_id, invoke_type="sync", status=request_status, duration_ms=duration_ms, error_message=error_message, client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None) return OpenApiChatResponse( request_id=request_id, app_code=app_entity.code, session_id=session_id, run_request_id=run_request_id, target_type=app_entity.target_type, target_id=app_entity.target_id, status=request_status, output_text=output_text, error=error_message) @router.post("/gateway/openapi/apps/{app_code}/chat/stream") async def openapi_chat_stream(app_code: str, payload: OpenApiChatRequest, request: Request): settings = ApiGatewaySettings() session_factory = request.app.state.session_factory auth_db = session_factory() try: key_entity, app_entity = _authenticate_app_api_key(request, auth_db) except HTTPException as exc: auth_db.close() detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) return StreamingResponse( _single_sse("failed", {"status": "failed", "error_code": "auth_error", "error_message": detail}), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) finally: auth_db.close() if app_entity.code != app_code: return StreamingResponse( _single_sse("failed", {"status": "failed", "error_code": "forbidden", "error_message": "api key does not belong to this app"}), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) if app_entity.target_type != "agent": return StreamingResponse( _single_sse("failed", {"status": "failed", "error_code": "unsupported", "error_message": "streaming is only supported for agent targets in V0.1"}), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) return StreamingResponse( _stream_openapi_chat(app_code, payload, request, key_entity, app_entity, session_factory, settings), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) def _single_sse(event: str, data: dict) -> AsyncIterator[str]: async def _gen(): yield _sse(event, data) return _gen() async def _stream_openapi_chat( app_code: str, payload: OpenApiChatRequest, request: Request, key_entity, app_entity, session_factory, settings: ApiGatewaySettings): start = perf_counter() request_id = str(uuid4()) targets = build_proxy_targets(settings) session_target = targets["session-service"] agent_target = targets["agent-service"] headers = _build_internal_headers(request, settings) client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds) output_text = "" error_message: str | None = None session_id: str | None = None run_request_id: str | None = None request_status = "failed" try: session_id = payload.session_id if not session_id: session_data = await _post_json( client=client, target=session_target, path="", payload={ "app_id": app_entity.id, "user_id": payload.user_id or "openapi", "channel_type": "openapi", "runtime_target_type": app_entity.target_type, "runtime_target_id": app_entity.target_id, }, headers=headers) session_id = _get_string(session_data, "id") run_request_payload = { "target_type": app_entity.target_type, "target_id": app_entity.target_id, "mode": "production", "input_text": payload.message, } run_request = await _post_json( client=client, target=session_target, path="run-requests", payload={ "session_id": session_id, "app_config_id": app_entity.target_id, "workflow_config_id": app_entity.target_id, "trigger_type": "chat", "request_payload_json": run_request_payload, "request_status": "accepted", }, headers=headers) run_request_id = _get_string(run_request, "id") user_message = await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": session_id, "turn_id": run_request_id, "role": "user", "content_type": "text", "content_text": payload.message, "content_json": {}, }, headers=headers) user_message_id = _get_string(user_message, "id") await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": "running", "request_payload_json": {**run_request_payload, "user_message_id": user_message_id}, }, headers=headers) yield _sse("started", { "request_id": request_id, "session_id": session_id, "run_request_id": run_request_id}) agent_run = await _post_json( client=client, target=agent_target, path="runs", payload={ "agent_id": app_entity.target_id, "session_id": session_id, "input_text": payload.message, "input_json": {"source": "openapi", "run_request_id": run_request_id}, }, headers=headers) agent_run_id = _get_string(agent_run, "id") stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream") async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp: if not resp.is_success: error_message = await _read_stream_error(resp) else: async for ev_name, ev_data in _parse_sse(resp): data = json.loads(ev_data) if ev_name == "agent.run.delta": text_chunk = data.get("text", "") yield _sse("delta", {"text": text_chunk}) if isinstance(text_chunk, str): output_text += text_chunk elif ev_name == "agent.run.completed": run_data = data.get("run", data) final_text = _get_optional_string(run_data, "output_text") if not output_text and final_text: output_text = final_text yield _sse("completed", {"status": "completed", "output_text": output_text}) elif ev_name == "agent.run.failed": msg = data.get("error_message", "Agent execution failed") if not isinstance(msg, str): msg = "Agent execution failed" error_message = msg yield _sse("failed", {"status": "failed", "error_code": "agent_error", "error_message": msg}) else: yield _sse(ev_name, data) request_status = "failed" if error_message else "completed" if output_text: await _post_json( client=client, target=session_target, path="messages", payload={ "session_id": session_id, "turn_id": run_request_id, "role": "assistant", "content_type": "text", "content_text": output_text, "content_json": {}, }, headers=headers) await _post_json( client=client, target=session_target, path="run-requests/update", payload={ "run_request_id": run_request_id, "request_status": request_status, "request_payload_json": { **run_request_payload, "user_message_id": user_message_id, "output_text": output_text, "error_message": error_message, }, }, headers=headers) except HTTPException as exc: detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False) yield _sse("failed", {"status": "failed", "error_code": "gateway_error", "error_message": detail}) except Exception as exc: yield _sse("failed", {"status": "failed", "error_code": "internal_error", "error_message": str(exc)}) finally: await client.aclose() duration_ms = int((perf_counter() - start) * 1000) audit_db = session_factory() try: AppInvocationAuditRepository(audit_db).create( app_id=app_entity.id, api_key_prefix=key_entity.key_prefix, request_id=request_id, session_id=session_id, run_request_id=run_request_id, target_type=app_entity.target_type, target_id=app_entity.target_id, invoke_type="stream", status=request_status, duration_ms=duration_ms, error_message=error_message, client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None) except Exception: pass finally: audit_db.close() @router.api_route( "/gateway/sessions", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/sessions/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_session_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["session-service"], path=path) @router.api_route( "/gateway/agents", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/agents/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_agent_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["agent-service"], path=path) @router.api_route( "/gateway/memories", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/memories/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_memory_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["memory-service"], path=path) @router.api_route( "/gateway/teams", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/teams/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_team_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["team-service"], path=path) @router.api_route( "/gateway/skills", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/skills/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_skill_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["skill-service"], path=path) @router.api_route( "/gateway/human", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/human/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_human_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["human-service"], path=path) @router.api_route( "/gateway/knowledge", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/knowledge/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_knowledge_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["knowledge-service"], path=path) @router.api_route( "/gateway/events", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/events/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_event_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["event-service"], path=path) @router.api_route( "/gateway/identity", methods=["POST"]) @router.api_route( "/gateway/identity/{path:path}", methods=["POST"]) async def proxy_identity_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["identity-service"], path=path) @router.api_route( "/gateway/scheduler", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/scheduler/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_scheduler_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["scheduler-service"], path=path) @router.api_route( "/gateway/tools", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/tools/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_tool_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["tool-service"], path=path) @router.api_route( "/gateway/models", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/models/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_model_gateway_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["model-gateway-service"], path=path) @router.api_route( "/gateway/model-providers", methods=["POST"]) @router.api_route( "/gateway/model-providers/{path:path}", methods=["POST"]) async def proxy_model_provider_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["model-provider-service"], path=path) @router.api_route( "/gateway/code", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) @router.api_route( "/gateway/code/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def proxy_code_runner_service( request: Request, settings: GatewaySettingsDep, proxy: ServiceProxyDep, path: str = "") -> Response: return await proxy.forward( request=request, target=build_proxy_targets(settings)["code-runner-service"], path=path) def _build_internal_headers(request: Request, settings: ApiGatewaySettings) -> dict[str, str]: headers = build_internal_service_headers(settings) authorization = request.headers.get("authorization") user_id = request.headers.get("x-user-id") if authorization: headers["authorization"] = authorization if user_id: headers["x-user-id"] = user_id return headers def httpx_client(timeout_seconds: float) -> httpx.AsyncClient: return httpx.AsyncClient(timeout=timeout_seconds) async def _post_json( *, client: httpx.AsyncClient, target: ProxyTarget, path: str, payload: dict[str, object], headers: dict[str, str]) -> dict[str, object]: url = _target_url(target, path) try: response = await client.post(url, headers=headers, json=payload) except httpx.HTTPError as exc: raise HTTPException(status_code=502, detail=error_detail("error.downstream.request_failed", service=target.service_name, error=str(exc))) from exc if not response.is_success: raise HTTPException(status_code=response.status_code, detail=_error_detail(response)) data = response.json() if not isinstance(data, dict): raise HTTPException(status_code=502, detail=error_detail("error.downstream.unexpected_response", service=target.service_name)) return data def _target_url(target: ProxyTarget, path: str) -> str: normalized_path = path.strip("/") if normalized_path: return f"{target.base_url.rstrip('/')}{target.path_prefix}/{normalized_path}" return f"{target.base_url.rstrip('/')}{target.path_prefix}" def _error_detail(response: httpx.Response): try: payload = response.json() except ValueError: return error_detail("error.downstream.generic_failure", status=str(response.status_code)) if isinstance(payload, dict): detail = payload.get("detail") if isinstance(detail, str): return detail error = payload.get("error") if isinstance(error, dict): message = error.get("message") if isinstance(message, str): return message return error_detail("error.downstream.generic_failure", status=str(response.status_code)) def _get_string(payload: dict[str, object], key: str) -> str: value = payload.get(key) if not isinstance(value, str) or not value: raise HTTPException(status_code=502, detail=error_detail("error.downstream.missing_field", field=key)) return value def _get_optional_string(payload: dict[str, object], key: str) -> str | None: value = payload.get(key) return value if isinstance(value, str) and value else None def _get_dict(payload: dict[str, object], key: str) -> dict[str, object]: value = payload.get(key) if not isinstance(value, dict): raise HTTPException(status_code=502, detail=error_detail("error.downstream.missing_field", field=key)) return value def _resolve_output_text(run_payload: dict[str, object]) -> str | None: output_text = _get_optional_string(run_payload, "output_text") if output_text: return output_text output_json = run_payload.get("output_json") if isinstance(output_json, dict) and output_json: return json.dumps(output_json, ensure_ascii=False) return None def _sse(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" async def _parse_sse(response: httpx.Response): current_event = "message" current_data = "" async for line in response.aiter_lines(): if line.startswith("event:"): current_event = line[6:].strip() elif line.startswith("data:"): current_data = line[5:].strip() elif line == "": if current_data: yield current_event, current_data current_event = "message" current_data = "" if current_data: yield current_event, current_data async def _read_stream_error(response: httpx.Response) -> str: body = await response.aread() text = body.decode(errors="replace") try: data = json.loads(text) if isinstance(data, dict): detail = data.get("detail") if isinstance(detail, str): return detail except (ValueError, UnicodeDecodeError): pass return text or f"downstream error {response.status_code}"