| 1234567891011121314151617181920212223242526272829303132333435363738 |
- from core_shared.rate_limit import InMemoryFixedWindowRateLimiter
- def test_in_memory_fixed_window_rate_limiter_blocks_after_limit() -> None:
- limiter = InMemoryFixedWindowRateLimiter()
- first = limiter.check(
- key="tenant:t1",
- limit=2,
- window_seconds=60,
- now_epoch_seconds=120,
- )
- second = limiter.check(
- key="tenant:t1",
- limit=2,
- window_seconds=60,
- now_epoch_seconds=121,
- )
- third = limiter.check(
- key="tenant:t1",
- limit=2,
- window_seconds=60,
- now_epoch_seconds=122,
- )
- next_window = limiter.check(
- key="tenant:t1",
- limit=2,
- window_seconds=60,
- now_epoch_seconds=180,
- )
- assert first.allowed is True
- assert first.remaining == 1
- assert second.allowed is True
- assert second.remaining == 0
- assert third.allowed is False
- assert third.reset_epoch_seconds == 180
- assert next_window.allowed is True
|