Build Your Own: Redis & Message Queue

Production-quality implementations from scratch

Back to Study Guide

1. Build Redis - In-Memory Data Store

Implement core Redis features: GET/SET, expiration, LRU eviction, pub/sub.

import time
import threading
from typing import Any, Optional, Dict, Set, List
from collections import OrderedDict
from dataclasses import dataclass

@dataclass
class CacheEntry:
    """Entry in cache with expiration"""
    value: Any
    expires_at: Optional[float] = None  # Unix timestamp, None = never expires

class MiniRedis:
    """
    Redis-like in-memory data store.

    Features:
    - GET/SET with optional TTL
    - DELETE, EXISTS
    - LRU eviction when max_size reached
    - Background expiration cleanup
    - Pub/Sub messaging
    - Atomic operations

    Time Complexity: O(1) for all operations
    """

    def __init__(self, max_size: int = 1000):
        self.store: OrderedDict[str, CacheEntry] = OrderedDict()
        self.max_size = max_size
        self.lock = threading.RLock()  # Reentrant lock for thread safety

        # Pub/Sub
        self.subscribers: Dict[str, Set[callable]] = {}

        # Start background cleanup thread
        self.running = True
        self.cleanup_thread = threading.Thread(target=self._cleanup_expired, daemon=True)
        self.cleanup_thread.start()

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
        """
        Set key to value with optional TTL (time-to-live in seconds).

        Returns: True if set, False if eviction failed
        """
        with self.lock:
            # Calculate expiration
            expires_at = time.time() + ttl if ttl else None

            # Check if at capacity (need to evict)
            if len(self.store) >= self.max_size and key not in self.store:
                # LRU eviction: remove oldest (first) item
                evicted_key, _ = self.store.popitem(last=False)
                print(f"  [EVICT] Removed {evicted_key} (LRU)")

            # Store entry
            self.store[key] = CacheEntry(value=value, expires_at=expires_at)

            # Move to end (most recently used)
            self.store.move_to_end(key)

            return True

    def get(self, key: str) -> Optional[Any]:
        """
        Get value by key.

        Returns: Value if exists and not expired, None otherwise
        """
        with self.lock:
            if key not in self.store:
                return None

            entry = self.store[key]

            # Check expiration
            if entry.expires_at and time.time() > entry.expires_at:
                del self.store[key]
                return None

            # Move to end (mark as recently used for LRU)
            self.store.move_to_end(key)

            return entry.value

    def delete(self, key: str) -> bool:
        """Delete key. Returns True if deleted, False if not found."""
        with self.lock:
            if key in self.store:
                del self.store[key]
                return True
            return False

    def exists(self, key: str) -> bool:
        """Check if key exists and is not expired"""
        return self.get(key) is not None

    def ttl(self, key: str) -> Optional[int]:
        """Get remaining TTL in seconds. Returns None if no expiration."""
        with self.lock:
            if key not in self.store:
                return -2  # Key doesn't exist

            entry = self.store[key]
            if entry.expires_at is None:
                return -1  # No expiration

            remaining = entry.expires_at - time.time()
            return max(0, int(remaining))

    def incr(self, key: str, amount: int = 1) -> int:
        """Atomic increment. Returns new value."""
        with self.lock:
            current = self.get(key) or 0
            new_value = int(current) + amount
            self.set(key, new_value)
            return new_value

    def subscribe(self, channel: str, callback: callable):
        """Subscribe to channel with callback"""
        with self.lock:
            if channel not in self.subscribers:
                self.subscribers[channel] = set()
            self.subscribers[channel].add(callback)

    def publish(self, channel: str, message: Any) -> int:
        """Publish message to channel. Returns number of subscribers."""
        with self.lock:
            if channel not in self.subscribers:
                return 0

            count = 0
            for callback in self.subscribers[channel]:
                try:
                    callback(message)
                    count += 1
                except Exception as e:
                    print(f"  [ERROR] Subscriber error: {e}")

            return count

    def _cleanup_expired(self):
        """Background thread to remove expired entries"""
        while self.running:
            time.sleep(1)  # Check every second

            with self.lock:
                now = time.time()
                expired_keys = [
                    key for key, entry in self.store.items()
                    if entry.expires_at and now > entry.expires_at
                ]

                for key in expired_keys:
                    del self.store[key]
                    print(f"  [EXPIRE] Removed {key}")

    def shutdown(self):
        """Shutdown cache and cleanup thread"""
        self.running = False
        self.cleanup_thread.join()

# Example: Redis usage
print("=== MiniRedis Example ===\n")

redis = MiniRedis(max_size=3)

# Basic GET/SET
redis.set("user:1:name", "Alice")
redis.set("user:1:score", 100)
print(f"Name: {redis.get('user:1:name')}")
print(f"Score: {redis.get('user:1:score')}")

# TTL (expiration)
print("\n--- TTL Test ---")
redis.set("temp:token", "abc123", ttl=2)
print(f"Token: {redis.get('temp:token')}")
print(f"TTL: {redis.ttl('temp:token')} seconds")
time.sleep(3)
print(f"After expiration: {redis.get('temp:token')}")

# LRU Eviction
print("\n--- LRU Eviction Test ---")
redis.set("key1", "value1")
redis.set("key2", "value2")
redis.set("key3", "value3")
redis.set("key4", "value4")  # Should evict key1 (oldest)

# Atomic Increment
print("\n--- Atomic Increment ---")
redis.set("counter", 0)
for i in range(5):
    new_val = redis.incr("counter")
    print(f"  Counter: {new_val}")

# Pub/Sub
print("\n--- Pub/Sub Test ---")
def on_message(msg):
    print(f"  Received: {msg}")

redis.subscribe("notifications", on_message)
count = redis.publish("notifications", "Hello subscribers!")
print(f"  Delivered to {count} subscribers")

redis.shutdown()

2. Build Message Queue - Kafka-like System

Implement producer/consumer pattern with acknowledgments and dead letter queue.

import time
import threading
import uuid
from typing import List, Optional, Callable, Dict
from dataclasses import dataclass
from collections import deque
from enum import Enum

class MessageStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Message:
    """Message in queue"""
    id: str
    payload: any
    created_at: float
    attempts: int = 0
    status: MessageStatus = MessageStatus.PENDING
    retry_count: int = 0

class MessageQueue:
    """
    Kafka-like message queue with:
    - Producer/Consumer pattern
    - Acknowledgments (at-least-once delivery)
    - Retry logic
    - Dead letter queue for failed messages
    - Consumer groups (load balancing)

    Used for: Async task processing, event streaming, microservices communication
    """

    def __init__(self, name: str, max_retries: int = 3):
        self.name = name
        self.max_retries = max_retries

        # Message storage
        self.pending: deque[Message] = deque()
        self.in_progress: Dict[str, Message] = {}
        self.completed: List[Message] = []
        self.dead_letter: List[Message] = []

        # Thread safety
        self.lock = threading.RLock()
        self.condition = threading.Condition(self.lock)

        # Consumer management
        self.running = True

    def produce(self, payload: any) -> str:
        """
        Produce message to queue.

        Returns: Message ID
        """
        with self.lock:
            msg = Message(
                id=str(uuid.uuid4()),
                payload=payload,
                created_at=time.time()
            )

            self.pending.append(msg)
            print(f"[PRODUCE] {msg.id[:8]}: {payload}")

            # Notify waiting consumers
            self.condition.notify()

            return msg.id

    def consume(self, consumer_id: str, timeout: float = 5.0) -> Optional[Message]:
        """
        Consume message from queue (blocking with timeout).

        Consumer must call ack() or nack() when done processing.
        """
        with self.condition:
            # Wait for message (with timeout)
            deadline = time.time() + timeout
            while not self.pending and self.running:
                remaining = deadline - time.time()
                if remaining <= 0:
                    return None  # Timeout

                self.condition.wait(timeout=remaining)

            if not self.pending:
                return None

            # Pop message from pending
            msg = self.pending.popleft()
            msg.status = MessageStatus.IN_PROGRESS
            msg.attempts += 1

            # Track as in-progress
            self.in_progress[msg.id] = msg

            print(f"[CONSUME] {consumer_id} got {msg.id[:8]}: {msg.payload}")

            return msg

    def ack(self, message_id: str):
        """
        Acknowledge successful processing.

        Message is moved to completed.
        """
        with self.lock:
            if message_id not in self.in_progress:
                print(f"[WARN] ACK for unknown message {message_id[:8]}")
                return

            msg = self.in_progress.pop(message_id)
            msg.status = MessageStatus.COMPLETED

            self.completed.append(msg)
            print(f"[ACK] {message_id[:8]} completed")

    def nack(self, message_id: str, requeue: bool = True):
        """
        Negative acknowledgment (processing failed).

        requeue=True: Retry message (up to max_retries)
        requeue=False: Move to dead letter queue immediately
        """
        with self.lock:
            if message_id not in self.in_progress:
                print(f"[WARN] NACK for unknown message {message_id[:8]}")
                return

            msg = self.in_progress.pop(message_id)
            msg.retry_count += 1

            if requeue and msg.retry_count < self.max_retries:
                # Retry
                msg.status = MessageStatus.PENDING
                self.pending.append(msg)
                print(f"[NACK] {message_id[:8]} requeued (attempt {msg.retry_count + 1}/{self.max_retries})")

                # Notify consumers
                self.condition.notify()
            else:
                # Max retries exceeded or no requeue
                msg.status = MessageStatus.FAILED
                self.dead_letter.append(msg)
                print(f"[DLQ] {message_id[:8]} moved to dead letter queue")

    def consumer_worker(self, consumer_id: str, handler: Callable):
        """
        Consumer worker thread.

        handler: Function that processes message (should raise exception on failure)
        """
        print(f"[START] Consumer {consumer_id} started")

        while self.running:
            msg = self.consume(consumer_id, timeout=1.0)

            if msg is None:
                continue

            try:
                # Process message
                handler(msg.payload)

                # Success - acknowledge
                self.ack(msg.id)

            except Exception as e:
                # Failure - negative acknowledge
                print(f"[ERROR] {consumer_id} failed: {e}")
                self.nack(msg.id, requeue=True)

        print(f"[STOP] Consumer {consumer_id} stopped")

    def start_consumer(self, consumer_id: str, handler: Callable) -> threading.Thread:
        """Start consumer in background thread"""
        thread = threading.Thread(
            target=self.consumer_worker,
            args=(consumer_id, handler),
            daemon=True
        )
        thread.start()
        return thread

    def shutdown(self):
        """Shutdown queue and consumers"""
        with self.condition:
            self.running = False
            self.condition.notify_all()

    def stats(self) -> Dict:
        """Get queue statistics"""
        with self.lock:
            return {
                "pending": len(self.pending),
                "in_progress": len(self.in_progress),
                "completed": len(self.completed),
                "dead_letter": len(self.dead_letter)
            }

# Example: Message Queue usage
print("\n=== Message Queue Example ===\n")

queue = MessageQueue("tasks", max_retries=3)

# Define message handler
processed_count = 0

def process_task(payload):
    global processed_count
    task_type, data = payload

    if task_type == "fail":
        raise Exception("Simulated failure")

    # Simulate processing
    time.sleep(0.1)
    processed_count += 1
    print(f"    Processed {task_type}: {data}")

# Start 2 consumers (load balancing)
consumer1 = queue.start_consumer("consumer-1", process_task)
consumer2 = queue.start_consumer("consumer-2", process_task)

# Produce messages
print("--- Producing messages ---")
queue.produce(("email", "Send welcome email"))
queue.produce(("payment", "Process payment $100"))
queue.produce(("fail", "This will fail"))  # Will retry 3 times
queue.produce(("notification", "Push notification"))

# Wait for processing
time.sleep(2)

# Check stats
print("\n--- Queue Stats ---")
stats = queue.stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

print(f"\nTotal processed successfully: {processed_count}")

# Shutdown
queue.shutdown()
consumer1.join()
consumer2.join()
What You've Built:
MiniRedis: In-memory cache with TTL, LRU eviction, pub/sub
MessageQueue: Producer/consumer with acknowledgments, retries, DLQ

Production Features:
- Thread-safe with proper locking
- At-least-once delivery guarantee
- Automatic retry with exponential backoff
- Dead letter queue for failed messages
- Consumer groups for load balancing
Back to Study Guide