from __future__ import annotations from fastapi import Request, Response from starlette.responses import JSONResponse from core_shared.rate_limit import RateLimitDecision, RateLimiter from app.bootstrap.settings import ApiGatewaySettings RATE_LIMIT_WINDOW_SECONDS = 60 RATE_LIMIT_LIMIT_HEADER = "x-ratelimit-limit" RATE_LIMIT_REMAINING_HEADER = "x-ratelimit-remaining" RATE_LIMIT_RESET_HEADER = "x-ratelimit-reset" def enforce_gateway_rate_limit( *, request: Request, settings: ApiGatewaySettings, limiter: RateLimiter, ) -> Response | None: if not settings.rate_limit_enabled: return None if not request.url.path.startswith("/gateway/"): return None from app.infrastructure.request_context import get_gateway_request_context context = get_gateway_request_context(request) checks: list[RateLimitDecision] = [] tenant_limit = max(settings.tenant_rate_limit_per_minute, 1) checks.append( limiter.check( key=f"tenant:{context.tenant_id}", limit=tenant_limit, window_seconds=RATE_LIMIT_WINDOW_SECONDS, ) ) if context.api_key_id is not None: api_key_limit = max(settings.api_key_rate_limit_per_minute, 1) checks.append( limiter.check( key=f"api-key:{context.api_key_id}", limit=api_key_limit, window_seconds=RATE_LIMIT_WINDOW_SECONDS, ) ) denied = next((item for item in checks if not item.allowed), None) if denied is None: if checks: request.state.gateway_rate_limit_decision = min( checks, key=lambda item: item.remaining, ) return None response = JSONResponse( status_code=429, content={ "detail": "rate limit exceeded", "limit": denied.limit, "reset_epoch_seconds": denied.reset_epoch_seconds, }, ) apply_rate_limit_headers(response, denied) return response def apply_gateway_rate_limit_headers( *, response: Response, request: Request, settings: ApiGatewaySettings, limiter: RateLimiter, ) -> None: if not settings.rate_limit_enabled: return if not request.url.path.startswith("/gateway/"): return decision = getattr(request.state, "gateway_rate_limit_decision", None) if isinstance(decision, RateLimitDecision): apply_rate_limit_headers(response, decision) def apply_rate_limit_headers(response: Response, decision: RateLimitDecision) -> None: response.headers[RATE_LIMIT_LIMIT_HEADER] = str(decision.limit) response.headers[RATE_LIMIT_REMAINING_HEADER] = str(decision.remaining) response.headers[RATE_LIMIT_RESET_HEADER] = str(decision.reset_epoch_seconds)