Caching Strategies

Cache Patterns, Invalidation, and Performance Optimization

"There are only two hard things in Computer Science: cache invalidation and naming things."
— Phil Karlton

1. Cache Hierarchy

Level Location Latency Size Use Case
L1 Cache CPU core ~1 ns 32-64 KB CPU registers, instructions
L2 Cache CPU core ~3-10 ns 256 KB - 1 MB Recently accessed data
L3 Cache CPU package ~10-20 ns 8-64 MB Shared across cores
RAM System memory ~100 ns 8-128 GB Active application data
Application Cache Redis/Memcached ~1 ms 1-100 GB Hot data, sessions, computed results
CDN Cache Edge servers ~10-50 ms 1-10 TB Static assets, images, videos
Database Disk + DB cache ~10-100 ms 100 GB - 10 TB Persistent storage
flowchart LR subgraph "Request Path (Latency)" A[Client] --> B[CDN
10-50ms] B --> C[Load Balancer] C --> D[App Server] D --> E[Redis Cache
~1ms] E --> F[Database
10-100ms] end style A fill:#5e81ac,color:#eceff4 style B fill:#a3be8c,color:#2e3440 style C fill:#88c0d0,color:#2e3440 style D fill:#88c0d0,color:#2e3440 style E fill:#a3be8c,color:#2e3440 style F fill:#bf616a,color:#eceff4

2. Caching Patterns

Understanding the Names

The pattern names describe where the cache sits relative to the write path or when operations happen:

flowchart LR subgraph WT["Write-THROUGH"] direction LR WT_A[App] -->|write| WT_C[Cache] WT_C -->|write| WT_D[DB] end style WT_A fill:#5e81ac,color:#eceff4 style WT_C fill:#a3be8c,color:#2e3440 style WT_D fill:#88c0d0,color:#2e3440

Write flows THROUGH the cache to DB (inline, synchronous)

flowchart LR subgraph WB["Write-BEHIND"] direction LR WB_A[App] -->|write| WB_C[Cache] WB_C -.->|later| WB_D[DB] end style WB_A fill:#5e81ac,color:#eceff4 style WB_C fill:#a3be8c,color:#2e3440 style WB_D fill:#88c0d0,color:#2e3440

DB write happens BEHIND (after) the cache write

flowchart LR subgraph CA["Cache-ASIDE"] direction LR CA_A[App] -->|write| CA_D[DB] CA_A <-->|manage| CA_C[Cache] CA_D -.->|read back| CA_C end style CA_A fill:#5e81ac,color:#eceff4 style CA_C fill:#a3be8c,color:#2e3440 style CA_D fill:#88c0d0,color:#2e3440

Cache is ASIDE from the main path, app manages it manually

flowchart RL subgraph RT["Read-THROUGH"] direction RL RT_D[DB] -->|fetch| RT_C[Cache] RT_C -->|return| RT_A[App] end style RT_A fill:#5e81ac,color:#eceff4 style RT_C fill:#a3be8c,color:#2e3440 style RT_D fill:#88c0d0,color:#2e3440

Read flows THROUGH the cache (cache auto-fetches on miss)

What Each Name Implies

Pattern Preposition Meaning Cache Position
Write-Through Data passes through the cache Inline, synchronous gatekeeper
Write-Behind DB write comes behind (after) Cache first, DB catches up later
Write-Back Data written back to DB Same as write-behind (alias)
Cache-Aside Cache is to the side Out of band, app manages explicitly
Read-Through Reads pass through cache Cache auto-fetches on miss
Refresh-Ahead Refresh ahead of expiry Proactive, before TTL runs out
The Key Insight:

Pattern Comparison

Pattern Read Latency Write Latency Consistency Best For
Cache-Aside Miss penalty Fast (DB only) Eventual General purpose, read-heavy
Write-Through Always fast Slow (2 writes) Strong Data integrity critical
Write-Behind Always fast Very fast Eventual Write-heavy, can tolerate loss
Refresh-Ahead Always fast (hot data) N/A Near real-time Predictable hot data

2.1 Cache-Aside (Lazy Loading)

Most Common Pattern: Application manages cache directly.

Cache HIT

sequenceDiagram participant App participant Cache participant DB App->>Cache: GET user:123 Cache-->>App: ✓ Found (JSON data) Note over App: Return cached data

Cache MISS

sequenceDiagram participant App participant Cache participant DB App->>Cache: GET user:123 Cache-->>App: ✗ Not found App->>DB: SELECT * FROM users DB-->>App: User data App->>Cache: SETEX user:123 (TTL) Note over App: Return data
sequenceDiagram participant App participant Cache participant DB Note over App,DB: Write Path (Invalidation) App->>DB: UPDATE users SET name='...' DB-->>App: OK App->>Cache: DELETE user:123 Note over Cache: Next read will repopulate
import redis
import json
from typing import Optional, Any
from functools import wraps

class CacheAside:
    """
    Cache-Aside (Lazy Loading) Pattern

    Pros:
    + Only caches what's actually requested (no waste)
    + Resilient (cache failure doesn't break app)
    + Works with any data store

    Cons:
    - Cache miss penalty (3 round trips: cache check, DB read, cache write)
    - Stale data possible (until TTL expires)
    - Cache stampede risk
    """

    def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
        self.cache = redis_client
        self.ttl = ttl

    def get_user(self, user_id: int) -> Optional[dict]:
        """
        Read with cache-aside pattern
        """
        cache_key = f"user:{user_id}"

        # 1. Try cache first
        cached = self.cache.get(cache_key)
        if cached:
            print(f"Cache HIT for {cache_key}")
            return json.loads(cached)

        # 2. Cache miss - fetch from database
        print(f"Cache MISS for {cache_key}")
        user = self._fetch_from_db(user_id)

        if user:
            # 3. Store in cache for future requests
            self.cache.setex(
                cache_key,
                self.ttl,
                json.dumps(user)
            )

        return user

    def update_user(self, user_id: int, updates: dict):
        """
        Write with cache invalidation
        """
        # 1. Update database
        self._update_db(user_id, updates)

        # 2. Invalidate cache (let next read repopulate)
        cache_key = f"user:{user_id}"
        self.cache.delete(cache_key)

        # Alternative: Update cache immediately
        # self.cache.setex(cache_key, self.ttl, json.dumps(updated_user))

    def _fetch_from_db(self, user_id: int) -> Optional[dict]:
        """Simulate database fetch"""
        # In production: SELECT * FROM users WHERE id = user_id
        return {"id": user_id, "name": "John Doe", "email": "john@example.com"}

    def _update_db(self, user_id: int, updates: dict):
        """Simulate database update"""
        # In production: UPDATE users SET ... WHERE id = user_id
        pass


# Decorator pattern for caching
def cache_aside(cache_key_fn, ttl=3600):
    """
    Decorator for cache-aside pattern

    Usage:
    @cache_aside(lambda user_id: f"user:{user_id}", ttl=3600)
    def get_user(user_id):
        return db.query(f"SELECT * FROM users WHERE id = {user_id}")
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            cache = redis.Redis(host='localhost', port=6379, db=0)

            # Generate cache key
            cache_key = cache_key_fn(*args, **kwargs)

            # Try cache
            cached = cache.get(cache_key)
            if cached:
                return json.loads(cached)

            # Cache miss - call function
            result = func(*args, **kwargs)

            # Store in cache
            if result is not None:
                cache.setex(cache_key, ttl, json.dumps(result))

            return result

        return wrapper
    return decorator


# Usage
@cache_aside(lambda user_id: f"user:{user_id}", ttl=3600)
def get_user(user_id: int):
    """This function is now cached"""
    print(f"Fetching user {user_id} from database...")
    return {"id": user_id, "name": "John Doe"}

2.2 Write-Through Cache

Synchronous Write: Write to cache and database simultaneously.
sequenceDiagram participant App participant Cache participant DB Note over App,DB: Write-Through: Both writes synchronous App->>DB: INSERT INTO users... DB-->>App: OK App->>Cache: SETEX user:123 Cache-->>App: OK Note over App: Both succeed or rollback rect rgb(46, 52, 64) Note over App,DB: Read Path (always in cache) App->>Cache: GET user:123 Cache-->>App: ✓ Found end
class WriteThroughCache:
    """
    Write-Through Cache Pattern

    Pros:
    + Cache always consistent with database
    + No stale data
    + Read latency always low (data always in cache)

    Cons:
    - Write latency higher (two writes)
    - Wasted cache space (caches everything, even rarely accessed)
    - Cache failure breaks writes
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client

    def create_user(self, user_id: int, user_data: dict):
        """
        Write-through: Write to cache AND database
        Both writes must succeed or rollback (2PC pattern)
        """
        try:
            # 1. Write to database first
            self._write_to_db(user_id, user_data)

            # 2. Write to cache (synchronously)
            cache_key = f"user:{user_id}"
            self.cache.setex(
                cache_key,
                3600,
                json.dumps(user_data)
            )
        except Exception as e:
            # If cache write fails, rollback DB write
            self._delete_from_db(user_id)
            raise Exception(f"Write-through failed: {e}. Rolled back DB write.")

    def update_user(self, user_id: int, updates: dict):
        """
        Update in both cache and database
        Both writes must succeed or rollback
        """
        # Get original data for rollback
        original_user = self._get_from_db(user_id)

        try:
            # 1. Update database
            updated_user = self._update_db(user_id, updates)

            # 2. Update cache
            cache_key = f"user:{user_id}"
            self.cache.setex(
                cache_key,
                3600,
                json.dumps(updated_user)
            )
        except Exception as e:
            # If cache write fails, rollback DB update
            if original_user:
                self._update_db(user_id, original_user)
            raise Exception(f"Write-through update failed: {e}. Rolled back DB update.")

    def get_user(self, user_id: int) -> Optional[dict]:
        """
        Read is simple - always in cache
        """
        cache_key = f"user:{user_id}"
        cached = self.cache.get(cache_key)

        if cached:
            return json.loads(cached)

        # Cache miss should be rare (only on cold start)
        return None

    def _write_to_db(self, user_id: int, user_data: dict):
        """Database write"""
        # In production: INSERT INTO users (id, ...) VALUES (user_id, ...)
        pass

    def _update_db(self, user_id: int, updates: dict) -> dict:
        """Database update, returns updated record"""
        # In production: UPDATE users SET ... WHERE id = user_id
        return {"id": user_id, **updates}

    def _delete_from_db(self, user_id: int):
        """Database delete (for rollback)"""
        # In production: DELETE FROM users WHERE id = user_id
        pass

    def _get_from_db(self, user_id: int) -> Optional[dict]:
        """Database fetch (for rollback)"""
        # In production: SELECT * FROM users WHERE id = user_id
        return {"id": user_id, "name": "John Doe"}

2.3 Write-Behind (Write-Back) Cache

sequenceDiagram participant App participant Cache participant Queue participant Worker participant DB Note over App,DB: Write-Behind: Async DB writes App->>Cache: SETEX user:123 Cache-->>App: OK (fast!) App->>Queue: Enqueue write App-->>App: Return immediately Note over Worker,DB: Background (async) loop Batch Processing Worker->>Queue: Dequeue batch Worker->>DB: Batch INSERT/UPDATE DB-->>Worker: OK end
Risk: Data loss if cache fails before background write completes. Use for non-critical data or with persistence.
import queue
import threading
import time

class WriteBehindCache:
    """
    Write-Behind (Write-Back) Cache Pattern

    Writes go to cache first, database updated asynchronously

    Pros:
    + Very fast writes (cache only)
    + Batch database writes (efficiency)
    + Reduces database load

    Cons:
    - Data loss risk (cache failure before DB write)
    - Complex implementation
    - Eventual consistency
    """

    def __init__(self, redis_client: redis.Redis, batch_size: int = 100):
        self.cache = redis_client
        self.write_queue = queue.Queue()
        self.batch_size = batch_size

        # Start background worker
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()

    def update_user(self, user_id: int, updates: dict):
        """
        Write to cache immediately, queue DB write
        """
        # 1. Update cache (fast)
        cache_key = f"user:{user_id}"
        user_data = {"id": user_id, **updates}
        self.cache.setex(cache_key, 3600, json.dumps(user_data))

        # 2. Queue database write (asynchronous)
        self.write_queue.put(('update', user_id, updates))

    def _worker(self):
        """
        Background worker processes write queue
        """
        batch = []

        while True:
            try:
                # Collect batch of writes
                while len(batch) < self.batch_size:
                    try:
                        item = self.write_queue.get(timeout=1)
                        batch.append(item)
                    except queue.Empty:
                        break

                # Process batch
                if batch:
                    self._flush_batch(batch)
                    batch = []

            except Exception as e:
                print(f"Write-behind error: {e}")

    def _flush_batch(self, batch: list):
        """
        Write batch to database
        """
        # Batch database operations for efficiency
        # UPDATE users SET ... WHERE id IN (...)
        print(f"Flushing {len(batch)} writes to database")

        for operation, user_id, updates in batch:
            if operation == 'update':
                self._update_db(user_id, updates)

    def _update_db(self, user_id: int, updates: dict):
        """Database update"""
        # Actual database write
        pass

2.4 Refresh-Ahead Cache

sequenceDiagram participant App participant Cache participant BG as Background Thread participant DB Note over App,DB: Refresh-Ahead: Proactive refresh before expiry App->>Cache: GET user:123 Cache-->>App: ✓ Found (TTL: 20% remaining) Note over App: Check: TTL < threshold? App->>BG: Trigger async refresh par Return immediately App-->>App: Return cached data and Background refresh BG->>DB: SELECT * FROM users DB-->>BG: Fresh data BG->>Cache: SETEX user:123 (reset TTL) end Note over Cache: Cache refreshed before expiry!
import time
from datetime import datetime, timedelta

class RefreshAheadCache:
    """
    Refresh-Ahead Cache Pattern

    Proactively refresh hot data before expiration

    Pros:
    + Always fresh data for popular items
    + No cache miss penalty for hot data
    + Predictable latency

    Cons:
    - Wasted refreshes for cold data
    - Complex to implement
    - Needs accurate prediction of hot data
    """

    def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
        self.cache = redis_client
        self.ttl = ttl
        self.refresh_threshold = 0.8  # Refresh when 80% of TTL passed

    def get_user(self, user_id: int) -> Optional[dict]:
        """
        Get with automatic refresh-ahead
        """
        cache_key = f"user:{user_id}"

        cached = self.cache.get(cache_key)
        if cached:
            # Check if nearing expiration
            ttl_remaining = self.cache.ttl(cache_key)

            if ttl_remaining < (self.ttl * (1 - self.refresh_threshold)):
                # Refresh in background (async)
                threading.Thread(
                    target=self._refresh_cache,
                    args=(user_id,)
                ).start()

            return json.loads(cached)

        # Cache miss - load and cache
        user = self._fetch_from_db(user_id)
        if user:
            self.cache.setex(cache_key, self.ttl, json.dumps(user))

        return user

    def _refresh_cache(self, user_id: int):
        """
        Background refresh of cache
        """
        print(f"Refreshing cache for user:{user_id}")

        user = self._fetch_from_db(user_id)
        if user:
            cache_key = f"user:{user_id}"
            self.cache.setex(cache_key, self.ttl, json.dumps(user))

    def _fetch_from_db(self, user_id: int) -> Optional[dict]:
        """Database fetch"""
        return {"id": user_id, "name": "John Doe"}

3. Cache Invalidation Strategies

Strategy How It Works Pros Cons
TTL (Time-To-Live) Cache expires after X seconds Simple, automatic Stale data possible, cache stampede
Event-Based Invalidate on write events Always fresh Complex, distributed coordination
Tag-Based Group caches by tags, invalidate tag Bulk invalidation Complex tag management
Versioning Include version in cache key No invalidation needed Memory waste (multiple versions)

3.1 TTL-Based Expiration

class TTLCache:
    """
    TTL (Time-To-Live) based cache expiration

    Simplest invalidation strategy
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client

    def set_with_ttl(self, key: str, value: Any, ttl: int):
        """
        Set cache with TTL

        TTL selection guide:
        - Static data (configs): 1 hour - 1 day
        - User profiles: 5-15 minutes
        - API responses: 1-5 minutes
        - Real-time data: 10-60 seconds
        """
        self.cache.setex(key, ttl, json.dumps(value))

    def set_with_adaptive_ttl(self, key: str, value: Any, access_count: int):
        """
        Adaptive TTL based on access patterns

        Hot data → longer TTL
        Cold data → shorter TTL
        """
        # More accesses = longer TTL
        if access_count > 1000:
            ttl = 3600  # 1 hour
        elif access_count > 100:
            ttl = 1800  # 30 minutes
        else:
            ttl = 300   # 5 minutes

        self.cache.setex(key, ttl, json.dumps(value))

3.2 Cache Stampede Prevention

Cache Stampede: When cache expires, multiple requests simultaneously hit the database to regenerate the same data.

The Problem: Stampede

sequenceDiagram participant R1 as Request 1 participant R2 as Request 2 participant R3 as Request 3 participant Cache participant DB Note over Cache: Cache EXPIRED! R1->>Cache: GET (miss) R2->>Cache: GET (miss) R3->>Cache: GET (miss) par All hit DB simultaneously! R1->>DB: SELECT... R2->>DB: SELECT... R3->>DB: SELECT... end Note over DB: OVERLOADED!

Solution: Lock-Based

sequenceDiagram participant R1 as Request 1 participant R2 as Request 2 participant Cache participant DB Note over Cache: Cache EXPIRED! R1->>Cache: GET (miss) R1->>Cache: SET lock:key NX Cache-->>R1: ✓ Got lock R2->>Cache: GET (miss) R2->>Cache: SET lock:key NX Cache-->>R2: ✗ Lock exists R2->>R2: Wait... R1->>DB: SELECT... DB-->>R1: Data R1->>Cache: SETEX key R1->>Cache: DEL lock:key R2->>Cache: GET Cache-->>R2: ✓ Found!
import hashlib

class CacheStampedeProtection:
    """
    Prevent cache stampede (thundering herd)

    Problem:
    - Cache expires
    - 1000 requests simultaneously hit DB
    - Database overload
    - Slow response

    Solutions:
    1. Lock-based: First request locks, others wait
    2. Probabilistic early expiration
    3. Background refresh
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client

    def get_with_lock(self, key: str, fetch_fn, ttl: int = 3600):
        """
        Solution 1: Lock-based stampede prevention

        First request gets lock and regenerates cache
        Others wait for first request to complete
        """
        # Try cache
        cached = self.cache.get(key)
        if cached:
            return json.loads(cached)

        # Cache miss - try to acquire lock
        lock_key = f"lock:{key}"
        lock = self.cache.set(lock_key, "1", nx=True, ex=10)  # 10s lock

        if lock:
            # Got lock - regenerate cache
            try:
                value = fetch_fn()
                self.cache.setex(key, ttl, json.dumps(value))
                return value
            finally:
                # Release lock
                self.cache.delete(lock_key)
        else:
            # Didn't get lock - wait and retry
            time.sleep(0.1)  # Brief wait
            cached = self.cache.get(key)
            if cached:
                return json.loads(cached)

            # Lock holder failed - try again
            return self.get_with_lock(key, fetch_fn, ttl)

    def get_with_probabilistic_expiration(self, key: str, fetch_fn, ttl: int = 3600):
        """
        Solution 2: Probabilistic early expiration (XFetch)

        Randomly refresh before TTL expires
        Probability increases as expiration approaches

        P(refresh) = β * e^(-(TTL_remaining / δ))

        Where:
        - β: beta, adjusts probability
        - δ: delta, time before expiration to start refreshing
        """
        cached_data = self.cache.get(key)

        if cached_data:
            # Check TTL
            ttl_remaining = self.cache.ttl(key)

            # Calculate refresh probability
            delta = ttl * 0.2  # Start considering refresh at 20% TTL remaining
            beta = 1.0

            if ttl_remaining < delta:
                import random
                import math

                probability = beta * math.exp(-ttl_remaining / delta)

                if random.random() < probability:
                    # Probabilistically refresh
                    threading.Thread(
                        target=self._background_refresh,
                        args=(key, fetch_fn, ttl)
                    ).start()

            return json.loads(cached_data)

        # Cache miss - normal fetch
        value = fetch_fn()
        self.cache.setex(key, ttl, json.dumps(value))
        return value

    def _background_refresh(self, key: str, fetch_fn, ttl: int):
        """Refresh cache in background"""
        try:
            value = fetch_fn()
            self.cache.setex(key, ttl, json.dumps(value))
        except Exception as e:
            print(f"Background refresh failed: {e}")

3.3 Event-Based Invalidation

sequenceDiagram participant Writer as Writer Service participant DB participant PubSub as Redis Pub/Sub participant C1 as Cache Node 1 participant C2 as Cache Node 2 participant C3 as Cache Node 3 Note over Writer,C3: Event-Based: Pub/Sub Invalidation Writer->>DB: UPDATE users SET... DB-->>Writer: OK Writer->>PubSub: PUBLISH cache:invalidate {user:123} par Broadcast to all nodes PubSub-->>C1: Message PubSub-->>C2: Message PubSub-->>C3: Message end C1->>C1: DELETE user:123 C2->>C2: DELETE user:123 C3->>C3: DELETE user:123 Note over C1,C3: All caches invalidated!
class EventBasedInvalidation:
    """
    Event-based cache invalidation using Pub/Sub

    When data changes, publish invalidation event
    All cache instances subscribe and invalidate
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client
        self.pubsub = redis_client.pubsub()

        # Subscribe to invalidation events
        self.pubsub.subscribe('cache:invalidate')

        # Start listener thread
        self.listener_thread = threading.Thread(
            target=self._listen_for_invalidations,
            daemon=True
        )
        self.listener_thread.start()

    def update_user(self, user_id: int, updates: dict):
        """
        Update user and publish invalidation event
        """
        # 1. Update database
        self._update_db(user_id, updates)

        # 2. Publish invalidation event
        self.cache.publish('cache:invalidate', json.dumps({
            'type': 'user',
            'id': user_id,
            'pattern': f'user:{user_id}*'  # Invalidate all related caches
        }))

    def _listen_for_invalidations(self):
        """
        Listen for invalidation events
        """
        for message in self.pubsub.listen():
            if message['type'] == 'message':
                event = json.loads(message['data'])

                # Invalidate matching cache keys
                pattern = event['pattern']
                for key in self.cache.scan_iter(match=pattern):
                    self.cache.delete(key)
                    print(f"Invalidated cache: {key}")

    def _update_db(self, user_id: int, updates: dict):
        """Database update"""
        pass

4. Redis Caching Patterns

4.1 String Caching (Simple Values)

class RedisStringCache:
    """
    Redis string operations for caching

    Use for: Simple key-value caching
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client

    def cache_user(self, user_id: int, user_data: dict):
        """Cache user as JSON string"""
        key = f"user:{user_id}"
        self.cache.setex(key, 3600, json.dumps(user_data))

    def cache_counter(self, key: str):
        """Atomic counter (views, likes, etc.)"""
        self.cache.incr(key)

    def cache_with_multiple_keys(self, data: dict):
        """Batch set multiple keys (pipeline for efficiency)"""
        pipe = self.cache.pipeline()

        for key, value in data.items():
            pipe.setex(key, 3600, json.dumps(value))

        pipe.execute()

4.2 Hash Caching (Structured Data)

class RedisHashCache:
    """
    Redis hash operations for structured data

    Use for: Objects with multiple fields
    Benefits: Update individual fields without fetching entire object
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client

    def cache_user_as_hash(self, user_id: int, user_data: dict):
        """
        Store user as hash (more efficient for partial updates)
        """
        key = f"user:{user_id}"

        # Store entire user as hash
        self.cache.hset(key, mapping=user_data)
        self.cache.expire(key, 3600)

    def update_user_field(self, user_id: int, field: str, value: str):
        """
        Update single field without fetching entire user
        """
        key = f"user:{user_id}"
        self.cache.hset(key, field, value)

    def get_user_field(self, user_id: int, field: str) -> Optional[str]:
        """
        Get single field
        """
        key = f"user:{user_id}"
        return self.cache.hget(key, field)

    def get_user(self, user_id: int) -> dict:
        """
        Get all fields as dictionary
        """
        key = f"user:{user_id}"
        return self.cache.hgetall(key)

4.3 List/Set Caching (Collections)

class RedisCollectionCache:
    """
    Redis list and set operations

    Lists: Ordered collections, recent items, queues
    Sets: Unique values, tags, followers
    """

    def __init__(self, redis_client: redis.Redis):
        self.cache = redis_client

    def cache_recent_orders(self, user_id: int, order_id: int):
        """
        Maintain list of recent orders (FIFO, max 100)
        """
        key = f"user:{user_id}:recent_orders"

        # Add to front of list
        self.cache.lpush(key, order_id)

        # Trim to keep only 100 most recent
        self.cache.ltrim(key, 0, 99)

        # Set expiration
        self.cache.expire(key, 86400)  # 24 hours

    def get_recent_orders(self, user_id: int) -> list:
        """Get recent orders"""
        key = f"user:{user_id}:recent_orders"
        return self.cache.lrange(key, 0, -1)

    def cache_user_tags(self, user_id: int, tags: list):
        """
        Store user tags as set (unique values)
        """
        key = f"user:{user_id}:tags"

        # Add multiple tags
        self.cache.sadd(key, *tags)
        self.cache.expire(key, 3600)

    def get_common_tags(self, user_id1: int, user_id2: int) -> set:
        """
        Find common tags between users (set intersection)
        """
        key1 = f"user:{user_id1}:tags"
        key2 = f"user:{user_id2}:tags"

        return self.cache.sinter(key1, key2)

5. LRU Cache Implementation

flowchart LR subgraph "LRU Cache: Doubly Linked List + HashMap" direction TB HM[HashMap
key → Node ptr] subgraph DLL["Doubly Linked List (order)"] direction LR H[HEAD] <--> N1[key:A
MRU] <--> N2[key:B] <--> N3[key:C
LRU] <--> T[TAIL] end end HM -.-> N1 HM -.-> N2 HM -.-> N3 style H fill:#434c5e,color:#e0e0e0 style T fill:#434c5e,color:#e0e0e0 style N1 fill:#a3be8c,color:#2e3440 style N2 fill:#88c0d0,color:#2e3440 style N3 fill:#bf616a,color:#eceff4 style HM fill:#5e81ac,color:#eceff4
sequenceDiagram participant Client participant HashMap participant DLL as Doubly Linked List Note over Client,DLL: GET key:B (cache hit) Client->>HashMap: Lookup key:B HashMap-->>Client: Node pointer Client->>DLL: Remove node from current position Client->>DLL: Add node after HEAD (MRU) Note over Client,DLL: PUT key:D (new key, cache full) Client->>HashMap: Check if exists HashMap-->>Client: Not found Client->>DLL: Create new node after HEAD Client->>HashMap: Store key:D → node ptr Note over DLL: Capacity exceeded! Client->>DLL: Remove node before TAIL (LRU) Client->>HashMap: Delete evicted key
from collections import OrderedDict

class LRUCache:
    """
    LRU (Least Recently Used) Cache

    Evicts least recently used items when capacity reached

    Time Complexity:
    - Get: O(1)
    - Put: O(1)
    - Space: O(capacity)

    Implementation: OrderedDict (maintains insertion order)
    """

    def __init__(self, capacity: int):
        self.cache = OrderedDict()
        self.capacity = capacity

    def get(self, key: str) -> Optional[Any]:
        """
        Get value and mark as recently used
        """
        if key not in self.cache:
            return None

        # Move to end (most recently used)
        self.cache.move_to_end(key)
        return self.cache[key]

    def put(self, key: str, value: Any):
        """
        Put value in cache
        """
        if key in self.cache:
            # Update existing key
            self.cache.move_to_end(key)

        self.cache[key] = value

        # Evict LRU if capacity exceeded
        if len(self.cache) > self.capacity:
            # Remove first item (least recently used)
            self.cache.popitem(last=False)


# LRU with manual implementation (interview question)
class LRUCacheManual:
    """
    LRU Cache with Doubly Linked List + HashMap

    Interview favorite - implement from scratch
    """

    class Node:
        def __init__(self, key, value):
            self.key = key
            self.value = value
            self.prev = None
            self.next = None

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = {}  # key -> Node

        # Dummy head and tail for easier manipulation
        self.head = self.Node(0, 0)
        self.tail = self.Node(0, 0)
        self.head.next = self.tail
        self.tail.prev = self.head

    def get(self, key: str) -> Optional[Any]:
        if key not in self.cache:
            return None

        node = self.cache[key]

        # Move to front (most recently used)
        self._remove(node)
        self._add_to_front(node)

        return node.value

    def put(self, key: str, value: Any):
        if key in self.cache:
            # Update existing
            self._remove(self.cache[key])

        node = self.Node(key, value)
        self.cache[key] = node
        self._add_to_front(node)

        # Evict LRU if needed
        if len(self.cache) > self.capacity:
            lru = self.tail.prev
            self._remove(lru)
            del self.cache[lru.key]

    def _remove(self, node):
        """Remove node from linked list"""
        node.prev.next = node.next
        node.next.prev = node.prev

    def _add_to_front(self, node):
        """Add node right after head (most recent)"""
        node.next = self.head.next
        node.prev = self.head
        self.head.next.prev = node
        self.head.next = node

6. CDN Caching

sequenceDiagram participant User participant Edge as CDN Edge (NYC) participant Origin as Origin Server Note over User,Origin: First Request (Cache Miss) User->>Edge: GET /static/logo.png Edge->>Edge: Check cache Note over Edge: Miss! Edge->>Origin: GET /static/logo.png Origin-->>Edge: 200 OK + Cache-Control: max-age=31536000 Edge->>Edge: Store in cache Edge-->>User: 200 OK Note over User,Origin: Subsequent Requests (Cache Hit) User->>Edge: GET /static/logo.png Edge->>Edge: Check cache Note over Edge: Hit! Edge-->>User: 200 OK (from cache) Note over Origin: Origin not contacted!
flowchart TB subgraph "Cache-Control Header Flow" A[Origin Response] --> B{Cache-Control?} B -->|public, max-age=3600| C[CDN + Browser cache] B -->|private, max-age=3600| D[Browser cache only] B -->|no-cache| E[Must revalidate every time] B -->|no-store| F[Never cache] end style A fill:#5e81ac,color:#eceff4 style C fill:#a3be8c,color:#2e3440 style D fill:#88c0d0,color:#2e3440 style E fill:#ebcb8b,color:#2e3440 style F fill:#bf616a,color:#eceff4
class CDNCachingStrategy:
    """
    CDN (Content Delivery Network) caching

    Cache static assets at edge locations near users

    Cache-Control headers:
    - public: Can be cached by CDN
    - private: Only browser cache (not CDN)
    - max-age: TTL in seconds
    - s-maxage: TTL for shared caches (CDN)
    - no-cache: Must revalidate
    - no-store: Don't cache at all
    """

    @staticmethod
    def static_asset_headers():
        """
        Headers for static assets (images, CSS, JS)

        Long TTL with versioning (cache busting)
        """
        return {
            'Cache-Control': 'public, max-age=31536000, immutable',  # 1 year
            'ETag': 'hash-of-content',
            # Filename versioning: style.v123.css (cache busting)
        }

    @staticmethod
    def dynamic_content_headers():
        """
        Headers for dynamic content that changes

        Short TTL with revalidation
        """
        return {
            'Cache-Control': 'public, max-age=300, must-revalidate',  # 5 minutes
            'ETag': 'hash-of-content',
            'Vary': 'Accept-Encoding, Accept-Language'
        }

    @staticmethod
    def api_response_headers():
        """
        Headers for API responses

        Separate TTL for browser vs CDN
        """
        return {
            'Cache-Control': 'public, max-age=60, s-maxage=300',  # Browser: 1min, CDN: 5min
            'ETag': 'hash-of-response',
            'Vary': 'Authorization'  # Different responses for different users
        }

    @staticmethod
    def no_cache_headers():
        """
        Headers for sensitive/personalized content
        """
        return {
            'Cache-Control': 'private, no-cache, no-store, must-revalidate',
            'Pragma': 'no-cache',  # HTTP/1.0
            'Expires': '0'
        }

7. Interview Tips

Key Caching Concepts:
Common Interview Questions:
  1. "Design a caching layer for an e-commerce site" → Cache-aside with Redis
  2. "How do you prevent cache stampede?" → Lock-based or probabilistic expiration
  3. "Implement an LRU cache" → Doubly linked list + HashMap, O(1) operations
  4. "When would you use write-through vs write-behind?" → Consistency vs performance trade-off
  5. "How do you invalidate cache?" → TTL, event-based, versioning
  6. "What's cache coherence?" → Keeping multiple cache copies consistent
  7. "CDN caching strategy?" → Long TTL for static, short for dynamic, versioning for busting
Production Caching Checklist: