rate_limit.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from __future__ import annotations
  2. from core_shared.rate_limit import RateLimitDecision, RateLimiter
  3. from fastapi import Request, Response
  4. from starlette.responses import JSONResponse
  5. from app.bootstrap.settings import ApiGatewaySettings
  6. RATE_LIMIT_WINDOW_SECONDS = 60
  7. RATE_LIMIT_LIMIT_HEADER = "x-ratelimit-limit"
  8. RATE_LIMIT_REMAINING_HEADER = "x-ratelimit-remaining"
  9. RATE_LIMIT_RESET_HEADER = "x-ratelimit-reset"
  10. def enforce_gateway_rate_limit(
  11. *,
  12. request: Request,
  13. settings: ApiGatewaySettings,
  14. limiter: RateLimiter) -> Response | None:
  15. if not settings.rate_limit_enabled:
  16. return None
  17. if not request.url.path.startswith("/gateway/"):
  18. return None
  19. checks: list[RateLimitDecision] = []
  20. global_limit = max(settings.global_rate_limit_per_minute, 1)
  21. checks.append(
  22. limiter.check(
  23. key="global",
  24. limit=global_limit,
  25. window_seconds=RATE_LIMIT_WINDOW_SECONDS)
  26. )
  27. from app.infrastructure.request_context import get_gateway_request_context
  28. context = get_gateway_request_context(request)
  29. if context.api_key_id is not None:
  30. api_key_limit = max(settings.api_key_rate_limit_per_minute, 1)
  31. checks.append(
  32. limiter.check(
  33. key=f"api-key:{context.api_key_id}",
  34. limit=api_key_limit,
  35. window_seconds=RATE_LIMIT_WINDOW_SECONDS)
  36. )
  37. denied = next((item for item in checks if not item.allowed), None)
  38. if denied is None:
  39. if checks:
  40. request.state.gateway_rate_limit_decision = min(
  41. checks,
  42. key=lambda item: item.remaining)
  43. return None
  44. response = JSONResponse(
  45. status_code=429,
  46. content={
  47. "detail": "rate limit exceeded",
  48. "limit": denied.limit,
  49. "reset_epoch_seconds": denied.reset_epoch_seconds,
  50. })
  51. apply_rate_limit_headers(response, denied)
  52. return response
  53. def apply_gateway_rate_limit_headers(
  54. *,
  55. response: Response,
  56. request: Request,
  57. settings: ApiGatewaySettings,
  58. limiter: RateLimiter) -> None:
  59. if not settings.rate_limit_enabled:
  60. return
  61. if not request.url.path.startswith("/gateway/"):
  62. return
  63. decision = getattr(request.state, "gateway_rate_limit_decision", None)
  64. if isinstance(decision, RateLimitDecision):
  65. apply_rate_limit_headers(response, decision)
  66. def apply_rate_limit_headers(response: Response, decision: RateLimitDecision) -> None:
  67. response.headers[RATE_LIMIT_LIMIT_HEADER] = str(decision.limit)
  68. response.headers[RATE_LIMIT_REMAINING_HEADER] = str(decision.remaining)
  69. response.headers[RATE_LIMIT_RESET_HEADER] = str(decision.reset_epoch_seconds)