Database Internals - Deep Dive

B-Trees, MVCC, WAL, Replication - How databases really work

Back to Study Guide
Purpose: Understand how production databases like PostgreSQL, MySQL, and MongoDB work internally. Essential knowledge for Distinguished Engineers designing data-intensive systems.

1. B-Tree and B+ Tree - Database Indexes

The fundamental data structure behind virtually all database indexes.

Why B-Trees?

Problem: Binary search trees are O(log n) for RAM, but databases live on disk.
Disk Access: ~10ms per seek vs ~100ns for RAM = 100,000x slower!
Solution: B-Trees minimize disk seeks by storing many keys per node.

B-Tree Properties

graph TD Root["[50, 100]"] --> N1["[20, 30, 40]"] Root --> N2["[60, 70, 80]"] Root --> N3["[110, 120]"] N1 --> L1["10-19"] N1 --> L2["20-29"] N1 --> L3["30-39"] N1 --> L4["40-49"] N2 --> L5["50-59"] N2 --> L6["60-69"] N2 --> L7["70-79"] N2 --> L8["80-99"] N3 --> L9["100-109"] N3 --> L10["110-119"] N3 --> L11["120+"] style Root fill:#5e81ac,color:#fff style N1 fill:#a3be8c,color:#2e3440 style N2 fill:#a3be8c,color:#2e3440 style N3 fill:#a3be8c,color:#2e3440

B+ Tree (Most Common)

Difference from B-Tree:
- All data stored in leaf nodes (internal nodes only have keys)
- Leaf nodes linked together (enables range scans)
- Used by: PostgreSQL, MySQL InnoDB, SQLite
from typing import Optional, List, Any
from dataclasses import dataclass

@dataclass
class BTreeNode:
    """B+ Tree node for database index"""
    keys: List[int]
    children: List['BTreeNode']
    values: List[Any]  # Only populated in leaf nodes
    is_leaf: bool
    next_leaf: Optional['BTreeNode'] = None  # Link to next leaf (for range scans)

class BPlusTree:
    """
    B+ Tree implementation for database indexing.

    Properties:
    - All data in leaf nodes
    - Internal nodes only store keys for navigation
    - Leaf nodes linked (enables efficient range queries)

    Time Complexity:
    - Search: O(log n)
    - Insert: O(log n)
    - Range query: O(log n + k) where k = results

    Space: O(n)

    Real-world: PostgreSQL uses B+ Trees for all indexes
    """

    def __init__(self, order: int = 4):
        """
        order: Maximum number of children per node

        For disk: order = page_size / (key_size + pointer_size)
        Example: 4KB page, 8-byte key, 8-byte pointer = 4096/16 = 256 children
        """
        self.order = order
        self.root = BTreeNode(keys=[], children=[], values=[], is_leaf=True)

    def search(self, key: int) -> Optional[Any]:
        """Search for key in B+ Tree"""
        return self._search_recursive(self.root, key)

    def _search_recursive(self, node: BTreeNode, key: int) -> Optional[Any]:
        """Recursive search"""
        if node.is_leaf:
            # Leaf node: linear search in keys
            for i, k in enumerate(node.keys):
                if k == key:
                    return node.values[i]
            return None

        # Internal node: find correct child
        i = 0
        while i < len(node.keys) and key >= node.keys[i]:
            i += 1

        return self._search_recursive(node.children[i], key)

    def range_query(self, start_key: int, end_key: int) -> List[tuple]:
        """
        Range query: Find all keys in [start_key, end_key].

        This is why databases love B+ Trees!
        1. Navigate to start_key (O(log n))
        2. Follow leaf links until end_key (O(k))
        """
        results = []

        # Find starting leaf
        leaf = self._find_leaf(self.root, start_key)

        # Scan through linked leaves
        while leaf:
            for i, key in enumerate(leaf.keys):
                if start_key <= key <= end_key:
                    results.append((key, leaf.values[i]))
                elif key > end_key:
                    return results  # Done

            leaf = leaf.next_leaf  # Move to next leaf

        return results

    def _find_leaf(self, node: BTreeNode, key: int) -> BTreeNode:
        """Navigate to leaf that would contain key"""
        if node.is_leaf:
            return node

        # Find correct child
        i = 0
        while i < len(node.keys) and key >= node.keys[i]:
            i += 1

        return self._find_leaf(node.children[i], key)

    def insert(self, key: int, value: Any):
        """Insert key-value pair"""
        root = self.root

        # If root is full, split it
        if len(root.keys) >= self.order - 1:
            new_root = BTreeNode(keys=[], children=[root], values=[], is_leaf=False)
            self._split_child(new_root, 0)
            self.root = new_root

        self._insert_non_full(self.root, key, value)

    def _insert_non_full(self, node: BTreeNode, key: int, value: Any):
        """Insert into node that's not full"""
        if node.is_leaf:
            # Insert into sorted position
            i = 0
            while i < len(node.keys) and key > node.keys[i]:
                i += 1

            node.keys.insert(i, key)
            node.values.insert(i, value)
        else:
            # Find child to insert into
            i = 0
            while i < len(node.keys) and key > node.keys[i]:
                i += 1

            # Split child if full
            if len(node.children[i].keys) >= self.order - 1:
                self._split_child(node, i)
                if key > node.keys[i]:
                    i += 1

            self._insert_non_full(node.children[i], key, value)

    def _split_child(self, parent: BTreeNode, index: int):
        """Split full child node"""
        order = self.order
        child = parent.children[index]
        mid = (order - 1) // 2

        # Create new node with right half
        new_node = BTreeNode(
            keys=child.keys[mid+1:],
            children=child.children[mid+1:] if not child.is_leaf else [],
            values=child.values[mid+1:] if child.is_leaf else [],
            is_leaf=child.is_leaf
        )

        # Update links for leaf nodes
        if child.is_leaf:
            new_node.next_leaf = child.next_leaf
            child.next_leaf = new_node

        # Keep left half in original node
        child.keys = child.keys[:mid]
        child.children = child.children[:mid+1] if not child.is_leaf else []
        child.values = child.values[:mid] if child.is_leaf else []

        # Insert middle key into parent
        parent.keys.insert(index, child.keys[mid] if not child.is_leaf else new_node.keys[0])
        parent.children.insert(index + 1, new_node)

# Example: Database index simulation
print("=== B+ Tree Example ===")
btree = BPlusTree(order=4)

# Insert records (simulating database rows)
data = [(10, "User A"), (20, "User B"), (5, "User C"),
        (15, "User D"), (25, "User E"), (30, "User F")]

for key, value in data:
    btree.insert(key, value)

# Point query
print("Search for key 15:")
result = btree.search(15)
print(f"  Found: {result}")

# Range query (this is what makes B+ Trees perfect for databases!)
print("\nRange query [10, 25]:")
results = btree.range_query(10, 25)
for key, value in results:
    print(f"  Key {key}: {value}")
B+ Tree vs Hash Index:
Operation B+ Tree Hash Index
Point query O(log n) O(1)
Range query O(log n + k) O(n) - must scan all!
ORDER BY Free (already sorted) O(n log n)
Prefix search Supported Not supported
Winner: B+ Tree for databases (range queries essential)

2. MVCC - Multi-Version Concurrency Control

How PostgreSQL allows readers and writers to work simultaneously without blocking.

The Problem: Lock-Based Concurrency

Traditional Approach (MySQL MyISAM):
- Writers block readers
- Readers block writers
- Result: Poor concurrency, lots of waiting

MVCC Solution

Key Idea: Keep multiple versions of each row.
- Writers create new versions (don't modify in-place)
- Readers see snapshot at their transaction start time
- No locking between readers and writers!
sequenceDiagram participant T1 as Transaction 1 (xid=100) participant T2 as Transaction 2 (xid=101) participant DB as Database T1->>DB: BEGIN (snapshot: xid=100) T1->>DB: SELECT balance WHERE id=1 DB->>T1: balance=1000 (xmin=95, xmax=inf) T2->>DB: BEGIN (snapshot: xid=101) T2->>DB: UPDATE balance=500 WHERE id=1 Note right of DB: Creates new version:
xmin=101, xmax=inf
Old version: xmin=95, xmax=101 T2->>DB: COMMIT T1->>DB: SELECT balance WHERE id=1 DB->>T1: balance=1000 (still sees old version!) Note right of T1: T1 started at xid=100,
can't see xid=101 T1->>DB: COMMIT
from typing import Optional, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class RowVersion:
    """A single version of a database row"""
    data: dict
    xmin: int  # Transaction ID that created this version
    xmax: int  # Transaction ID that deleted/updated this version (inf if current)

class MVCCTable:
    """
    Simplified MVCC implementation (PostgreSQL-style).

    Key concepts:
    - Each row can have multiple versions
    - Each version tagged with creating transaction (xmin)
    - When updated, old version marked with deleting transaction (xmax)
    - Transactions see snapshot based on their start time

    This is how PostgreSQL achieves high concurrency!
    """

    def __init__(self):
        # row_id -> list of versions (sorted by xmin)
        self.versions: dict[int, List[RowVersion]] = {}
        self.next_xid = 1  # Transaction ID counter
        self.active_transactions: dict[int, int] = {}  # xid -> snapshot_xid

    def begin_transaction(self) -> int:
        """Start a new transaction, return transaction ID"""
        xid = self.next_xid
        self.next_xid += 1

        # Snapshot = all transactions < this xid are "in the past"
        self.active_transactions[xid] = xid

        print(f"[TX {xid}] BEGIN (snapshot: xid < {xid})")
        return xid

    def commit_transaction(self, xid: int):
        """Commit transaction"""
        if xid in self.active_transactions:
            del self.active_transactions[xid]
            print(f"[TX {xid}] COMMIT")

    def insert(self, xid: int, row_id: int, data: dict):
        """Insert new row"""
        version = RowVersion(data=data, xmin=xid, xmax=float('inf'))

        if row_id not in self.versions:
            self.versions[row_id] = []

        self.versions[row_id].append(version)
        print(f"[TX {xid}] INSERT row {row_id}: {data}")

    def update(self, xid: int, row_id: int, new_data: dict):
        """
        Update row (MVCC-style).

        Process:
        1. Mark current version as deleted (set xmax)
        2. Insert new version (with xmin = current xid)
        """
        if row_id not in self.versions:
            print(f"[TX {xid}] ERROR: Row {row_id} not found")
            return

        # Find current version (xmax = inf)
        for version in self.versions[row_id]:
            if version.xmax == float('inf'):
                # Mark as deleted by this transaction
                version.xmax = xid
                break

        # Create new version
        new_version = RowVersion(data=new_data, xmin=xid, xmax=float('inf'))
        self.versions[row_id].append(new_version)

        print(f"[TX {xid}] UPDATE row {row_id}: {new_data}")

    def select(self, xid: int, row_id: int) -> Optional[dict]:
        """
        Select row visible to this transaction.

        Visibility rules:
        - xmin < snapshot_xid (created before our snapshot)
        - xmax > snapshot_xid or xmax = inf (not deleted in our snapshot)
        """
        if row_id not in self.versions:
            return None

        snapshot_xid = self.active_transactions.get(xid, xid)

        # Find visible version
        for version in self.versions[row_id]:
            # Created before our snapshot?
            if version.xmin >= snapshot_xid:
                continue

            # Still alive in our snapshot?
            if version.xmax == float('inf') or version.xmax >= snapshot_xid:
                print(f"[TX {xid}] SELECT row {row_id}: {version.data} " +
                      f"(xmin={version.xmin}, xmax={version.xmax})")
                return version.data

        print(f"[TX {xid}] SELECT row {row_id}: NOT FOUND (no visible version)")
        return None

# Example: PostgreSQL-style MVCC
print("\n=== MVCC Example ===")
table = MVCCTable()

# Initial data
tx0 = table.begin_transaction()
table.insert(tx0, row_id=1, data={"balance": 1000})
table.commit_transaction(tx0)

# Transaction 1: Long-running query
tx1 = table.begin_transaction()
result = table.select(tx1, row_id=1)
print(f"  -> Balance: {result['balance']}\n")

# Transaction 2: Update balance
tx2 = table.begin_transaction()
table.update(tx2, row_id=1, new_data={"balance": 500})
table.commit_transaction(tx2)
print()

# Transaction 1: Still sees old version!
result = table.select(tx1, row_id=1)
print(f"  -> Balance: {result['balance']} (MVCC isolation!)\n")
table.commit_transaction(tx1)

# Transaction 3: Sees new version
tx3 = table.begin_transaction()
result = table.select(tx3, row_id=1)
print(f"  -> Balance: {result['balance']}")
table.commit_transaction(tx3)
MVCC Benefits:
Readers never block writers
Writers never block readers
Consistent snapshots (repeatable reads)
High concurrency

Cost:
Multiple versions consume space
Need VACUUM to clean up old versions (PostgreSQL)
Write amplification (creates new version on every update)

3. Write-Ahead Logging (WAL)

How databases ensure durability - your data survives crashes.

The Durability Problem

Naive Approach: Write directly to data files
Problem: Crash mid-write leads to corrupted data!
Example: Transfer $100 (debit account A, credit account B)
Crash after debit before credit leads to $100 vanishing!

WAL Solution

Rule: Write to log BEFORE modifying data
1. Write change to append-only log (fast, sequential)
2. Acknowledge to client ("committed")
3. Apply changes to data files (async, can replay from log if crash)
import os
import json
from typing import Dict, Any
from dataclasses import dataclass, asdict

@dataclass
class WALEntry:
    """Write-Ahead Log entry"""
    lsn: int  # Log Sequence Number (monotonically increasing)
    operation: str  # "INSERT", "UPDATE", "DELETE"
    table: str
    row_id: int
    data: dict

class WriteAheadLog:
    """
    Simplified Write-Ahead Logging (PostgreSQL-style).

    Properties:
    - All changes written to log BEFORE data files
    - Log is append-only (fast, sequential writes)
    - Can replay log to recover from crashes
    - Periodic checkpoints flush dirty pages to disk

    This is how databases guarantee durability (the D in ACID)!
    """

    def __init__(self, log_file: str = "wal.log", data_file: str = "data.json"):
        self.log_file = log_file
        self.data_file = data_file
        self.lsn = 0  # Log Sequence Number
        self.data: Dict[str, Dict[int, dict]] = {}  # table -> row_id -> data
        self.dirty_pages = set()  # Pages modified but not flushed to disk

        # Load existing data
        if os.path.exists(data_file):
            with open(data_file, 'r') as f:
                self.data = json.load(f)

        # Replay WAL if exists (crash recovery)
        if os.path.exists(log_file):
            print("=== Crash Recovery: Replaying WAL ===")
            self._replay_wal()

    def _write_log(self, entry: WALEntry):
        """Append entry to WAL (durable write)"""
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(asdict(entry)) + '\n')
            f.flush()  # Force OS to write to disk
            os.fsync(f.fileno())  # Ensure disk write completed

        print(f"[WAL] LSN={entry.lsn} {entry.operation} {entry.table}#{entry.row_id}")

    def _apply_to_memory(self, entry: WALEntry):
        """Apply change to in-memory data"""
        table = entry.table
        row_id = entry.row_id

        if table not in self.data:
            self.data[table] = {}

        if entry.operation == "INSERT" or entry.operation == "UPDATE":
            self.data[table][row_id] = entry.data
        elif entry.operation == "DELETE":
            if row_id in self.data[table]:
                del self.data[table][row_id]

        self.dirty_pages.add((table, row_id))

    def insert(self, table: str, row_id: int, data: dict):
        """Insert with WAL"""
        # 1. Write to WAL first (MUST complete before acknowledging)
        entry = WALEntry(lsn=self.lsn, operation="INSERT",
                        table=table, row_id=row_id, data=data)
        self.lsn += 1
        self._write_log(entry)

        # 2. Apply to in-memory data
        self._apply_to_memory(entry)

        print(f"  -> Insert committed (durable via WAL)")

    def update(self, table: str, row_id: int, data: dict):
        """Update with WAL"""
        entry = WALEntry(lsn=self.lsn, operation="UPDATE",
                        table=table, row_id=row_id, data=data)
        self.lsn += 1
        self._write_log(entry)
        self._apply_to_memory(entry)
        print(f"  -> Update committed (durable via WAL)")

    def checkpoint(self):
        """
        Checkpoint: Flush dirty pages to disk.

        After checkpoint, can truncate WAL (recovery only needs changes since last checkpoint).
        """
        print("\n=== Checkpoint: Flushing dirty pages to disk ===")

        # Write data file
        with open(self.data_file, 'w') as f:
            json.dump(self.data, f, indent=2)
            f.flush()
            os.fsync(f.fileno())

        self.dirty_pages.clear()

        # In real DB: can now truncate WAL (keep only recent entries)
        print(f"  -> Checkpoint complete. Data file synced to disk.\n")

    def _replay_wal(self):
        """Replay WAL for crash recovery"""
        with open(self.log_file, 'r') as f:
            for line in f:
                entry_dict = json.loads(line.strip())
                entry = WALEntry(**entry_dict)

                print(f"  Replaying LSN={entry.lsn} {entry.operation}")
                self._apply_to_memory(entry)
                self.lsn = max(self.lsn, entry.lsn + 1)

        print(f"  -> Recovery complete. Replayed to LSN={self.lsn}\n")

# Example: WAL in action
print("\n=== Write-Ahead Logging Example ===")

# Clean up old files
for f in ["wal.log", "data.json"]:
    if os.path.exists(f):
        os.remove(f)

db = WriteAheadLog()

# Transactions
db.insert("users", 1, {"name": "Alice", "balance": 1000})
db.insert("users", 2, {"name": "Bob", "balance": 500})

# Checkpoint
db.checkpoint()

# More changes
db.update("users", 1, {"name": "Alice", "balance": 900})
db.update("users", 2, {"name": "Bob", "balance": 600})

# Simulate crash (no checkpoint!)
print("=== Simulating crash... ===\n")

# Restart database - WAL replay will restore state
db2 = WriteAheadLog()  # Automatically replays WAL

print("After recovery:")
print(f"  User 1: {db2.data['users'][1]}")
print(f"  User 2: {db2.data['users'][2]}")

# Clean up
os.remove("wal.log")
os.remove("data.json")
WAL Performance:
- Append-only writes: Sequential I/O ~100MB/s (vs random ~5MB/s)
- Fsync every commit: ~1000 commits/sec (can batch for higher throughput)
- Checkpoint interval: Balance between recovery time and write overhead
- PostgreSQL: Checkpoints every 5 minutes or when WAL reaches size limit

4. Database Replication

How databases achieve high availability and read scalability.

Replication Strategies

Strategy Description Use Case Consistency
Streaming Replication Stream WAL from primary to replicas PostgreSQL, MySQL Async (eventual) or Sync (strong)
Logical Replication Replicate logical changes (SQL statements) Cross-version, selective tables Eventual
Statement-Based Replay SQL statements on replicas MySQL (old) Can diverge (non-deterministic functions)
Row-Based Replicate actual row changes MySQL (default now) Exact replication
import time
import threading
from typing import List
from queue import Queue

class ReplicationLog:
    """
    Simplified database replication (PostgreSQL-style streaming replication).

    Architecture:
    - Primary: Accepts writes, appends to WAL
    - Replicas: Stream WAL from primary, apply changes

    Modes:
    - Async: Primary doesn't wait for replica acknowledgment (fast, eventual consistency)
    - Sync: Primary waits for replica (slower, strong consistency)
    """

    def __init__(self, node_id: str, is_primary: bool = False):
        self.node_id = node_id
        self.is_primary = is_primary
        self.wal: List[dict] = []
        self.data: dict = {}
        self.lsn = 0
        self.replica_queues: List[Queue] = []  # For async replication

    def add_replica(self, replica_queue: Queue):
        """Register a replica to receive WAL stream"""
        self.replica_queues.append(replica_queue)

    def write(self, key: str, value: Any, sync: bool = False):
        """
        Write to primary (with replication).

        sync=False: Async replication (don't wait for replicas)
        sync=True: Sync replication (wait for at least 1 replica)
        """
        if not self.is_primary:
            print(f"[{self.node_id}] ERROR: Cannot write to replica")
            return

        # Create WAL entry
        entry = {
            "lsn": self.lsn,
            "operation": "UPDATE",
            "key": key,
            "value": value,
            "timestamp": time.time()
        }
        self.lsn += 1

        # Apply locally
        self.wal.append(entry)
        self.data[key] = value

        print(f"[{self.node_id}] WRITE {key}={value} (LSN={entry['lsn']})")

        # Replicate to followers
        for queue in self.replica_queues:
            queue.put(entry)

        if sync and self.replica_queues:
            # Sync mode: wait for acknowledgment (simplified - wait 100ms)
            time.sleep(0.1)
            print(f"  -> Sync replication: Waited for replica acknowledgment")

    def read(self, key: str) -> Any:
        """Read from node (can be primary or replica)"""
        value = self.data.get(key, "NOT_FOUND")
        print(f"[{self.node_id}] READ {key}={value}")
        return value

    def start_replication(self, primary_queue: Queue):
        """Start replication thread (for replica nodes)"""
        if self.is_primary:
            return

        def replication_loop():
            while True:
                entry = primary_queue.get()

                # Apply WAL entry
                self.wal.append(entry)
                self.data[entry['key']] = entry['value']
                self.lsn = entry['lsn'] + 1

                print(f"[{self.node_id}] REPLICATED {entry['key']}={entry['value']} (LSN={entry['lsn']})")

        thread = threading.Thread(target=replication_loop, daemon=True)
        thread.start()

# Example: PostgreSQL-style streaming replication
print("\n=== Database Replication Example ===")

# Create primary and 2 replicas
primary = ReplicationLog("PRIMARY", is_primary=True)
replica1 = ReplicationLog("REPLICA-1")
replica2 = ReplicationLog("REPLICA-2")

# Setup replication streams
queue1 = Queue()
queue2 = Queue()
primary.add_replica(queue1)
primary.add_replica(queue2)

replica1.start_replication(queue1)
replica2.start_replication(queue2)

# Write to primary
primary.write("user:1:balance", 1000)
primary.write("user:2:balance", 500)

# Wait for replication
time.sleep(0.2)

# Read from replica (eventual consistency)
print("\n--- Reading from replica ---")
replica1.read("user:1:balance")
replica2.read("user:2:balance")

# Update on primary
print("\n--- Update on primary ---")
primary.write("user:1:balance", 900)

time.sleep(0.2)

# Read from replicas (should see update)
print("\n--- Reading updated value from replicas ---")
replica1.read("user:1:balance")
replica2.read("user:1:balance")
Replication Trade-offs:

Async Replication (Default):
Fast writes (no waiting)
High availability (primary doesn't depend on replicas)
Replication lag (replicas may be behind)
Data loss possible (if primary crashes before replication)

Sync Replication:
Strong consistency (replica always up-to-date)
No data loss (committed = replicated)
Slower writes (wait for network + replica)
Availability impact (primary blocked if replica down)

Key Takeaways

B+ Trees: Minimize disk I/O, enable efficient range queries
MVCC: Readers and writers don't block each other - high concurrency
WAL: Write to log first - durability despite crashes
Replication: Async for performance, sync for consistency
Back to Study Guide