Datastore Comparison: Deep Dive

PostgreSQL • MySQL • MongoDB • Redis • RocksDB • etcd • Cassandra • DynamoDB • Elasticsearch

Quick Reference Table

Datastore Type Use Cases Performance Scalability
PostgreSQL RDBMS Transactional data, complex queries, ACID 10k QPS Vertical + read replicas
MySQL RDBMS Web apps, read-heavy workloads 15k QPS Vertical + read replicas
MongoDB Document Flexible schema, rapid iteration 20k QPS Horizontal sharding
Redis In-memory KV Caching, sessions, real-time, queues 100k+ QPS Horizontal cluster
RocksDB Embedded KV Storage engine, high write throughput 50k+ writes/s Single node (embedded)
etcd Distributed KV Config mgmt, service discovery, coordination 10k writes/s Horizontal (Raft cluster)
Cassandra Wide Column Time-series, massive writes, multi-DC 100k+ writes/s Horizontal (excellent)
DynamoDB Managed KV Serverless, auto-scaling, AWS Unlimited (managed) Auto (AWS managed)
Elasticsearch Search Engine Full-text search, log analytics 10k searches/s Horizontal sharding

1. Redis - In-Memory Data Structure Store

What is Redis?

Redis (Remote Dictionary Server) is an in-memory key-value store known for blazing-fast performance (100k+ ops/sec). Unlike simple caches, Redis supports rich data structures: strings, hashes, lists, sets, sorted sets, bitmaps, hyperloglogs, and streams.

Architecture: Single-threaded event loop (no locks needed), persistence options (RDB snapshots + AOF log), clustering for horizontal scaling, pub/sub messaging.

Redis Data Structures & Use Cases

1. Strings - Simple Caching

Use Case: Session storage, API response caching, rate limiting counters

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Session storage
def store_session(session_id, user_data):
    """Store user session with TTL"""
    r.setex(
        f"session:{session_id}",
        3600,  # 1 hour expiration
        json.dumps(user_data)
    )

def get_session(session_id):
    """Retrieve session"""
    data = r.get(f"session:{session_id}")
    return json.loads(data) if data else None

# Rate limiting
def is_rate_limited(user_id, max_requests=100, window=60):
    """Allow max_requests per window (seconds)"""
    key = f"rate_limit:{user_id}"
    current = r.incr(key)

    if current == 1:
        r.expire(key, window)

    return current > max_requests

# API response caching
def cached_api_call(endpoint, ttl=300):
    """Cache API responses"""
    cache_key = f"api_cache:{endpoint}"

    # Try cache first
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache miss - fetch and store
    response = fetch_from_api(endpoint)
    r.setex(cache_key, ttl, json.dumps(response))
    return response

2. Hashes - Structured Objects

Use Case: User profiles, product details, configuration settings

# Store user profile as hash
def save_user_profile(user_id, profile_data):
    """Store user profile efficiently"""
    r.hset(
        f"user:{user_id}",
        mapping={
            "name": profile_data["name"],
            "email": profile_data["email"],
            "age": profile_data["age"],
            "premium": "true" if profile_data["premium"] else "false"
        }
    )

def get_user_profile(user_id):
    """Get entire profile"""
    return r.hgetall(f"user:{user_id}")

def get_user_field(user_id, field):
    """Get specific field only"""
    return r.hget(f"user:{user_id}", field)

def increment_user_stat(user_id, stat_name, amount=1):
    """Atomically increment stats"""
    r.hincrby(f"user:{user_id}:stats", stat_name, amount)

# Example: Shopping cart
def add_to_cart(user_id, product_id, quantity):
    """Add product to cart"""
    r.hset(f"cart:{user_id}", product_id, quantity)

def get_cart(user_id):
    """Get entire cart"""
    return r.hgetall(f"cart:{user_id}")

3. Lists - Queues & Activity Feeds

Use Case: Task queues, activity feeds, recent items, message queues

# Task queue (producer-consumer)
def enqueue_task(queue_name, task_data):
    """Add task to queue (FIFO)"""
    r.lpush(queue_name, json.dumps(task_data))

def dequeue_task(queue_name, timeout=0):
    """Blocking pop from queue"""
    result = r.brpop(queue_name, timeout)
    if result:
        _, task_data = result
        return json.loads(task_data)
    return None

# Activity feed (recent N items)
def add_to_feed(user_id, activity):
    """Add activity to user feed"""
    feed_key = f"feed:{user_id}"
    r.lpush(feed_key, json.dumps(activity))
    r.ltrim(feed_key, 0, 99)  # Keep only 100 most recent

def get_feed(user_id, page=0, page_size=20):
    """Get paginated feed"""
    start = page * page_size
    end = start + page_size - 1
    activities = r.lrange(f"feed:{user_id}", start, end)
    return [json.loads(a) for a in activities]

# Recent search history
def add_search(user_id, query):
    """Store recent search"""
    key = f"search_history:{user_id}"
    r.lpush(key, query)
    r.ltrim(key, 0, 9)  # Keep last 10 searches
    r.expire(key, 86400 * 30)  # Expire after 30 days

4. Sets - Unique Collections

Use Case: Tagging, followers/following, online users, unique visitors

# Tagging system
def add_tags(article_id, *tags):
    """Add tags to article"""
    r.sadd(f"article:{article_id}:tags", *tags)

def get_articles_by_tag(tag):
    """Find all articles with tag"""
    # Store reverse index
    return r.smembers(f"tag:{tag}:articles")

def get_common_tags(article1_id, article2_id):
    """Find common tags between articles"""
    return r.sinter(
        f"article:{article1_id}:tags",
        f"article:{article2_id}:tags"
    )

# Social following
def follow_user(user_id, target_id):
    """User follows target"""
    r.sadd(f"user:{user_id}:following", target_id)
    r.sadd(f"user:{target_id}:followers", user_id)

def get_mutual_friends(user1_id, user2_id):
    """Find mutual connections"""
    return r.sinter(
        f"user:{user1_id}:following",
        f"user:{user2_id}:following"
    )

# Online users
def mark_online(user_id):
    """Add to online users set"""
    r.sadd("users:online", user_id)
    r.expire("users:online", 300)  # Clean up after 5 min

def get_online_count():
    """Count online users"""
    return r.scard("users:online")

# Unique daily visitors (HyperLogLog for memory efficiency)
def track_visitor(ip_address):
    """Track unique visitor"""
    date = datetime.now().strftime("%Y-%m-%d")
    r.pfadd(f"visitors:{date}", ip_address)

def get_unique_visitors(date):
    """Get unique visitor count"""
    return r.pfcount(f"visitors:{date}")

5. Sorted Sets - Leaderboards & Rankings

Use Case: Leaderboards, priority queues, time-series, trending items

# Game leaderboard
def update_score(user_id, score):
    """Update player score"""
    r.zadd("leaderboard", {user_id: score})

def get_top_players(n=10):
    """Get top N players"""
    return r.zrevrange("leaderboard", 0, n-1, withscores=True)

def get_player_rank(user_id):
    """Get player's rank (0-indexed)"""
    rank = r.zrevrank("leaderboard", user_id)
    return rank + 1 if rank is not None else None

def get_players_in_range(min_score, max_score):
    """Get players within score range"""
    return r.zrangebyscore("leaderboard", min_score, max_score)

# Priority queue
def add_task_priority(task_id, priority):
    """Add task with priority (lower = higher priority)"""
    r.zadd("task_queue", {task_id: priority})

def pop_highest_priority_task():
    """Get and remove highest priority task"""
    result = r.zpopmin("task_queue", 1)
    return result[0] if result else None

# Trending articles (time-decay scoring)
import time

def upvote_article(article_id):
    """Upvote with time decay"""
    score = time.time()  # More recent = higher score
    r.zadd("trending", {article_id: score}, gt=True)

def get_trending(hours=24):
    """Get trending articles from last N hours"""
    cutoff_time = time.time() - (hours * 3600)
    return r.zrevrangebyscore(
        "trending",
        "+inf",
        cutoff_time,
        start=0,
        num=10
    )

# Scheduled tasks
def schedule_task(task_id, execute_at):
    """Schedule task for future execution"""
    r.zadd("scheduled_tasks", {task_id: execute_at.timestamp()})

def get_due_tasks():
    """Get tasks due for execution"""
    now = time.time()
    tasks = r.zrangebyscore("scheduled_tasks", 0, now)
    # Remove from scheduled
    if tasks:
        r.zrem("scheduled_tasks", *tasks)
    return tasks

6. Pub/Sub - Real-Time Messaging

Use Case: Chat, notifications, live updates, event broadcasting

# Publisher
def publish_notification(channel, message):
    """Broadcast notification"""
    r.publish(channel, json.dumps(message))

# Subscriber
def listen_to_notifications(channel):
    """Subscribe to notifications"""
    pubsub = r.pubsub()
    pubsub.subscribe(channel)

    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            handle_notification(data)

# Chat room
def send_message(room_id, user_id, message):
    """Send chat message"""
    payload = {
        "user_id": user_id,
        "message": message,
        "timestamp": time.time()
    }
    r.publish(f"chat:{room_id}", json.dumps(payload))

# Live dashboard updates
def broadcast_metrics(metrics):
    """Broadcast system metrics"""
    r.publish("dashboard:metrics", json.dumps(metrics))

7. Streams - Event Sourcing & Message Queues

Use Case: Event logs, message queues with consumer groups, audit trails

# Add events to stream
def log_event(stream_name, event_data):
    """Add event to stream"""
    r.xadd(stream_name, event_data, maxlen=10000)  # Keep last 10k

# Consumer group (multiple workers)
def create_consumer_group(stream_name, group_name):
    """Create consumer group"""
    try:
        r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
    except redis.exceptions.ResponseError:
        pass  # Group already exists

def consume_events(stream_name, group_name, consumer_name):
    """Process events as consumer group member"""
    while True:
        messages = r.xreadgroup(
            group_name,
            consumer_name,
            {stream_name: '>'},
            count=10,
            block=1000
        )

        for stream, events in messages:
            for event_id, event_data in events:
                try:
                    process_event(event_data)
                    # Acknowledge processing
                    r.xack(stream_name, group_name, event_id)
                except Exception as e:
                    print(f"Failed to process {event_id}: {e}")

# Activity log
def log_user_activity(user_id, action, details):
    """Log user activity"""
    r.xadd(
        f"activity:{user_id}",
        {
            "action": action,
            "details": json.dumps(details),
            "timestamp": time.time()
        }
    )

def get_user_activity(user_id, count=100):
    """Get recent user activity"""
    return r.xrevrange(f"activity:{user_id}", '+', '-', count=count)

Redis Use Case Summary

Use Case Data Structure Why Redis?
Caching String, Hash 100k+ QPS, automatic expiration (TTL)
Session Storage String, Hash Fast access, automatic expiration, shared across servers
Rate Limiting String (INCR) Atomic operations, sliding window with TTL
Task Queue List, Stream Blocking operations, reliability with streams
Leaderboards Sorted Set O(log N) updates, range queries, rankings
Real-time Chat Pub/Sub, Stream Sub-millisecond latency, fan-out
Analytics HyperLogLog, Bitmap Memory-efficient cardinality, unique counts
Geospatial Geo (Sorted Set) Radius queries, distance calculations

When NOT to Use Redis

2. RocksDB - High-Performance Embedded Storage

What is RocksDB?

RocksDB is an embedded key-value store optimized for fast storage (SSD/NVMe). Developed by Facebook, it's based on Google's LevelDB but with massive improvements for production use. It uses LSM trees for write optimization.

Architecture: Log-Structured Merge tree, write-ahead log (WAL), memtable + SSTables, compaction, bloom filters for read optimization.

Key Feature: Unlike Redis (network service), RocksDB is embedded directly in your application - no network overhead!

RocksDB Characteristics

Performance Profile

RocksDB Use Cases

1. Storage Engine for Databases

Who Uses It: MySQL (MyRocks), MongoDB, CockroachDB, TiDB

import rocksdb

# Initialize RocksDB
opts = rocksdb.Options()
opts.create_if_missing = True
opts.max_open_files = 300000
opts.write_buffer_size = 67108864  # 64MB
opts.max_write_buffer_number = 3
opts.target_file_size_base = 67108864

db = rocksdb.DB("./mydb", opts)

# Basic operations
def put_data(key, value):
    """Write to RocksDB"""
    db.put(key.encode(), value.encode())

def get_data(key):
    """Read from RocksDB"""
    value = db.get(key.encode())
    return value.decode() if value else None

def delete_data(key):
    """Delete from RocksDB"""
    db.delete(key.encode())

# Batch writes (atomic)
def batch_write(operations):
    """Batch multiple operations"""
    batch = rocksdb.WriteBatch()

    for op_type, key, value in operations:
        if op_type == 'put':
            batch.put(key.encode(), value.encode())
        elif op_type == 'delete':
            batch.delete(key.encode())

    db.write(batch)

2. Time-Series Data & Logs

Why RocksDB: Write-optimized LSM tree perfect for append-heavy workloads

from datetime import datetime
import struct

# Store metrics with timestamp keys
def log_metric(metric_name, value, timestamp=None):
    """Store time-series metric"""
    if timestamp is None:
        timestamp = datetime.now()

    # Key: metric_name + timestamp (sortable)
    ts_bytes = struct.pack('>d', timestamp.timestamp())
    key = f"{metric_name}:".encode() + ts_bytes

    # Value: metric value
    value_bytes = struct.pack('>d', value)

    db.put(key, value_bytes)

def get_metrics_range(metric_name, start_time, end_time):
    """Query metrics in time range"""
    start_key = f"{metric_name}:".encode() + struct.pack('>d', start_time.timestamp())
    end_key = f"{metric_name}:".encode() + struct.pack('>d', end_time.timestamp())

    it = db.iteritems()
    it.seek(start_key)

    results = []
    for key, value in it:
        if key > end_key:
            break

        # Parse timestamp and value
        ts = struct.unpack('>d', key[-8:])[0]
        val = struct.unpack('>d', value)[0]
        results.append((datetime.fromtimestamp(ts), val))

    return results

3. Blockchain & Cryptocurrency

Who Uses It: Ethereum, Bitcoin Core

Why: Need to store massive state (accounts, balances, smart contracts) with fast lookups and writes

# Blockchain state storage
def store_account_balance(address, balance, block_number):
    """Store account balance at block height"""
    key = f"balance:{address}:{block_number}".encode()
    value = str(balance).encode()
    db.put(key, value)

def get_account_balance(address, block_number=None):
    """Get account balance at specific block"""
    if block_number is None:
        # Get latest
        prefix = f"balance:{address}:".encode()
        it = db.iteritems()
        it.seek_for_prev(prefix + b'\xff' * 10)

        for key, value in it:
            if key.startswith(prefix):
                return int(value.decode())
    else:
        key = f"balance:{address}:{block_number}".encode()
        value = db.get(key)
        return int(value.decode()) if value else 0

# Transaction indexing
def index_transaction(tx_hash, tx_data):
    """Index transaction by hash"""
    db.put(f"tx:{tx_hash}".encode(), tx_data)

def get_transaction(tx_hash):
    """Retrieve transaction"""
    return db.get(f"tx:{tx_hash}".encode())

4. Graph Database Storage

Who Uses It: Used internally by some graph databases

# Graph storage with RocksDB
def add_edge(from_node, to_node, edge_type, properties):
    """Add edge between nodes"""
    # Store outgoing edge
    out_key = f"out:{from_node}:{edge_type}:{to_node}".encode()
    db.put(out_key, json.dumps(properties).encode())

    # Store incoming edge (for reverse traversal)
    in_key = f"in:{to_node}:{edge_type}:{from_node}".encode()
    db.put(in_key, json.dumps(properties).encode())

def get_outgoing_edges(node_id, edge_type=None):
    """Get all outgoing edges from node"""
    if edge_type:
        prefix = f"out:{node_id}:{edge_type}:".encode()
    else:
        prefix = f"out:{node_id}:".encode()

    it = db.iteritems()
    it.seek(prefix)

    edges = []
    for key, value in it:
        if not key.startswith(prefix):
            break

        # Parse edge
        parts = key.decode().split(':')
        edges.append({
            'to': parts[-1],
            'type': parts[2],
            'properties': json.loads(value.decode())
        })

    return edges

5. Application State & Configuration

Why RocksDB: Fast local storage, no network overhead, ACID transactions

# Application configuration store
def save_config(namespace, key, value):
    """Save configuration"""
    db.put(f"config:{namespace}:{key}".encode(), value.encode())

def load_config(namespace):
    """Load all config for namespace"""
    prefix = f"config:{namespace}:".encode()
    it = db.iteritems()
    it.seek(prefix)

    config = {}
    for key, value in it:
        if not key.startswith(prefix):
            break

        config_key = key.decode().split(':', 2)[2]
        config[config_key] = value.decode()

    return config

# Feature flags
def set_feature_flag(flag_name, enabled):
    """Set feature flag"""
    db.put(f"feature:{flag_name}".encode(), b'1' if enabled else b'0')

def is_feature_enabled(flag_name):
    """Check feature flag"""
    value = db.get(f"feature:{flag_name}".encode())
    return value == b'1' if value else False

RocksDB vs Redis: When to Use Which?

Aspect Redis RocksDB
Deployment Standalone service (network) Embedded library (in-process)
Data Size Limited by RAM (expensive) Limited by disk (cheap, TB+)
Latency Sub-ms (network + memory) Microseconds (no network)
Throughput 100k+ ops/sec 50k-200k writes, 100k-500k reads
Persistence Optional (RDB + AOF) Always persistent
Data Structures Rich (lists, sets, sorted sets, etc.) Key-value only
Sharing Multiple clients/servers Single process only
Use Cases Caching, sessions, queues, real-time Storage engine, time-series, logs

3. etcd - Distributed Configuration & Coordination

What is etcd?

etcd is a distributed, strongly consistent key-value store that provides a reliable way to store data across a cluster of machines. It uses the Raft consensus algorithm to ensure data consistency and is designed for configuration management, service discovery, and distributed coordination.

Architecture: Raft consensus (leader election), distributed cluster (3-5 nodes typical), strongly consistent (CP in CAP theorem), watch mechanism for real-time updates, lease-based TTLs.

Key Feature: Unlike Redis (in-memory, AP), etcd prioritizes consistency and is the backbone of Kubernetes and other cloud-native systems.

etcd Characteristics

Performance & Consistency Profile

etcd Use Cases

1. Kubernetes Configuration Storage

The Primary Use Case: etcd is the backing store for all Kubernetes cluster data

import etcd3

# Initialize etcd client
etcd = etcd3.client(host='localhost', port=2379)

# Store Kubernetes-style configuration
def store_pod_config(namespace, pod_name, config):
    """Store pod configuration in etcd (similar to K8s)"""
    key = f"/registry/pods/{namespace}/{pod_name}"
    etcd.put(key, json.dumps(config))

def get_pod_config(namespace, pod_name):
    """Retrieve pod configuration"""
    key = f"/registry/pods/{namespace}/{pod_name}"
    value, metadata = etcd.get(key)
    return json.loads(value) if value else None

# Example: Store pod configuration
pod_config = {
    "apiVersion": "v1",
    "kind": "Pod",
    "metadata": {
        "name": "nginx-pod",
        "namespace": "default"
    },
    "spec": {
        "containers": [{
            "name": "nginx",
            "image": "nginx:1.21"
        }]
    }
}

store_pod_config("default", "nginx-pod", pod_config)

# Watch for changes (K8s controllers use this)
def watch_pods(namespace):
    """Watch for pod changes in namespace"""
    watch_key = f"/registry/pods/{namespace}/"

    events_iterator, cancel = etcd.watch_prefix(watch_key)

    for event in events_iterator:
        print(f"Event: {event}")
        if isinstance(event, etcd3.events.PutEvent):
            print(f"Pod created/updated: {event.key}")
        elif isinstance(event, etcd3.events.DeleteEvent):
            print(f"Pod deleted: {event.key}")

# Example usage
# watch_pods("default")  # Runs indefinitely

2. Service Discovery

Why etcd: Services register themselves with TTLs. Clients watch for changes and auto-update.

import etcd3
import time
import json
from threading import Thread

# Service registration with heartbeat
class ServiceRegistry:
    def __init__(self, etcd_client, service_name, instance_id, ttl=10):
        self.etcd = etcd_client
        self.service_name = service_name
        self.instance_id = instance_id
        self.ttl = ttl
        self.lease = None

    def register(self, host, port):
        """Register service instance with TTL"""
        # Create lease with TTL
        self.lease = self.etcd.lease(self.ttl)

        # Store service info
        key = f"/services/{self.service_name}/{self.instance_id}"
        value = json.dumps({
            "host": host,
            "port": port,
            "registered_at": time.time()
        })

        self.etcd.put(key, value, lease=self.lease)
        print(f"Registered {self.service_name}:{self.instance_id} at {host}:{port}")

        # Keep alive in background
        Thread(target=self._keep_alive, daemon=True).start()

    def _keep_alive(self):
        """Maintain lease with periodic refresh"""
        while True:
            time.sleep(self.ttl // 2)
            try:
                self.lease.refresh()
                print(f"Heartbeat: {self.service_name}:{self.instance_id}")
            except Exception as e:
                print(f"Failed to refresh lease: {e}")
                break

    def deregister(self):
        """Remove service from registry"""
        key = f"/services/{self.service_name}/{self.instance_id}"
        self.etcd.delete(key)
        if self.lease:
            self.lease.revoke()

# Service discovery client
class ServiceDiscovery:
    def __init__(self, etcd_client):
        self.etcd = etcd_client

    def discover(self, service_name):
        """Find all instances of a service"""
        prefix = f"/services/{service_name}/"
        instances = []

        for value, metadata in self.etcd.get_prefix(prefix):
            if value:
                instance_data = json.loads(value)
                instances.append(instance_data)

        return instances

    def watch_service(self, service_name, callback):
        """Watch for service changes"""
        prefix = f"/services/{service_name}/"
        events_iterator, cancel = self.etcd.watch_prefix(prefix)

        for event in events_iterator:
            if isinstance(event, etcd3.events.PutEvent):
                instance = json.loads(event.value)
                callback("added", instance)
            elif isinstance(event, etcd3.events.DeleteEvent):
                callback("removed", event.key)

# Example usage
etcd_client = etcd3.client()

# Register service instances
registry = ServiceRegistry(etcd_client, "api-service", "instance-1")
registry.register("10.0.1.5", 8080)

# Discover services
discovery = ServiceDiscovery(etcd_client)
instances = discovery.discover("api-service")
print(f"Found instances: {instances}")

# Watch for changes
def on_service_change(event_type, data):
    print(f"Service {event_type}: {data}")

# discovery.watch_service("api-service", on_service_change)

3. Distributed Locking

Use Case: Leader election, mutual exclusion, coordination

import etcd3
from contextlib import contextmanager

class DistributedLock:
    """
    Distributed lock using etcd

    Use for:
    - Leader election
    - Ensuring only one process runs a task
    - Coordinating work across distributed systems
    """

    def __init__(self, etcd_client, lock_name, ttl=30):
        self.etcd = etcd_client
        self.lock_name = lock_name
        self.ttl = ttl
        self.lease = None

    @contextmanager
    def acquire(self, timeout=10):
        """
        Acquire distributed lock

        Uses etcd transaction to ensure atomicity:
        - Only succeeds if key doesn't exist (first to acquire wins)
        - Automatically releases on lease expiry
        """
        lock_key = f"/locks/{self.lock_name}"

        # Create lease
        self.lease = self.etcd.lease(self.ttl)

        # Try to acquire lock (atomic operation)
        start_time = time.time()
        while time.time() - start_time < timeout:
            # Transaction: create key only if it doesn't exist
            success = self.etcd.transaction(
                compare=[
                    self.etcd.transactions.version(lock_key) == 0
                ],
                success=[
                    self.etcd.transactions.put(
                        lock_key,
                        "locked",
                        lease=self.lease
                    )
                ],
                failure=[]
            )

            if success:
                print(f"Acquired lock: {self.lock_name}")
                try:
                    yield
                finally:
                    self._release(lock_key)
                return

            time.sleep(0.1)

        raise TimeoutError(f"Failed to acquire lock: {self.lock_name}")

    def _release(self, lock_key):
        """Release the lock"""
        self.etcd.delete(lock_key)
        if self.lease:
            self.lease.revoke()
        print(f"Released lock: {self.lock_name}")

# Example: Leader election
class LeaderElection:
    """
    Leader election using etcd

    Multiple processes compete, one becomes leader
    If leader dies (lease expires), new election happens
    """

    def __init__(self, etcd_client, election_name, instance_id):
        self.etcd = etcd_client
        self.election_name = election_name
        self.instance_id = instance_id

    def run_for_leader(self, on_elected, on_lost):
        """
        Campaign for leadership

        on_elected: callback when this instance becomes leader
        on_lost: callback when leadership is lost
        """
        lock_key = f"/elections/{self.election_name}/leader"
        lease = self.etcd.lease(30)

        while True:
            # Try to become leader
            success = self.etcd.transaction(
                compare=[
                    self.etcd.transactions.version(lock_key) == 0
                ],
                success=[
                    self.etcd.transactions.put(
                        lock_key,
                        self.instance_id,
                        lease=lease
                    )
                ],
                failure=[]
            )

            if success:
                print(f"{self.instance_id} became leader!")
                on_elected()

                # Keep refreshing lease while leader
                try:
                    while True:
                        time.sleep(10)
                        lease.refresh()
                except Exception as e:
                    print(f"Lost leadership: {e}")
                    on_lost()
            else:
                # Wait for current leader to expire
                print(f"{self.instance_id} waiting for leader...")
                time.sleep(5)

# Example usage
etcd_client = etcd3.client()

# Distributed lock for critical section
lock = DistributedLock(etcd_client, "process-data-job")

with lock.acquire():
    print("Processing data... (only one instance does this)")
    time.sleep(5)
    print("Done processing")

# Leader election example
def do_leader_work():
    """Work that only leader should do"""
    print("I am the leader, doing leader work...")

def on_lost_leadership():
    """Cleanup when leadership is lost"""
    print("No longer leader, stopping work...")

election = LeaderElection(etcd_client, "main-controller", "instance-1")
# election.run_for_leader(do_leader_work, on_lost_leadership)

4. Configuration Management

Use Case: Dynamic configuration updates, feature flags, application settings

import etcd3
import json

class ConfigManager:
    """
    Configuration management with etcd

    Features:
    - Centralized config storage
    - Real-time updates (watch for changes)
    - Versioned configs
    - Environment-specific configs
    """

    def __init__(self, etcd_client, app_name, environment):
        self.etcd = etcd_client
        self.app_name = app_name
        self.environment = environment
        self.config_cache = {}

    def set_config(self, key, value):
        """Set configuration value"""
        full_key = f"/config/{self.app_name}/{self.environment}/{key}"
        self.etcd.put(full_key, json.dumps(value))

    def get_config(self, key, default=None):
        """Get configuration value"""
        full_key = f"/config/{self.app_name}/{self.environment}/{key}"

        # Try cache first
        if key in self.config_cache:
            return self.config_cache[key]

        # Fetch from etcd
        value, metadata = self.etcd.get(full_key)
        if value:
            parsed = json.loads(value)
            self.config_cache[key] = parsed
            return parsed

        return default

    def get_all_config(self):
        """Get all configuration for this app/environment"""
        prefix = f"/config/{self.app_name}/{self.environment}/"
        config = {}

        for value, metadata in self.etcd.get_prefix(prefix):
            if value:
                # Extract key name from full path
                key_name = metadata.key.decode().split('/')[-1]
                config[key_name] = json.loads(value)

        return config

    def watch_config(self, callback):
        """Watch for configuration changes"""
        prefix = f"/config/{self.app_name}/{self.environment}/"
        events_iterator, cancel = self.etcd.watch_prefix(prefix)

        for event in events_iterator:
            if isinstance(event, etcd3.events.PutEvent):
                key_name = event.key.decode().split('/')[-1]
                new_value = json.loads(event.value)

                # Update cache
                self.config_cache[key_name] = new_value

                # Notify application
                callback(key_name, new_value)

# Example usage
etcd_client = etcd3.client()
config_mgr = ConfigManager(etcd_client, "my-api", "production")

# Set configuration
config_mgr.set_config("max_connections", 100)
config_mgr.set_config("timeout_seconds", 30)
config_mgr.set_config("feature_flags", {
    "new_ui": True,
    "beta_features": False
})

# Get configuration
max_conn = config_mgr.get_config("max_connections")
print(f"Max connections: {max_conn}")

# Get all config
all_config = config_mgr.get_all_config()
print(f"All config: {all_config}")

# Watch for changes (hot reload)
def on_config_change(key, new_value):
    print(f"Config changed: {key} = {new_value}")
    # Reload application settings
    reload_app_config(key, new_value)

# config_mgr.watch_config(on_config_change)

etcd vs Other Datastores

Aspect etcd Redis Zookeeper
Consensus Raft (easier to understand) None (AP system) ZAB (Zookeeper Atomic Broadcast)
Consistency Linearizable (CP) Eventual (AP) Linearizable (CP)
Performance 10k writes/s, 100k+ reads/s 100k+ ops/s Similar to etcd
API gRPC + HTTP RESP protocol Custom protocol
Watch Built-in, efficient Pub/Sub, keyspace notifications Watchers (less efficient)
Use Case Config, coordination, K8s Caching, sessions, queues Coordination (legacy)
Adoption Kubernetes, Cloud-native Everywhere (caching) Hadoop, legacy systems

When to Choose etcd

  • Need strong consistency: Critical for coordination and config
  • Distributed coordination: Leader election, distributed locks
  • Service discovery: With real-time updates via watch
  • Configuration management: Centralized, versioned config
  • Kubernetes: It's the standard (mandatory for K8s)

When NOT to Use etcd

  • High throughput caching: Use Redis instead
  • Large values: etcd has 1.5MB value size limit
  • Query capabilities: Simple key-value only, no SQL
  • High write volume: Limited by Raft consensus (~10k writes/s)

4. Quick Comparison: Other Major Datastores

PostgreSQL - Relational Powerhouse

Best For: Complex queries, ACID transactions, relational data, JSON + full-text search

Use Cases: E-commerce orders, financial transactions, user accounts, any structured data with relationships

Performance: 10k-20k QPS, vertical scaling + read replicas

When to Use: Need ACID, complex JOINs, data integrity

MongoDB - Flexible Documents

Best For: Flexible schema, rapid iteration, nested documents

Use Cases: Content management, product catalogs, user profiles, IoT data

Performance: 20k-50k QPS, horizontal sharding

When to Use: Schema changes frequently, denormalized data, JSON-heavy

Cassandra - Massive Write Scale

Best For: Time-series, IoT, write-heavy workloads, multi-DC

Use Cases: Sensor data, event logging, messaging history, activity streams

Performance: 100k+ writes/sec, linear scaling

When to Use: Massive write volume, eventual consistency OK, multi-region

Elasticsearch - Search & Analytics

Best For: Full-text search, log analytics, aggregations

Use Cases: Product search, log analysis, metrics dashboards, APM

Performance: 10k searches/sec, complex aggregations

When to Use: Need fuzzy search, text analysis, real-time analytics

4. Decision Tree

Which Datastore Should I Use?

  1. Need sub-millisecond latency?
    • Shared across servers → Redis
    • Single application → RocksDB
  2. Need ACID transactions?PostgreSQL
  3. Schema changes frequently?MongoDB
  4. Massive write throughput?Cassandra or RocksDB
  5. Full-text search?Elasticsearch
  6. Embedded storage (no network)?RocksDB or SQLite
  7. Multi-region replication?DynamoDB or Cassandra
  8. Graph relationships? → Neo4j (or RocksDB custom)

5. Real-World Architecture Examples

E-Commerce Platform

Social Media App

IoT / Time-Series Platform