💻 System Design - Python Implementations Part 2

Probabilistic data structures, rate limiting, URL shortening, and more

🎲 1. Probabilistic Data Structures

Space-efficient structures that trade perfect accuracy for massive memory savings. Essential for large-scale systems.

Bloom Filter - Membership Testing

Use Case: Check if username exists (avoid DB query), spam detection, cache filtering, distributed systems deduplication
import hashlib
from typing import List

class BloomFilter:
    """
    Bloom Filter: Probabilistic set membership testing.

    Properties:
    - False positives possible (says "yes" when should be "no")
    - False negatives NEVER (if says "no", definitely not in set)
    - Space-efficient: ~10 bits per element for 1% false positive rate

    Time Complexity: O(k) where k = number of hash functions
    Space Complexity: O(m) where m = bit array size

    Real-world: Bitcoin uses Bloom filters in SPV wallets, Medium uses
    for read history, Chrome uses for malicious URL detection
    """

    def __init__(self, size: int, num_hash_functions: int):
        """
        size: Size of bit array (larger = fewer false positives)
        num_hash_functions: Number of hash functions (optimal k = m/n * ln(2))

        Rule of thumb:
        - For 1% FP rate: size = 10 * expected_items
        - For 0.1% FP rate: size = 15 * expected_items
        """
        self.size = size
        self.num_hash_functions = num_hash_functions
        self.bit_array = [0] * size

    def _hash(self, item: str, seed: int) -> int:
        """Generate hash with seed for multiple hash functions"""
        h = hashlib.md5(f"{item}{seed}".encode())
        return int(h.hexdigest(), 16) % self.size

    def add(self, item: str) -> None:
        """Add item to the filter"""
        for i in range(self.num_hash_functions):
            index = self._hash(item, i)
            self.bit_array[index] = 1

    def contains(self, item: str) -> bool:
        """
        Check if item might be in the set.

        Returns:
        - True: Maybe in set (could be false positive)
        - False: Definitely NOT in set (100% accurate)
        """
        for i in range(self.num_hash_functions):
            index = self._hash(item, i)
            if self.bit_array[index] == 0:
                return False  # Definitely not in set
        return True  # Probably in set

    def false_positive_rate(self, n: int) -> float:
        """
        Calculate theoretical false positive rate.

        Formula: (1 - e^(-kn/m))^k
        where k = hash functions, n = items, m = size
        """
        import math
        k = self.num_hash_functions
        m = self.size
        return (1 - math.exp(-k * n / m)) ** k

# Example: Check if username is taken
print("=== Bloom Filter Example ===")
bf = BloomFilter(size=1000, num_hash_functions=3)

# Add existing usernames
existing_users = ["alice", "bob", "charlie", "david", "eve"]
for user in existing_users:
    bf.add(user)

# Check membership
print(f"'alice' exists: {bf.contains('alice')}")  # True (correct)
print(f"'frank' exists: {bf.contains('frank')}")  # False (correct)
print(f"'xyz' exists: {bf.contains('xyz')}")      # Might be True (false positive)

# Calculate FP rate with 5 items
fp_rate = bf.false_positive_rate(5)
print(f"\nFalse positive rate with 5 items: {fp_rate*100:.2f}%")

# Practical usage
def check_username_available(username: str, bloom: BloomFilter) -> bool:
    """
    Two-stage check:
    1. Bloom filter (fast, in-memory)
    2. Database (slow, if Bloom says "maybe exists")
    """
    if not bloom.contains(username):
        # Definitely available! No DB query needed
        return True
    else:
        # Maybe taken - need to check DB
        # (This is where false positives cost a DB query)
        return check_database(username)  # Actual DB query

def check_database(username: str) -> bool:
    # Simulated DB query
    return username not in existing_users

# Test the two-stage check
print("\nTwo-stage username check:")
print(f"'newuser' available: {check_username_available('newuser', bf)}")  # No DB hit!
print(f"'alice' available: {check_username_available('alice', bf)}")      # DB hit
Memory Savings Example:
1 million usernames in a hash set: ~64MB (64 bits per pointer + string)
1 million usernames in Bloom filter (1% FP): ~1.2MB (10 bits per element)
Result: 98% memory reduction! Cost: 1% of lookups hit DB unnecessarily

Count-Min Sketch - Frequency Estimation

Use Case: Twitter trending topics, top-K heavy hitters, network traffic analysis, DDoS detection
import hashlib
from typing import List

class CountMinSketch:
    """
    Count-Min Sketch: Estimate frequency of items in a stream.

    Properties:
    - Overestimates frequency (never underestimates)
    - Error bound: ε (epsilon) with probability δ (delta)
    - Space-efficient: much smaller than exact counting

    Time Complexity: O(k) for update and query
    Space Complexity: O(w × d) where w = width, d = depth

    Real-world: Twitter uses for trending topics, network routers for
    traffic analysis, Redis uses for approximate counting
    """

    def __init__(self, width: int, depth: int):
        """
        width: Number of counters per hash function (w)
        depth: Number of hash functions (d)

        Rule of thumb:
        - width = ceil(e / epsilon) where e = 2.718
        - depth = ceil(ln(1/delta))

        Example: For ε=0.01 (1% error), δ=0.01 (99% confidence):
        - width = 272, depth = 5
        """
        self.width = width
        self.depth = depth
        self.table = [[0] * width for _ in range(depth)]

    def _hash(self, item: str, seed: int) -> int:
        """Generate hash for item with seed"""
        h = hashlib.md5(f"{item}{seed}".encode())
        return int(h.hexdigest(), 16) % self.width

    def update(self, item: str, count: int = 1) -> None:
        """Increment count for item"""
        for i in range(self.depth):
            index = self._hash(item, i)
            self.table[i][index] += count

    def query(self, item: str) -> int:
        """
        Estimate frequency of item.

        Returns minimum across all hash functions (reduces overcount error).
        """
        min_count = float('inf')
        for i in range(self.depth):
            index = self._hash(item, i)
            min_count = min(min_count, self.table[i][index])
        return min_count

    def top_k(self, items: List[str], k: int) -> List[tuple]:
        """
        Find top-K most frequent items.

        Note: Need to track candidates separately, then query their counts.
        """
        counts = [(item, self.query(item)) for item in set(items)]
        counts.sort(key=lambda x: x[1], reverse=True)
        return counts[:k]

# Example: Twitter trending hashtags
print("\n=== Count-Min Sketch Example ===")
cms = CountMinSketch(width=100, depth=5)

# Simulate tweet stream with hashtags
tweet_stream = [
    "#python", "#python", "#javascript", "#python",
    "#java", "#python", "#go", "#javascript",
    "#rust", "#python", "#javascript", "#python",
    "#typescript", "#python", "#javascript",
]

# Update counts
for hashtag in tweet_stream:
    cms.update(hashtag)

# Query frequencies
print("Hashtag frequencies:")
unique_tags = set(tweet_stream)
for tag in sorted(unique_tags):
    estimated = cms.query(tag)
    actual = tweet_stream.count(tag)
    print(f"  {tag}: estimated={estimated}, actual={actual}")

# Find top-3 trending
top_3 = cms.top_k(list(unique_tags), 3)
print(f"\nTop 3 trending:")
for tag, count in top_3:
    print(f"  {tag}: ~{count} tweets")
Count-Min Sketch vs Exact Counter:
Exact: HashMap storing every item → O(n) space for n unique items
Count-Min: Fixed size regardless of unique items → O(w×d) space
Trade-off: Slight overestimation for massive space savings

🔗 2. URL Shortener (bit.ly)

Use Case: bit.ly, TinyURL, goo.gl - convert long URLs to short codes
import string
import hashlib
from typing import Optional, Dict

class URLShortener:
    """
    URL Shortener like bit.ly.

    Requirements:
    - Generate unique short codes (e.g., bit.ly/a3x9F)
    - Redirect short -> long URL
    - Handle billions of URLs
    - Short codes should be 6-8 characters

    Approaches:
    1. Random generation + collision check
    2. Hash-based (MD5) + truncate
    3. Counter-based (base62 encoding) ← Most common
    """

    # Base62: [0-9a-zA-Z] = 62 characters
    BASE62 = string.digits + string.ascii_lowercase + string.ascii_uppercase

    def __init__(self):
        self.url_to_code: Dict[str, str] = {}
        self.code_to_url: Dict[str, str] = {}
        self.counter = 100000  # Start at 100000 for 6-char codes

    def shorten_counter_based(self, long_url: str) -> str:
        """
        Counter-based approach (most scalable).

        Algorithm:
        1. Increment global counter
        2. Encode counter in base62
        3. Store mapping

        Pros: Guaranteed unique, predictable length
        Cons: Sequential (can predict next URL)

        Space: 62^6 = 56 billion unique URLs with 6 characters
        """
        # Check if already shortened
        if long_url in self.url_to_code:
            return self.url_to_code[long_url]

        # Generate short code
        short_code = self._encode_base62(self.counter)
        self.counter += 1

        # Store bidirectional mapping
        self.url_to_code[long_url] = short_code
        self.code_to_url[short_code] = long_url

        return short_code

    def shorten_hash_based(self, long_url: str) -> str:
        """
        Hash-based approach.

        Algorithm:
        1. Hash URL (MD5)
        2. Take first 6 characters of base62-encoded hash
        3. Handle collisions with counter suffix

        Pros: Non-sequential, deterministic for same URL
        Cons: Collision handling needed
        """
        # MD5 hash
        hash_digest = hashlib.md5(long_url.encode()).hexdigest()

        # Take first 6 hex chars and convert to base62
        hash_int = int(hash_digest[:8], 16)
        short_code = self._encode_base62(hash_int)[:6]

        # Handle collision
        original_code = short_code
        suffix = 0
        while short_code in self.code_to_url:
            suffix += 1
            short_code = original_code + str(suffix)

        # Store mapping
        self.url_to_code[long_url] = short_code
        self.code_to_url[short_code] = long_url

        return short_code

    def expand(self, short_code: str) -> Optional[str]:
        """Expand short code to original URL"""
        return self.code_to_url.get(short_code)

    def _encode_base62(self, num: int) -> str:
        """
        Encode integer to base62 string.

        Example: 12345 -> "3D7" in base62
        """
        if num == 0:
            return self.BASE62[0]

        result = []
        while num > 0:
            result.append(self.BASE62[num % 62])
            num //= 62

        return ''.join(reversed(result))

    def _decode_base62(self, code: str) -> int:
        """Decode base62 string to integer"""
        num = 0
        for char in code:
            num = num * 62 + self.BASE62.index(char)
        return num

# Example usage
print("\n=== URL Shortener Example ===")
shortener = URLShortener()

# Shorten URLs
long_url1 = "https://www.example.com/very/long/path/to/resource?param=value"
long_url2 = "https://github.com/user/repo/pull/12345"

short1 = shortener.shorten_counter_based(long_url1)
short2 = shortener.shorten_counter_based(long_url2)

print(f"Original: {long_url1}")
print(f"Shortened: bit.ly/{short1}")
print()
print(f"Original: {long_url2}")
print(f"Shortened: bit.ly/{short2}")

# Expand
print(f"\nExpanding bit.ly/{short1}:")
print(f"-> {shortener.expand(short1)}")

# Calculate capacity
print("\n=== Capacity Analysis ===")
chars = len(URLShortener.BASE62)
for length in range(4, 9):
    capacity = chars ** length
    print(f"{length} characters: {capacity:,} unique URLs")
Time Complexity: O(1) for shorten and expand
Space Complexity: O(n) where n = number of URLs
Production Considerations:
- Use distributed counter (Redis INCR, DB sequence, Snowflake ID)
- Add expiration (delete old URLs)
- Rate limiting (prevent abuse)
- Analytics (track clicks)
- Custom aliases (vanity URLs)

Distributed URL Shortener

import time

class SnowflakeID:
    """
    Twitter Snowflake ID generator for distributed systems.

    64-bit ID structure:
    - 1 bit: unused (always 0)
    - 41 bits: timestamp (milliseconds since epoch) → 69 years
    - 10 bits: machine/datacenter ID → 1024 machines
    - 12 bits: sequence number → 4096 IDs per ms per machine

    Guarantees:
    - Globally unique across all machines
    - Roughly time-ordered (sortable)
    - No coordination needed between machines
    """

    def __init__(self, machine_id: int, epoch: int = 1609459200000):
        """
        machine_id: Unique ID for this machine (0-1023)
        epoch: Custom epoch (default: Jan 1, 2021)
        """
        self.machine_id = machine_id & 0x3FF  # 10 bits
        self.epoch = epoch
        self.sequence = 0
        self.last_timestamp = -1

    def generate(self) -> int:
        """Generate unique 64-bit ID"""
        timestamp = int(time.time() * 1000)  # milliseconds

        if timestamp < self.last_timestamp:
            raise Exception("Clock moved backwards!")

        if timestamp == self.last_timestamp:
            # Same millisecond - increment sequence
            self.sequence = (self.sequence + 1) & 0xFFF  # 12 bits
            if self.sequence == 0:
                # Sequence exhausted - wait for next ms
                while timestamp <= self.last_timestamp:
                    timestamp = int(time.time() * 1000)
        else:
            # New millisecond - reset sequence
            self.sequence = 0

        self.last_timestamp = timestamp

        # Construct ID: timestamp | machine_id | sequence
        return ((timestamp - self.epoch) << 22) | (self.machine_id << 12) | self.sequence

# Example: Distributed URL shortener
class DistributedURLShortener:
    """URL Shortener for distributed system with multiple servers"""

    BASE62 = string.digits + string.ascii_lowercase + string.ascii_uppercase

    def __init__(self, machine_id: int):
        self.id_generator = SnowflakeID(machine_id)
        self.code_to_url: Dict[str, str] = {}

    def shorten(self, long_url: str) -> str:
        """Generate short code using Snowflake ID"""
        unique_id = self.id_generator.generate()
        short_code = self._encode_base62(unique_id)[:7]  # 7 chars

        self.code_to_url[short_code] = long_url
        return short_code

    def _encode_base62(self, num: int) -> str:
        if num == 0:
            return self.BASE62[0]
        result = []
        while num > 0:
            result.append(self.BASE62[num % 62])
            num //= 62
        return ''.join(reversed(result))

# Simulate multiple servers
print("\n=== Distributed URL Shortener ===")
server1 = DistributedURLShortener(machine_id=1)
server2 = DistributedURLShortener(machine_id=2)

url = "https://example.com/page"
code1 = server1.shorten(url)
code2 = server2.shorten(url)

print(f"Server 1 generated: {code1}")
print(f"Server 2 generated: {code2}")
print(f"Unique: {code1 != code2}")  # True - different servers generate different IDs

🚦 3. Rate Limiting Strategies

Protect APIs from abuse and ensure fair resource allocation.

Token Bucket Algorithm

Use Case: AWS API Gateway, Stripe API, most RESTful APIs - allows bursts
import time
from typing import Dict

class TokenBucket:
    """
    Token Bucket rate limiter.

    Concept: Bucket holds tokens, refilled at constant rate.
    - Each request consumes 1 token
    - If no tokens available, request denied
    - Allows bursts up to bucket capacity

    Example: 100 requests/minute with burst of 20
    - Capacity: 20 tokens
    - Refill rate: 100/60 = 1.67 tokens/second

    Pros: Simple, allows bursts, smooth rate limiting
    Cons: Requires state per user
    """

    def __init__(self, capacity: int, refill_rate: float):
        """
        capacity: Maximum tokens in bucket (burst size)
        refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()

    def allow_request(self) -> bool:
        """
        Try to consume a token for this request.

        Returns: True if allowed, False if rate limited
        """
        # Refill tokens based on time elapsed
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

        # Try to consume a token
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        else:
            return False

class DistributedTokenBucket:
    """
    Token Bucket with Redis for distributed systems.

    Uses Redis to store:
    - tokens: Current token count
    - last_refill: Last refill timestamp

    All servers share same bucket state.
    """

    def __init__(self, user_id: str, capacity: int, refill_rate: float):
        self.user_id = user_id
        self.capacity = capacity
        self.refill_rate = refill_rate
        # In production: self.redis = redis.Redis()

    def allow_request(self) -> bool:
        """Check rate limit using Redis"""
        # Lua script for atomic refill + consume (prevents race conditions)
        lua_script = """
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        local tokens = tonumber(redis.call('GET', KEYS[1]) or capacity)
        local last_refill = tonumber(redis.call('GET', KEYS[2]) or now)

        local elapsed = now - last_refill
        tokens = math.min(capacity, tokens + elapsed * refill_rate)

        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('SET', KEYS[1], tokens)
            redis.call('SET', KEYS[2], now)
            return 1
        else
            return 0
        end
        """

        # In production: result = redis.eval(lua_script, ...)
        # Simulated for example
        return True

# Example: API rate limiting
print("\n=== Token Bucket Example ===")
bucket = TokenBucket(capacity=5, refill_rate=1.0)  # 5 burst, 1 token/sec

print("Making 7 rapid requests:")
for i in range(7):
    allowed = bucket.allow_request()
    status = "✓ Allowed" if allowed else "✗ Rate Limited"
    print(f"  Request {i+1}: {status} ({bucket.tokens:.2f} tokens remaining)")

print("\nWaiting 3 seconds for refill...")
time.sleep(3)

print("\nAfter refill:")
allowed = bucket.allow_request()
print(f"  Request: {'✓ Allowed' if allowed else '✗ Rate Limited'} ({bucket.tokens:.2f} tokens)")

Sliding Window Log

Use Case: More accurate than fixed window, used when burst control is critical
import time
from collections import deque
from typing import Deque

class SlidingWindowLog:
    """
    Sliding Window Log rate limiter.

    Concept: Keep log of all request timestamps, count requests in window.
    - Window slides with time (not fixed intervals)
    - More accurate than fixed window
    - No boundary issues

    Example: 100 requests per minute
    - At any point in time, count requests in last 60 seconds
    - If < 100, allow; else deny

    Pros: Very accurate, no boundary effects
    Cons: High memory (stores all timestamps)
    """

    def __init__(self, max_requests: int, window_seconds: int):
        """
        max_requests: Maximum requests allowed in window
        window_seconds: Size of sliding window
        """
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.request_log: Deque[float] = deque()

    def allow_request(self) -> bool:
        """Check if request is allowed"""
        now = time.time()
        window_start = now - self.window_seconds

        # Remove old requests outside window
        while self.request_log and self.request_log[0] < window_start:
            self.request_log.popleft()

        # Check if we're under limit
        if len(self.request_log) < self.max_requests:
            self.request_log.append(now)
            return True
        else:
            return False

class SlidingWindowCounter:
    """
    Sliding Window Counter (hybrid approach).

    Concept: Combine current + previous window with weighted average.
    - Less memory than log (only 2 counters)
    - More accurate than fixed window

    Formula:
    count = prev_window_count * overlap_percent + current_window_count

    Example: Limit 100 req/min, window size 1 min
    - At 12:00:30 (30 sec into current minute):
      - Previous minute (11:59-12:00): had 80 requests
      - Current minute (12:00-12:01): has 40 requests
      - Overlap: 50% of previous window
      - Estimated count: 80 * 0.5 + 40 = 80 requests
    """

    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.current_window_count = 0
        self.current_window_start = time.time()
        self.prev_window_count = 0

    def allow_request(self) -> bool:
        now = time.time()
        elapsed = now - self.current_window_start

        if elapsed >= self.window_seconds:
            # Move to next window
            self.prev_window_count = self.current_window_count
            self.current_window_count = 0
            self.current_window_start = now
            elapsed = 0

        # Calculate weighted count
        overlap_percent = (self.window_seconds - elapsed) / self.window_seconds
        estimated_count = (self.prev_window_count * overlap_percent +
                          self.current_window_count)

        if estimated_count < self.max_requests:
            self.current_window_count += 1
            return True
        else:
            return False

# Example comparison
print("\n=== Sliding Window Comparison ===")

# Sliding Window Log (accurate but memory-intensive)
swl = SlidingWindowLog(max_requests=5, window_seconds=10)

print("Sliding Window Log (5 requests per 10 seconds):")
for i in range(8):
    allowed = swl.allow_request()
    print(f"  Request {i+1}: {'✓' if allowed else '✗'} ({len(swl.request_log)} in window)")
    if i == 4:
        print("  Waiting 6 seconds...")
        time.sleep(6)
Rate Limiter Comparison:

Algorithm Memory Accuracy Bursts Use Case
Token Bucket O(1) Good Allows APIs, AWS Gateway
Leaky Bucket O(1) Good Smooths Network traffic shaping
Fixed Window O(1) Poor Boundary issues Simple rate limiting
Sliding Window Log O(n) Excellent Controlled Accurate limiting needed
Sliding Window Counter O(1) Very Good Controlled Best balance (Redis)

📰 4. News Feed / Fan-out Service

Use Case: Twitter timeline, Facebook news feed, Instagram feed
from typing import List, Dict, Set
from dataclasses import dataclass
from datetime import datetime
import heapq

@dataclass
class Post:
    post_id: int
    author_id: int
    content: str
    timestamp: datetime

    def __lt__(self, other):
        return self.timestamp > other.timestamp  # Reverse for max heap

class FanoutService:
    """
    News Feed implementation with two strategies:

    1. Fan-out on Write (Push model):
       - When user posts, immediately push to all followers' feeds
       - Pros: Fast reads (feed pre-generated)
       - Cons: Slow writes (celebrity with 100M followers = 100M writes)

    2. Fan-out on Read (Pull model):
       - When user requests feed, fetch from followed users
       - Pros: Fast writes, no wasted work
       - Cons: Slow reads (need to query many users)

    Hybrid: Use push for normal users, pull for celebrities
    """

    def __init__(self):
        # User -> Set of followers
        self.followers: Dict[int, Set[int]] = {}

        # User -> Set of users they follow
        self.following: Dict[int, Set[int]] = {}

        # User -> Posts they created
        self.user_posts: Dict[int, List[Post]] = {}

        # User -> Pre-generated feed (fan-out on write)
        self.user_feeds: Dict[int, List[Post]] = {}

    def follow(self, follower_id: int, followee_id: int):
        """User follows another user"""
        if followee_id not in self.followers:
            self.followers[followee_id] = set()
        self.followers[followee_id].add(follower_id)

        if follower_id not in self.following:
            self.following[follower_id] = set()
        self.following[follower_id].add(followee_id)

    def post_fan_out_on_write(self, user_id: int, content: str, post_id: int):
        """
        Fan-out on Write: Push post to all followers' feeds.

        Process:
        1. Create post
        2. Get all followers
        3. Add post to each follower's feed

        Time Complexity: O(F) where F = number of followers
        """
        post = Post(post_id, user_id, content, datetime.now())

        # Store in user's posts
        if user_id not in self.user_posts:
            self.user_posts[user_id] = []
        self.user_posts[user_id].append(post)

        # Fan-out: Push to all followers
        followers = self.followers.get(user_id, set())
        for follower_id in followers:
            if follower_id not in self.user_feeds:
                self.user_feeds[follower_id] = []
            self.user_feeds[follower_id].append(post)

        # Also add to own feed
        if user_id not in self.user_feeds:
            self.user_feeds[user_id] = []
        self.user_feeds[user_id].append(post)

    def get_feed_push_model(self, user_id: int, limit: int = 10) -> List[Post]:
        """
        Get feed using push model (fan-out on write).

        Time Complexity: O(F log F) where F = feed size (for sorting)
        Very fast! Feed already generated.
        """
        feed = self.user_feeds.get(user_id, [])
        feed.sort(key=lambda p: p.timestamp, reverse=True)
        return feed[:limit]

    def get_feed_pull_model(self, user_id: int, limit: int = 10) -> List[Post]:
        """
        Get feed using pull model (fan-out on read).

        Process:
        1. Get all users this user follows
        2. Fetch recent posts from each
        3. Merge and sort (k-way merge)

        Time Complexity: O(N × log(limit)) where N = users followed
        Slower, but no write amplification
        """
        following = self.following.get(user_id, set())

        # Collect recent posts from all followed users (including self)
        all_posts = []
        for followed_id in following | {user_id}:
            posts = self.user_posts.get(followed_id, [])
            all_posts.extend(posts)

        # Sort by timestamp and return top N
        all_posts.sort(key=lambda p: p.timestamp, reverse=True)
        return all_posts[:limit]

    def get_feed_hybrid(self, user_id: int, limit: int = 10) -> List[Post]:
        """
        Hybrid approach (Twitter/Facebook style).

        Strategy:
        - Normal users: Fan-out on write (push model)
        - Celebrities (>1M followers): Pull model
        - Mix results using merge

        This balances:
        - Write cost: Don't fan-out to millions
        - Read performance: Pre-generate for most users
        """
        CELEBRITY_THRESHOLD = 1000000

        feed_posts = []
        celebrity_posts = []

        following = self.following.get(user_id, set())

        for followed_id in following:
            follower_count = len(self.followers.get(followed_id, set()))

            if follower_count > CELEBRITY_THRESHOLD:
                # Celebrity: Pull their posts
                posts = self.user_posts.get(followed_id, [])
                celebrity_posts.extend(posts)
            # else: Their posts already in our feed (pushed)

        # Merge pre-generated feed + celebrity posts
        feed_posts = self.user_feeds.get(user_id, [])
        all_posts = feed_posts + celebrity_posts
        all_posts.sort(key=lambda p: p.timestamp, reverse=True)

        return all_posts[:limit]

# Example: Twitter-like feed
print("\n=== News Feed Example ===")
feed_service = FanoutService()

# Users
alice_id = 1
bob_id = 2
charlie_id = 3

# Follow relationships
feed_service.follow(alice_id, bob_id)    # Alice follows Bob
feed_service.follow(alice_id, charlie_id)  # Alice follows Charlie
feed_service.follow(charlie_id, bob_id)  # Charlie follows Bob

# Posts (fan-out on write)
feed_service.post_fan_out_on_write(bob_id, "Hello from Bob!", post_id=101)
feed_service.post_fan_out_on_write(charlie_id, "Charlie's update", post_id=102)
feed_service.post_fan_out_on_write(bob_id, "Bob again!", post_id=103)

# Alice's feed
print("Alice's feed (push model):")
alice_feed = feed_service.get_feed_push_model(alice_id, limit=5)
for post in alice_feed:
    print(f"  @user{post.author_id}: {post.content}")

print("\nAlice's feed (pull model):")
alice_feed_pull = feed_service.get_feed_pull_model(alice_id, limit=5)
for post in alice_feed_pull:
    print(f"  @user{post.author_id}: {post.content}")
Celebrity Problem (Fan-out on Write):
Elon Musk posts tweet → 100M followers
= 100M feed writes = database overload!

Solution: Don't fan-out celebrities. When users request feed, merge:
1. Pre-generated feed (from normal follows)
2. Real-time pull from celebrities

🔤 5. Autocomplete / Trie

Use Case: Google search autocomplete, IDE autocomplete, spell checker, IP routing
from typing import List, Optional, Dict

class TrieNode:
    def __init__(self):
        self.children: Dict[str, TrieNode] = {}
        self.is_end_of_word = False
        self.frequency = 0  # How often this word is searched

class Trie:
    """
    Trie (Prefix Tree) for efficient autocomplete.

    Properties:
    - Each node represents a character
    - Path from root to node = prefix
    - Words sharing prefix share path

    Time Complexity:
    - Insert: O(L) where L = word length
    - Search: O(L)
    - Autocomplete: O(P + N) where P = prefix length, N = results

    Space Complexity: O(ALPHABET_SIZE × N × L) where N = words
    """

    def __init__(self):
        self.root = TrieNode()

    def insert(self, word: str, frequency: int = 1):
        """Insert word into trie with frequency (for ranking)"""
        node = self.root

        for char in word:
            if char not in node.children:
                node.children[char] = TrieNode()
            node = node.children[char]

        node.is_end_of_word = True
        node.frequency += frequency

    def search(self, word: str) -> bool:
        """Check if word exists"""
        node = self._find_node(word)
        return node is not None and node.is_end_of_word

    def starts_with(self, prefix: str) -> bool:
        """Check if any word starts with prefix"""
        return self._find_node(prefix) is not None

    def autocomplete(self, prefix: str, limit: int = 10) -> List[tuple]:
        """
        Get top autocomplete suggestions for prefix.

        Returns: List of (word, frequency) tuples, sorted by frequency
        """
        # Find node for prefix
        node = self._find_node(prefix)
        if node is None:
            return []

        # DFS to find all words with this prefix
        results = []
        self._dfs_words(node, prefix, results)

        # Sort by frequency (descending) and return top N
        results.sort(key=lambda x: x[1], reverse=True)
        return results[:limit]

    def _find_node(self, prefix: str) -> Optional[TrieNode]:
        """Navigate to node representing prefix"""
        node = self.root
        for char in prefix:
            if char not in node.children:
                return None
            node = node.children[char]
        return node

    def _dfs_words(self, node: TrieNode, current_word: str, results: List[tuple]):
        """DFS to collect all words from this node"""
        if node.is_end_of_word:
            results.append((current_word, node.frequency))

        for char, child_node in node.children.items():
            self._dfs_words(child_node, current_word + char, results)

class AutocompleteSystem:
    """
    Production autocomplete system with caching.

    Optimizations:
    1. Cache top results for popular prefixes
    2. Limit search depth
    3. Pre-compute top suggestions
    """

    def __init__(self):
        self.trie = Trie()
        self.cache: Dict[str, List[tuple]] = {}

    def add_search(self, query: str):
        """Add search query (increments frequency)"""
        self.trie.insert(query.lower(), frequency=1)

        # Invalidate cache for this prefix
        self._invalidate_cache(query.lower())

    def suggest(self, prefix: str, limit: int = 10) -> List[str]:
        """Get autocomplete suggestions"""
        prefix = prefix.lower()

        # Check cache
        if prefix in self.cache:
            return [word for word, _ in self.cache[prefix][:limit]]

        # Query trie
        results = self.trie.autocomplete(prefix, limit)

        # Cache results
        self.cache[prefix] = results

        return [word for word, _ in results]

    def _invalidate_cache(self, query: str):
        """Invalidate cache for all prefixes of this query"""
        for i in range(1, len(query) + 1):
            prefix = query[:i]
            if prefix in self.cache:
                del self.cache[prefix]

# Example: Google-style autocomplete
print("\n=== Autocomplete System Example ===")
autocomplete = AutocompleteSystem()

# Simulate search history
searches = [
    ("python tutorial", 1000),
    ("python interview questions", 800),
    ("python list comprehension", 500),
    ("java tutorial", 700),
    ("javascript promises", 600),
    ("python decorators", 400),
]

for query, freq in searches:
    for _ in range(freq):
        autocomplete.add_search(query)

# User types "pyth"
print("User types 'pyth':")
suggestions = autocomplete.suggest("pyth", limit=5)
for i, suggestion in enumerate(suggestions, 1):
    print(f"  {i}. {suggestion}")

print("\nUser types 'python ':")
suggestions = autocomplete.suggest("python ", limit=5)
for i, suggestion in enumerate(suggestions, 1):
    print(f"  {i}. {suggestion}")
Real-World Optimizations:
1. Google: Pre-computes top 10 suggestions for every prefix, stores in cache
2. Personalization: Boost suggestions based on user history
3. Geographic: Different suggestions by location
4. Trending: Real-time injection of trending topics
5. Distributed: Sharded trie across multiple servers by prefix hash

🌐 6. Distributed Systems Patterns

Sharding / Partitioning

Use Case: Distribute data across multiple databases for horizontal scaling
import hashlib
from typing import Any, List, Dict

class ShardingStrategy:
    """Base class for sharding strategies"""

    def get_shard(self, key: str) -> int:
        raise NotImplementedError

class HashBasedSharding(ShardingStrategy):
    """
    Hash-based sharding: hash(key) % num_shards

    Pros: Even distribution
    Cons: Adding/removing shards requires rehashing all data
    """

    def __init__(self, num_shards: int):
        self.num_shards = num_shards

    def get_shard(self, key: str) -> int:
        hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
        return hash_value % self.num_shards

class RangeBasedSharding(ShardingStrategy):
    """
    Range-based sharding: key ranges mapped to shards

    Example: user_id 0-1M → shard 0, 1M-2M → shard 1

    Pros: Easy to add shards, range queries efficient
    Cons: Can lead to hotspots if data not evenly distributed
    """

    def __init__(self, ranges: List[tuple]):
        """ranges: [(min, max, shard_id), ...]"""
        self.ranges = sorted(ranges, key=lambda x: x[0])

    def get_shard(self, key: str) -> int:
        # Assume key is numeric for range-based
        key_int = int(key)

        for min_val, max_val, shard_id in self.ranges:
            if min_val <= key_int < max_val:
                return shard_id

        return 0  # Default shard

class GeoBasedSharding(ShardingStrategy):
    """
    Geography-based sharding: route by location

    Example: US users → US datacenter, EU users → EU datacenter

    Pros: Low latency (data close to users)
    Cons: Uneven distribution, cross-region queries complex
    """

    def __init__(self, geo_mapping: Dict[str, int]):
        """geo_mapping: {'US': 0, 'EU': 1, 'ASIA': 2}"""
        self.geo_mapping = geo_mapping

    def get_shard(self, key: str) -> int:
        # Extract region from key (e.g., "user_US_12345")
        parts = key.split('_')
        region = parts[1] if len(parts) > 1 else 'US'
        return self.geo_mapping.get(region, 0)

class ShardedDatabase:
    """Simulated sharded database"""

    def __init__(self, num_shards: int, strategy: ShardingStrategy):
        self.shards = [{}for _ in range(num_shards)]
        self.strategy = strategy

    def put(self, key: str, value: Any):
        """Store key-value in appropriate shard"""
        shard_id = self.strategy.get_shard(key)
        self.shards[shard_id][key] = value

    def get(self, key: str) -> Any:
        """Retrieve value from appropriate shard"""
        shard_id = self.strategy.get_shard(key)
        return self.shards[shard_id].get(key)

    def get_shard_stats(self) -> List[int]:
        """Get number of items in each shard"""
        return [len(shard) for shard in self.shards]

# Example: Sharding user data
print("\n=== Database Sharding Example ===")

# Hash-based sharding
hash_strategy = HashBasedSharding(num_shards=4)
db = ShardedDatabase(num_shards=4, strategy=hash_strategy)

# Insert users
users = [f"user_{i}" for i in range(100)]
for user in users:
    db.put(user, {"name": user, "email": f"{user}@example.com"})

print("Hash-based sharding distribution:")
stats = db.get_shard_stats()
for i, count in enumerate(stats):
    print(f"  Shard {i}: {count} users ({count/100*100:.1f}%)")

# Range-based sharding
print("\nRange-based sharding:")
ranges = [(0, 25, 0), (25, 50, 1), (50, 75, 2), (75, 100, 3)]
range_strategy = RangeBasedSharding(ranges)
db_range = ShardedDatabase(num_shards=4, strategy=range_strategy)

for i in range(100):
    db_range.put(str(i), f"user_{i}")

stats = db_range.get_shard_stats()
for i, count in enumerate(stats):
    print(f"  Shard {i}: {count} users ({count/100*100:.1f}%)")

Idempotency Key Pattern

Use Case: Stripe payments, ensure exactly-once processing despite retries
import uuid
from typing import Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class IdempotentResult:
    """Cached result of idempotent operation"""
    request_id: str
    result: Any
    timestamp: datetime

class IdempotencyManager:
    """
    Idempotency key manager for exactly-once processing.

    Pattern:
    1. Client generates unique request ID (UUID)
    2. Server checks if request ID already processed
    3. If yes: return cached result
    4. If no: process request, cache result with request ID

    Prevents:
    - Duplicate charges (payment retries)
    - Duplicate messages (network retries)
    - Duplicate database writes

    TTL: Results cached for 24 hours
    """

    def __init__(self, ttl_hours: int = 24):
        self.cache: Dict[str, IdempotentResult] = {}
        self.ttl = timedelta(hours=ttl_hours)

    def execute(self, idempotency_key: str, operation: callable) -> Any:
        """
        Execute operation idempotently.

        Args:
            idempotency_key: Unique key for this request
            operation: Function to execute (only if not cached)

        Returns: Result (from cache or fresh execution)
        """
        # Check cache
        if idempotency_key in self.cache:
            cached = self.cache[idempotency_key]

            # Check TTL
            if datetime.now() - cached.timestamp < self.ttl:
                print(f"  [CACHE HIT] Returning cached result for {idempotency_key}")
                return cached.result
            else:
                # Expired - remove from cache
                del self.cache[idempotency_key]

        # Execute operation
        print(f"  [CACHE MISS] Executing operation for {idempotency_key}")
        result = operation()

        # Cache result
        self.cache[idempotency_key] = IdempotentResult(
            request_id=idempotency_key,
            result=result,
            timestamp=datetime.now()
        )

        return result

    def cleanup_expired(self):
        """Remove expired entries"""
        now = datetime.now()
        expired_keys = [
            key for key, cached in self.cache.items()
            if now - cached.timestamp >= self.ttl
        ]
        for key in expired_keys:
            del self.cache[key]

# Example: Stripe-style payment processing
class PaymentProcessor:
    """Simulated payment processor with idempotency"""

    def __init__(self):
        self.idempotency = IdempotencyManager()
        self.payment_count = 0

    def charge(self, idempotency_key: str, amount: float, customer: str) -> Dict:
        """
        Process payment idempotently.

        If client retries with same key, returns original result.
        """
        def process_charge():
            self.payment_count += 1
            charge_id = f"ch_{uuid.uuid4().hex[:16]}"
            print(f"    Processing charge: ${amount} for {customer}")
            return {
                "charge_id": charge_id,
                "amount": amount,
                "customer": customer,
                "status": "succeeded"
            }

        return self.idempotency.execute(idempotency_key, process_charge)

# Example usage
print("\n=== Idempotency Pattern Example ===")
processor = PaymentProcessor()

# Client generates idempotency key
request_id = str(uuid.uuid4())

# Original request
print("Request 1 (original):")
result1 = processor.charge(request_id, 100.0, "customer_123")
print(f"  Charge ID: {result1['charge_id']}")

# Network fails, client retries with SAME idempotency key
print("\nRequest 2 (retry with same key):")
result2 = processor.charge(request_id, 100.0, "customer_123")
print(f"  Charge ID: {result2['charge_id']}")

# Verify only ONE charge was processed
print(f"\nTotal charges processed: {processor.payment_count}")
print(f"Same result: {result1['charge_id'] == result2['charge_id']}")
Stripe Implementation Details:
1. Client sends Idempotency-Key header with unique UUID
2. Stripe stores (key → result) in Redis with 24hr TTL
3. Retry with same key returns cached result
4. Different key = new operation
5. Prevents duplicate charges from network retries/timeouts

🎓 Complete System Design Implementation Coverage

Part 1: Dijkstra, A*, TSP, Geohash, QuadTree, LRU Cache, Consistent Hashing

Part 2: Bloom Filter, Count-Min Sketch, URL Shortener, Rate Limiters, News Feed Fan-out, Autocomplete Trie, Sharding, Idempotency

All implementations include complexity analysis, real-world use cases, and production considerations for Distinguished Engineer interviews.