| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- from __future__ import annotations
- from fastapi import Request, Response
- from starlette.responses import JSONResponse
- from core_shared.rate_limit import RateLimitDecision, RateLimiter
- from app.bootstrap.settings import ApiGatewaySettings
- RATE_LIMIT_WINDOW_SECONDS = 60
- RATE_LIMIT_LIMIT_HEADER = "x-ratelimit-limit"
- RATE_LIMIT_REMAINING_HEADER = "x-ratelimit-remaining"
- RATE_LIMIT_RESET_HEADER = "x-ratelimit-reset"
- def enforce_gateway_rate_limit(
- *,
- request: Request,
- settings: ApiGatewaySettings,
- limiter: RateLimiter,
- ) -> Response | None:
- if not settings.rate_limit_enabled:
- return None
- if not request.url.path.startswith("/gateway/"):
- return None
- from app.infrastructure.request_context import get_gateway_request_context
- context = get_gateway_request_context(request)
- checks: list[RateLimitDecision] = []
- tenant_limit = max(settings.tenant_rate_limit_per_minute, 1)
- checks.append(
- limiter.check(
- key=f"tenant:{context.tenant_id}",
- limit=tenant_limit,
- window_seconds=RATE_LIMIT_WINDOW_SECONDS,
- )
- )
- if context.api_key_id is not None:
- api_key_limit = max(settings.api_key_rate_limit_per_minute, 1)
- checks.append(
- limiter.check(
- key=f"api-key:{context.api_key_id}",
- limit=api_key_limit,
- window_seconds=RATE_LIMIT_WINDOW_SECONDS,
- )
- )
- denied = next((item for item in checks if not item.allowed), None)
- if denied is None:
- if checks:
- request.state.gateway_rate_limit_decision = min(
- checks,
- key=lambda item: item.remaining,
- )
- return None
- response = JSONResponse(
- status_code=429,
- content={
- "detail": "rate limit exceeded",
- "limit": denied.limit,
- "reset_epoch_seconds": denied.reset_epoch_seconds,
- },
- )
- apply_rate_limit_headers(response, denied)
- return response
- def apply_gateway_rate_limit_headers(
- *,
- response: Response,
- request: Request,
- settings: ApiGatewaySettings,
- limiter: RateLimiter,
- ) -> None:
- if not settings.rate_limit_enabled:
- return
- if not request.url.path.startswith("/gateway/"):
- return
- decision = getattr(request.state, "gateway_rate_limit_decision", None)
- if isinstance(decision, RateLimitDecision):
- apply_rate_limit_headers(response, decision)
- def apply_rate_limit_headers(response: Response, decision: RateLimitDecision) -> None:
- response.headers[RATE_LIMIT_LIMIT_HEADER] = str(decision.limit)
- response.headers[RATE_LIMIT_REMAINING_HEADER] = str(decision.remaining)
- response.headers[RATE_LIMIT_RESET_HEADER] = str(decision.reset_epoch_seconds)
|