Distributed systems introduce unique challenges that don't exist in single-machine systems. Understanding these classic problems and their solutions is essential for senior engineers.
Interview Tip: When designing distributed systems, proactively mention these problems and how you'd address them.
Definition: Multiple processes/threads wake up simultaneously to handle an event, but only one can proceed. All others waste CPU cycles.
Common Scenario: Cache stampede - cached item expires, thousands of requests simultaneously hit the database to regenerate it.
import redis
import time
import random
import threading
from functools import wraps
r = redis.Redis(decode_responses=True)
# Solution 1: Distributed Lock (only one regenerates)
def cache_with_lock(key, ttl=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Check cache
cached = r.get(key)
if cached:
return cached
# Acquire lock to prevent stampede
lock_key = f"lock:{key}"
lock = r.set(lock_key, "1", nx=True, ex=10) # 10-sec lock
if lock:
# This thread regenerates
try:
value = func(*args, **kwargs)
r.setex(key, ttl, value)
return value
finally:
r.delete(lock_key)
else:
# Wait and retry (other thread is regenerating)
time.sleep(0.1)
return wrapper(*args, **kwargs) # Retry
return wrapper
return decorator
@cache_with_lock("expensive_data", ttl=300)
def get_expensive_data():
print(f"[{threading.current_thread().name}] Computing expensive operation...")
time.sleep(2) # Simulate expensive DB query
return "computed_value"
# Solution 2: Probabilistic Early Expiration (XFetch)
def cache_with_xfetch(key, ttl=300, beta=1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get value and timestamp
cached_data = r.get(key)
cache_time_key = f"{key}:time"
cache_time = r.get(cache_time_key)
# Probabilistic early expiration
if cached_data and cache_time:
delta = time.time() - float(cache_time)
# Probability increases as expiry approaches
if delta * beta * random.random() < ttl:
return cached_data
# Regenerate
value = func(*args, **kwargs)
r.setex(key, ttl, value)
r.setex(cache_time_key, ttl, str(time.time()))
return value
return wrapper
return decorator
# Solution 3: Stale-While-Revalidate
class StaleWhileRevalidate:
def __init__(self, redis_client, stale_ttl=3600):
self.redis = redis_client
self.stale_ttl = stale_ttl
def get_or_compute(self, key, compute_fn, fresh_ttl=300):
# Try fresh cache
fresh_key = f"{key}:fresh"
fresh = self.redis.get(fresh_key)
if fresh:
return fresh
# Try stale cache
stale_key = f"{key}:stale"
stale = self.redis.get(stale_key)
# Async revalidation (in background)
def revalidate():
value = compute_fn()
self.redis.setex(fresh_key, fresh_ttl, value)
self.redis.setex(stale_key, self.stale_ttl, value)
if stale:
# Return stale immediately, refresh in background
threading.Thread(target=revalidate, daemon=True).start()
return stale
else:
# No cache at all - compute synchronously
value = compute_fn()
self.redis.setex(fresh_key, fresh_ttl, value)
self.redis.setex(stale_key, self.stale_ttl, value)
return value
# Usage
swr = StaleWhileRevalidate(r)
value = swr.get_or_compute("user:profile:123", lambda: fetch_from_db())
Definition: Achieving consensus in a distributed system where some participants may be faulty or malicious (Byzantine faults).
Scenario: Multiple generals must agree on attack/retreat, but some may be traitors sending conflicting messages.
Key Insight: Byzantine Fault Tolerance requires 3f+1 nodes to tolerate f malicious actors
from dataclasses import dataclass
from typing import List, Dict
from collections import Counter
@dataclass
class Message:
sender: str
value: str
signatures: List[str] # Chain of signatures
class ByzantineNode:
"""Simplified Byzantine Fault Tolerant Node"""
def __init__(self, node_id: str, is_byzantine: bool = False):
self.node_id = node_id
self.is_byzantine = is_byzantine
self.received_messages: List[Message] = []
def broadcast(self, value: str, nodes: List['ByzantineNode']):
"""Send value to all other nodes"""
if self.is_byzantine:
# Byzantine node sends conflicting messages!
for i, node in enumerate(nodes):
if node != self:
# Send different values to different nodes
conflicting_value = "ATTACK" if i % 2 == 0 else "RETREAT"
msg = Message(self.node_id, conflicting_value, [self.node_id])
node.receive(msg)
else:
# Honest node sends same value to all
for node in nodes:
if node != self:
msg = Message(self.node_id, value, [self.node_id])
node.receive(msg)
def receive(self, message: Message):
"""Receive message from another node"""
self.received_messages.append(message)
def decide(self, total_nodes: int) -> str:
"""Decide based on majority (requires > 2/3 agreement)"""
if not self.received_messages:
return "UNKNOWN"
# Count votes
votes = Counter(msg.value for msg in self.received_messages)
# Need 2/3 majority for BFT (can tolerate 1/3 Byzantine)
required = (2 * total_nodes) // 3
for value, count in votes.most_common():
if count >= required:
return value
return "NO_CONSENSUS"
# Simulation
def simulate_byzantine_fault_tolerance():
print("=" * 60)
print("BYZANTINE GENERALS PROBLEM SIMULATION")
print("=" * 60)
# Create 7 nodes (can tolerate 2 Byzantine with 3f + 1 = 7)
nodes = [
ByzantineNode("General_A", is_byzantine=False), # Commander
ByzantineNode("General_B", is_byzantine=False),
ByzantineNode("General_C", is_byzantine=True), # Traitor!
ByzantineNode("General_D", is_byzantine=False),
ByzantineNode("General_E", is_byzantine=False),
ByzantineNode("General_F", is_byzantine=False),
ByzantineNode("General_G", is_byzantine=False),
]
# Commander broadcasts order
print("\nCommander (General_A) orders: ATTACK")
nodes[0].broadcast("ATTACK", nodes)
# Byzantine general sends conflicting messages
print("General_C (Traitor) sends conflicting messages!")
nodes[2].broadcast("ATTACK", nodes) # Will send different to different nodes
# Each node decides
print("\nDecisions:")
for node in nodes:
if not node.is_byzantine:
decision = node.decide(len(nodes))
print(f"{node.node_id}: {decision} (received {len(node.received_messages)} msgs)")
print("\nResult: Honest nodes reach consensus despite traitor!")
simulate_byzantine_fault_tolerance()
Definition: Network partition causes cluster to split into multiple groups, each thinking they're the primary. Results in conflicting writes.
Scenario: Active-active databases with network partition - both sides accept writes, causing data divergence.
Problem: Both partitions accept writes, causing irreconcilable data divergence
from dataclasses import dataclass
from typing import List, Set
import time
@dataclass
class Node:
node_id: str
is_leader: bool = False
term: int = 0 # Election term (like Raft)
reachable_nodes: Set[str] = None
def __post_init__(self):
if self.reachable_nodes is None:
self.reachable_nodes = {self.node_id}
class QuorumBasedCluster:
"""Prevents split brain using quorum"""
def __init__(self, total_nodes: int):
self.total_nodes = total_nodes
self.majority = (total_nodes // 2) + 1
def can_be_leader(self, node: Node) -> bool:
"""Node can only be leader if it can reach majority"""
return len(node.reachable_nodes) >= self.majority
def simulate_partition(self):
"""Simulate network partition"""
print("=" * 60)
print("SPLIT BRAIN PREVENTION - QUORUM SIMULATION")
print("=" * 60)
# 5-node cluster
nodes = {
'A': Node('A', is_leader=True, reachable_nodes={'A', 'B', 'C', 'D', 'E'}),
'B': Node('B', reachable_nodes={'A', 'B', 'C', 'D', 'E'}),
'C': Node('C', reachable_nodes={'A', 'B', 'C', 'D', 'E'}),
'D': Node('D', reachable_nodes={'A', 'B', 'C', 'D', 'E'}),
'E': Node('E', reachable_nodes={'A', 'B', 'C', 'D', 'E'}),
}
print(f"\nInitial state: Node A is leader")
print(f"Majority required: {self.majority}/{self.total_nodes}")
# Simulate partition: {A, B} | {C, D, E}
print("\nNetwork partition occurs!")
print("Partition 1: A, B")
print("Partition 2: C, D, E")
# Update reachability
nodes['A'].reachable_nodes = {'A', 'B'}
nodes['B'].reachable_nodes = {'A', 'B'}
nodes['C'].reachable_nodes = {'C', 'D', 'E'}
nodes['D'].reachable_nodes = {'C', 'D', 'E'}
nodes['E'].reachable_nodes = {'C', 'D', 'E'}
# Check quorum
print("\n--- Checking Quorum ---")
for node_id, node in nodes.items():
can_lead = self.can_be_leader(node)
reachable_count = len(node.reachable_nodes)
print(f"Node {node_id}: Reaches {reachable_count} nodes - "
f"Can be leader: {can_lead}")
# Node A loses leadership (can't reach majority)
if not self.can_be_leader(nodes['A']):
print("\nNode A steps down (can't reach majority)")
nodes['A'].is_leader = False
# Partition 2 can elect new leader
partition2_nodes = [nodes['C'], nodes['D'], nodes['E']]
if all(self.can_be_leader(n) for n in partition2_nodes):
nodes['C'].is_leader = True
nodes['C'].term += 1
print(f"Node C becomes new leader (term {nodes['C'].term})")
print("\n--- Final State ---")
for node_id, node in nodes.items():
status = "LEADER" if node.is_leader else "follower"
print(f"Node {node_id}: {status} (term {node.term})")
print("\nSplit brain prevented! Only majority partition operates.")
# Run simulation
cluster = QuorumBasedCluster(total_nodes=5)
cluster.simulate_partition()
# Fencing Token Pattern (prevent stale writes)
class FencedStorage:
"""Storage with fencing tokens to reject stale writes"""
def __init__(self):
self.data = {}
self.current_token = 0
def write(self, key: str, value: str, token: int):
"""Accept write only if token is current or higher"""
if token < self.current_token:
raise Exception(f"Stale token {token} < {self.current_token}. "
f"Rejecting write (split brain protection)")
self.current_token = max(self.current_token, token)
self.data[key] = value
print(f"Write accepted with token {token}")
def read(self, key: str) -> str:
return self.data.get(key)
# Usage
storage = FencedStorage()
# Old leader (partition 1) tries to write with old token
try:
storage.write("key1", "value_from_old_leader", token=1)
except Exception as e:
print(f"Error: {e}")
# New leader (partition 2) writes with new token
storage.write("key1", "value_from_new_leader", token=2)
# Old leader's retried write is rejected
try:
storage.write("key1", "value_from_old_leader_retry", token=1)
except Exception as e:
print(f"Error: {e}")
print(f"Final value: {storage.read('key1')}")
Definition: Proving that perfect consensus is impossible over an unreliable network. Two generals must coordinate attack but can only communicate via unreliable messengers.
Insight: This is a fundamental impossibility result - you cannot guarantee consensus with message loss.
General A General B
| |
|-------- "Attack at dawn" ------->| (Message might be lost!)
| |
|<------- "ACK received" -----------| (ACK might be lost!)
| |
|-------- "ACK-ACK" --------------->| (Infinite regression!)
| |
Neither can be 100% certain the other received the message!
Accept impossibility: Use timeouts and retry with acknowledgments. Accept that perfect coordination is impossible.
Common patterns:
Definition: Determining the order of events in a distributed system without a global clock.
Scenario: Server A's clock says 10:00:01, Server B's clock says 10:00:00. Which event happened first?
class LamportClock:
"""Logical clock that preserves causal ordering"""
def __init__(self, node_id: str):
self.node_id = node_id
self.time = 0
def tick(self):
"""Local event - increment clock"""
self.time += 1
return self.time
def send_message(self, message: dict) -> dict:
"""Attach timestamp to outgoing message"""
self.tick()
message['timestamp'] = self.time
message['from'] = self.node_id
print(f"[{self.node_id}] Send at T={self.time}: {message.get('data')}")
return message
def receive_message(self, message: dict):
"""Update clock on message receipt"""
# Take max of own clock and message timestamp, then increment
self.time = max(self.time, message['timestamp']) + 1
print(f"[{self.node_id}] Receive at T={self.time}: {message.get('data')} "
f"(from {message['from']} at T={message['timestamp']})")
# Simulation
node_a = LamportClock("A")
node_b = LamportClock("B")
# Events
node_a.tick() # A: local event (T=1)
msg1 = node_a.send_message({'data': 'Hello'}) # A sends (T=2)
node_b.receive_message(msg1) # B receives (T=3, max(0,2)+1)
node_b.tick() # B: local event (T=4)
msg2 = node_b.send_message({'data': 'Reply'}) # B sends (T=5)
node_a.receive_message(msg2) # A receives (T=6, max(2,5)+1)
print("\nResult: Causally ordered events preserved!")
# Vector Clock (detects concurrency)
class VectorClock:
"""Detects if events are concurrent or causal"""
def __init__(self, node_id: str, num_nodes: int):
self.node_id = node_id
self.node_index = int(node_id.split('_')[1])
self.vector = [0] * num_nodes
def tick(self):
"""Increment own component"""
self.vector[self.node_index] += 1
def send(self) -> list:
self.tick()
return self.vector.copy()
def receive(self, other_vector: list):
"""Merge vectors: take component-wise max"""
for i in range(len(self.vector)):
self.vector[i] = max(self.vector[i], other_vector[i])
self.tick()
def happens_before(self, other_vector: list) -> bool:
"""Check if self happened before other"""
return (all(self.vector[i] <= other_vector[i] for i in range(len(self.vector)))
and any(self.vector[i] < other_vector[i] for i in range(len(self.vector))))
def concurrent(self, other_vector: list) -> bool:
"""Check if events are concurrent"""
return (not self.happens_before(other_vector)
and not VectorClock._happens_before_static(other_vector, self.vector))
@staticmethod
def _happens_before_static(v1, v2):
return (all(v1[i] <= v2[i] for i in range(len(v1)))
and any(v1[i] < v2[i] for i in range(len(v1))))
def __repr__(self):
return f"{self.node_id}: {self.vector}"
Fischer-Lynch-Paterson (FLP) Theorem: It's impossible to guarantee consensus in an asynchronous system with even one faulty process.
Implication: Any consensus algorithm (Paxos, Raft) must sacrifice either safety or liveness.
Definition: Circular wait condition across distributed nodes, each holding resources others need.
from collections import defaultdict
from typing import Set, Dict
class DistributedDeadlockDetector:
"""Wait-for graph based deadlock detection"""
def __init__(self):
self.wait_for_graph: Dict[str, Set[str]] = defaultdict(set)
def add_wait(self, process: str, waiting_for: str):
"""Process is waiting for resource held by waiting_for"""
self.wait_for_graph[process].add(waiting_for)
def remove_wait(self, process: str, was_waiting_for: str):
"""Release wait"""
if process in self.wait_for_graph:
self.wait_for_graph[process].discard(was_waiting_for)
def has_cycle(self) -> bool:
"""Detect cycle in wait-for graph (indicates deadlock)"""
visited = set()
rec_stack = set()
def dfs(node: str) -> bool:
visited.add(node)
rec_stack.add(node)
for neighbor in self.wait_for_graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
return True # Cycle found!
rec_stack.remove(node)
return False
for node in self.wait_for_graph:
if node not in visited:
if dfs(node):
return True
return False
# Example
detector = DistributedDeadlockDetector()
# Process A waits for B, B waits for C, C waits for A (cycle!)
detector.add_wait("process_A", "process_B")
detector.add_wait("process_B", "process_C")
detector.add_wait("process_C", "process_A")
if detector.has_cycle():
print("Deadlock detected!")
| Problem | Core Issue | Solution |
|---|---|---|
| Thundering Herd | Simultaneous cache miss overwhelms DB | Locks, probabilistic expiration, coalescing |
| Byzantine Generals | Consensus with malicious actors | BFT (3f+1 nodes), Proof of Work/Stake |
| Split Brain | Multiple primaries after partition | Quorum, fencing tokens, STONITH |
| Two Generals | Perfect consensus impossible (unreliable network) | Accept impossibility, use retries + idempotency |
| Clock Sync | No global time in distributed systems | Lamport clocks, vector clocks, HLC |
| FLP Impossibility | Async consensus impossible with faults | Add timeouts (partial sync), leader-based |
| Distributed Deadlock | Circular waits across nodes | Global ordering, timeouts, cycle detection |