|
|
@@ -1,6 +1,9 @@
|
|
|
import asyncio
|
|
|
import json
|
|
|
-from typing import Annotated
|
|
|
+from datetime import datetime
|
|
|
+from time import perf_counter
|
|
|
+from typing import Annotated, AsyncIterator
|
|
|
+from uuid import uuid4
|
|
|
|
|
|
from core_domain import ServiceDescriptor, ServiceHealth
|
|
|
import httpx
|
|
|
@@ -11,7 +14,13 @@ from pydantic import BaseModel
|
|
|
|
|
|
from app.bootstrap.settings import ApiGatewaySettings
|
|
|
from app.db.session import get_db
|
|
|
-from app.domain.repositories import ApiKeyRepository, GatewayRequestAuditRepository
|
|
|
+from app.domain.repositories import (
|
|
|
+ ApiKeyRepository,
|
|
|
+ AppApiKeyRepository,
|
|
|
+ AppDefinitionRepository,
|
|
|
+ AppInvocationAuditRepository,
|
|
|
+ GatewayRequestAuditRepository,
|
|
|
+)
|
|
|
from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key
|
|
|
from app.infrastructure.proxy import ProxyServiceName, ProxyTarget, ServiceProxy
|
|
|
from core_shared.security import build_internal_service_headers
|
|
|
@@ -22,10 +31,25 @@ from app.schemas.gateway import (
|
|
|
ApiKeyResponse,
|
|
|
ApiKeyStatusPostRequest,
|
|
|
ApiKeyStatusUpdateRequest,
|
|
|
+ AppApiKeyCreateRequest,
|
|
|
+ AppApiKeyCreateResponse,
|
|
|
+ AppApiKeyListRequest,
|
|
|
+ AppApiKeyResponse,
|
|
|
+ AppApiKeyStatusUpdateRequest,
|
|
|
+ AppAuditListRequest,
|
|
|
+ AppCreateRequest,
|
|
|
+ AppDetailRequest,
|
|
|
+ AppInvocationAuditResponse,
|
|
|
+ AppListRequest,
|
|
|
+ AppResponse,
|
|
|
+ AppStatusUpdateRequest,
|
|
|
+ AppUpdateRequest,
|
|
|
GatewayAuditServiceStats,
|
|
|
GatewayAuditStatsResponse,
|
|
|
GatewayRequestAuditResponse,
|
|
|
GatewayServicesHealthResponse,
|
|
|
+ OpenApiChatRequest,
|
|
|
+ OpenApiChatResponse,
|
|
|
)
|
|
|
|
|
|
router = APIRouter()
|
|
|
@@ -632,6 +656,496 @@ async def _stream_session_execute(
|
|
|
await client.aclose()
|
|
|
|
|
|
|
|
|
+# ── Application Admin Routes ─────────────────────────────────────────────────
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps", response_model=AppResponse)
|
|
|
+def create_app(payload: AppCreateRequest, db: DbSession) -> AppResponse:
|
|
|
+ existing = AppDefinitionRepository(db).get_by_code(code=payload.code)
|
|
|
+ if existing is not None:
|
|
|
+ raise HTTPException(status_code=409, detail=f"app code already exists: {payload.code}")
|
|
|
+ entity = AppDefinitionRepository(db).create(
|
|
|
+ code=payload.code,
|
|
|
+ name=payload.name,
|
|
|
+ description=payload.description,
|
|
|
+ target_type=payload.target_type,
|
|
|
+ target_id=payload.target_id,
|
|
|
+ owner_user_id=payload.owner_user_id,
|
|
|
+ settings_json=payload.settings_json)
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/list", response_model=list[AppResponse])
|
|
|
+def list_apps(payload: AppListRequest, db: DbSession) -> list[AppResponse]:
|
|
|
+ return [AppResponse.from_entity(e) for e in AppDefinitionRepository(db).list_all()]
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/detail", response_model=AppResponse)
|
|
|
+def get_app_detail(payload: AppDetailRequest, db: DbSession) -> AppResponse:
|
|
|
+ entity = AppDefinitionRepository(db).get_by_id(app_id=payload.app_id)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {payload.app_id}")
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/update", response_model=AppResponse)
|
|
|
+def update_app(payload: AppUpdateRequest, db: DbSession) -> AppResponse:
|
|
|
+ entity = AppDefinitionRepository(db).update(
|
|
|
+ app_id=payload.app_id,
|
|
|
+ name=payload.name,
|
|
|
+ description=payload.description,
|
|
|
+ target_type=payload.target_type,
|
|
|
+ target_id=payload.target_id,
|
|
|
+ settings_json=payload.settings_json)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {payload.app_id}")
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/status", response_model=AppResponse)
|
|
|
+def update_app_status(payload: AppStatusUpdateRequest, db: DbSession) -> AppResponse:
|
|
|
+ entity = AppDefinitionRepository(db).update_status(app_id=payload.app_id, status=payload.status)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {payload.app_id}")
|
|
|
+ return AppResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/api-keys", response_model=AppApiKeyCreateResponse)
|
|
|
+def create_app_api_key(app_id: str, payload: AppApiKeyCreateRequest, db: DbSession) -> AppApiKeyCreateResponse:
|
|
|
+ app_entity = AppDefinitionRepository(db).get_by_id(app_id=app_id)
|
|
|
+ if app_entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"app not found: {app_id}")
|
|
|
+ api_key = generate_api_key()
|
|
|
+ entity = AppApiKeyRepository(db).create(
|
|
|
+ app_id=app_id,
|
|
|
+ name=payload.name,
|
|
|
+ key_prefix=get_api_key_prefix(api_key),
|
|
|
+ key_hash=hash_api_key(api_key),
|
|
|
+ scopes=payload.scopes,
|
|
|
+ expires_time=payload.expires_time)
|
|
|
+ return AppApiKeyCreateResponse(
|
|
|
+ id=entity.id,
|
|
|
+ app_id=entity.app_id,
|
|
|
+ name=entity.name,
|
|
|
+ key_prefix=entity.key_prefix,
|
|
|
+ api_key=api_key,
|
|
|
+ status=entity.status,
|
|
|
+ scopes=entity.scopes,
|
|
|
+ expires_time=entity.expires_time,
|
|
|
+ created_time=entity.created_time)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/api-keys/list", response_model=list[AppApiKeyResponse])
|
|
|
+def list_app_api_keys(app_id: str, payload: AppApiKeyListRequest, db: DbSession) -> list[AppApiKeyResponse]:
|
|
|
+ return [AppApiKeyResponse.from_entity(e) for e in AppApiKeyRepository(db).list_by_app(app_id=app_id)]
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/api-keys/status", response_model=AppApiKeyResponse)
|
|
|
+def update_app_api_key_status(app_id: str, payload: AppApiKeyStatusUpdateRequest, db: DbSession) -> AppApiKeyResponse:
|
|
|
+ entity = AppApiKeyRepository(db).update_status(api_key_id=payload.api_key_id, status=payload.status)
|
|
|
+ if entity is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"api key not found: {payload.api_key_id}")
|
|
|
+ return AppApiKeyResponse.from_entity(entity)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/apps/{app_id}/audits", response_model=list[AppInvocationAuditResponse])
|
|
|
+def list_app_audits(app_id: str, payload: AppAuditListRequest, db: DbSession) -> list[AppInvocationAuditResponse]:
|
|
|
+ return [
|
|
|
+ AppInvocationAuditResponse.from_entity(e)
|
|
|
+ for e in AppInvocationAuditRepository(db).list_by_app(app_id=app_id, limit=payload.limit)
|
|
|
+ ]
|
|
|
+
|
|
|
+
|
|
|
+# ── OpenAPI External Invocation ──────────────────────────────────────────────
|
|
|
+
|
|
|
+
|
|
|
+def _authenticate_app_api_key(request: Request, db: Session):
|
|
|
+ settings = ApiGatewaySettings()
|
|
|
+ token: str | None = None
|
|
|
+ authorization = request.headers.get("authorization")
|
|
|
+ if authorization:
|
|
|
+ scheme, _, t = authorization.partition(" ")
|
|
|
+ if scheme.lower() == "bearer" and t.strip():
|
|
|
+ token = t.strip()
|
|
|
+ if token is None:
|
|
|
+ token = request.headers.get(settings.api_key_header_name)
|
|
|
+ if not token:
|
|
|
+ raise HTTPException(status_code=401, detail="missing bearer token or api key")
|
|
|
+
|
|
|
+ key_hash = hash_api_key(token)
|
|
|
+ key_entity = AppApiKeyRepository(db).get_active_by_hash(key_hash=key_hash)
|
|
|
+ if key_entity is None:
|
|
|
+ raise HTTPException(status_code=401, detail="invalid api key")
|
|
|
+ if key_entity.expires_time is not None and key_entity.expires_time <= datetime.utcnow():
|
|
|
+ raise HTTPException(status_code=401, detail="api key expired")
|
|
|
+
|
|
|
+ app_entity = AppDefinitionRepository(db).get_by_id(app_id=key_entity.app_id)
|
|
|
+ if app_entity is None:
|
|
|
+ raise HTTPException(status_code=401, detail="app not found")
|
|
|
+ if app_entity.status != "published":
|
|
|
+ raise HTTPException(status_code=403, detail=f"app is {app_entity.status}, not published")
|
|
|
+
|
|
|
+ AppApiKeyRepository(db).touch_last_used_time(api_key_id=key_entity.id)
|
|
|
+ return key_entity, app_entity
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/openapi/apps/{app_code}/chat", response_model=OpenApiChatResponse)
|
|
|
+async def openapi_chat(app_code: str, payload: OpenApiChatRequest, request: Request, db: DbSession):
|
|
|
+ start = perf_counter()
|
|
|
+ request_id = str(uuid4())
|
|
|
+ key_entity, app_entity = _authenticate_app_api_key(request, db)
|
|
|
+ if app_entity.code != app_code:
|
|
|
+ raise HTTPException(status_code=403, detail="api key does not belong to this app")
|
|
|
+
|
|
|
+ targets = build_proxy_targets(ApiGatewaySettings())
|
|
|
+ session_target = targets["session-service"]
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
+ team_target = targets["team-service"]
|
|
|
+ headers = _build_internal_headers(request, ApiGatewaySettings())
|
|
|
+
|
|
|
+ async with httpx_client(ApiGatewaySettings().proxy_timeout_seconds) as client:
|
|
|
+ session_id = payload.session_id
|
|
|
+ if not session_id:
|
|
|
+ session_data = await _post_json(
|
|
|
+ client=client, target=session_target, path="",
|
|
|
+ payload={
|
|
|
+ "app_id": app_entity.id,
|
|
|
+ "user_id": payload.user_id or "openapi",
|
|
|
+ "channel_type": "openapi",
|
|
|
+ "runtime_target_type": app_entity.target_type,
|
|
|
+ "runtime_target_id": app_entity.target_id,
|
|
|
+ }, headers=headers)
|
|
|
+ session_id = _get_string(session_data, "id")
|
|
|
+
|
|
|
+ run_request_payload = {
|
|
|
+ "target_type": app_entity.target_type,
|
|
|
+ "target_id": app_entity.target_id,
|
|
|
+ "mode": "production",
|
|
|
+ "input_text": payload.message,
|
|
|
+ }
|
|
|
+ run_request = await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id,
|
|
|
+ "app_config_id": app_entity.target_id,
|
|
|
+ "workflow_config_id": app_entity.target_id,
|
|
|
+ "trigger_type": "chat",
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
+ "request_status": "accepted",
|
|
|
+ }, headers=headers)
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
+
|
|
|
+ user_message = await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "user", "content_type": "text",
|
|
|
+ "content_text": payload.message, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": "running",
|
|
|
+ "request_payload_json": {**run_request_payload, "user_message_id": _get_string(user_message, "id")},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ output_text: str | None = None
|
|
|
+ error_message: str | None = None
|
|
|
+ request_status = "completed"
|
|
|
+
|
|
|
+ try:
|
|
|
+ if app_entity.target_type == "agent":
|
|
|
+ agent_run = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "agent_id": app_entity.target_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "input_text": payload.message,
|
|
|
+ "input_json": {"source": "openapi", "run_request_id": run_request_id},
|
|
|
+ }, headers=headers)
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
+ execute_result = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs/execute",
|
|
|
+ payload={"agent_run_id": agent_run_id, "dry_run": False}, headers=headers)
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
+ else:
|
|
|
+ team_run = await _post_json(
|
|
|
+ client=client, target=team_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "team_id": app_entity.target_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "input_text": payload.message,
|
|
|
+ "input_json": {"source": "openapi", "run_request_id": run_request_id},
|
|
|
+ "enqueue": True,
|
|
|
+ }, headers=headers)
|
|
|
+ team_run_id = _get_string(team_run, "id")
|
|
|
+ execute_result = await _post_json(
|
|
|
+ client=client, target=team_target, path=f"runs/{team_run_id}/execute",
|
|
|
+ payload={"dry_run": False}, headers=headers)
|
|
|
+ run_data = _get_dict(execute_result, "run")
|
|
|
+ output_text = _resolve_output_text(run_data)
|
|
|
+ error_message = _get_optional_string(run_data, "error_message")
|
|
|
+
|
|
|
+ if error_message:
|
|
|
+ request_status = "failed"
|
|
|
+
|
|
|
+ if output_text:
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "assistant", "content_type": "text",
|
|
|
+ "content_text": output_text, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+ except HTTPException as exc:
|
|
|
+ request_status = "failed"
|
|
|
+ error_message = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload,
|
|
|
+ "user_message_id": _get_string(user_message, "id"),
|
|
|
+ "output_text": output_text, "error_message": error_message,
|
|
|
+ },
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ duration_ms = int((perf_counter() - start) * 1000)
|
|
|
+ AppInvocationAuditRepository(db).create(
|
|
|
+ app_id=app_entity.id,
|
|
|
+ api_key_prefix=key_entity.key_prefix,
|
|
|
+ request_id=request_id,
|
|
|
+ session_id=session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=app_entity.target_type,
|
|
|
+ target_id=app_entity.target_id,
|
|
|
+ invoke_type="sync",
|
|
|
+ status=request_status,
|
|
|
+ duration_ms=duration_ms,
|
|
|
+ error_message=error_message,
|
|
|
+ client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None)
|
|
|
+
|
|
|
+ return OpenApiChatResponse(
|
|
|
+ request_id=request_id,
|
|
|
+ app_code=app_entity.code,
|
|
|
+ session_id=session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=app_entity.target_type,
|
|
|
+ target_id=app_entity.target_id,
|
|
|
+ status=request_status,
|
|
|
+ output_text=output_text,
|
|
|
+ error=error_message)
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/gateway/openapi/apps/{app_code}/chat/stream")
|
|
|
+async def openapi_chat_stream(app_code: str, payload: OpenApiChatRequest, request: Request):
|
|
|
+ settings = ApiGatewaySettings()
|
|
|
+ session_factory = request.app.state.session_factory
|
|
|
+
|
|
|
+ auth_db = session_factory()
|
|
|
+ try:
|
|
|
+ key_entity, app_entity = _authenticate_app_api_key(request, auth_db)
|
|
|
+ except HTTPException as exc:
|
|
|
+ auth_db.close()
|
|
|
+ detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+ return StreamingResponse(
|
|
|
+ _single_sse("failed", {"status": "failed", "error_code": "auth_error", "error_message": detail}),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+ finally:
|
|
|
+ auth_db.close()
|
|
|
+
|
|
|
+ if app_entity.code != app_code:
|
|
|
+ return StreamingResponse(
|
|
|
+ _single_sse("failed", {"status": "failed", "error_code": "forbidden", "error_message": "api key does not belong to this app"}),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+ if app_entity.target_type != "agent":
|
|
|
+ return StreamingResponse(
|
|
|
+ _single_sse("failed", {"status": "failed", "error_code": "unsupported", "error_message": "streaming is only supported for agent targets in V0.1"}),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+
|
|
|
+ return StreamingResponse(
|
|
|
+ _stream_openapi_chat(app_code, payload, request, key_entity, app_entity, session_factory, settings),
|
|
|
+ media_type="text/event-stream",
|
|
|
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
+
|
|
|
+
|
|
|
+def _single_sse(event: str, data: dict) -> AsyncIterator[str]:
|
|
|
+ async def _gen():
|
|
|
+ yield _sse(event, data)
|
|
|
+ return _gen()
|
|
|
+
|
|
|
+
|
|
|
+async def _stream_openapi_chat(
|
|
|
+ app_code: str,
|
|
|
+ payload: OpenApiChatRequest,
|
|
|
+ request: Request,
|
|
|
+ key_entity,
|
|
|
+ app_entity,
|
|
|
+ session_factory,
|
|
|
+ settings: ApiGatewaySettings):
|
|
|
+ start = perf_counter()
|
|
|
+ request_id = str(uuid4())
|
|
|
+
|
|
|
+ targets = build_proxy_targets(settings)
|
|
|
+ session_target = targets["session-service"]
|
|
|
+ agent_target = targets["agent-service"]
|
|
|
+ headers = _build_internal_headers(request, settings)
|
|
|
+ client = httpx.AsyncClient(timeout=settings.proxy_timeout_seconds)
|
|
|
+
|
|
|
+ output_text = ""
|
|
|
+ error_message: str | None = None
|
|
|
+ session_id: str | None = None
|
|
|
+ run_request_id: str | None = None
|
|
|
+ request_status = "failed"
|
|
|
+
|
|
|
+ try:
|
|
|
+ session_id = payload.session_id
|
|
|
+ if not session_id:
|
|
|
+ session_data = await _post_json(
|
|
|
+ client=client, target=session_target, path="",
|
|
|
+ payload={
|
|
|
+ "app_id": app_entity.id,
|
|
|
+ "user_id": payload.user_id or "openapi",
|
|
|
+ "channel_type": "openapi",
|
|
|
+ "runtime_target_type": app_entity.target_type,
|
|
|
+ "runtime_target_id": app_entity.target_id,
|
|
|
+ }, headers=headers)
|
|
|
+ session_id = _get_string(session_data, "id")
|
|
|
+
|
|
|
+ run_request_payload = {
|
|
|
+ "target_type": app_entity.target_type,
|
|
|
+ "target_id": app_entity.target_id,
|
|
|
+ "mode": "production",
|
|
|
+ "input_text": payload.message,
|
|
|
+ }
|
|
|
+ run_request = await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id,
|
|
|
+ "app_config_id": app_entity.target_id,
|
|
|
+ "workflow_config_id": app_entity.target_id,
|
|
|
+ "trigger_type": "chat",
|
|
|
+ "request_payload_json": run_request_payload,
|
|
|
+ "request_status": "accepted",
|
|
|
+ }, headers=headers)
|
|
|
+ run_request_id = _get_string(run_request, "id")
|
|
|
+
|
|
|
+ user_message = await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "user", "content_type": "text",
|
|
|
+ "content_text": payload.message, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+ user_message_id = _get_string(user_message, "id")
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": "running",
|
|
|
+ "request_payload_json": {**run_request_payload, "user_message_id": user_message_id},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ yield _sse("started", {
|
|
|
+ "request_id": request_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "run_request_id": run_request_id})
|
|
|
+
|
|
|
+ agent_run = await _post_json(
|
|
|
+ client=client, target=agent_target, path="runs",
|
|
|
+ payload={
|
|
|
+ "agent_id": app_entity.target_id,
|
|
|
+ "session_id": session_id,
|
|
|
+ "input_text": payload.message,
|
|
|
+ "input_json": {"source": "openapi", "run_request_id": run_request_id},
|
|
|
+ }, headers=headers)
|
|
|
+ agent_run_id = _get_string(agent_run, "id")
|
|
|
+
|
|
|
+ stream_url = _target_url(agent_target, f"runs/{agent_run_id}/execute-stream")
|
|
|
+ async with client.stream("POST", stream_url, headers=headers, json={"dry_run": False}) as resp:
|
|
|
+ if not resp.is_success:
|
|
|
+ error_message = await _read_stream_error(resp)
|
|
|
+ else:
|
|
|
+ async for ev_name, ev_data in _parse_sse(resp):
|
|
|
+ data = json.loads(ev_data)
|
|
|
+ if ev_name == "agent.run.delta":
|
|
|
+ text_chunk = data.get("text", "")
|
|
|
+ yield _sse("delta", {"text": text_chunk})
|
|
|
+ if isinstance(text_chunk, str):
|
|
|
+ output_text += text_chunk
|
|
|
+ elif ev_name == "agent.run.completed":
|
|
|
+ run_data = data.get("run", data)
|
|
|
+ final_text = _get_optional_string(run_data, "output_text")
|
|
|
+ if not output_text and final_text:
|
|
|
+ output_text = final_text
|
|
|
+ yield _sse("completed", {"status": "completed", "output_text": output_text})
|
|
|
+ elif ev_name == "agent.run.failed":
|
|
|
+ msg = data.get("error_message", "Agent execution failed")
|
|
|
+ if not isinstance(msg, str):
|
|
|
+ msg = "Agent execution failed"
|
|
|
+ error_message = msg
|
|
|
+ yield _sse("failed", {"status": "failed", "error_code": "agent_error", "error_message": msg})
|
|
|
+ else:
|
|
|
+ yield _sse(ev_name, data)
|
|
|
+
|
|
|
+ request_status = "failed" if error_message else "completed"
|
|
|
+
|
|
|
+ if output_text:
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="messages",
|
|
|
+ payload={
|
|
|
+ "session_id": session_id, "turn_id": run_request_id,
|
|
|
+ "role": "assistant", "content_type": "text",
|
|
|
+ "content_text": output_text, "content_json": {},
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ await _post_json(
|
|
|
+ client=client, target=session_target, path="run-requests/update",
|
|
|
+ payload={
|
|
|
+ "run_request_id": run_request_id, "request_status": request_status,
|
|
|
+ "request_payload_json": {
|
|
|
+ **run_request_payload, "user_message_id": user_message_id,
|
|
|
+ "output_text": output_text, "error_message": error_message,
|
|
|
+ },
|
|
|
+ }, headers=headers)
|
|
|
+
|
|
|
+ except HTTPException as exc:
|
|
|
+ detail = exc.detail if isinstance(exc.detail, str) else json.dumps(exc.detail, ensure_ascii=False)
|
|
|
+ yield _sse("failed", {"status": "failed", "error_code": "gateway_error", "error_message": detail})
|
|
|
+ except Exception as exc:
|
|
|
+ yield _sse("failed", {"status": "failed", "error_code": "internal_error", "error_message": str(exc)})
|
|
|
+ finally:
|
|
|
+ await client.aclose()
|
|
|
+ duration_ms = int((perf_counter() - start) * 1000)
|
|
|
+ audit_db = session_factory()
|
|
|
+ try:
|
|
|
+ AppInvocationAuditRepository(audit_db).create(
|
|
|
+ app_id=app_entity.id,
|
|
|
+ api_key_prefix=key_entity.key_prefix,
|
|
|
+ request_id=request_id,
|
|
|
+ session_id=session_id,
|
|
|
+ run_request_id=run_request_id,
|
|
|
+ target_type=app_entity.target_type,
|
|
|
+ target_id=app_entity.target_id,
|
|
|
+ invoke_type="stream",
|
|
|
+ status=request_status,
|
|
|
+ duration_ms=duration_ms,
|
|
|
+ error_message=error_message,
|
|
|
+ client_metadata_json=json.dumps(payload.metadata) if payload.metadata else None)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ finally:
|
|
|
+ audit_db.close()
|
|
|
+
|
|
|
+
|
|
|
@router.api_route(
|
|
|
"/gateway/sessions",
|
|
|
methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
|