🎲 1. Probabilistic Data Structures
Space-efficient structures that trade perfect accuracy for massive memory savings. Essential for large-scale systems.
Bloom Filter - Membership Testing
Use Case: Check if username exists (avoid DB query), spam detection, cache filtering, distributed systems deduplication
import hashlib
from typing import List
class BloomFilter:
"""
Bloom Filter: Probabilistic set membership testing.
Properties:
- False positives possible (says "yes" when should be "no")
- False negatives NEVER (if says "no", definitely not in set)
- Space-efficient: ~10 bits per element for 1% false positive rate
Time Complexity: O(k) where k = number of hash functions
Space Complexity: O(m) where m = bit array size
Real-world: Bitcoin uses Bloom filters in SPV wallets, Medium uses
for read history, Chrome uses for malicious URL detection
"""
def __init__(self, size: int, num_hash_functions: int):
"""
size: Size of bit array (larger = fewer false positives)
num_hash_functions: Number of hash functions (optimal k = m/n * ln(2))
Rule of thumb:
- For 1% FP rate: size = 10 * expected_items
- For 0.1% FP rate: size = 15 * expected_items
"""
self.size = size
self.num_hash_functions = num_hash_functions
self.bit_array = [0] * size
def _hash(self, item: str, seed: int) -> int:
"""Generate hash with seed for multiple hash functions"""
h = hashlib.md5(f"{item}{seed}".encode())
return int(h.hexdigest(), 16) % self.size
def add(self, item: str) -> None:
"""Add item to the filter"""
for i in range(self.num_hash_functions):
index = self._hash(item, i)
self.bit_array[index] = 1
def contains(self, item: str) -> bool:
"""
Check if item might be in the set.
Returns:
- True: Maybe in set (could be false positive)
- False: Definitely NOT in set (100% accurate)
"""
for i in range(self.num_hash_functions):
index = self._hash(item, i)
if self.bit_array[index] == 0:
return False # Definitely not in set
return True # Probably in set
def false_positive_rate(self, n: int) -> float:
"""
Calculate theoretical false positive rate.
Formula: (1 - e^(-kn/m))^k
where k = hash functions, n = items, m = size
"""
import math
k = self.num_hash_functions
m = self.size
return (1 - math.exp(-k * n / m)) ** k
# Example: Check if username is taken
print("=== Bloom Filter Example ===")
bf = BloomFilter(size=1000, num_hash_functions=3)
# Add existing usernames
existing_users = ["alice", "bob", "charlie", "david", "eve"]
for user in existing_users:
bf.add(user)
# Check membership
print(f"'alice' exists: {bf.contains('alice')}") # True (correct)
print(f"'frank' exists: {bf.contains('frank')}") # False (correct)
print(f"'xyz' exists: {bf.contains('xyz')}") # Might be True (false positive)
# Calculate FP rate with 5 items
fp_rate = bf.false_positive_rate(5)
print(f"\nFalse positive rate with 5 items: {fp_rate*100:.2f}%")
# Practical usage
def check_username_available(username: str, bloom: BloomFilter) -> bool:
"""
Two-stage check:
1. Bloom filter (fast, in-memory)
2. Database (slow, if Bloom says "maybe exists")
"""
if not bloom.contains(username):
# Definitely available! No DB query needed
return True
else:
# Maybe taken - need to check DB
# (This is where false positives cost a DB query)
return check_database(username) # Actual DB query
def check_database(username: str) -> bool:
# Simulated DB query
return username not in existing_users
# Test the two-stage check
print("\nTwo-stage username check:")
print(f"'newuser' available: {check_username_available('newuser', bf)}") # No DB hit!
print(f"'alice' available: {check_username_available('alice', bf)}") # DB hit
Memory Savings Example:
1 million usernames in a hash set: ~64MB (64 bits per pointer + string)
1 million usernames in Bloom filter (1% FP): ~1.2MB (10 bits per element)
Result: 98% memory reduction! Cost: 1% of lookups hit DB unnecessarily
1 million usernames in a hash set: ~64MB (64 bits per pointer + string)
1 million usernames in Bloom filter (1% FP): ~1.2MB (10 bits per element)
Result: 98% memory reduction! Cost: 1% of lookups hit DB unnecessarily
Count-Min Sketch - Frequency Estimation
Use Case: Twitter trending topics, top-K heavy hitters, network traffic analysis, DDoS detection
import hashlib
from typing import List
class CountMinSketch:
"""
Count-Min Sketch: Estimate frequency of items in a stream.
Properties:
- Overestimates frequency (never underestimates)
- Error bound: ε (epsilon) with probability δ (delta)
- Space-efficient: much smaller than exact counting
Time Complexity: O(k) for update and query
Space Complexity: O(w × d) where w = width, d = depth
Real-world: Twitter uses for trending topics, network routers for
traffic analysis, Redis uses for approximate counting
"""
def __init__(self, width: int, depth: int):
"""
width: Number of counters per hash function (w)
depth: Number of hash functions (d)
Rule of thumb:
- width = ceil(e / epsilon) where e = 2.718
- depth = ceil(ln(1/delta))
Example: For ε=0.01 (1% error), δ=0.01 (99% confidence):
- width = 272, depth = 5
"""
self.width = width
self.depth = depth
self.table = [[0] * width for _ in range(depth)]
def _hash(self, item: str, seed: int) -> int:
"""Generate hash for item with seed"""
h = hashlib.md5(f"{item}{seed}".encode())
return int(h.hexdigest(), 16) % self.width
def update(self, item: str, count: int = 1) -> None:
"""Increment count for item"""
for i in range(self.depth):
index = self._hash(item, i)
self.table[i][index] += count
def query(self, item: str) -> int:
"""
Estimate frequency of item.
Returns minimum across all hash functions (reduces overcount error).
"""
min_count = float('inf')
for i in range(self.depth):
index = self._hash(item, i)
min_count = min(min_count, self.table[i][index])
return min_count
def top_k(self, items: List[str], k: int) -> List[tuple]:
"""
Find top-K most frequent items.
Note: Need to track candidates separately, then query their counts.
"""
counts = [(item, self.query(item)) for item in set(items)]
counts.sort(key=lambda x: x[1], reverse=True)
return counts[:k]
# Example: Twitter trending hashtags
print("\n=== Count-Min Sketch Example ===")
cms = CountMinSketch(width=100, depth=5)
# Simulate tweet stream with hashtags
tweet_stream = [
"#python", "#python", "#javascript", "#python",
"#java", "#python", "#go", "#javascript",
"#rust", "#python", "#javascript", "#python",
"#typescript", "#python", "#javascript",
]
# Update counts
for hashtag in tweet_stream:
cms.update(hashtag)
# Query frequencies
print("Hashtag frequencies:")
unique_tags = set(tweet_stream)
for tag in sorted(unique_tags):
estimated = cms.query(tag)
actual = tweet_stream.count(tag)
print(f" {tag}: estimated={estimated}, actual={actual}")
# Find top-3 trending
top_3 = cms.top_k(list(unique_tags), 3)
print(f"\nTop 3 trending:")
for tag, count in top_3:
print(f" {tag}: ~{count} tweets")
Count-Min Sketch vs Exact Counter:
Exact: HashMap storing every item → O(n) space for n unique items
Count-Min: Fixed size regardless of unique items → O(w×d) space
Trade-off: Slight overestimation for massive space savings
Exact: HashMap storing every item → O(n) space for n unique items
Count-Min: Fixed size regardless of unique items → O(w×d) space
Trade-off: Slight overestimation for massive space savings
🔗 2. URL Shortener (bit.ly)
Use Case: bit.ly, TinyURL, goo.gl - convert long URLs to short codes
import string
import hashlib
from typing import Optional, Dict
class URLShortener:
"""
URL Shortener like bit.ly.
Requirements:
- Generate unique short codes (e.g., bit.ly/a3x9F)
- Redirect short -> long URL
- Handle billions of URLs
- Short codes should be 6-8 characters
Approaches:
1. Random generation + collision check
2. Hash-based (MD5) + truncate
3. Counter-based (base62 encoding) ← Most common
"""
# Base62: [0-9a-zA-Z] = 62 characters
BASE62 = string.digits + string.ascii_lowercase + string.ascii_uppercase
def __init__(self):
self.url_to_code: Dict[str, str] = {}
self.code_to_url: Dict[str, str] = {}
self.counter = 100000 # Start at 100000 for 6-char codes
def shorten_counter_based(self, long_url: str) -> str:
"""
Counter-based approach (most scalable).
Algorithm:
1. Increment global counter
2. Encode counter in base62
3. Store mapping
Pros: Guaranteed unique, predictable length
Cons: Sequential (can predict next URL)
Space: 62^6 = 56 billion unique URLs with 6 characters
"""
# Check if already shortened
if long_url in self.url_to_code:
return self.url_to_code[long_url]
# Generate short code
short_code = self._encode_base62(self.counter)
self.counter += 1
# Store bidirectional mapping
self.url_to_code[long_url] = short_code
self.code_to_url[short_code] = long_url
return short_code
def shorten_hash_based(self, long_url: str) -> str:
"""
Hash-based approach.
Algorithm:
1. Hash URL (MD5)
2. Take first 6 characters of base62-encoded hash
3. Handle collisions with counter suffix
Pros: Non-sequential, deterministic for same URL
Cons: Collision handling needed
"""
# MD5 hash
hash_digest = hashlib.md5(long_url.encode()).hexdigest()
# Take first 6 hex chars and convert to base62
hash_int = int(hash_digest[:8], 16)
short_code = self._encode_base62(hash_int)[:6]
# Handle collision
original_code = short_code
suffix = 0
while short_code in self.code_to_url:
suffix += 1
short_code = original_code + str(suffix)
# Store mapping
self.url_to_code[long_url] = short_code
self.code_to_url[short_code] = long_url
return short_code
def expand(self, short_code: str) -> Optional[str]:
"""Expand short code to original URL"""
return self.code_to_url.get(short_code)
def _encode_base62(self, num: int) -> str:
"""
Encode integer to base62 string.
Example: 12345 -> "3D7" in base62
"""
if num == 0:
return self.BASE62[0]
result = []
while num > 0:
result.append(self.BASE62[num % 62])
num //= 62
return ''.join(reversed(result))
def _decode_base62(self, code: str) -> int:
"""Decode base62 string to integer"""
num = 0
for char in code:
num = num * 62 + self.BASE62.index(char)
return num
# Example usage
print("\n=== URL Shortener Example ===")
shortener = URLShortener()
# Shorten URLs
long_url1 = "https://www.example.com/very/long/path/to/resource?param=value"
long_url2 = "https://github.com/user/repo/pull/12345"
short1 = shortener.shorten_counter_based(long_url1)
short2 = shortener.shorten_counter_based(long_url2)
print(f"Original: {long_url1}")
print(f"Shortened: bit.ly/{short1}")
print()
print(f"Original: {long_url2}")
print(f"Shortened: bit.ly/{short2}")
# Expand
print(f"\nExpanding bit.ly/{short1}:")
print(f"-> {shortener.expand(short1)}")
# Calculate capacity
print("\n=== Capacity Analysis ===")
chars = len(URLShortener.BASE62)
for length in range(4, 9):
capacity = chars ** length
print(f"{length} characters: {capacity:,} unique URLs")
Time Complexity: O(1) for shorten and expand
Space Complexity: O(n) where n = number of URLs
Production Considerations:
- Use distributed counter (Redis INCR, DB sequence, Snowflake ID)
- Add expiration (delete old URLs)
- Rate limiting (prevent abuse)
- Analytics (track clicks)
- Custom aliases (vanity URLs)
Space Complexity: O(n) where n = number of URLs
Production Considerations:
- Use distributed counter (Redis INCR, DB sequence, Snowflake ID)
- Add expiration (delete old URLs)
- Rate limiting (prevent abuse)
- Analytics (track clicks)
- Custom aliases (vanity URLs)
Distributed URL Shortener
import time
class SnowflakeID:
"""
Twitter Snowflake ID generator for distributed systems.
64-bit ID structure:
- 1 bit: unused (always 0)
- 41 bits: timestamp (milliseconds since epoch) → 69 years
- 10 bits: machine/datacenter ID → 1024 machines
- 12 bits: sequence number → 4096 IDs per ms per machine
Guarantees:
- Globally unique across all machines
- Roughly time-ordered (sortable)
- No coordination needed between machines
"""
def __init__(self, machine_id: int, epoch: int = 1609459200000):
"""
machine_id: Unique ID for this machine (0-1023)
epoch: Custom epoch (default: Jan 1, 2021)
"""
self.machine_id = machine_id & 0x3FF # 10 bits
self.epoch = epoch
self.sequence = 0
self.last_timestamp = -1
def generate(self) -> int:
"""Generate unique 64-bit ID"""
timestamp = int(time.time() * 1000) # milliseconds
if timestamp < self.last_timestamp:
raise Exception("Clock moved backwards!")
if timestamp == self.last_timestamp:
# Same millisecond - increment sequence
self.sequence = (self.sequence + 1) & 0xFFF # 12 bits
if self.sequence == 0:
# Sequence exhausted - wait for next ms
while timestamp <= self.last_timestamp:
timestamp = int(time.time() * 1000)
else:
# New millisecond - reset sequence
self.sequence = 0
self.last_timestamp = timestamp
# Construct ID: timestamp | machine_id | sequence
return ((timestamp - self.epoch) << 22) | (self.machine_id << 12) | self.sequence
# Example: Distributed URL shortener
class DistributedURLShortener:
"""URL Shortener for distributed system with multiple servers"""
BASE62 = string.digits + string.ascii_lowercase + string.ascii_uppercase
def __init__(self, machine_id: int):
self.id_generator = SnowflakeID(machine_id)
self.code_to_url: Dict[str, str] = {}
def shorten(self, long_url: str) -> str:
"""Generate short code using Snowflake ID"""
unique_id = self.id_generator.generate()
short_code = self._encode_base62(unique_id)[:7] # 7 chars
self.code_to_url[short_code] = long_url
return short_code
def _encode_base62(self, num: int) -> str:
if num == 0:
return self.BASE62[0]
result = []
while num > 0:
result.append(self.BASE62[num % 62])
num //= 62
return ''.join(reversed(result))
# Simulate multiple servers
print("\n=== Distributed URL Shortener ===")
server1 = DistributedURLShortener(machine_id=1)
server2 = DistributedURLShortener(machine_id=2)
url = "https://example.com/page"
code1 = server1.shorten(url)
code2 = server2.shorten(url)
print(f"Server 1 generated: {code1}")
print(f"Server 2 generated: {code2}")
print(f"Unique: {code1 != code2}") # True - different servers generate different IDs
🚦 3. Rate Limiting Strategies
Protect APIs from abuse and ensure fair resource allocation.
Token Bucket Algorithm
Use Case: AWS API Gateway, Stripe API, most RESTful APIs - allows bursts
import time
from typing import Dict
class TokenBucket:
"""
Token Bucket rate limiter.
Concept: Bucket holds tokens, refilled at constant rate.
- Each request consumes 1 token
- If no tokens available, request denied
- Allows bursts up to bucket capacity
Example: 100 requests/minute with burst of 20
- Capacity: 20 tokens
- Refill rate: 100/60 = 1.67 tokens/second
Pros: Simple, allows bursts, smooth rate limiting
Cons: Requires state per user
"""
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: Maximum tokens in bucket (burst size)
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
def allow_request(self) -> bool:
"""
Try to consume a token for this request.
Returns: True if allowed, False if rate limited
"""
# Refill tokens based on time elapsed
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
# Try to consume a token
if self.tokens >= 1:
self.tokens -= 1
return True
else:
return False
class DistributedTokenBucket:
"""
Token Bucket with Redis for distributed systems.
Uses Redis to store:
- tokens: Current token count
- last_refill: Last refill timestamp
All servers share same bucket state.
"""
def __init__(self, user_id: str, capacity: int, refill_rate: float):
self.user_id = user_id
self.capacity = capacity
self.refill_rate = refill_rate
# In production: self.redis = redis.Redis()
def allow_request(self) -> bool:
"""Check rate limit using Redis"""
# Lua script for atomic refill + consume (prevents race conditions)
lua_script = """
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens = tonumber(redis.call('GET', KEYS[1]) or capacity)
local last_refill = tonumber(redis.call('GET', KEYS[2]) or now)
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('SET', KEYS[1], tokens)
redis.call('SET', KEYS[2], now)
return 1
else
return 0
end
"""
# In production: result = redis.eval(lua_script, ...)
# Simulated for example
return True
# Example: API rate limiting
print("\n=== Token Bucket Example ===")
bucket = TokenBucket(capacity=5, refill_rate=1.0) # 5 burst, 1 token/sec
print("Making 7 rapid requests:")
for i in range(7):
allowed = bucket.allow_request()
status = "✓ Allowed" if allowed else "✗ Rate Limited"
print(f" Request {i+1}: {status} ({bucket.tokens:.2f} tokens remaining)")
print("\nWaiting 3 seconds for refill...")
time.sleep(3)
print("\nAfter refill:")
allowed = bucket.allow_request()
print(f" Request: {'✓ Allowed' if allowed else '✗ Rate Limited'} ({bucket.tokens:.2f} tokens)")
Sliding Window Log
Use Case: More accurate than fixed window, used when burst control is critical
import time
from collections import deque
from typing import Deque
class SlidingWindowLog:
"""
Sliding Window Log rate limiter.
Concept: Keep log of all request timestamps, count requests in window.
- Window slides with time (not fixed intervals)
- More accurate than fixed window
- No boundary issues
Example: 100 requests per minute
- At any point in time, count requests in last 60 seconds
- If < 100, allow; else deny
Pros: Very accurate, no boundary effects
Cons: High memory (stores all timestamps)
"""
def __init__(self, max_requests: int, window_seconds: int):
"""
max_requests: Maximum requests allowed in window
window_seconds: Size of sliding window
"""
self.max_requests = max_requests
self.window_seconds = window_seconds
self.request_log: Deque[float] = deque()
def allow_request(self) -> bool:
"""Check if request is allowed"""
now = time.time()
window_start = now - self.window_seconds
# Remove old requests outside window
while self.request_log and self.request_log[0] < window_start:
self.request_log.popleft()
# Check if we're under limit
if len(self.request_log) < self.max_requests:
self.request_log.append(now)
return True
else:
return False
class SlidingWindowCounter:
"""
Sliding Window Counter (hybrid approach).
Concept: Combine current + previous window with weighted average.
- Less memory than log (only 2 counters)
- More accurate than fixed window
Formula:
count = prev_window_count * overlap_percent + current_window_count
Example: Limit 100 req/min, window size 1 min
- At 12:00:30 (30 sec into current minute):
- Previous minute (11:59-12:00): had 80 requests
- Current minute (12:00-12:01): has 40 requests
- Overlap: 50% of previous window
- Estimated count: 80 * 0.5 + 40 = 80 requests
"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.current_window_count = 0
self.current_window_start = time.time()
self.prev_window_count = 0
def allow_request(self) -> bool:
now = time.time()
elapsed = now - self.current_window_start
if elapsed >= self.window_seconds:
# Move to next window
self.prev_window_count = self.current_window_count
self.current_window_count = 0
self.current_window_start = now
elapsed = 0
# Calculate weighted count
overlap_percent = (self.window_seconds - elapsed) / self.window_seconds
estimated_count = (self.prev_window_count * overlap_percent +
self.current_window_count)
if estimated_count < self.max_requests:
self.current_window_count += 1
return True
else:
return False
# Example comparison
print("\n=== Sliding Window Comparison ===")
# Sliding Window Log (accurate but memory-intensive)
swl = SlidingWindowLog(max_requests=5, window_seconds=10)
print("Sliding Window Log (5 requests per 10 seconds):")
for i in range(8):
allowed = swl.allow_request()
print(f" Request {i+1}: {'✓' if allowed else '✗'} ({len(swl.request_log)} in window)")
if i == 4:
print(" Waiting 6 seconds...")
time.sleep(6)
Rate Limiter Comparison:
| Algorithm | Memory | Accuracy | Bursts | Use Case |
|---|---|---|---|---|
| Token Bucket | O(1) | Good | Allows | APIs, AWS Gateway |
| Leaky Bucket | O(1) | Good | Smooths | Network traffic shaping |
| Fixed Window | O(1) | Poor | Boundary issues | Simple rate limiting |
| Sliding Window Log | O(n) | Excellent | Controlled | Accurate limiting needed |
| Sliding Window Counter | O(1) | Very Good | Controlled | Best balance (Redis) |
📰 4. News Feed / Fan-out Service
Use Case: Twitter timeline, Facebook news feed, Instagram feed
from typing import List, Dict, Set
from dataclasses import dataclass
from datetime import datetime
import heapq
@dataclass
class Post:
post_id: int
author_id: int
content: str
timestamp: datetime
def __lt__(self, other):
return self.timestamp > other.timestamp # Reverse for max heap
class FanoutService:
"""
News Feed implementation with two strategies:
1. Fan-out on Write (Push model):
- When user posts, immediately push to all followers' feeds
- Pros: Fast reads (feed pre-generated)
- Cons: Slow writes (celebrity with 100M followers = 100M writes)
2. Fan-out on Read (Pull model):
- When user requests feed, fetch from followed users
- Pros: Fast writes, no wasted work
- Cons: Slow reads (need to query many users)
Hybrid: Use push for normal users, pull for celebrities
"""
def __init__(self):
# User -> Set of followers
self.followers: Dict[int, Set[int]] = {}
# User -> Set of users they follow
self.following: Dict[int, Set[int]] = {}
# User -> Posts they created
self.user_posts: Dict[int, List[Post]] = {}
# User -> Pre-generated feed (fan-out on write)
self.user_feeds: Dict[int, List[Post]] = {}
def follow(self, follower_id: int, followee_id: int):
"""User follows another user"""
if followee_id not in self.followers:
self.followers[followee_id] = set()
self.followers[followee_id].add(follower_id)
if follower_id not in self.following:
self.following[follower_id] = set()
self.following[follower_id].add(followee_id)
def post_fan_out_on_write(self, user_id: int, content: str, post_id: int):
"""
Fan-out on Write: Push post to all followers' feeds.
Process:
1. Create post
2. Get all followers
3. Add post to each follower's feed
Time Complexity: O(F) where F = number of followers
"""
post = Post(post_id, user_id, content, datetime.now())
# Store in user's posts
if user_id not in self.user_posts:
self.user_posts[user_id] = []
self.user_posts[user_id].append(post)
# Fan-out: Push to all followers
followers = self.followers.get(user_id, set())
for follower_id in followers:
if follower_id not in self.user_feeds:
self.user_feeds[follower_id] = []
self.user_feeds[follower_id].append(post)
# Also add to own feed
if user_id not in self.user_feeds:
self.user_feeds[user_id] = []
self.user_feeds[user_id].append(post)
def get_feed_push_model(self, user_id: int, limit: int = 10) -> List[Post]:
"""
Get feed using push model (fan-out on write).
Time Complexity: O(F log F) where F = feed size (for sorting)
Very fast! Feed already generated.
"""
feed = self.user_feeds.get(user_id, [])
feed.sort(key=lambda p: p.timestamp, reverse=True)
return feed[:limit]
def get_feed_pull_model(self, user_id: int, limit: int = 10) -> List[Post]:
"""
Get feed using pull model (fan-out on read).
Process:
1. Get all users this user follows
2. Fetch recent posts from each
3. Merge and sort (k-way merge)
Time Complexity: O(N × log(limit)) where N = users followed
Slower, but no write amplification
"""
following = self.following.get(user_id, set())
# Collect recent posts from all followed users (including self)
all_posts = []
for followed_id in following | {user_id}:
posts = self.user_posts.get(followed_id, [])
all_posts.extend(posts)
# Sort by timestamp and return top N
all_posts.sort(key=lambda p: p.timestamp, reverse=True)
return all_posts[:limit]
def get_feed_hybrid(self, user_id: int, limit: int = 10) -> List[Post]:
"""
Hybrid approach (Twitter/Facebook style).
Strategy:
- Normal users: Fan-out on write (push model)
- Celebrities (>1M followers): Pull model
- Mix results using merge
This balances:
- Write cost: Don't fan-out to millions
- Read performance: Pre-generate for most users
"""
CELEBRITY_THRESHOLD = 1000000
feed_posts = []
celebrity_posts = []
following = self.following.get(user_id, set())
for followed_id in following:
follower_count = len(self.followers.get(followed_id, set()))
if follower_count > CELEBRITY_THRESHOLD:
# Celebrity: Pull their posts
posts = self.user_posts.get(followed_id, [])
celebrity_posts.extend(posts)
# else: Their posts already in our feed (pushed)
# Merge pre-generated feed + celebrity posts
feed_posts = self.user_feeds.get(user_id, [])
all_posts = feed_posts + celebrity_posts
all_posts.sort(key=lambda p: p.timestamp, reverse=True)
return all_posts[:limit]
# Example: Twitter-like feed
print("\n=== News Feed Example ===")
feed_service = FanoutService()
# Users
alice_id = 1
bob_id = 2
charlie_id = 3
# Follow relationships
feed_service.follow(alice_id, bob_id) # Alice follows Bob
feed_service.follow(alice_id, charlie_id) # Alice follows Charlie
feed_service.follow(charlie_id, bob_id) # Charlie follows Bob
# Posts (fan-out on write)
feed_service.post_fan_out_on_write(bob_id, "Hello from Bob!", post_id=101)
feed_service.post_fan_out_on_write(charlie_id, "Charlie's update", post_id=102)
feed_service.post_fan_out_on_write(bob_id, "Bob again!", post_id=103)
# Alice's feed
print("Alice's feed (push model):")
alice_feed = feed_service.get_feed_push_model(alice_id, limit=5)
for post in alice_feed:
print(f" @user{post.author_id}: {post.content}")
print("\nAlice's feed (pull model):")
alice_feed_pull = feed_service.get_feed_pull_model(alice_id, limit=5)
for post in alice_feed_pull:
print(f" @user{post.author_id}: {post.content}")
Celebrity Problem (Fan-out on Write):
Elon Musk posts tweet → 100M followers
= 100M feed writes = database overload!
Solution: Don't fan-out celebrities. When users request feed, merge:
1. Pre-generated feed (from normal follows)
2. Real-time pull from celebrities
Elon Musk posts tweet → 100M followers
= 100M feed writes = database overload!
Solution: Don't fan-out celebrities. When users request feed, merge:
1. Pre-generated feed (from normal follows)
2. Real-time pull from celebrities
🔤 5. Autocomplete / Trie
Use Case: Google search autocomplete, IDE autocomplete, spell checker, IP routing
from typing import List, Optional, Dict
class TrieNode:
def __init__(self):
self.children: Dict[str, TrieNode] = {}
self.is_end_of_word = False
self.frequency = 0 # How often this word is searched
class Trie:
"""
Trie (Prefix Tree) for efficient autocomplete.
Properties:
- Each node represents a character
- Path from root to node = prefix
- Words sharing prefix share path
Time Complexity:
- Insert: O(L) where L = word length
- Search: O(L)
- Autocomplete: O(P + N) where P = prefix length, N = results
Space Complexity: O(ALPHABET_SIZE × N × L) where N = words
"""
def __init__(self):
self.root = TrieNode()
def insert(self, word: str, frequency: int = 1):
"""Insert word into trie with frequency (for ranking)"""
node = self.root
for char in word:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.is_end_of_word = True
node.frequency += frequency
def search(self, word: str) -> bool:
"""Check if word exists"""
node = self._find_node(word)
return node is not None and node.is_end_of_word
def starts_with(self, prefix: str) -> bool:
"""Check if any word starts with prefix"""
return self._find_node(prefix) is not None
def autocomplete(self, prefix: str, limit: int = 10) -> List[tuple]:
"""
Get top autocomplete suggestions for prefix.
Returns: List of (word, frequency) tuples, sorted by frequency
"""
# Find node for prefix
node = self._find_node(prefix)
if node is None:
return []
# DFS to find all words with this prefix
results = []
self._dfs_words(node, prefix, results)
# Sort by frequency (descending) and return top N
results.sort(key=lambda x: x[1], reverse=True)
return results[:limit]
def _find_node(self, prefix: str) -> Optional[TrieNode]:
"""Navigate to node representing prefix"""
node = self.root
for char in prefix:
if char not in node.children:
return None
node = node.children[char]
return node
def _dfs_words(self, node: TrieNode, current_word: str, results: List[tuple]):
"""DFS to collect all words from this node"""
if node.is_end_of_word:
results.append((current_word, node.frequency))
for char, child_node in node.children.items():
self._dfs_words(child_node, current_word + char, results)
class AutocompleteSystem:
"""
Production autocomplete system with caching.
Optimizations:
1. Cache top results for popular prefixes
2. Limit search depth
3. Pre-compute top suggestions
"""
def __init__(self):
self.trie = Trie()
self.cache: Dict[str, List[tuple]] = {}
def add_search(self, query: str):
"""Add search query (increments frequency)"""
self.trie.insert(query.lower(), frequency=1)
# Invalidate cache for this prefix
self._invalidate_cache(query.lower())
def suggest(self, prefix: str, limit: int = 10) -> List[str]:
"""Get autocomplete suggestions"""
prefix = prefix.lower()
# Check cache
if prefix in self.cache:
return [word for word, _ in self.cache[prefix][:limit]]
# Query trie
results = self.trie.autocomplete(prefix, limit)
# Cache results
self.cache[prefix] = results
return [word for word, _ in results]
def _invalidate_cache(self, query: str):
"""Invalidate cache for all prefixes of this query"""
for i in range(1, len(query) + 1):
prefix = query[:i]
if prefix in self.cache:
del self.cache[prefix]
# Example: Google-style autocomplete
print("\n=== Autocomplete System Example ===")
autocomplete = AutocompleteSystem()
# Simulate search history
searches = [
("python tutorial", 1000),
("python interview questions", 800),
("python list comprehension", 500),
("java tutorial", 700),
("javascript promises", 600),
("python decorators", 400),
]
for query, freq in searches:
for _ in range(freq):
autocomplete.add_search(query)
# User types "pyth"
print("User types 'pyth':")
suggestions = autocomplete.suggest("pyth", limit=5)
for i, suggestion in enumerate(suggestions, 1):
print(f" {i}. {suggestion}")
print("\nUser types 'python ':")
suggestions = autocomplete.suggest("python ", limit=5)
for i, suggestion in enumerate(suggestions, 1):
print(f" {i}. {suggestion}")
Real-World Optimizations:
1. Google: Pre-computes top 10 suggestions for every prefix, stores in cache
2. Personalization: Boost suggestions based on user history
3. Geographic: Different suggestions by location
4. Trending: Real-time injection of trending topics
5. Distributed: Sharded trie across multiple servers by prefix hash
1. Google: Pre-computes top 10 suggestions for every prefix, stores in cache
2. Personalization: Boost suggestions based on user history
3. Geographic: Different suggestions by location
4. Trending: Real-time injection of trending topics
5. Distributed: Sharded trie across multiple servers by prefix hash
🌐 6. Distributed Systems Patterns
Sharding / Partitioning
Use Case: Distribute data across multiple databases for horizontal scaling
import hashlib
from typing import Any, List, Dict
class ShardingStrategy:
"""Base class for sharding strategies"""
def get_shard(self, key: str) -> int:
raise NotImplementedError
class HashBasedSharding(ShardingStrategy):
"""
Hash-based sharding: hash(key) % num_shards
Pros: Even distribution
Cons: Adding/removing shards requires rehashing all data
"""
def __init__(self, num_shards: int):
self.num_shards = num_shards
def get_shard(self, key: str) -> int:
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % self.num_shards
class RangeBasedSharding(ShardingStrategy):
"""
Range-based sharding: key ranges mapped to shards
Example: user_id 0-1M → shard 0, 1M-2M → shard 1
Pros: Easy to add shards, range queries efficient
Cons: Can lead to hotspots if data not evenly distributed
"""
def __init__(self, ranges: List[tuple]):
"""ranges: [(min, max, shard_id), ...]"""
self.ranges = sorted(ranges, key=lambda x: x[0])
def get_shard(self, key: str) -> int:
# Assume key is numeric for range-based
key_int = int(key)
for min_val, max_val, shard_id in self.ranges:
if min_val <= key_int < max_val:
return shard_id
return 0 # Default shard
class GeoBasedSharding(ShardingStrategy):
"""
Geography-based sharding: route by location
Example: US users → US datacenter, EU users → EU datacenter
Pros: Low latency (data close to users)
Cons: Uneven distribution, cross-region queries complex
"""
def __init__(self, geo_mapping: Dict[str, int]):
"""geo_mapping: {'US': 0, 'EU': 1, 'ASIA': 2}"""
self.geo_mapping = geo_mapping
def get_shard(self, key: str) -> int:
# Extract region from key (e.g., "user_US_12345")
parts = key.split('_')
region = parts[1] if len(parts) > 1 else 'US'
return self.geo_mapping.get(region, 0)
class ShardedDatabase:
"""Simulated sharded database"""
def __init__(self, num_shards: int, strategy: ShardingStrategy):
self.shards = [{}for _ in range(num_shards)]
self.strategy = strategy
def put(self, key: str, value: Any):
"""Store key-value in appropriate shard"""
shard_id = self.strategy.get_shard(key)
self.shards[shard_id][key] = value
def get(self, key: str) -> Any:
"""Retrieve value from appropriate shard"""
shard_id = self.strategy.get_shard(key)
return self.shards[shard_id].get(key)
def get_shard_stats(self) -> List[int]:
"""Get number of items in each shard"""
return [len(shard) for shard in self.shards]
# Example: Sharding user data
print("\n=== Database Sharding Example ===")
# Hash-based sharding
hash_strategy = HashBasedSharding(num_shards=4)
db = ShardedDatabase(num_shards=4, strategy=hash_strategy)
# Insert users
users = [f"user_{i}" for i in range(100)]
for user in users:
db.put(user, {"name": user, "email": f"{user}@example.com"})
print("Hash-based sharding distribution:")
stats = db.get_shard_stats()
for i, count in enumerate(stats):
print(f" Shard {i}: {count} users ({count/100*100:.1f}%)")
# Range-based sharding
print("\nRange-based sharding:")
ranges = [(0, 25, 0), (25, 50, 1), (50, 75, 2), (75, 100, 3)]
range_strategy = RangeBasedSharding(ranges)
db_range = ShardedDatabase(num_shards=4, strategy=range_strategy)
for i in range(100):
db_range.put(str(i), f"user_{i}")
stats = db_range.get_shard_stats()
for i, count in enumerate(stats):
print(f" Shard {i}: {count} users ({count/100*100:.1f}%)")
Idempotency Key Pattern
Use Case: Stripe payments, ensure exactly-once processing despite retries
import uuid
from typing import Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class IdempotentResult:
"""Cached result of idempotent operation"""
request_id: str
result: Any
timestamp: datetime
class IdempotencyManager:
"""
Idempotency key manager for exactly-once processing.
Pattern:
1. Client generates unique request ID (UUID)
2. Server checks if request ID already processed
3. If yes: return cached result
4. If no: process request, cache result with request ID
Prevents:
- Duplicate charges (payment retries)
- Duplicate messages (network retries)
- Duplicate database writes
TTL: Results cached for 24 hours
"""
def __init__(self, ttl_hours: int = 24):
self.cache: Dict[str, IdempotentResult] = {}
self.ttl = timedelta(hours=ttl_hours)
def execute(self, idempotency_key: str, operation: callable) -> Any:
"""
Execute operation idempotently.
Args:
idempotency_key: Unique key for this request
operation: Function to execute (only if not cached)
Returns: Result (from cache or fresh execution)
"""
# Check cache
if idempotency_key in self.cache:
cached = self.cache[idempotency_key]
# Check TTL
if datetime.now() - cached.timestamp < self.ttl:
print(f" [CACHE HIT] Returning cached result for {idempotency_key}")
return cached.result
else:
# Expired - remove from cache
del self.cache[idempotency_key]
# Execute operation
print(f" [CACHE MISS] Executing operation for {idempotency_key}")
result = operation()
# Cache result
self.cache[idempotency_key] = IdempotentResult(
request_id=idempotency_key,
result=result,
timestamp=datetime.now()
)
return result
def cleanup_expired(self):
"""Remove expired entries"""
now = datetime.now()
expired_keys = [
key for key, cached in self.cache.items()
if now - cached.timestamp >= self.ttl
]
for key in expired_keys:
del self.cache[key]
# Example: Stripe-style payment processing
class PaymentProcessor:
"""Simulated payment processor with idempotency"""
def __init__(self):
self.idempotency = IdempotencyManager()
self.payment_count = 0
def charge(self, idempotency_key: str, amount: float, customer: str) -> Dict:
"""
Process payment idempotently.
If client retries with same key, returns original result.
"""
def process_charge():
self.payment_count += 1
charge_id = f"ch_{uuid.uuid4().hex[:16]}"
print(f" Processing charge: ${amount} for {customer}")
return {
"charge_id": charge_id,
"amount": amount,
"customer": customer,
"status": "succeeded"
}
return self.idempotency.execute(idempotency_key, process_charge)
# Example usage
print("\n=== Idempotency Pattern Example ===")
processor = PaymentProcessor()
# Client generates idempotency key
request_id = str(uuid.uuid4())
# Original request
print("Request 1 (original):")
result1 = processor.charge(request_id, 100.0, "customer_123")
print(f" Charge ID: {result1['charge_id']}")
# Network fails, client retries with SAME idempotency key
print("\nRequest 2 (retry with same key):")
result2 = processor.charge(request_id, 100.0, "customer_123")
print(f" Charge ID: {result2['charge_id']}")
# Verify only ONE charge was processed
print(f"\nTotal charges processed: {processor.payment_count}")
print(f"Same result: {result1['charge_id'] == result2['charge_id']}")
Stripe Implementation Details:
1. Client sends
2. Stripe stores (key → result) in Redis with 24hr TTL
3. Retry with same key returns cached result
4. Different key = new operation
5. Prevents duplicate charges from network retries/timeouts
1. Client sends
Idempotency-Key header with unique UUID2. Stripe stores (key → result) in Redis with 24hr TTL
3. Retry with same key returns cached result
4. Different key = new operation
5. Prevents duplicate charges from network retries/timeouts
🎓 Complete System Design Implementation Coverage
Part 1: Dijkstra, A*, TSP, Geohash, QuadTree, LRU Cache, Consistent Hashing
Part 2: Bloom Filter, Count-Min Sketch, URL Shortener, Rate Limiters, News Feed Fan-out, Autocomplete Trie, Sharding, Idempotency
All implementations include complexity analysis, real-world use cases, and production considerations for Distinguished Engineer interviews.