Idempotency Patterns

What is Idempotency?

Definition: An operation is idempotent if performing it multiple times has the same effect as performing it once.

Why it matters:

Real-world impact without idempotency:

Why Was This Needed?

The Problem (1990s-2000s): As systems moved from monolithic architectures to distributed systems, a fundamental problem emerged: networks are unreliable. When a client sends a request and the network times out, the client can't know if:

The Turning Point (2000s):

The Solution Evolution:

Famous Incidents That Drove Adoption:

Key Insight: The shift from "exactly-once delivery" (impossible in distributed systems due to Two Generals Problem) to "at-least-once delivery + idempotent processing" was a fundamental paradigm shift. Modern distributed systems assume messages/requests will be duplicated and design for it, rather than trying to prevent it (which is mathematically impossible in the presence of network failures).

Why Multiple Patterns? Different problems need different solutions:

HTTP Method Idempotency

HTTP Method Idempotent? Why? Example
GET ✓ Yes Read-only, doesn't modify state GET /users/123 - Always returns same user
PUT ✓ Yes Replaces entire resource with same values PUT /users/123 {name: "Alice"} - Sets name to Alice (same result if repeated)
DELETE ✓ Yes Deleting same resource multiple times → still deleted DELETE /users/123 - User deleted (subsequent calls get 404, but state unchanged)
PATCH ⚠ Depends Idempotent if setting absolute values, not if incrementing PATCH {age: 30} ✓ idempotent | PATCH {age: +1} ✗ not idempotent
POST ✗ No Creates new resources each time POST /orders - Creates new order every time (unless you implement idempotency)

Implementation Patterns

1. Idempotency Keys (Client-Generated Tokens)

How it works: The client generates a unique key (UUID) for each logical operation and sends it with the request. The server stores processed keys and rejects/returns cached results for duplicate keys. The server checks if this key has been seen before: if yes, return the cached result; if no, process the request, store the result with the key, and return the result.

When to use: Payment APIs (Stripe), order creation, any POST operation that should only happen once

Python Implementation:

import uuid
from datetime import datetime, timedelta
from typing import Dict, Any, Optional

class IdempotencyStore:
    """In-memory store for idempotency keys (use Redis in production)"""
    def __init__(self, ttl_hours: int = 24):
        self.store: Dict[str, Dict[str, Any]] = {}
        self.ttl_hours = ttl_hours

    def get(self, key: str) -> Optional[Dict[str, Any]]:
        """Retrieve cached response for an idempotency key"""
        if key in self.store:
            record = self.store[key]
            # Check if expired
            if datetime.now() < record['expires_at']:
                return record['response']
            else:
                del self.store[key]
        return None

    def set(self, key: str, response: Dict[str, Any]) -> None:
        """Store response with expiration"""
        self.store[key] = {
            'response': response,
            'expires_at': datetime.now() + timedelta(hours=self.ttl_hours),
            'created_at': datetime.now()
        }

# Usage in API endpoint
idempotency_store = IdempotencyStore()

def create_payment(amount: float, idempotency_key: str) -> Dict[str, Any]:
    """
    Process a payment with idempotency protection

    How it works:
    1. Check if idempotency_key exists in store
    2. If exists, return cached response (request is duplicate)
    3. If not exists, process payment and cache result
    """
    # Check for existing result
    cached_response = idempotency_store.get(idempotency_key)
    if cached_response:
        print(f"Duplicate request detected for key {idempotency_key}")
        return cached_response

    # Process payment (simulate)
    payment_id = str(uuid.uuid4())
    response = {
        'payment_id': payment_id,
        'amount': amount,
        'status': 'completed',
        'timestamp': datetime.now().isoformat()
    }

    # Store result
    idempotency_store.set(idempotency_key, response)

    print(f"Payment processed: {payment_id}")
    return response

# Example usage
if __name__ == "__main__":
    # Client generates idempotency key
    client_key = str(uuid.uuid4())

    # First request - processes payment
    result1 = create_payment(100.00, client_key)
    print(f"First request: {result1}")

    # Retry (network timeout, user clicked twice, etc.) - returns cached result
    result2 = create_payment(100.00, client_key)
    print(f"Second request (duplicate): {result2}")

    # Verify same payment_id
    assert result1['payment_id'] == result2['payment_id']
    print("✓ Idempotency verified - same payment ID returned")
⚠ Important Considerations:
  • Key generation: Client must generate cryptographically random UUIDs
  • TTL: Keys should expire (24 hours typical) to prevent infinite storage growth
  • Storage: Use Redis or database, not in-memory (must survive server restarts)
  • Race conditions: Use database locks or SETNX in Redis to handle concurrent requests
  • Key format: Consider including client_id in key for multi-tenant systems

2. Natural Idempotency (Unique Constraints)

How it works: Use the business data itself as the uniqueness constraint. Create a unique database constraint on natural business identifiers (e.g., user_id + order_date + product_id). If the same request comes twice, the database will reject the duplicate with a constraint violation. Catch the exception and return success (since the desired state already exists).

When to use: When operations have natural unique identifiers (e.g., "user X can only have one active subscription," "order number must be unique")

Python Implementation with SQLAlchemy:

from sqlalchemy import create_engine, Column, String, Integer, Float, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
import uuid

Base = declarative_base()

class Order(Base):
    """
    Order with natural idempotency via unique constraint

    How it works: The database enforces that each customer can only have
    one order with the same external_order_id. Duplicate requests will
    fail at the database level, preventing duplicate orders.
    """
    __tablename__ = 'orders'

    id = Column(Integer, primary_key=True)
    customer_id = Column(String, nullable=False)
    external_order_id = Column(String, nullable=False)  # From external system
    product_id = Column(String, nullable=False)
    amount = Column(Float, nullable=False)

    # Natural idempotency constraint - each customer + external_order_id is unique
    __table_args__ = (
        UniqueConstraint('customer_id', 'external_order_id', name='uq_customer_order'),
    )

# Database setup
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

def create_order(customer_id: str, external_order_id: str,
                product_id: str, amount: float) -> dict:
    """
    Create an order with natural idempotency

    How it works:
    1. Try to insert order with unique constraint
    2. If IntegrityError (duplicate), query and return existing order
    3. This is idempotent because multiple calls with same data → same result
    """
    session = Session()

    try:
        order = Order(
            customer_id=customer_id,
            external_order_id=external_order_id,
            product_id=product_id,
            amount=amount
        )
        session.add(order)
        session.commit()

        result = {
            'order_id': order.id,
            'customer_id': order.customer_id,
            'external_order_id': order.external_order_id,
            'status': 'created'
        }
        print(f"Order created: {result}")
        return result

    except IntegrityError:
        session.rollback()
        # Duplicate detected - fetch existing order
        existing_order = session.query(Order).filter_by(
            customer_id=customer_id,
            external_order_id=external_order_id
        ).first()

        result = {
            'order_id': existing_order.id,
            'customer_id': existing_order.customer_id,
            'external_order_id': existing_order.external_order_id,
            'status': 'already_exists'
        }
        print(f"Duplicate order detected, returning existing: {result}")
        return result

    finally:
        session.close()

# Example usage
if __name__ == "__main__":
    customer = "customer_123"
    ext_order = "ext_order_789"

    # First request - creates order
    result1 = create_order(customer, ext_order, "product_A", 99.99)

    # Duplicate request - returns existing order
    result2 = create_order(customer, ext_order, "product_A", 99.99)

    # Verify same order_id
    assert result1['order_id'] == result2['order_id']
    print("✓ Natural idempotency verified - same order returned")
⚠ Limitations:
  • Requires natural key: Not all operations have suitable unique business identifiers
  • Partial updates: Doesn't work well if different requests might update different fields
  • Compound keys: Can get complex with multi-column unique constraints
  • Error handling: Must properly catch and handle constraint violations

3. State Machine Pattern

How it works: Track the state of an entity and only allow valid state transitions. If a request would move from state A→B but the entity is already in state B, the operation is a no-op (already done). Use database row-level locking to prevent race conditions. Only process requests that result in valid state transitions.

When to use: Order processing, workflow systems, payment processing, any system with clear states and transitions

Python Implementation:

from enum import Enum
from typing import Optional
from dataclasses import dataclass
from datetime import datetime

class OrderState(Enum):
    """Order lifecycle states"""
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    CANCELLED = "cancelled"
    REFUNDED = "refunded"

# Valid state transitions
VALID_TRANSITIONS = {
    OrderState.PENDING: [OrderState.PROCESSING, OrderState.CANCELLED],
    OrderState.PROCESSING: [OrderState.COMPLETED, OrderState.CANCELLED],
    OrderState.COMPLETED: [OrderState.REFUNDED],
    OrderState.CANCELLED: [],  # Terminal state
    OrderState.REFUNDED: []    # Terminal state
}

@dataclass
class Order:
    order_id: str
    state: OrderState
    updated_at: datetime

class OrderStateMachine:
    """
    Idempotent order state management using state machine pattern

    How it works: State transitions are idempotent because:
    - If already in target state, do nothing (return success)
    - If transition invalid, reject with error
    - Only valid transitions are processed exactly once
    """

    def __init__(self):
        self.orders = {}  # order_id -> Order (use database in production)

    def create_order(self, order_id: str) -> Order:
        """Create new order in PENDING state"""
        if order_id in self.orders:
            # Idempotent - return existing order
            print(f"Order {order_id} already exists")
            return self.orders[order_id]

        order = Order(
            order_id=order_id,
            state=OrderState.PENDING,
            updated_at=datetime.now()
        )
        self.orders[order_id] = order
        print(f"Order {order_id} created in state {order.state.value}")
        return order

    def transition(self, order_id: str, target_state: OrderState) -> dict:
        """
        Attempt state transition with idempotency

        How it works:
        1. Check if order exists
        2. If already in target state → idempotent success (already done)
        3. If transition is valid → process and update state
        4. If transition is invalid → reject with error
        """
        if order_id not in self.orders:
            return {'success': False, 'error': 'Order not found'}

        order = self.orders[order_id]
        current_state = order.state

        # IDEMPOTENCY: Already in target state
        if current_state == target_state:
            print(f"Order {order_id} already in state {target_state.value} - idempotent success")
            return {
                'success': True,
                'order_id': order_id,
                'state': current_state.value,
                'message': 'Already in target state (idempotent)'
            }

        # Validate transition
        if target_state not in VALID_TRANSITIONS.get(current_state, []):
            return {
                'success': False,
                'error': f'Invalid transition from {current_state.value} to {target_state.value}'
            }

        # Process transition
        order.state = target_state
        order.updated_at = datetime.now()
        print(f"Order {order_id} transitioned: {current_state.value} → {target_state.value}")

        return {
            'success': True,
            'order_id': order_id,
            'previous_state': current_state.value,
            'current_state': target_state.value
        }

# Example usage
if __name__ == "__main__":
    sm = OrderStateMachine()

    # Create order
    order = sm.create_order("order_123")

    # Start processing
    result1 = sm.transition("order_123", OrderState.PROCESSING)
    print(f"First transition: {result1}")

    # Retry (duplicate request) - idempotent
    result2 = sm.transition("order_123", OrderState.PROCESSING)
    print(f"Duplicate transition: {result2}")
    assert result2['message'] == 'Already in target state (idempotent)'

    # Complete order
    result3 = sm.transition("order_123", OrderState.COMPLETED)
    print(f"Complete order: {result3}")

    # Try to process completed order (invalid transition)
    result4 = sm.transition("order_123", OrderState.PROCESSING)
    print(f"Invalid transition: {result4}")

    print("\n✓ State machine idempotency verified")
⚠ Production Considerations:
  • Locking: Use SELECT FOR UPDATE or optimistic locking to prevent race conditions
  • Audit trail: Log all state transitions for debugging and compliance
  • Event sourcing: Consider storing all transition attempts, not just current state
  • Rollback: Define compensation logic for failed operations

4. Database Transactions with Conditional Updates

How it works: Use database transactions with WHERE clauses that check current state. The UPDATE only succeeds if the condition matches. If 0 rows are updated, the condition wasn't met (operation already done or invalid). Use optimistic locking with version numbers to detect concurrent modifications.

When to use: Balance updates, inventory management, any operation that modifies existing records based on current state

Python Implementation:

from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Account(Base):
    """
    Bank account with version-based optimistic locking

    How it works: The version field increments with each update.
    Updates include the expected version in the WHERE clause.
    If version doesn't match, update fails (concurrent modification detected).
    """
    __tablename__ = 'accounts'

    id = Column(Integer, primary_key=True)
    account_number = Column(String, unique=True, nullable=False)
    balance = Column(Float, nullable=False, default=0.0)
    version = Column(Integer, nullable=False, default=0)

# Database setup
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

def deposit_money(account_number: str, amount: float, transaction_id: str) -> dict:
    """
    Idempotent deposit using optimistic locking

    How it works:
    1. Read current balance and version
    2. Check if transaction already applied (stored in processed_transactions)
    3. If not, update balance WHERE version matches expected
    4. If update affects 0 rows, retry (concurrent modification)
    """
    session = Session()

    try:
        # Get current account state
        account = session.query(Account).filter_by(
            account_number=account_number
        ).first()

        if not account:
            return {'success': False, 'error': 'Account not found'}

        current_version = account.version
        current_balance = account.balance
        new_balance = current_balance + amount

        # Conditional update: only succeeds if version hasn't changed
        # This prevents race conditions and makes operation idempotent
        rows_updated = session.query(Account).filter(
            Account.account_number == account_number,
            Account.version == current_version  # Optimistic lock
        ).update({
            'balance': new_balance,
            'version': current_version + 1
        }, synchronize_session=False)

        if rows_updated == 0:
            # Version changed - concurrent modification
            session.rollback()
            return {
                'success': False,
                'error': 'Concurrent modification detected, please retry'
            }

        session.commit()

        return {
            'success': True,
            'account_number': account_number,
            'previous_balance': current_balance,
            'new_balance': new_balance,
            'version': current_version + 1
        }

    except Exception as e:
        session.rollback()
        return {'success': False, 'error': str(e)}

    finally:
        session.close()

def withdraw_money_safe(account_number: str, amount: float) -> dict:
    """
    Idempotent withdrawal with balance check

    How it works: UPDATE only executes if balance >= amount AND version matches.
    If conditions not met, 0 rows updated → operation rejected.
    This is idempotent at the SQL level.
    """
    session = Session()

    try:
        account = session.query(Account).filter_by(
            account_number=account_number
        ).first()

        if not account:
            return {'success': False, 'error': 'Account not found'}

        current_version = account.version
        current_balance = account.balance

        if current_balance < amount:
            return {'success': False, 'error': 'Insufficient funds'}

        # Conditional update with multiple conditions
        rows_updated = session.query(Account).filter(
            Account.account_number == account_number,
            Account.balance >= amount,  # Safety check
            Account.version == current_version  # Optimistic lock
        ).update({
            'balance': Account.balance - amount,
            'version': current_version + 1
        }, synchronize_session=False)

        if rows_updated == 0:
            session.rollback()
            return {'success': False, 'error': 'Concurrent modification or insufficient funds'}

        session.commit()

        new_balance = current_balance - amount
        return {
            'success': True,
            'account_number': account_number,
            'previous_balance': current_balance,
            'new_balance': new_balance,
            'amount_withdrawn': amount
        }

    except Exception as e:
        session.rollback()
        return {'success': False, 'error': str(e)}

    finally:
        session.close()

# Example usage
if __name__ == "__main__":
    session = Session()

    # Create test account
    account = Account(account_number="ACC123", balance=1000.0, version=0)
    session.add(account)
    session.commit()
    session.close()

    # Deposit money
    result1 = deposit_money("ACC123", 500.0, "txn_001")
    print(f"Deposit: {result1}")

    # Withdraw money
    result2 = withdraw_money_safe("ACC123", 200.0)
    print(f"Withdrawal: {result2}")

    # Try to overdraw
    result3 = withdraw_money_safe("ACC123", 5000.0)
    print(f"Overdraw attempt: {result3}")

    print("\n✓ Conditional update idempotency verified")

5. Request Deduplication with Message Queues

How it works: Store a hash of the message content (or message ID from queue) in a processed messages table. Before processing, check if hash/ID exists. If exists, skip processing (already done). If not exists, process and store hash/ID atomically. Use database transactions to ensure atomicity of check-and-process.

When to use: Kafka consumers, SQS message processing, webhook handlers, any at-least-once delivery system

Python Implementation:

import hashlib
import json
from datetime import datetime
from typing import Dict, Any, Optional

class ProcessedMessageStore:
    """
    Store for tracking processed messages (use database in production)

    How it works: Before processing a message, check if its ID or content hash
    has been processed. This prevents duplicate processing in at-least-once
    delivery systems.
    """
    def __init__(self):
        self.processed = {}  # message_id -> processing_result

    def is_processed(self, message_id: str) -> bool:
        """Check if message has been processed"""
        return message_id in self.processed

    def mark_processed(self, message_id: str, result: Any) -> None:
        """Mark message as processed with result"""
        self.processed[message_id] = {
            'result': result,
            'processed_at': datetime.now().isoformat()
        }

    def get_result(self, message_id: str) -> Optional[Any]:
        """Get cached result for processed message"""
        if message_id in self.processed:
            return self.processed[message_id]['result']
        return None

def compute_message_hash(message: Dict[str, Any]) -> str:
    """
    Compute deterministic hash of message content

    How it works: Convert message to JSON (sorted keys for determinism),
    then compute SHA-256 hash. Same content always produces same hash,
    enabling deduplication even if message_id not available.
    """
    # Sort keys for deterministic JSON
    message_json = json.dumps(message, sort_keys=True)
    return hashlib.sha256(message_json.encode()).hexdigest()

def process_message_idempotent(message: Dict[str, Any],
                               message_id: Optional[str] = None) -> dict:
    """
    Process message with idempotency using either message_id or content hash

    How it works:
    1. Use message_id if available (from SQS, Kafka offset, etc.)
    2. Otherwise compute content hash for deduplication
    3. Check if already processed
    4. If yes, return cached result (idempotent)
    5. If no, process and cache result
    """
    store = ProcessedMessageStore()

    # Use message_id or content hash as dedup key
    dedup_key = message_id if message_id else compute_message_hash(message)

    # Check if already processed
    if store.is_processed(dedup_key):
        cached_result = store.get_result(dedup_key)
        print(f"Message {dedup_key[:16]}... already processed (cached result)")
        return {
            'status': 'duplicate',
            'result': cached_result,
            'message': 'Already processed'
        }

    # Process message (simulate)
    print(f"Processing new message {dedup_key[:16]}...")
    result = {
        'message_id': dedup_key,
        'action': message.get('action'),
        'status': 'completed',
        'timestamp': datetime.now().isoformat()
    }

    # Perform actual business logic here
    if message.get('action') == 'send_email':
        result['email_sent'] = True
        result['recipient'] = message.get('recipient')

    # Mark as processed
    store.mark_processed(dedup_key, result)

    return {
        'status': 'processed',
        'result': result
    }

# Example: Kafka/SQS message processing
if __name__ == "__main__":
    # Simulate receiving same message twice (at-least-once delivery)
    message = {
        'action': 'send_email',
        'recipient': 'user@example.com',
        'template': 'welcome_email'
    }

    # First delivery - processes message
    result1 = process_message_idempotent(message, message_id="msg_123")
    print(f"First delivery: {result1}\n")

    # Duplicate delivery (network retry, rebalance, etc.)
    result2 = process_message_idempotent(message, message_id="msg_123")
    print(f"Second delivery (duplicate): {result2}\n")

    # Content-based deduplication (no message_id)
    result3 = process_message_idempotent(message)
    print(f"Content hash dedup: {result3}\n")

    print("✓ Message deduplication idempotency verified")
⚠ Production Considerations:
  • Atomicity: Use database transactions to check and mark processed atomically
  • TTL: Processed messages should expire to prevent unbounded growth
  • Hash collisions: Use SHA-256 or better; collisions extremely unlikely
  • Message ordering: Deduplication doesn't guarantee order; use sequence numbers if needed
  • Side effects: Ensure business logic is truly idempotent (e.g., "send email" cached as "sent")

6. Distributed Lock Pattern

How it works: Acquire a distributed lock (using Redis, ZooKeeper, or database) with the idempotency key as the lock identifier. Only the first request acquires the lock and processes. Subsequent requests fail to acquire lock or wait for first to complete. Release lock after processing and storing result. Failed processes should release locks (use TTL as safety).

When to use: Critical sections in distributed systems, preventing concurrent execution of same operation

Python Implementation with Redis:

import redis
import time
import uuid
from typing import Optional, Callable, Any

class RedisDistributedLock:
    """
    Distributed lock using Redis for idempotency

    How it works: Use Redis SETNX (set if not exists) to acquire lock.
    Only one process can acquire lock for a given key. Others must wait
    or fail. Lock has TTL to prevent deadlocks if process crashes.
    """

    def __init__(self, redis_client: redis.Redis, lock_timeout: int = 30):
        self.redis = redis_client
        self.lock_timeout = lock_timeout

    def acquire_lock(self, lock_key: str, lock_value: str) -> bool:
        """
        Acquire distributed lock

        How it works:
        - SETNX sets key only if it doesn't exist (atomic operation)
        - Returns True if lock acquired, False if already held
        - Lock expires after lock_timeout seconds (prevents deadlock)
        """
        # SET key value NX EX timeout
        # NX = only set if not exists
        # EX = expire after N seconds
        result = self.redis.set(
            lock_key,
            lock_value,
            nx=True,  # Only set if not exists
            ex=self.lock_timeout  # Expire after timeout
        )
        return result is not None

    def release_lock(self, lock_key: str, lock_value: str) -> bool:
        """
        Release distributed lock (only if we own it)

        How it works: Use Lua script to atomically check value and delete.
        This prevents releasing someone else's lock.
        """
        # Lua script for atomic check-and-delete
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(lua_script, 1, lock_key, lock_value)
        return result == 1

def process_with_distributed_lock(operation_id: str,
                                  operation: Callable[[], Any],
                                  redis_client: redis.Redis) -> dict:
    """
    Execute operation with distributed lock for idempotency

    How it works:
    1. Try to acquire lock with operation_id as key
    2. If acquired, execute operation and release lock
    3. If not acquired, operation already in progress or completed
    4. This ensures only one process executes the operation
    """
    lock_manager = RedisDistributedLock(redis_client)
    lock_key = f"lock:operation:{operation_id}"
    lock_value = str(uuid.uuid4())  # Unique value to identify our lock

    # Try to acquire lock
    if lock_manager.acquire_lock(lock_key, lock_value):
        try:
            print(f"Lock acquired for {operation_id}, executing operation...")

            # Check if result already exists (operation completed in past)
            result_key = f"result:operation:{operation_id}"
            cached_result = redis_client.get(result_key)

            if cached_result:
                print(f"Operation {operation_id} already completed (cached result)")
                return {
                    'status': 'completed',
                    'result': cached_result.decode(),
                    'source': 'cache'
                }

            # Execute operation
            result = operation()

            # Cache result (with expiration)
            redis_client.setex(result_key, 3600, str(result))  # 1 hour TTL

            print(f"Operation {operation_id} completed successfully")
            return {
                'status': 'completed',
                'result': result,
                'source': 'executed'
            }

        finally:
            # Always release lock
            lock_manager.release_lock(lock_key, lock_value)
    else:
        # Lock not acquired - operation already in progress or recently completed
        print(f"Lock not acquired for {operation_id}, checking for cached result...")

        # Wait briefly and check for result (operation might be completing)
        for _ in range(5):
            time.sleep(0.5)
            result_key = f"result:operation:{operation_id}"
            cached_result = redis_client.get(result_key)
            if cached_result:
                return {
                    'status': 'completed',
                    'result': cached_result.decode(),
                    'source': 'cache_after_wait'
                }

        return {
            'status': 'in_progress',
            'message': 'Operation currently being processed by another instance'
        }

# Example usage
if __name__ == "__main__":
    # Setup Redis (in production, use actual Redis server)
    # redis_client = redis.Redis(host='localhost', port=6379, db=0)

    # Mock Redis for demo
    class MockRedis:
        def __init__(self):
            self.data = {}

        def set(self, key, value, nx=False, ex=None):
            if nx and key in self.data:
                return None
            self.data[key] = value
            return True

        def get(self, key):
            return self.data.get(key)

        def setex(self, key, time, value):
            self.data[key] = value

        def eval(self, script, num_keys, key, value):
            if self.data.get(key) == value:
                del self.data[key]
                return 1
            return 0

    redis_client = MockRedis()

    # Define operation
    def expensive_operation():
        """Simulate expensive operation (payment processing, etc.)"""
        time.sleep(1)
        return f"payment_processed_{uuid.uuid4()}"

    # First request - acquires lock and executes
    operation_id = "payment_12345"
    result1 = process_with_distributed_lock(operation_id, expensive_operation, redis_client)
    print(f"First request: {result1}\n")

    # Duplicate request - uses cached result
    result2 = process_with_distributed_lock(operation_id, expensive_operation, redis_client)
    print(f"Duplicate request: {result2}\n")

    print("✓ Distributed lock idempotency verified")

Pattern Comparison

Pattern Complexity Storage Needed Best For Limitations
Idempotency Keys Medium High (stores all results) APIs, payment processing Requires client cooperation, storage grows
Natural Idempotency Low Low (just unique constraints) Operations with natural keys Not always available, inflexible
State Machine Medium-High Low (current state only) Workflows, order processing Requires clear state model, complex logic
Conditional Updates Medium Low (version field) Balance updates, inventory Requires careful WHERE clauses, retries
Message Deduplication Low-Medium Medium (processed IDs/hashes) Queue consumers, webhooks Doesn't prevent execution, only dedup
Distributed Locks High Low (lock state) Critical sections, singleton tasks Requires Redis/ZooKeeper, failure handling

Choosing the Right Pattern

Use Idempotency Keys when:

  • ✓ Building public APIs (Stripe-style)
  • ✓ Need to cache full response
  • ✓ Client can generate unique IDs
  • ✓ Want simple implementation
  • ✗ Storage can grow large
  • ✗ Requires client cooperation

Use Natural Idempotency when:

  • ✓ Have unique business identifiers
  • ✓ Want database-enforced guarantees
  • ✓ Simplest possible implementation
  • ✗ Not all operations have natural keys
  • ✗ Can't handle partial updates well

Use State Machine when:

  • ✓ Clear state transitions
  • ✓ Need audit trail of changes
  • ✓ Complex business workflows
  • ✗ Requires upfront state modeling
  • ✗ Can become complex with many states

Use Conditional Updates when:

  • ✓ Updating numeric values (balance, inventory)
  • ✓ Want optimistic locking
  • ✓ High concurrency scenarios
  • ✗ Requires retry logic for conflicts
  • ✗ Can have high contention

Use Message Deduplication when:

  • ✓ Processing Kafka/SQS messages
  • ✓ At-least-once delivery systems
  • ✓ Webhook handling
  • ✗ Doesn't prevent concurrent execution
  • ✗ Storage grows with processed messages

Use Distributed Locks when:

  • ✓ Critical sections must execute once
  • ✓ Singleton jobs in cluster
  • ✓ Need strong guarantees
  • ✗ Complex failure scenarios
  • ✗ Requires external service (Redis)
  • ✗ Potential for deadlocks

Real-World Examples

Stripe Payments: Idempotency Keys

# Client code
import stripe
import uuid

stripe.api_key = 'sk_test_...'

# Client generates idempotency key
idempotency_key = str(uuid.uuid4())

# Create payment charge
try:
    charge = stripe.Charge.create(
        amount=2000,
        currency='usd',
        source='tok_visa',
        description='Product purchase',
        idempotency_key=idempotency_key  # Same key = same charge
    )
except stripe.error.StripeError as e:
    # Network timeout? Retry with SAME key
    charge = stripe.Charge.create(
        amount=2000,
        currency='usd',
        source='tok_visa',
        description='Product purchase',
        idempotency_key=idempotency_key  # Same result guaranteed
    )

E-commerce Order: Natural Idempotency + State Machine

"""
Combine patterns for robust idempotency

How it works:
1. Natural idempotency via unique constraint (user_id + cart_id)
2. State machine prevents invalid transitions
3. Conditional updates with version for concurrency
"""

class Order:
    def __init__(self, user_id, cart_id):
        self.user_id = user_id
        self.cart_id = cart_id  # Unique per user
        self.state = 'pending'
        self.version = 0

    def checkout(self):
        """
        Idempotent checkout:
        - Unique constraint on user_id + cart_id
        - State machine: pending → processing only
        - Version check prevents concurrent checkouts
        """
        if self.state != 'pending':
            return {'status': 'already_processed', 'order_id': self.id}

        # Process payment, update inventory, etc.
        self.state = 'processing'
        self.version += 1

        return {'status': 'success', 'order_id': self.id}

⚠ Common Pitfalls

  1. Non-idempotent side effects: Sending emails, webhooks, external API calls must be handled carefully
  2. Partial failures: Database updated but external API call failed - need compensation logic
  3. Race conditions: Two requests with same key arrive simultaneously - need atomic operations
  4. TTL too short: Idempotency key expires before retry window - can cause duplicates
  5. Forgetting to include amounts: Same key with different amounts should be rejected
  6. Not persisting state: In-memory stores lose data on restart - use database/Redis

Best Practices

  1. Always validate key uniqueness: UUID v4 is recommended; don't use sequential IDs
  2. Include request parameters in validation: Same key with different amounts = error
  3. Set appropriate TTLs: 24 hours typical; consider retry windows
  4. Use database transactions: Check and process atomically
  5. Return same response: Cached responses must match original exactly
  6. Handle concurrent requests: Use SELECT FOR UPDATE or Redis SETNX
  7. Log idempotency hits: Track duplicate requests for monitoring
  8. Document behavior: Clear API docs on idempotency guarantees
  9. Test edge cases: Concurrent requests, expired keys, partial failures
  10. Monitor performance: Idempotency checks add latency - optimize lookups

📺 Video Resources

Recommended YouTube Channels & Videos

Idempotency & Distributed Systems

Related System Design Concepts