PostgreSQL • MySQL • MongoDB • Redis • RocksDB • etcd • Cassandra • DynamoDB • Elasticsearch
| Datastore | Type | Use Cases | Performance | Scalability |
|---|---|---|---|---|
| PostgreSQL | RDBMS | Transactional data, complex queries, ACID | 10k QPS | Vertical + read replicas |
| MySQL | RDBMS | Web apps, read-heavy workloads | 15k QPS | Vertical + read replicas |
| MongoDB | Document | Flexible schema, rapid iteration | 20k QPS | Horizontal sharding |
| Redis | In-memory KV | Caching, sessions, real-time, queues | 100k+ QPS | Horizontal cluster |
| RocksDB | Embedded KV | Storage engine, high write throughput | 50k+ writes/s | Single node (embedded) |
| etcd | Distributed KV | Config mgmt, service discovery, coordination | 10k writes/s | Horizontal (Raft cluster) |
| Cassandra | Wide Column | Time-series, massive writes, multi-DC | 100k+ writes/s | Horizontal (excellent) |
| DynamoDB | Managed KV | Serverless, auto-scaling, AWS | Unlimited (managed) | Auto (AWS managed) |
| Elasticsearch | Search Engine | Full-text search, log analytics | 10k searches/s | Horizontal sharding |
Redis (Remote Dictionary Server) is an in-memory key-value store known for blazing-fast performance (100k+ ops/sec). Unlike simple caches, Redis supports rich data structures: strings, hashes, lists, sets, sorted sets, bitmaps, hyperloglogs, and streams.
Architecture: Single-threaded event loop (no locks needed), persistence options (RDB snapshots + AOF log), clustering for horizontal scaling, pub/sub messaging.
Use Case: Session storage, API response caching, rate limiting counters
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Session storage
def store_session(session_id, user_data):
"""Store user session with TTL"""
r.setex(
f"session:{session_id}",
3600, # 1 hour expiration
json.dumps(user_data)
)
def get_session(session_id):
"""Retrieve session"""
data = r.get(f"session:{session_id}")
return json.loads(data) if data else None
# Rate limiting
def is_rate_limited(user_id, max_requests=100, window=60):
"""Allow max_requests per window (seconds)"""
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current > max_requests
# API response caching
def cached_api_call(endpoint, ttl=300):
"""Cache API responses"""
cache_key = f"api_cache:{endpoint}"
# Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - fetch and store
response = fetch_from_api(endpoint)
r.setex(cache_key, ttl, json.dumps(response))
return response
Use Case: User profiles, product details, configuration settings
# Store user profile as hash
def save_user_profile(user_id, profile_data):
"""Store user profile efficiently"""
r.hset(
f"user:{user_id}",
mapping={
"name": profile_data["name"],
"email": profile_data["email"],
"age": profile_data["age"],
"premium": "true" if profile_data["premium"] else "false"
}
)
def get_user_profile(user_id):
"""Get entire profile"""
return r.hgetall(f"user:{user_id}")
def get_user_field(user_id, field):
"""Get specific field only"""
return r.hget(f"user:{user_id}", field)
def increment_user_stat(user_id, stat_name, amount=1):
"""Atomically increment stats"""
r.hincrby(f"user:{user_id}:stats", stat_name, amount)
# Example: Shopping cart
def add_to_cart(user_id, product_id, quantity):
"""Add product to cart"""
r.hset(f"cart:{user_id}", product_id, quantity)
def get_cart(user_id):
"""Get entire cart"""
return r.hgetall(f"cart:{user_id}")
Use Case: Task queues, activity feeds, recent items, message queues
# Task queue (producer-consumer)
def enqueue_task(queue_name, task_data):
"""Add task to queue (FIFO)"""
r.lpush(queue_name, json.dumps(task_data))
def dequeue_task(queue_name, timeout=0):
"""Blocking pop from queue"""
result = r.brpop(queue_name, timeout)
if result:
_, task_data = result
return json.loads(task_data)
return None
# Activity feed (recent N items)
def add_to_feed(user_id, activity):
"""Add activity to user feed"""
feed_key = f"feed:{user_id}"
r.lpush(feed_key, json.dumps(activity))
r.ltrim(feed_key, 0, 99) # Keep only 100 most recent
def get_feed(user_id, page=0, page_size=20):
"""Get paginated feed"""
start = page * page_size
end = start + page_size - 1
activities = r.lrange(f"feed:{user_id}", start, end)
return [json.loads(a) for a in activities]
# Recent search history
def add_search(user_id, query):
"""Store recent search"""
key = f"search_history:{user_id}"
r.lpush(key, query)
r.ltrim(key, 0, 9) # Keep last 10 searches
r.expire(key, 86400 * 30) # Expire after 30 days
Use Case: Tagging, followers/following, online users, unique visitors
# Tagging system
def add_tags(article_id, *tags):
"""Add tags to article"""
r.sadd(f"article:{article_id}:tags", *tags)
def get_articles_by_tag(tag):
"""Find all articles with tag"""
# Store reverse index
return r.smembers(f"tag:{tag}:articles")
def get_common_tags(article1_id, article2_id):
"""Find common tags between articles"""
return r.sinter(
f"article:{article1_id}:tags",
f"article:{article2_id}:tags"
)
# Social following
def follow_user(user_id, target_id):
"""User follows target"""
r.sadd(f"user:{user_id}:following", target_id)
r.sadd(f"user:{target_id}:followers", user_id)
def get_mutual_friends(user1_id, user2_id):
"""Find mutual connections"""
return r.sinter(
f"user:{user1_id}:following",
f"user:{user2_id}:following"
)
# Online users
def mark_online(user_id):
"""Add to online users set"""
r.sadd("users:online", user_id)
r.expire("users:online", 300) # Clean up after 5 min
def get_online_count():
"""Count online users"""
return r.scard("users:online")
# Unique daily visitors (HyperLogLog for memory efficiency)
def track_visitor(ip_address):
"""Track unique visitor"""
date = datetime.now().strftime("%Y-%m-%d")
r.pfadd(f"visitors:{date}", ip_address)
def get_unique_visitors(date):
"""Get unique visitor count"""
return r.pfcount(f"visitors:{date}")
Use Case: Leaderboards, priority queues, time-series, trending items
# Game leaderboard
def update_score(user_id, score):
"""Update player score"""
r.zadd("leaderboard", {user_id: score})
def get_top_players(n=10):
"""Get top N players"""
return r.zrevrange("leaderboard", 0, n-1, withscores=True)
def get_player_rank(user_id):
"""Get player's rank (0-indexed)"""
rank = r.zrevrank("leaderboard", user_id)
return rank + 1 if rank is not None else None
def get_players_in_range(min_score, max_score):
"""Get players within score range"""
return r.zrangebyscore("leaderboard", min_score, max_score)
# Priority queue
def add_task_priority(task_id, priority):
"""Add task with priority (lower = higher priority)"""
r.zadd("task_queue", {task_id: priority})
def pop_highest_priority_task():
"""Get and remove highest priority task"""
result = r.zpopmin("task_queue", 1)
return result[0] if result else None
# Trending articles (time-decay scoring)
import time
def upvote_article(article_id):
"""Upvote with time decay"""
score = time.time() # More recent = higher score
r.zadd("trending", {article_id: score}, gt=True)
def get_trending(hours=24):
"""Get trending articles from last N hours"""
cutoff_time = time.time() - (hours * 3600)
return r.zrevrangebyscore(
"trending",
"+inf",
cutoff_time,
start=0,
num=10
)
# Scheduled tasks
def schedule_task(task_id, execute_at):
"""Schedule task for future execution"""
r.zadd("scheduled_tasks", {task_id: execute_at.timestamp()})
def get_due_tasks():
"""Get tasks due for execution"""
now = time.time()
tasks = r.zrangebyscore("scheduled_tasks", 0, now)
# Remove from scheduled
if tasks:
r.zrem("scheduled_tasks", *tasks)
return tasks
Use Case: Chat, notifications, live updates, event broadcasting
# Publisher
def publish_notification(channel, message):
"""Broadcast notification"""
r.publish(channel, json.dumps(message))
# Subscriber
def listen_to_notifications(channel):
"""Subscribe to notifications"""
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
handle_notification(data)
# Chat room
def send_message(room_id, user_id, message):
"""Send chat message"""
payload = {
"user_id": user_id,
"message": message,
"timestamp": time.time()
}
r.publish(f"chat:{room_id}", json.dumps(payload))
# Live dashboard updates
def broadcast_metrics(metrics):
"""Broadcast system metrics"""
r.publish("dashboard:metrics", json.dumps(metrics))
Use Case: Event logs, message queues with consumer groups, audit trails
# Add events to stream
def log_event(stream_name, event_data):
"""Add event to stream"""
r.xadd(stream_name, event_data, maxlen=10000) # Keep last 10k
# Consumer group (multiple workers)
def create_consumer_group(stream_name, group_name):
"""Create consumer group"""
try:
r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass # Group already exists
def consume_events(stream_name, group_name, consumer_name):
"""Process events as consumer group member"""
while True:
messages = r.xreadgroup(
group_name,
consumer_name,
{stream_name: '>'},
count=10,
block=1000
)
for stream, events in messages:
for event_id, event_data in events:
try:
process_event(event_data)
# Acknowledge processing
r.xack(stream_name, group_name, event_id)
except Exception as e:
print(f"Failed to process {event_id}: {e}")
# Activity log
def log_user_activity(user_id, action, details):
"""Log user activity"""
r.xadd(
f"activity:{user_id}",
{
"action": action,
"details": json.dumps(details),
"timestamp": time.time()
}
)
def get_user_activity(user_id, count=100):
"""Get recent user activity"""
return r.xrevrange(f"activity:{user_id}", '+', '-', count=count)
| Use Case | Data Structure | Why Redis? |
|---|---|---|
| Caching | String, Hash | 100k+ QPS, automatic expiration (TTL) |
| Session Storage | String, Hash | Fast access, automatic expiration, shared across servers |
| Rate Limiting | String (INCR) | Atomic operations, sliding window with TTL |
| Task Queue | List, Stream | Blocking operations, reliability with streams |
| Leaderboards | Sorted Set | O(log N) updates, range queries, rankings |
| Real-time Chat | Pub/Sub, Stream | Sub-millisecond latency, fan-out |
| Analytics | HyperLogLog, Bitmap | Memory-efficient cardinality, unique counts |
| Geospatial | Geo (Sorted Set) | Radius queries, distance calculations |
RocksDB is an embedded key-value store optimized for fast storage (SSD/NVMe). Developed by Facebook, it's based on Google's LevelDB but with massive improvements for production use. It uses LSM trees for write optimization.
Architecture: Log-Structured Merge tree, write-ahead log (WAL), memtable + SSTables, compaction, bloom filters for read optimization.
Key Feature: Unlike Redis (network service), RocksDB is embedded directly in your application - no network overhead!
Who Uses It: MySQL (MyRocks), MongoDB, CockroachDB, TiDB
import rocksdb
# Initialize RocksDB
opts = rocksdb.Options()
opts.create_if_missing = True
opts.max_open_files = 300000
opts.write_buffer_size = 67108864 # 64MB
opts.max_write_buffer_number = 3
opts.target_file_size_base = 67108864
db = rocksdb.DB("./mydb", opts)
# Basic operations
def put_data(key, value):
"""Write to RocksDB"""
db.put(key.encode(), value.encode())
def get_data(key):
"""Read from RocksDB"""
value = db.get(key.encode())
return value.decode() if value else None
def delete_data(key):
"""Delete from RocksDB"""
db.delete(key.encode())
# Batch writes (atomic)
def batch_write(operations):
"""Batch multiple operations"""
batch = rocksdb.WriteBatch()
for op_type, key, value in operations:
if op_type == 'put':
batch.put(key.encode(), value.encode())
elif op_type == 'delete':
batch.delete(key.encode())
db.write(batch)
Why RocksDB: Write-optimized LSM tree perfect for append-heavy workloads
from datetime import datetime
import struct
# Store metrics with timestamp keys
def log_metric(metric_name, value, timestamp=None):
"""Store time-series metric"""
if timestamp is None:
timestamp = datetime.now()
# Key: metric_name + timestamp (sortable)
ts_bytes = struct.pack('>d', timestamp.timestamp())
key = f"{metric_name}:".encode() + ts_bytes
# Value: metric value
value_bytes = struct.pack('>d', value)
db.put(key, value_bytes)
def get_metrics_range(metric_name, start_time, end_time):
"""Query metrics in time range"""
start_key = f"{metric_name}:".encode() + struct.pack('>d', start_time.timestamp())
end_key = f"{metric_name}:".encode() + struct.pack('>d', end_time.timestamp())
it = db.iteritems()
it.seek(start_key)
results = []
for key, value in it:
if key > end_key:
break
# Parse timestamp and value
ts = struct.unpack('>d', key[-8:])[0]
val = struct.unpack('>d', value)[0]
results.append((datetime.fromtimestamp(ts), val))
return results
Who Uses It: Ethereum, Bitcoin Core
Why: Need to store massive state (accounts, balances, smart contracts) with fast lookups and writes
# Blockchain state storage
def store_account_balance(address, balance, block_number):
"""Store account balance at block height"""
key = f"balance:{address}:{block_number}".encode()
value = str(balance).encode()
db.put(key, value)
def get_account_balance(address, block_number=None):
"""Get account balance at specific block"""
if block_number is None:
# Get latest
prefix = f"balance:{address}:".encode()
it = db.iteritems()
it.seek_for_prev(prefix + b'\xff' * 10)
for key, value in it:
if key.startswith(prefix):
return int(value.decode())
else:
key = f"balance:{address}:{block_number}".encode()
value = db.get(key)
return int(value.decode()) if value else 0
# Transaction indexing
def index_transaction(tx_hash, tx_data):
"""Index transaction by hash"""
db.put(f"tx:{tx_hash}".encode(), tx_data)
def get_transaction(tx_hash):
"""Retrieve transaction"""
return db.get(f"tx:{tx_hash}".encode())
Who Uses It: Used internally by some graph databases
# Graph storage with RocksDB
def add_edge(from_node, to_node, edge_type, properties):
"""Add edge between nodes"""
# Store outgoing edge
out_key = f"out:{from_node}:{edge_type}:{to_node}".encode()
db.put(out_key, json.dumps(properties).encode())
# Store incoming edge (for reverse traversal)
in_key = f"in:{to_node}:{edge_type}:{from_node}".encode()
db.put(in_key, json.dumps(properties).encode())
def get_outgoing_edges(node_id, edge_type=None):
"""Get all outgoing edges from node"""
if edge_type:
prefix = f"out:{node_id}:{edge_type}:".encode()
else:
prefix = f"out:{node_id}:".encode()
it = db.iteritems()
it.seek(prefix)
edges = []
for key, value in it:
if not key.startswith(prefix):
break
# Parse edge
parts = key.decode().split(':')
edges.append({
'to': parts[-1],
'type': parts[2],
'properties': json.loads(value.decode())
})
return edges
Why RocksDB: Fast local storage, no network overhead, ACID transactions
# Application configuration store
def save_config(namespace, key, value):
"""Save configuration"""
db.put(f"config:{namespace}:{key}".encode(), value.encode())
def load_config(namespace):
"""Load all config for namespace"""
prefix = f"config:{namespace}:".encode()
it = db.iteritems()
it.seek(prefix)
config = {}
for key, value in it:
if not key.startswith(prefix):
break
config_key = key.decode().split(':', 2)[2]
config[config_key] = value.decode()
return config
# Feature flags
def set_feature_flag(flag_name, enabled):
"""Set feature flag"""
db.put(f"feature:{flag_name}".encode(), b'1' if enabled else b'0')
def is_feature_enabled(flag_name):
"""Check feature flag"""
value = db.get(f"feature:{flag_name}".encode())
return value == b'1' if value else False
| Aspect | Redis | RocksDB |
|---|---|---|
| Deployment | Standalone service (network) | Embedded library (in-process) |
| Data Size | Limited by RAM (expensive) | Limited by disk (cheap, TB+) |
| Latency | Sub-ms (network + memory) | Microseconds (no network) |
| Throughput | 100k+ ops/sec | 50k-200k writes, 100k-500k reads |
| Persistence | Optional (RDB + AOF) | Always persistent |
| Data Structures | Rich (lists, sets, sorted sets, etc.) | Key-value only |
| Sharing | Multiple clients/servers | Single process only |
| Use Cases | Caching, sessions, queues, real-time | Storage engine, time-series, logs |
etcd is a distributed, strongly consistent key-value store that provides a reliable way to store data across a cluster of machines. It uses the Raft consensus algorithm to ensure data consistency and is designed for configuration management, service discovery, and distributed coordination.
Architecture: Raft consensus (leader election), distributed cluster (3-5 nodes typical), strongly consistent (CP in CAP theorem), watch mechanism for real-time updates, lease-based TTLs.
Key Feature: Unlike Redis (in-memory, AP), etcd prioritizes consistency and is the backbone of Kubernetes and other cloud-native systems.
The Primary Use Case: etcd is the backing store for all Kubernetes cluster data
import etcd3
# Initialize etcd client
etcd = etcd3.client(host='localhost', port=2379)
# Store Kubernetes-style configuration
def store_pod_config(namespace, pod_name, config):
"""Store pod configuration in etcd (similar to K8s)"""
key = f"/registry/pods/{namespace}/{pod_name}"
etcd.put(key, json.dumps(config))
def get_pod_config(namespace, pod_name):
"""Retrieve pod configuration"""
key = f"/registry/pods/{namespace}/{pod_name}"
value, metadata = etcd.get(key)
return json.loads(value) if value else None
# Example: Store pod configuration
pod_config = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "nginx-pod",
"namespace": "default"
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:1.21"
}]
}
}
store_pod_config("default", "nginx-pod", pod_config)
# Watch for changes (K8s controllers use this)
def watch_pods(namespace):
"""Watch for pod changes in namespace"""
watch_key = f"/registry/pods/{namespace}/"
events_iterator, cancel = etcd.watch_prefix(watch_key)
for event in events_iterator:
print(f"Event: {event}")
if isinstance(event, etcd3.events.PutEvent):
print(f"Pod created/updated: {event.key}")
elif isinstance(event, etcd3.events.DeleteEvent):
print(f"Pod deleted: {event.key}")
# Example usage
# watch_pods("default") # Runs indefinitely
Why etcd: Services register themselves with TTLs. Clients watch for changes and auto-update.
import etcd3
import time
import json
from threading import Thread
# Service registration with heartbeat
class ServiceRegistry:
def __init__(self, etcd_client, service_name, instance_id, ttl=10):
self.etcd = etcd_client
self.service_name = service_name
self.instance_id = instance_id
self.ttl = ttl
self.lease = None
def register(self, host, port):
"""Register service instance with TTL"""
# Create lease with TTL
self.lease = self.etcd.lease(self.ttl)
# Store service info
key = f"/services/{self.service_name}/{self.instance_id}"
value = json.dumps({
"host": host,
"port": port,
"registered_at": time.time()
})
self.etcd.put(key, value, lease=self.lease)
print(f"Registered {self.service_name}:{self.instance_id} at {host}:{port}")
# Keep alive in background
Thread(target=self._keep_alive, daemon=True).start()
def _keep_alive(self):
"""Maintain lease with periodic refresh"""
while True:
time.sleep(self.ttl // 2)
try:
self.lease.refresh()
print(f"Heartbeat: {self.service_name}:{self.instance_id}")
except Exception as e:
print(f"Failed to refresh lease: {e}")
break
def deregister(self):
"""Remove service from registry"""
key = f"/services/{self.service_name}/{self.instance_id}"
self.etcd.delete(key)
if self.lease:
self.lease.revoke()
# Service discovery client
class ServiceDiscovery:
def __init__(self, etcd_client):
self.etcd = etcd_client
def discover(self, service_name):
"""Find all instances of a service"""
prefix = f"/services/{service_name}/"
instances = []
for value, metadata in self.etcd.get_prefix(prefix):
if value:
instance_data = json.loads(value)
instances.append(instance_data)
return instances
def watch_service(self, service_name, callback):
"""Watch for service changes"""
prefix = f"/services/{service_name}/"
events_iterator, cancel = self.etcd.watch_prefix(prefix)
for event in events_iterator:
if isinstance(event, etcd3.events.PutEvent):
instance = json.loads(event.value)
callback("added", instance)
elif isinstance(event, etcd3.events.DeleteEvent):
callback("removed", event.key)
# Example usage
etcd_client = etcd3.client()
# Register service instances
registry = ServiceRegistry(etcd_client, "api-service", "instance-1")
registry.register("10.0.1.5", 8080)
# Discover services
discovery = ServiceDiscovery(etcd_client)
instances = discovery.discover("api-service")
print(f"Found instances: {instances}")
# Watch for changes
def on_service_change(event_type, data):
print(f"Service {event_type}: {data}")
# discovery.watch_service("api-service", on_service_change)
Use Case: Leader election, mutual exclusion, coordination
import etcd3
from contextlib import contextmanager
class DistributedLock:
"""
Distributed lock using etcd
Use for:
- Leader election
- Ensuring only one process runs a task
- Coordinating work across distributed systems
"""
def __init__(self, etcd_client, lock_name, ttl=30):
self.etcd = etcd_client
self.lock_name = lock_name
self.ttl = ttl
self.lease = None
@contextmanager
def acquire(self, timeout=10):
"""
Acquire distributed lock
Uses etcd transaction to ensure atomicity:
- Only succeeds if key doesn't exist (first to acquire wins)
- Automatically releases on lease expiry
"""
lock_key = f"/locks/{self.lock_name}"
# Create lease
self.lease = self.etcd.lease(self.ttl)
# Try to acquire lock (atomic operation)
start_time = time.time()
while time.time() - start_time < timeout:
# Transaction: create key only if it doesn't exist
success = self.etcd.transaction(
compare=[
self.etcd.transactions.version(lock_key) == 0
],
success=[
self.etcd.transactions.put(
lock_key,
"locked",
lease=self.lease
)
],
failure=[]
)
if success:
print(f"Acquired lock: {self.lock_name}")
try:
yield
finally:
self._release(lock_key)
return
time.sleep(0.1)
raise TimeoutError(f"Failed to acquire lock: {self.lock_name}")
def _release(self, lock_key):
"""Release the lock"""
self.etcd.delete(lock_key)
if self.lease:
self.lease.revoke()
print(f"Released lock: {self.lock_name}")
# Example: Leader election
class LeaderElection:
"""
Leader election using etcd
Multiple processes compete, one becomes leader
If leader dies (lease expires), new election happens
"""
def __init__(self, etcd_client, election_name, instance_id):
self.etcd = etcd_client
self.election_name = election_name
self.instance_id = instance_id
def run_for_leader(self, on_elected, on_lost):
"""
Campaign for leadership
on_elected: callback when this instance becomes leader
on_lost: callback when leadership is lost
"""
lock_key = f"/elections/{self.election_name}/leader"
lease = self.etcd.lease(30)
while True:
# Try to become leader
success = self.etcd.transaction(
compare=[
self.etcd.transactions.version(lock_key) == 0
],
success=[
self.etcd.transactions.put(
lock_key,
self.instance_id,
lease=lease
)
],
failure=[]
)
if success:
print(f"{self.instance_id} became leader!")
on_elected()
# Keep refreshing lease while leader
try:
while True:
time.sleep(10)
lease.refresh()
except Exception as e:
print(f"Lost leadership: {e}")
on_lost()
else:
# Wait for current leader to expire
print(f"{self.instance_id} waiting for leader...")
time.sleep(5)
# Example usage
etcd_client = etcd3.client()
# Distributed lock for critical section
lock = DistributedLock(etcd_client, "process-data-job")
with lock.acquire():
print("Processing data... (only one instance does this)")
time.sleep(5)
print("Done processing")
# Leader election example
def do_leader_work():
"""Work that only leader should do"""
print("I am the leader, doing leader work...")
def on_lost_leadership():
"""Cleanup when leadership is lost"""
print("No longer leader, stopping work...")
election = LeaderElection(etcd_client, "main-controller", "instance-1")
# election.run_for_leader(do_leader_work, on_lost_leadership)
Use Case: Dynamic configuration updates, feature flags, application settings
import etcd3
import json
class ConfigManager:
"""
Configuration management with etcd
Features:
- Centralized config storage
- Real-time updates (watch for changes)
- Versioned configs
- Environment-specific configs
"""
def __init__(self, etcd_client, app_name, environment):
self.etcd = etcd_client
self.app_name = app_name
self.environment = environment
self.config_cache = {}
def set_config(self, key, value):
"""Set configuration value"""
full_key = f"/config/{self.app_name}/{self.environment}/{key}"
self.etcd.put(full_key, json.dumps(value))
def get_config(self, key, default=None):
"""Get configuration value"""
full_key = f"/config/{self.app_name}/{self.environment}/{key}"
# Try cache first
if key in self.config_cache:
return self.config_cache[key]
# Fetch from etcd
value, metadata = self.etcd.get(full_key)
if value:
parsed = json.loads(value)
self.config_cache[key] = parsed
return parsed
return default
def get_all_config(self):
"""Get all configuration for this app/environment"""
prefix = f"/config/{self.app_name}/{self.environment}/"
config = {}
for value, metadata in self.etcd.get_prefix(prefix):
if value:
# Extract key name from full path
key_name = metadata.key.decode().split('/')[-1]
config[key_name] = json.loads(value)
return config
def watch_config(self, callback):
"""Watch for configuration changes"""
prefix = f"/config/{self.app_name}/{self.environment}/"
events_iterator, cancel = self.etcd.watch_prefix(prefix)
for event in events_iterator:
if isinstance(event, etcd3.events.PutEvent):
key_name = event.key.decode().split('/')[-1]
new_value = json.loads(event.value)
# Update cache
self.config_cache[key_name] = new_value
# Notify application
callback(key_name, new_value)
# Example usage
etcd_client = etcd3.client()
config_mgr = ConfigManager(etcd_client, "my-api", "production")
# Set configuration
config_mgr.set_config("max_connections", 100)
config_mgr.set_config("timeout_seconds", 30)
config_mgr.set_config("feature_flags", {
"new_ui": True,
"beta_features": False
})
# Get configuration
max_conn = config_mgr.get_config("max_connections")
print(f"Max connections: {max_conn}")
# Get all config
all_config = config_mgr.get_all_config()
print(f"All config: {all_config}")
# Watch for changes (hot reload)
def on_config_change(key, new_value):
print(f"Config changed: {key} = {new_value}")
# Reload application settings
reload_app_config(key, new_value)
# config_mgr.watch_config(on_config_change)
| Aspect | etcd | Redis | Zookeeper |
|---|---|---|---|
| Consensus | Raft (easier to understand) | None (AP system) | ZAB (Zookeeper Atomic Broadcast) |
| Consistency | Linearizable (CP) | Eventual (AP) | Linearizable (CP) |
| Performance | 10k writes/s, 100k+ reads/s | 100k+ ops/s | Similar to etcd |
| API | gRPC + HTTP | RESP protocol | Custom protocol |
| Watch | Built-in, efficient | Pub/Sub, keyspace notifications | Watchers (less efficient) |
| Use Case | Config, coordination, K8s | Caching, sessions, queues | Coordination (legacy) |
| Adoption | Kubernetes, Cloud-native | Everywhere (caching) | Hadoop, legacy systems |
Best For: Complex queries, ACID transactions, relational data, JSON + full-text search
Use Cases: E-commerce orders, financial transactions, user accounts, any structured data with relationships
Performance: 10k-20k QPS, vertical scaling + read replicas
When to Use: Need ACID, complex JOINs, data integrity
Best For: Flexible schema, rapid iteration, nested documents
Use Cases: Content management, product catalogs, user profiles, IoT data
Performance: 20k-50k QPS, horizontal sharding
When to Use: Schema changes frequently, denormalized data, JSON-heavy
Best For: Time-series, IoT, write-heavy workloads, multi-DC
Use Cases: Sensor data, event logging, messaging history, activity streams
Performance: 100k+ writes/sec, linear scaling
When to Use: Massive write volume, eventual consistency OK, multi-region
Best For: Full-text search, log analytics, aggregations
Use Cases: Product search, log analysis, metrics dashboards, APM
Performance: 10k searches/sec, complex aggregations
When to Use: Need fuzzy search, text analysis, real-time analytics