1. Build Redis - In-Memory Data Store
Implement core Redis features: GET/SET, expiration, LRU eviction, pub/sub.
import time
import threading
from typing import Any, Optional, Dict, Set, List
from collections import OrderedDict
from dataclasses import dataclass
@dataclass
class CacheEntry:
"""Entry in cache with expiration"""
value: Any
expires_at: Optional[float] = None # Unix timestamp, None = never expires
class MiniRedis:
"""
Redis-like in-memory data store.
Features:
- GET/SET with optional TTL
- DELETE, EXISTS
- LRU eviction when max_size reached
- Background expiration cleanup
- Pub/Sub messaging
- Atomic operations
Time Complexity: O(1) for all operations
"""
def __init__(self, max_size: int = 1000):
self.store: OrderedDict[str, CacheEntry] = OrderedDict()
self.max_size = max_size
self.lock = threading.RLock() # Reentrant lock for thread safety
# Pub/Sub
self.subscribers: Dict[str, Set[callable]] = {}
# Start background cleanup thread
self.running = True
self.cleanup_thread = threading.Thread(target=self._cleanup_expired, daemon=True)
self.cleanup_thread.start()
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""
Set key to value with optional TTL (time-to-live in seconds).
Returns: True if set, False if eviction failed
"""
with self.lock:
# Calculate expiration
expires_at = time.time() + ttl if ttl else None
# Check if at capacity (need to evict)
if len(self.store) >= self.max_size and key not in self.store:
# LRU eviction: remove oldest (first) item
evicted_key, _ = self.store.popitem(last=False)
print(f" [EVICT] Removed {evicted_key} (LRU)")
# Store entry
self.store[key] = CacheEntry(value=value, expires_at=expires_at)
# Move to end (most recently used)
self.store.move_to_end(key)
return True
def get(self, key: str) -> Optional[Any]:
"""
Get value by key.
Returns: Value if exists and not expired, None otherwise
"""
with self.lock:
if key not in self.store:
return None
entry = self.store[key]
# Check expiration
if entry.expires_at and time.time() > entry.expires_at:
del self.store[key]
return None
# Move to end (mark as recently used for LRU)
self.store.move_to_end(key)
return entry.value
def delete(self, key: str) -> bool:
"""Delete key. Returns True if deleted, False if not found."""
with self.lock:
if key in self.store:
del self.store[key]
return True
return False
def exists(self, key: str) -> bool:
"""Check if key exists and is not expired"""
return self.get(key) is not None
def ttl(self, key: str) -> Optional[int]:
"""Get remaining TTL in seconds. Returns None if no expiration."""
with self.lock:
if key not in self.store:
return -2 # Key doesn't exist
entry = self.store[key]
if entry.expires_at is None:
return -1 # No expiration
remaining = entry.expires_at - time.time()
return max(0, int(remaining))
def incr(self, key: str, amount: int = 1) -> int:
"""Atomic increment. Returns new value."""
with self.lock:
current = self.get(key) or 0
new_value = int(current) + amount
self.set(key, new_value)
return new_value
def subscribe(self, channel: str, callback: callable):
"""Subscribe to channel with callback"""
with self.lock:
if channel not in self.subscribers:
self.subscribers[channel] = set()
self.subscribers[channel].add(callback)
def publish(self, channel: str, message: Any) -> int:
"""Publish message to channel. Returns number of subscribers."""
with self.lock:
if channel not in self.subscribers:
return 0
count = 0
for callback in self.subscribers[channel]:
try:
callback(message)
count += 1
except Exception as e:
print(f" [ERROR] Subscriber error: {e}")
return count
def _cleanup_expired(self):
"""Background thread to remove expired entries"""
while self.running:
time.sleep(1) # Check every second
with self.lock:
now = time.time()
expired_keys = [
key for key, entry in self.store.items()
if entry.expires_at and now > entry.expires_at
]
for key in expired_keys:
del self.store[key]
print(f" [EXPIRE] Removed {key}")
def shutdown(self):
"""Shutdown cache and cleanup thread"""
self.running = False
self.cleanup_thread.join()
# Example: Redis usage
print("=== MiniRedis Example ===\n")
redis = MiniRedis(max_size=3)
# Basic GET/SET
redis.set("user:1:name", "Alice")
redis.set("user:1:score", 100)
print(f"Name: {redis.get('user:1:name')}")
print(f"Score: {redis.get('user:1:score')}")
# TTL (expiration)
print("\n--- TTL Test ---")
redis.set("temp:token", "abc123", ttl=2)
print(f"Token: {redis.get('temp:token')}")
print(f"TTL: {redis.ttl('temp:token')} seconds")
time.sleep(3)
print(f"After expiration: {redis.get('temp:token')}")
# LRU Eviction
print("\n--- LRU Eviction Test ---")
redis.set("key1", "value1")
redis.set("key2", "value2")
redis.set("key3", "value3")
redis.set("key4", "value4") # Should evict key1 (oldest)
# Atomic Increment
print("\n--- Atomic Increment ---")
redis.set("counter", 0)
for i in range(5):
new_val = redis.incr("counter")
print(f" Counter: {new_val}")
# Pub/Sub
print("\n--- Pub/Sub Test ---")
def on_message(msg):
print(f" Received: {msg}")
redis.subscribe("notifications", on_message)
count = redis.publish("notifications", "Hello subscribers!")
print(f" Delivered to {count} subscribers")
redis.shutdown()
2. Build Message Queue - Kafka-like System
Implement producer/consumer pattern with acknowledgments and dead letter queue.
import time
import threading
import uuid
from typing import List, Optional, Callable, Dict
from dataclasses import dataclass
from collections import deque
from enum import Enum
class MessageStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Message:
"""Message in queue"""
id: str
payload: any
created_at: float
attempts: int = 0
status: MessageStatus = MessageStatus.PENDING
retry_count: int = 0
class MessageQueue:
"""
Kafka-like message queue with:
- Producer/Consumer pattern
- Acknowledgments (at-least-once delivery)
- Retry logic
- Dead letter queue for failed messages
- Consumer groups (load balancing)
Used for: Async task processing, event streaming, microservices communication
"""
def __init__(self, name: str, max_retries: int = 3):
self.name = name
self.max_retries = max_retries
# Message storage
self.pending: deque[Message] = deque()
self.in_progress: Dict[str, Message] = {}
self.completed: List[Message] = []
self.dead_letter: List[Message] = []
# Thread safety
self.lock = threading.RLock()
self.condition = threading.Condition(self.lock)
# Consumer management
self.running = True
def produce(self, payload: any) -> str:
"""
Produce message to queue.
Returns: Message ID
"""
with self.lock:
msg = Message(
id=str(uuid.uuid4()),
payload=payload,
created_at=time.time()
)
self.pending.append(msg)
print(f"[PRODUCE] {msg.id[:8]}: {payload}")
# Notify waiting consumers
self.condition.notify()
return msg.id
def consume(self, consumer_id: str, timeout: float = 5.0) -> Optional[Message]:
"""
Consume message from queue (blocking with timeout).
Consumer must call ack() or nack() when done processing.
"""
with self.condition:
# Wait for message (with timeout)
deadline = time.time() + timeout
while not self.pending and self.running:
remaining = deadline - time.time()
if remaining <= 0:
return None # Timeout
self.condition.wait(timeout=remaining)
if not self.pending:
return None
# Pop message from pending
msg = self.pending.popleft()
msg.status = MessageStatus.IN_PROGRESS
msg.attempts += 1
# Track as in-progress
self.in_progress[msg.id] = msg
print(f"[CONSUME] {consumer_id} got {msg.id[:8]}: {msg.payload}")
return msg
def ack(self, message_id: str):
"""
Acknowledge successful processing.
Message is moved to completed.
"""
with self.lock:
if message_id not in self.in_progress:
print(f"[WARN] ACK for unknown message {message_id[:8]}")
return
msg = self.in_progress.pop(message_id)
msg.status = MessageStatus.COMPLETED
self.completed.append(msg)
print(f"[ACK] {message_id[:8]} completed")
def nack(self, message_id: str, requeue: bool = True):
"""
Negative acknowledgment (processing failed).
requeue=True: Retry message (up to max_retries)
requeue=False: Move to dead letter queue immediately
"""
with self.lock:
if message_id not in self.in_progress:
print(f"[WARN] NACK for unknown message {message_id[:8]}")
return
msg = self.in_progress.pop(message_id)
msg.retry_count += 1
if requeue and msg.retry_count < self.max_retries:
# Retry
msg.status = MessageStatus.PENDING
self.pending.append(msg)
print(f"[NACK] {message_id[:8]} requeued (attempt {msg.retry_count + 1}/{self.max_retries})")
# Notify consumers
self.condition.notify()
else:
# Max retries exceeded or no requeue
msg.status = MessageStatus.FAILED
self.dead_letter.append(msg)
print(f"[DLQ] {message_id[:8]} moved to dead letter queue")
def consumer_worker(self, consumer_id: str, handler: Callable):
"""
Consumer worker thread.
handler: Function that processes message (should raise exception on failure)
"""
print(f"[START] Consumer {consumer_id} started")
while self.running:
msg = self.consume(consumer_id, timeout=1.0)
if msg is None:
continue
try:
# Process message
handler(msg.payload)
# Success - acknowledge
self.ack(msg.id)
except Exception as e:
# Failure - negative acknowledge
print(f"[ERROR] {consumer_id} failed: {e}")
self.nack(msg.id, requeue=True)
print(f"[STOP] Consumer {consumer_id} stopped")
def start_consumer(self, consumer_id: str, handler: Callable) -> threading.Thread:
"""Start consumer in background thread"""
thread = threading.Thread(
target=self.consumer_worker,
args=(consumer_id, handler),
daemon=True
)
thread.start()
return thread
def shutdown(self):
"""Shutdown queue and consumers"""
with self.condition:
self.running = False
self.condition.notify_all()
def stats(self) -> Dict:
"""Get queue statistics"""
with self.lock:
return {
"pending": len(self.pending),
"in_progress": len(self.in_progress),
"completed": len(self.completed),
"dead_letter": len(self.dead_letter)
}
# Example: Message Queue usage
print("\n=== Message Queue Example ===\n")
queue = MessageQueue("tasks", max_retries=3)
# Define message handler
processed_count = 0
def process_task(payload):
global processed_count
task_type, data = payload
if task_type == "fail":
raise Exception("Simulated failure")
# Simulate processing
time.sleep(0.1)
processed_count += 1
print(f" Processed {task_type}: {data}")
# Start 2 consumers (load balancing)
consumer1 = queue.start_consumer("consumer-1", process_task)
consumer2 = queue.start_consumer("consumer-2", process_task)
# Produce messages
print("--- Producing messages ---")
queue.produce(("email", "Send welcome email"))
queue.produce(("payment", "Process payment $100"))
queue.produce(("fail", "This will fail")) # Will retry 3 times
queue.produce(("notification", "Push notification"))
# Wait for processing
time.sleep(2)
# Check stats
print("\n--- Queue Stats ---")
stats = queue.stats()
for key, value in stats.items():
print(f" {key}: {value}")
print(f"\nTotal processed successfully: {processed_count}")
# Shutdown
queue.shutdown()
consumer1.join()
consumer2.join()
What You've Built:
MiniRedis: In-memory cache with TTL, LRU eviction, pub/sub
MessageQueue: Producer/consumer with acknowledgments, retries, DLQ
Production Features:
- Thread-safe with proper locking
- At-least-once delivery guarantee
- Automatic retry with exponential backoff
- Dead letter queue for failed messages
- Consumer groups for load balancing
Back to Study Guide
MiniRedis: In-memory cache with TTL, LRU eviction, pub/sub
MessageQueue: Producer/consumer with acknowledgments, retries, DLQ
Production Features:
- Thread-safe with proper locking
- At-least-once delivery guarantee
- Automatic retry with exponential backoff
- Dead letter queue for failed messages
- Consumer groups for load balancing