Rate limiting protects your API/service from abuse, ensures fair resource allocation, and prevents system overload. Different algorithms suit different requirements.
For Senior Engineers: Know when to use each algorithm, their trade-offs, and how to implement them both in-memory and distributed (Redis).
| Algorithm | Memory | Accuracy | Burst Handling | Best For |
|---|---|---|---|---|
| Token Bucket | O(1) | High | Allows bursts | API rate limiting with burst tolerance |
| Leaky Bucket | O(1) | High | Smooths bursts | Traffic shaping, constant rate output |
| Fixed Window | O(1) | Low (boundary issues) | Poor | Simple quotas, analytics |
| Sliding Window Log | O(N) per user | Very High | Good | Strict rate limiting |
| Sliding Window Counter | O(1) | High | Good | Best balance (production systems) |
Tokens are added to a bucket at a fixed rate. Each request consumes a token. If bucket is empty, request is rejected.
Key Feature: Allows traffic bursts up to bucket capacity, then enforces rate limit
import time
import threading
class TokenBucket:
"""Token Bucket Rate Limiter
Args:
capacity: Maximum tokens in bucket
refill_rate: Tokens added per second
"""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self):
"""Add tokens based on elapsed time"""
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def allow_request(self, tokens: int = 1) -> bool:
"""Check if request is allowed (consumes tokens if yes)
Args:
tokens: Number of tokens to consume (for weighted limiting)
Returns:
True if allowed, False if rate limited
"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_available_tokens(self) -> float:
"""Check current token count without consuming"""
with self.lock:
self._refill()
return self.tokens
# Usage Example
limiter = TokenBucket(capacity=10, refill_rate=2.0) # 10 tokens, +2/sec
# Simulate requests
for i in range(15):
if limiter.allow_request():
print(f"Request {i+1}: Allowed (tokens: {limiter.get_available_tokens():.2f})")
else:
print(f"Request {i+1}: Rate Limited")
time.sleep(0.1) # 100ms between requests
# Weighted Token Bucket (different costs per request)
class WeightedTokenBucket(TokenBucket):
"""Token bucket with different costs per operation"""
def allow_read(self) -> bool:
"""Reads cost 1 token"""
return self.allow_request(tokens=1)
def allow_write(self) -> bool:
"""Writes cost 5 tokens (more expensive)"""
return self.allow_request(tokens=5)
def allow_search(self) -> bool:
"""Complex searches cost 10 tokens"""
return self.allow_request(tokens=10)
# Usage
weighted_limiter = WeightedTokenBucket(capacity=100, refill_rate=10.0)
print(weighted_limiter.allow_read()) # Cost: 1
print(weighted_limiter.allow_write()) # Cost: 5
print(weighted_limiter.allow_search()) # Cost: 10
Requests enter a queue (bucket) and are processed at a constant rate. If queue is full, new requests are rejected.
Key Feature: Smooths bursty traffic into constant rate output (traffic shaping)
import time
import threading
from collections import deque
from typing import Optional
class LeakyBucket:
"""Leaky Bucket Rate Limiter
Processes requests at constant rate, queues excess.
Args:
capacity: Maximum queue size
leak_rate: Requests processed per second
"""
def __init__(self, capacity: int, leak_rate: float):
self.capacity = capacity
self.leak_rate = leak_rate
self.queue = deque()
self.last_leak = time.time()
self.lock = threading.Lock()
def _leak(self):
"""Remove requests from queue at constant rate"""
now = time.time()
elapsed = now - self.last_leak
# Calculate how many requests should have leaked
leaks = int(elapsed * self.leak_rate)
if leaks > 0:
# Remove up to 'leaks' items from queue
for _ in range(min(leaks, len(self.queue))):
self.queue.popleft()
self.last_leak = now
def allow_request(self) -> bool:
"""Add request to queue if space available
Returns:
True if queued, False if bucket is full
"""
with self.lock:
self._leak()
if len(self.queue) < self.capacity:
self.queue.append(time.time())
return True
return False
def get_queue_size(self) -> int:
"""Current queue size"""
with self.lock:
self._leak()
return len(self.queue)
# Usage
limiter = LeakyBucket(capacity=10, leak_rate=2.0) # Queue 10, process 2/sec
for i in range(20):
if limiter.allow_request():
print(f"Request {i+1}: Queued (queue size: {limiter.get_queue_size()})")
else:
print(f"Request {i+1}: Rejected (bucket full)")
time.sleep(0.1)
# Leaky Bucket as Meter (no queue, just metering)
class LeakyBucketMeter:
"""Simplified leaky bucket - just tracks last request time
Ensures minimum interval between requests
"""
def __init__(self, rate: float):
"""
Args:
rate: Maximum requests per second
"""
self.min_interval = 1.0 / rate
self.last_request = 0
self.lock = threading.Lock()
def allow_request(self) -> bool:
"""Check if enough time passed since last request"""
with self.lock:
now = time.time()
time_passed = now - self.last_request
if time_passed >= self.min_interval:
self.last_request = now
return True
return False
def wait_time(self) -> float:
"""How long to wait before next request allowed"""
with self.lock:
now = time.time()
time_passed = now - self.last_request
return max(0, self.min_interval - time_passed)
# Usage
meter = LeakyBucketMeter(rate=5.0) # Max 5 req/sec
for i in range(10):
if meter.allow_request():
print(f"Request {i+1}: Allowed")
else:
wait = meter.wait_time()
print(f"Request {i+1}: Wait {wait:.2f}s")
time.sleep(0.1)
Count requests in fixed time windows (e.g., per minute). Reset counter at window boundary.
Window 1 (00:00-00:59) Window 2 (01:00-01:59)
+────────────────────+ +────────────────────+
│ Count: 0 - 1 - 2...│ - │ Count: 0 (RESET) │
│ Limit: 100 │ │ Limit: 100 │
+────────────────────+ +────────────────────+
Problem: Boundary burst!
00:30-01:00: 100 requests
01:00-01:30: 100 requests
- 200 requests in 1 minute! (should be 100)
import time
import threading
from typing import Dict
class FixedWindowCounter:
"""Fixed Window Rate Limiter
Simple but has boundary issues.
Args:
limit: Maximum requests per window
window_size: Window duration in seconds
"""
def __init__(self, limit: int, window_size: int):
self.limit = limit
self.window_size = window_size
self.counters: Dict[str, int] = {} # window_key -> count
self.lock = threading.Lock()
def _get_window_key(self) -> str:
"""Get current window identifier"""
now = int(time.time())
window_start = (now // self.window_size) * self.window_size
return str(window_start)
def allow_request(self, user_id: str) -> bool:
"""Check if request is allowed for user
Args:
user_id: User identifier
Returns:
True if allowed, False if rate limited
"""
with self.lock:
window_key = self._get_window_key()
key = f"{user_id}:{window_key}"
# Get current count for this window
count = self.counters.get(key, 0)
if count < self.limit:
self.counters[key] = count + 1
return True
return False
def get_count(self, user_id: str) -> int:
"""Get current count for user in this window"""
with self.lock:
window_key = self._get_window_key()
key = f"{user_id}:{window_key}"
return self.counters.get(key, 0)
def cleanup_old_windows(self):
"""Remove old window data (call periodically)"""
with self.lock:
current_window = self._get_window_key()
# Remove all keys except current window
self.counters = {
k: v for k, v in self.counters.items()
if k.endswith(current_window)
}
# Usage
limiter = FixedWindowCounter(limit=5, window_size=10) # 5 req per 10 sec
user_id = "user_123"
for i in range(10):
if limiter.allow_request(user_id):
count = limiter.get_count(user_id)
print(f"Request {i+1}: Allowed (count: {count}/{limiter.limit})")
else:
print(f"Request {i+1}: Rate Limited")
time.sleep(1)
# Demonstrate boundary issue
print("\n--- Boundary Issue Demo ---")
limiter2 = FixedWindowCounter(limit=3, window_size=5)
# Send 3 requests at end of window
time.sleep(4) # Wait until near end of window
for i in range(3):
limiter2.allow_request(user_id)
print(f"End of window 1: {limiter2.get_count(user_id)} requests")
# Window resets, send 3 more immediately
time.sleep(2) # Cross window boundary
for i in range(3):
limiter2.allow_request(user_id)
print(f"Start of window 2: {limiter2.get_count(user_id)} requests")
print("Total in ~1 second: 6 requests (limit was 3!)")
At window boundaries, users can send 2x limit in short time:
Store timestamp of each request. Count requests in last N seconds. Most accurate but memory-intensive.
Sliding window (60 seconds)
Timestamps: [10:00:05, 10:00:15, 10:00:45, 10:00:55, 10:01:02]
^
now
Window: [now - 60s, now] = [10:00:02, 10:01:02]
Count requests in window:
10:00:05 - (in window)
10:00:15 -
10:00:45 -
10:00:55 -
10:01:02 -
Total: 5 requests
import time
import threading
from collections import deque
from typing import Deque
class SlidingWindowLog:
"""Sliding Window Log Rate Limiter
Most accurate but memory-intensive O(N) per user.
Args:
limit: Maximum requests in window
window_size: Window duration in seconds
"""
def __init__(self, limit: int, window_size: int):
self.limit = limit
self.window_size = window_size
self.logs: Dict[str, Deque[float]] = {} # user_id -> timestamps
self.lock = threading.Lock()
def allow_request(self, user_id: str) -> bool:
"""Check if request is allowed
Args:
user_id: User identifier
Returns:
True if allowed, False if rate limited
"""
with self.lock:
now = time.time()
# Get or create log for user
if user_id not in self.logs:
self.logs[user_id] = deque()
log = self.logs[user_id]
# Remove timestamps outside window
cutoff = now - self.window_size
while log and log[0] < cutoff:
log.popleft()
# Check if under limit
if len(log) < self.limit:
log.append(now)
return True
return False
def get_count(self, user_id: str) -> int:
"""Get current request count for user"""
with self.lock:
if user_id not in self.logs:
return 0
now = time.time()
log = self.logs[user_id]
cutoff = now - self.window_size
# Remove old entries
while log and log[0] < cutoff:
log.popleft()
return len(log)
def get_memory_usage(self) -> int:
"""Total timestamps stored"""
return sum(len(log) for log in self.logs.values())
# Usage
limiter = SlidingWindowLog(limit=5, window_size=10) # 5 req per 10 sec
user_id = "user_123"
for i in range(10):
if limiter.allow_request(user_id):
count = limiter.get_count(user_id)
print(f"Request {i+1}: Allowed (count: {count}/{limiter.limit})")
else:
count = limiter.get_count(user_id)
print(f"Request {i+1}: Rate Limited (count: {count}/{limiter.limit})")
time.sleep(1.5)
print(f"\nMemory usage: {limiter.get_memory_usage()} timestamps stored")
# Demonstrate accuracy (no boundary issue)
print("\n--- No Boundary Issue ---")
limiter2 = SlidingWindowLog(limit=3, window_size=5)
# Send requests
for i in range(3):
limiter2.allow_request(user_id)
time.sleep(1)
# Try to send more (should be rejected, window hasn't fully passed)
if limiter2.allow_request(user_id):
print("Request 4: Allowed")
else:
print("Request 4: Rejected (accurate sliding window!)")
Hybrid of fixed window and sliding log. Approximates sliding window using two counters. O(1) memory with good accuracy.
Previous Window Current Window
[10:00-10:59] [10:01-10:01:59]
Count: 80 ^
Now: 10:01:30
Sliding window estimate:
Weight of prev window = (60 - 30) / 60 = 0.5
Estimate = (80 x 0.5) + current_count
= 40 + current_count
Approximates true sliding window with O(1) memory!
import time
import threading
from typing import Dict, Tuple
class SlidingWindowCounter:
"""Sliding Window Counter Rate Limiter
Best balance: O(1) memory, high accuracy, no boundary issues.
This is what most production systems use!
Args:
limit: Maximum requests in window
window_size: Window duration in seconds
"""
def __init__(self, limit: int, window_size: int):
self.limit = limit
self.window_size = window_size
# user_id -> (prev_count, prev_window_start, curr_count, curr_window_start)
self.windows: Dict[str, Tuple[int, int, int, int]] = {}
self.lock = threading.Lock()
def _get_window_start(self, timestamp: float) -> int:
"""Get window start time for given timestamp"""
return int(timestamp // self.window_size) * self.window_size
def allow_request(self, user_id: str) -> bool:
"""Check if request is allowed
Uses weighted count from previous + current window
Args:
user_id: User identifier
Returns:
True if allowed, False if rate limited
"""
with self.lock:
now = time.time()
curr_window = self._get_window_start(now)
# Get or initialize windows
if user_id not in self.windows:
self.windows[user_id] = (0, curr_window - self.window_size, 1, curr_window)
return True
prev_count, prev_window, curr_count, curr_window_stored = self.windows[user_id]
# Check if we moved to a new window
if curr_window != curr_window_stored:
# Shift: current becomes previous, reset current
prev_count = curr_count
prev_window = curr_window_stored
curr_count = 0
# Calculate weighted count for sliding window
time_in_curr_window = now - curr_window
weight_prev = (self.window_size - time_in_curr_window) / self.window_size
estimated_count = (prev_count * weight_prev) + curr_count
if estimated_count < self.limit:
curr_count += 1
self.windows[user_id] = (prev_count, prev_window, curr_count, curr_window)
return True
return False
def get_estimated_count(self, user_id: str) -> float:
"""Get estimated count in sliding window"""
with self.lock:
if user_id not in self.windows:
return 0.0
now = time.time()
curr_window = self._get_window_start(now)
prev_count, _, curr_count, curr_window_stored = self.windows[user_id]
if curr_window != curr_window_stored:
prev_count = curr_count
curr_count = 0
time_in_curr_window = now - curr_window
weight_prev = (self.window_size - time_in_curr_window) / self.window_size
return (prev_count * weight_prev) + curr_count
# Usage
limiter = SlidingWindowCounter(limit=5, window_size=10) # 5 req per 10 sec
user_id = "user_123"
for i in range(10):
if limiter.allow_request(user_id):
estimate = limiter.get_estimated_count(user_id)
print(f"Request {i+1}: Allowed (estimated: {estimate:.2f}/{limiter.limit})")
else:
estimate = limiter.get_estimated_count(user_id)
print(f"Request {i+1}: Rate Limited (estimated: {estimate:.2f}/{limiter.limit})")
time.sleep(1.5)
# Compare with Fixed Window
print("\n--- Comparison: Sliding vs Fixed Window ---")
fixed = FixedWindowCounter(limit=5, window_size=10)
sliding = SlidingWindowCounter(limit=5, window_size=10)
# Send burst at window boundary
time.sleep(8) # Near end of window
for i in range(5):
fixed.allow_request(user_id)
sliding.allow_request(user_id)
time.sleep(3) # Cross boundary
for i in range(5):
fixed_ok = fixed.allow_request(user_id)
sliding_ok = sliding.allow_request(user_id)
print(f"Request {i+1}: Fixed={fixed_ok}, Sliding={sliding_ok}")
print("\nResult: Sliding window prevents boundary burst!")
For distributed systems (multiple servers), use Redis for centralized rate limiting state.
import redis
import time
from typing import Optional
class RedisTokenBucket:
"""Distributed Token Bucket using Redis
Atomically implemented using Lua script for correctness.
"""
# Lua script for atomic token bucket operation
LUA_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
-- Get current state
local state = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(state[1]) or capacity
local last_refill = tonumber(state[2]) or now
-- Refill tokens
local elapsed = now - last_refill
local tokens_to_add = elapsed * refill_rate
tokens = math.min(capacity, tokens + tokens_to_add)
-- Check if request allowed
if tokens >= tokens_requested then
tokens = tokens - tokens_requested
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600) -- 1 hour TTL
return 1 -- Allowed
else
return 0 -- Rejected
end
"""
def __init__(self, redis_client: redis.Redis, capacity: int, refill_rate: float):
self.redis = redis_client
self.capacity = capacity
self.refill_rate = refill_rate
self.script = self.redis.register_script(self.LUA_SCRIPT)
def allow_request(self, user_id: str, tokens: int = 1) -> bool:
"""Check if request allowed (distributed across servers)
Args:
user_id: User identifier
tokens: Tokens to consume
Returns:
True if allowed, False if rate limited
"""
key = f"rate_limit:token_bucket:{user_id}"
now = time.time()
result = self.script(
keys=[key],
args=[self.capacity, self.refill_rate, tokens, now]
)
return bool(result)
# Redis Fixed Window Counter (simpler)
class RedisFixedWindow:
"""Simple Redis-based fixed window counter"""
def __init__(self, redis_client: redis.Redis, limit: int, window_size: int):
self.redis = redis_client
self.limit = limit
self.window_size = window_size
def allow_request(self, user_id: str) -> bool:
"""Check if request allowed
Uses Redis INCR + EXPIRE for atomic counter
"""
now = int(time.time())
window = (now // self.window_size) * self.window_size
key = f"rate_limit:fixed_window:{user_id}:{window}"
# Atomic increment
count = self.redis.incr(key)
if count == 1:
# First request in this window - set expiry
self.redis.expire(key, self.window_size * 2)
return count <= self.limit
# Redis Sliding Window Counter (production-grade)
class RedisSlidingWindow:
"""Production-grade sliding window using Redis"""
LUA_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local curr_window = math.floor(now / window) * window
local prev_window = curr_window - window
local prev_key = key .. ':' .. prev_window
local curr_key = key .. ':' .. curr_window
local prev_count = tonumber(redis.call('GET', prev_key) or 0)
local curr_count = tonumber(redis.call('GET', curr_key) or 0)
-- Calculate weight
local time_in_curr = now - curr_window
local weight = (window - time_in_curr) / window
local estimated = (prev_count * weight) + curr_count
if estimated < limit then
redis.call('INCR', curr_key)
redis.call('EXPIRE', curr_key, window * 2)
return 1
else
return 0
end
"""
def __init__(self, redis_client: redis.Redis, limit: int, window_size: int):
self.redis = redis_client
self.limit = limit
self.window_size = window_size
self.script = self.redis.register_script(self.LUA_SCRIPT)
def allow_request(self, user_id: str) -> bool:
key = f"rate_limit:sliding:{user_id}"
now = time.time()
result = self.script(
keys=[key],
args=[self.limit, self.window_size, now]
)
return bool(result)
# Usage with Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Token Bucket
token_limiter = RedisTokenBucket(r, capacity=100, refill_rate=10.0)
print(token_limiter.allow_request("user_123"))
# Fixed Window
fixed_limiter = RedisFixedWindow(r, limit=100, window_size=60)
print(fixed_limiter.allow_request("user_456"))
# Sliding Window (RECOMMENDED)
sliding_limiter = RedisSlidingWindow(r, limit=100, window_size=60)
print(sliding_limiter.allow_request("user_789"))
from functools import wraps
from flask import Flask, request, jsonify
import redis
app = Flask(__name__)
r = redis.Redis(decode_responses=True)
def rate_limit(limit: int, window: int):
"""Decorator for rate limiting Flask routes
Args:
limit: Max requests per window
window: Window size in seconds
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get user ID (from header, JWT, IP, etc.)
user_id = request.headers.get('X-User-ID', request.remote_addr)
# Use Redis sliding window
limiter = RedisSlidingWindow(r, limit, window)
if not limiter.allow_request(user_id):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window
}), 429 # Too Many Requests
return func(*args, **kwargs)
return wrapper
return decorator
# Usage in API
@app.route('/api/search')
@rate_limit(limit=10, window=60) # 10 requests per minute
def search():
query = request.args.get('q')
# ... expensive search operation
return jsonify({'results': []})
@app.route('/api/upload')
@rate_limit(limit=5, window=3600) # 5 uploads per hour
def upload():
# ... handle file upload
return jsonify({'status': 'uploaded'})
| Scenario | Recommended Algorithm | Why |
|---|---|---|
| Production API (distributed) | Redis Sliding Window Counter | O(1) memory, accurate, no boundary issues, distributed |
| Single server, high performance | Token Bucket (in-memory) | Fast, allows bursts, simple |
| Traffic shaping, constant rate | Leaky Bucket | Smooths bursts, constant output |
| Strict enforcement needed | Sliding Window Log | Most accurate (if memory isn't issue) |
| Simple quotas, analytics | Fixed Window Counter | Simplest implementation |
| Weighted requests (different costs) | Token Bucket | Naturally supports different token costs |