Database Design & Optimization

Normalization, Indexing, Query Optimization, and Performance Tuning

1. Database Normalization

1.1 Normal Forms Overview

Normal Form Rule Purpose
1NF Atomic values, no repeating groups Eliminate duplicate columns and ensure atomic values
2NF 1NF + No partial dependencies Remove subsets of data that apply to multiple rows
3NF 2NF + No transitive dependencies Remove columns not dependent on primary key
BCNF 3NF + Every determinant is a candidate key Handle anomalies not covered by 3NF

Example: Denormalized to 3NF

from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum

# ❌ Denormalized - Poor Design
class DenormalizedOrder:
    """
    Problems:
    - Repeating customer info for each order
    - No atomic values (multiple items in one field)
    - Update anomalies (customer info duplicated)
    """
    def __init__(self, order_id, customer_name, customer_email,
                 customer_address, items_string, total):
        self.order_id = order_id
        self.customer_name = customer_name
        self.customer_email = customer_email
        self.customer_address = customer_address
        self.items = items_string  # "Laptop,Mouse,Keyboard"
        self.total = total

# ✅ Normalized to 3NF
@dataclass
class Customer:
    """1NF: Atomic values, unique identifier"""
    customer_id: int
    name: str
    email: str
    address: str
    created_at: datetime

@dataclass
class Product:
    """1NF: Each product is a separate entity"""
    product_id: int
    name: str
    description: str
    price: float
    stock: int

@dataclass
class Order:
    """
    2NF: No partial dependencies
    - All attributes depend on entire primary key (order_id)
    - customer_id is foreign key, not duplicated customer data
    """
    order_id: int
    customer_id: int  # Foreign key
    order_date: datetime
    total: float
    status: str

@dataclass
class OrderItem:
    """
    3NF: No transitive dependencies
    - Depends only on (order_id, product_id) composite key
    - product_price stored at order time (historical snapshot)
    - quantity is order-specific
    """
    order_item_id: int
    order_id: int     # Foreign key
    product_id: int   # Foreign key
    quantity: int
    price_at_order: float  # Snapshot of price when ordered


# Database operations with normalized schema
class DatabaseOperations:
    def __init__(self, db_connection):
        self.conn = db_connection

    def create_order(self, customer_id: int, items: List[tuple]) -> int:
        """
        Create order with proper normalization
        items: List of (product_id, quantity) tuples
        """
        cursor = self.conn.cursor()

        # Calculate total from current product prices
        total = 0
        for product_id, quantity in items:
            cursor.execute(
                "SELECT price FROM products WHERE product_id = %s",
                (product_id,)
            )
            price = cursor.fetchone()[0]
            total += price * quantity

        # Insert into orders table
        cursor.execute("""
            INSERT INTO orders (customer_id, order_date, total, status)
            VALUES (%s, %s, %s, %s)
            RETURNING order_id
        """, (customer_id, datetime.now(), total, 'pending'))

        order_id = cursor.fetchone()[0]

        # Insert into order_items table
        for product_id, quantity in items:
            cursor.execute(
                "SELECT price FROM products WHERE product_id = %s",
                (product_id,)
            )
            price_at_order = cursor.fetchone()[0]

            cursor.execute("""
                INSERT INTO order_items
                (order_id, product_id, quantity, price_at_order)
                VALUES (%s, %s, %s, %s)
            """, (order_id, product_id, quantity, price_at_order))

        self.conn.commit()
        return order_id

    def get_order_details(self, order_id: int):
        """
        Retrieve order with JOIN across normalized tables
        """
        cursor = self.conn.cursor()

        query = """
        SELECT
            o.order_id,
            o.order_date,
            o.total,
            o.status,
            c.name AS customer_name,
            c.email AS customer_email,
            p.name AS product_name,
            oi.quantity,
            oi.price_at_order
        FROM orders o
        JOIN customers c ON o.customer_id = c.customer_id
        JOIN order_items oi ON o.order_id = oi.order_id
        JOIN products p ON oi.product_id = p.product_id
        WHERE o.order_id = %s
        """

        cursor.execute(query, (order_id,))
        return cursor.fetchall()
Benefits of Normalization:

1.2 When to Denormalize

Strategic Denormalization for Performance:
class DenormalizationStrategies:
    """
    Sometimes breaking normalization rules improves performance
    Use cases:
    1. Read-heavy workloads (avoid expensive JOINs)
    2. Reporting/Analytics tables
    3. Caching computed values
    4. Historical snapshots
    """

    def create_order_summary_table(self):
        """
        Denormalized table for fast reporting
        Stores aggregated data to avoid real-time calculations
        """
        query = """
        CREATE TABLE order_summaries (
            customer_id INT,
            customer_name VARCHAR(255),      -- Denormalized
            customer_email VARCHAR(255),     -- Denormalized
            total_orders INT,                -- Computed
            total_spent DECIMAL(10,2),       -- Computed
            last_order_date TIMESTAMP,       -- Computed
            avg_order_value DECIMAL(10,2),   -- Computed
            PRIMARY KEY (customer_id)
        );

        -- Materialized view or maintained by triggers
        -- Updated when orders change
        """
        return query

    def create_product_stats_table(self):
        """
        Store computed statistics for fast access
        Updated periodically (e.g., hourly cron job)
        """
        query = """
        CREATE TABLE product_statistics (
            product_id INT PRIMARY KEY,
            product_name VARCHAR(255),       -- Denormalized
            total_sold INT,                  -- Computed
            revenue DECIMAL(12,2),           -- Computed
            avg_rating DECIMAL(3,2),         -- Computed
            last_updated TIMESTAMP
        );
        """
        return query

    def cache_computed_column(self):
        """
        Add computed column to avoid repeated calculations
        Trade-off: Write performance for read performance
        """
        query = """
        ALTER TABLE orders
        ADD COLUMN item_count INT;

        -- Maintained by trigger
        CREATE TRIGGER update_item_count
        AFTER INSERT OR UPDATE OR DELETE ON order_items
        FOR EACH ROW
        EXECUTE FUNCTION update_order_item_count();
        """
        return query

2. Database Indexes: Complete Guide

What is an Index?

A database index is a separate data structure that stores a subset of table data in a sorted, searchable format. Think of it like a book's index - instead of scanning every page (table scan), you look up the topic in the index and jump directly to the relevant pages.

Key Insight: Indexes trade disk space and write performance for dramatically faster reads.

2.1 How Indexes Work: B-Tree Deep Dive

B-Tree Structure (Default Index Type)

B-Tree (Balanced Tree) is the default index type in PostgreSQL, MySQL, and most databases. Here's why it's so effective:

B-Tree for: SELECT * FROM users WHERE user_id = 12345

                    [10000, 20000, 30000]          ← Root (Level 1)
                   /         |           \
        [5K,7.5K,10K]  [15K,17.5K,20K]  [25K,27.5K,30K]  ← Internal (Level 2)
         /    |    \      /    |    \      /    |    \
    [1-5K] [5-7.5K] ... [10-15K] ...  [27.5-30K] ...  ← Leaf nodes (Level 3)
      ↓       ↓              ↓                          ← Point to table rows

Searching for user_id = 12345:
1. Root: 12345 < 20000, go left
2. Internal: 12345 > 10000, go middle
3. Leaf: Found! Jump to row location

Comparisons: 3 (log₂(1,000,000) ≈ 20 for 1M rows)
Without index: ~500,000 comparisons (table scan)
            

Why B-Trees Are Fast

2.2 Index Selectivity & Cardinality

Not All Columns Make Good Indexes!

Cardinality = Number of unique values in a column

Selectivity = Cardinality / Total Rows

Column Cardinality Selectivity Index Worth It? Reason
user_id (PK) 1,000,000 100% ✅ Excellent Every lookup finds ~1 row
email 950,000 95% ✅ Excellent High uniqueness, frequent lookups
last_name 50,000 5% ✅ Good Lookup finds ~20 rows, much better than scan
country 200 0.02% ⚠️ Maybe Lookup finds ~5,000 rows, marginal benefit
gender 3 0.0003% ❌ Poor Lookup finds ~333,333 rows, nearly a table scan!
is_active 2 0.0002% ❌ Very Poor Returns 50% of table, slower than scan + filter
Rule of Thumb:

When Low Cardinality Indexes Work: Partial Indexes

-- ❌ BAD: Full index on low-cardinality column
CREATE INDEX idx_orders_status ON orders(status);
-- Problem: If 95% of orders are 'completed', index doesn't help much

-- ✅ GOOD: Partial index on minority status
CREATE INDEX idx_active_orders ON orders(customer_id, status)
WHERE status IN ('pending', 'processing', 'shipping');
-- Only indexes 5% of rows, fast for active order queries

-- Example query that benefits:
SELECT * FROM orders
WHERE customer_id = 12345
AND status = 'pending';
-- Uses partial index, scans only ~50 rows instead of 1M

2.3 Composite Indexes & The Leftmost Prefix Rule

Critical Concept: Column Order Matters!

A composite index on (A, B, C) is like a phone book sorted by (Last Name, First Name, City):

-- Example: E-commerce orders table
CREATE INDEX idx_orders_composite ON orders(customer_id, status, order_date);

-- ✅ Uses index efficiently:
SELECT * FROM orders WHERE customer_id = 123;
-- Uses leftmost column

SELECT * FROM orders WHERE customer_id = 123 AND status = 'shipped';
-- Uses first 2 columns

SELECT * FROM orders
WHERE customer_id = 123 AND status = 'shipped' AND order_date > '2024-01-01';
-- Uses all 3 columns (perfect!)

-- ⚠️ Uses index inefficiently:
SELECT * FROM orders WHERE status = 'shipped';
-- Can't use index, must scan all rows

SELECT * FROM orders WHERE order_date > '2024-01-01';
-- Can't use index, must scan all rows

-- ✅ Partial use:
SELECT * FROM orders
WHERE customer_id = 123 AND order_date > '2024-01-01';
-- Uses customer_id from index, but scans within that subset
-- Still better than full table scan!

Choosing Column Order for Composite Indexes

Priority Order (most important first):
  1. Equality Filters: Columns with = or IN
  2. Selectivity: More selective (higher cardinality) first
  3. Range Filters: Columns with <, >, BETWEEN last
  4. Sort Order: Columns in ORDER BY clause
-- Example: Finding user activity
-- Query: SELECT * FROM activity
--        WHERE user_id = 123        -- Equality, very selective
--        AND event_type = 'purchase'  -- Equality, less selective
--        AND timestamp > '2024-01-01'  -- Range filter
--        ORDER BY timestamp DESC;

-- ✅ GOOD column order:
CREATE INDEX idx_activity ON activity(user_id, event_type, timestamp DESC);
-- Reasoning:
-- 1. user_id first (equality, most selective - 1 in 100K)
-- 2. event_type second (equality, less selective - 1 in 10)
-- 3. timestamp last (range filter + sort column)

-- ❌ BAD column order:
CREATE INDEX idx_activity_bad ON activity(timestamp, event_type, user_id);
-- timestamp is range filter, can't use equality optimization

2.4 All Index Types Explained

Index Type Data Structure Best For Complexity Operators
B-Tree Balanced tree Range queries, sorting, most use cases O(log n) =, <, >, <=, >=, BETWEEN, IN, LIKE 'prefix%'
Hash Hash table Exact equality only, very fast O(1) = only
GiST Generalized search tree Geometric, full-text, custom types Varies &&, @>, <@, etc (geometry)
GIN Generalized inverted index Full-text search, JSON, arrays O(log n) @@, @>, ?, ?&, ?|
BRIN Block range index Very large tables, naturally ordered data O(n) but small =, <, >, <=, >=, BETWEEN
SP-GiST Space-partitioned GiST Non-balanced structures (quad-trees) Varies Specialized operators

Detailed Examples of Each Index Type

-- 1. B-Tree (default, 99% of use cases)
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_date ON orders(order_date);
-- When: Almost always. Works for =, <, >, sorting, uniqueness

-- 2. Hash Index (PostgreSQL 10+, for equality only)
CREATE INDEX idx_products_sku ON products USING HASH(sku);
-- When: Large tables, only doing = lookups, never <, >, sorting
-- Slightly faster than B-tree for =, but limited use cases

-- 3. GIN Index (Inverted index for full-text search, JSON, arrays)
CREATE INDEX idx_products_search ON products
USING GIN(to_tsvector('english', name || ' ' || description));

-- Search products:
SELECT * FROM products
WHERE to_tsvector('english', name || ' ' || description) @@ to_tsquery('laptop');

-- JSON search:
CREATE INDEX idx_users_metadata ON users USING GIN(metadata jsonb_path_ops);
SELECT * FROM users WHERE metadata @> '{"premium": true}';

-- Array search:
CREATE INDEX idx_posts_tags ON posts USING GIN(tags);
SELECT * FROM posts WHERE tags @> ARRAY['postgresql', 'performance'];

-- 4. GiST Index (Geometric data, custom types)
CREATE INDEX idx_locations ON stores USING GIST(location);
-- Find nearby stores:
SELECT * FROM stores
WHERE location <-> point(40.7128, -74.0060) < 10;  -- Within 10 units

-- 5. BRIN Index (Block Range Index - very small, for huge tables)
CREATE INDEX idx_logs_timestamp ON logs USING BRIN(timestamp);
-- When:
-- - Tables > 1GB with naturally ordered data (timestamps, IDs)
-- - Index size: ~1% of B-tree size
-- - Slight scan overhead but massive space savings
-- - Perfect for append-only logs, time-series data

-- 6. Partial Index (Only index subset of rows)
CREATE INDEX idx_active_users ON users(user_id)
WHERE deleted_at IS NULL AND is_active = true;
-- 10x smaller than full index if only 10% of users are active

-- 7. Expression Index (Index on computed value)
CREATE INDEX idx_users_lower_email ON users(LOWER(email));
-- Now case-insensitive search uses index:
SELECT * FROM users WHERE LOWER(email) = 'user@example.com';

-- 8. Covering Index (INCLUDE clause - avoid table lookup)
CREATE INDEX idx_orders_covering ON orders(customer_id)
INCLUDE (order_date, total, status);
-- Query uses index-only scan (never touches table!):
SELECT order_date, total, status
FROM orders
WHERE customer_id = 12345;

-- 9. Unique Index (Enforces uniqueness)
CREATE UNIQUE INDEX idx_users_email ON users(email);
-- Faster than UNIQUE constraint + creates index

-- 10. Multicolumn/Composite Index
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status, order_date);
-- Supports queries on:
-- - (customer_id)
-- - (customer_id, status)
-- - (customer_id, status, order_date)
-- But NOT: (status) alone or (order_date) alone

2.5 Covering Indexes (Index-Only Scans)

What is a Covering Index?

An index that contains all columns needed for a query, so PostgreSQL never needs to look up the actual table rows.

Performance Impact: Can be 10-100x faster by eliminating random I/O to table.

-- Without covering index:
-- 1. Scan index to find matching rows (sequential I/O)
-- 2. Look up each row in table (random I/O - SLOW!)

-- With covering index:
-- 1. Scan index (sequential I/O)
-- 2. Done! All data is in the index.

-- Example: User dashboard query
SELECT user_id, last_login, total_orders, account_balance
FROM users
WHERE account_status = 'active'
AND last_login > '2024-01-01'
ORDER BY last_login DESC
LIMIT 100;

-- ❌ Regular index - still needs table lookups
CREATE INDEX idx_users_status_login ON users(account_status, last_login);
-- Index provides WHERE and ORDER BY
-- But SELECT columns require 100 table lookups

-- ✅ Covering index - no table lookups!
CREATE INDEX idx_users_covering ON users(account_status, last_login DESC)
INCLUDE (user_id, total_orders, account_balance);
-- Index contains everything: WHERE, ORDER BY, SELECT
-- Query uses "Index Only Scan" - 10x faster!

-- Check if query uses covering index:
EXPLAIN (ANALYZE, BUFFERS) SELECT ...;
-- Look for: "Index Only Scan" instead of "Index Scan"

2.6 Index Maintenance & Overhead

Indexes Are Not Free!

Storage Cost: Indexes typically consume 20-30% of table size

Write Cost: Every INSERT/UPDATE/DELETE must update all indexes

Maintenance Cost: Indexes can become bloated and need REINDEX

Write Penalty Example

-- Table with 5 indexes:
-- 1. PRIMARY KEY on id
-- 2. INDEX on customer_id
-- 3. INDEX on status
-- 4. INDEX on order_date
-- 5. INDEX on (customer_id, status)

-- Single INSERT:
INSERT INTO orders (customer_id, status, order_date, total)
VALUES (12345, 'pending', '2024-01-01', 99.99);

-- What actually happens:
-- 1. Insert row into table heap (1 write)
-- 2. Update PRIMARY KEY index (1 write)
-- 3. Update customer_id index (1 write)
-- 4. Update status index (1 write)
-- 5. Update order_date index (1 write)
-- 6. Update composite index (1 write)
-- Total: 6 writes for 1 INSERT!

-- UPDATE is even worse:
UPDATE orders SET status = 'shipped' WHERE order_id = 123;
-- Must update: table + status index + composite index = 3 writes
Index Best Practices:

Python Code for Index Analysis

import psycopg2
from typing import List, Dict
import json

class IndexAnalyzer:
    """
    Analyze and optimize database indexes
    """

    def __init__(self, conn):
        self.conn = conn

    def explain_query(self, query: str, params: tuple = None) -> Dict:
        """
        Use EXPLAIN ANALYZE to understand query performance
        """
        cursor = self.conn.cursor()

        explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
        cursor.execute(explain_query, params)

        result = cursor.fetchone()[0]
        return result[0]

    def find_unused_indexes(self) -> List[Dict]:
        """
        Find indexes that are never used (candidates for removal)
        Requires pg_stat_statements extension
        """
        cursor = self.conn.cursor()

        query = """
        SELECT
            schemaname,
            tablename,
            indexname,
            idx_scan,
            idx_tup_read,
            idx_tup_fetch,
            pg_size_pretty(pg_relation_size(indexrelid)) as index_size
        FROM pg_stat_user_indexes
        WHERE idx_scan = 0  -- Never used
        AND indexrelname NOT LIKE 'pk_%'  -- Skip primary keys
        ORDER BY pg_relation_size(indexrelid) DESC;
        """

        cursor.execute(query)
        return cursor.fetchall()

    def find_duplicate_indexes(self) -> List[Dict]:
        """
        Find duplicate or redundant indexes
        E.g., index on (a,b) makes index on (a) redundant
        """
        cursor = self.conn.cursor()

        query = """
        SELECT
            pg_size_pretty(SUM(pg_relation_size(idx))::BIGINT) AS total_size,
            (array_agg(idx))[1] AS idx1,
            (array_agg(idx))[2] AS idx2,
            (array_agg(idx))[3] AS idx3
        FROM (
            SELECT
                indexrelid::regclass AS idx,
                (indrelid::text ||E'\n'|| indclass::text ||E'\n'||
                 indkey::text ||E'\n'|| COALESCE(indexprs::text,'')||E'\n'||
                 COALESCE(indpred::text,'')) AS key
            FROM pg_index
        ) sub
        GROUP BY key
        HAVING COUNT(*) > 1
        ORDER BY SUM(pg_relation_size(idx)) DESC;
        """

        cursor.execute(query)
        return cursor.fetchall()

    def suggest_missing_indexes(self, min_seq_scans: int = 1000) -> List[str]:
        """
        Suggest indexes for tables with many sequential scans
        """
        cursor = self.conn.cursor()

        query = """
        SELECT
            schemaname,
            tablename,
            seq_scan,
            seq_tup_read,
            idx_scan,
            seq_scan - idx_scan AS too_much_seq,
            pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) AS table_size
        FROM pg_stat_user_tables
        WHERE seq_scan > %s
        AND (seq_scan - COALESCE(idx_scan, 0)) > 0
        ORDER BY seq_scan DESC;
        """

        cursor.execute(query, (min_seq_scans,))
        return cursor.fetchall()

    def check_index_bloat(self) -> List[Dict]:
        """
        Find bloated indexes (need REINDEX)
        """
        cursor = self.conn.cursor()

        query = """
        SELECT
            schemaname,
            tablename,
            indexname,
            pg_size_pretty(pg_relation_size(indexrelid)) AS index_size,
            ROUND(100 * (pg_relation_size(indexrelid) -
                  pg_relation_size(indexrelid, 'main'))::numeric /
                  NULLIF(pg_relation_size(indexrelid), 0), 2) AS bloat_pct
        FROM pg_stat_user_indexes
        WHERE pg_relation_size(indexrelid) > 10000000  -- > 10MB
        ORDER BY bloat_pct DESC;
        """

        cursor.execute(query)
        return cursor.fetchall()


# Example usage
def optimize_indexes():
    conn = psycopg2.connect(
        host="localhost",
        database="mydb",
        user="user",
        password="password"
    )

    analyzer = IndexAnalyzer(conn)

    # Analyze a slow query
    query = """
    SELECT o.*, c.name, c.email
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    WHERE o.status = 'pending'
    AND o.order_date > '2024-01-01'
    """

    explain = analyzer.explain_query(query)
    print("Query Plan:")
    print(json.dumps(explain, indent=2))

    # Find unused indexes
    unused = analyzer.find_unused_indexes()
    print(f"\nUnused indexes: {len(unused)}")
    for idx in unused:
        print(f"  - {idx['indexname']} ({idx['index_size']})")

    # Suggest missing indexes
    suggestions = analyzer.suggest_missing_indexes(min_seq_scans=1000)
    print(f"\nTables needing indexes: {len(suggestions)}")
    for table in suggestions:
        print(f"  - {table['tablename']}: {table['seq_scan']} seq scans")
Index Best Practices:

3. Query Optimization

3.1 N+1 Query Problem

Common Performance Killer in ORMs
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, joinedload, selectinload
from typing import List
import time

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customers'

    customer_id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

    orders = relationship("Order", back_populates="customer")

class Order(Base):
    __tablename__ = 'orders'

    order_id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey('customers.customer_id'))
    total = Column(Integer)

    customer = relationship("Customer", back_populates="orders")


# ❌ N+1 Problem - Issues N+1 queries for N orders
def bad_get_orders_with_customers():
    """
    Performance disaster:
    - 1 query to get all orders
    - N queries to get customer for each order

    For 1000 orders = 1001 queries!
    """
    session = Session()

    # Query 1: Get all orders
    orders = session.query(Order).all()

    # Queries 2-1001: Get customer for each order
    for order in orders:
        print(f"Order {order.order_id}: {order.customer.name}")
        # This triggers a separate SELECT for each order!

    session.close()


# ✅ Solution 1: Eager Loading with JOIN
def good_get_orders_with_join():
    """
    Single query with JOIN
    """
    session = Session()

    # Single query with JOIN
    orders = session.query(Order)\
        .options(joinedload(Order.customer))\
        .all()

    for order in orders:
        print(f"Order {order.order_id}: {order.customer.name}")
        # No additional queries!

    session.close()


# ✅ Solution 2: Separate SELECT with IN clause
def good_get_orders_with_selectin():
    """
    Two queries total:
    1. Get all orders
    2. Get all customers WHERE customer_id IN (...)

    Better than JOIN for one-to-many with large result sets
    """
    session = Session()

    orders = session.query(Order)\
        .options(selectinload(Order.customer))\
        .all()

    for order in orders:
        print(f"Order {order.order_id}: {order.customer.name}")

    session.close()


# ✅ Solution 3: Raw SQL with JOIN
def manual_optimized_query():
    """
    Full control with raw SQL
    """
    session = Session()

    query = """
    SELECT
        o.order_id,
        o.total,
        c.customer_id,
        c.name,
        c.email
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    """

    results = session.execute(query).fetchall()

    for row in results:
        print(f"Order {row.order_id}: {row.name}")

    session.close()


# Performance comparison
def benchmark_queries():
    """
    Benchmark different approaches
    """
    import time

    print("N+1 Problem (BAD):")
    start = time.time()
    bad_get_orders_with_customers()
    print(f"Time: {time.time() - start:.2f}s\n")

    print("Joined Load (GOOD):")
    start = time.time()
    good_get_orders_with_join()
    print(f"Time: {time.time() - start:.2f}s\n")

    print("Select In Load (GOOD):")
    start = time.time()
    good_get_orders_with_selectin()
    print(f"Time: {time.time() - start:.2f}s\n")


# Django ORM Example
class DjangoNPlusOneProblem:
    """
    N+1 problem in Django
    """

    @staticmethod
    def bad_approach():
        """❌ N+1 queries"""
        from myapp.models import Order

        orders = Order.objects.all()
        for order in orders:
            print(order.customer.name)  # Triggers query per order!

    @staticmethod
    def good_approach():
        """✅ Single query with JOIN"""
        from myapp.models import Order

        orders = Order.objects.select_related('customer').all()
        for order in orders:
            print(order.customer.name)  # No additional queries!

    @staticmethod
    def many_to_many_case():
        """✅ Use prefetch_related for many-to-many"""
        from myapp.models import Order

        # Get orders with all items (many-to-many)
        orders = Order.objects.prefetch_related('items').all()
        for order in orders:
            for item in order.items.all():  # No N+1!
                print(item.name)

3.2 Query Optimization Techniques

class QueryOptimizationTechniques:
    """
    Advanced query optimization patterns
    """

    def __init__(self, conn):
        self.conn = conn

    def use_covering_index(self):
        """
        Use covering index to avoid table lookup
        """
        # ❌ Without covering index: Index Scan + Table Lookup
        query_bad = """
        SELECT customer_id, name, email, total_orders
        FROM customers
        WHERE customer_id = 12345;
        """

        # ✅ With covering index: Index-Only Scan
        # CREATE INDEX idx_customers_covering
        # ON customers(customer_id) INCLUDE (name, email, total_orders);
        query_good = """
        SELECT customer_id, name, email, total_orders
        FROM customers
        WHERE customer_id = 12345;
        -- Uses index-only scan, no table access needed!
        """

    def avoid_function_on_indexed_column(self):
        """
        Functions on indexed columns prevent index usage
        """
        # ❌ Bad: Function prevents index usage
        query_bad = """
        SELECT * FROM orders
        WHERE YEAR(order_date) = 2024;
        -- Full table scan!
        """

        # ✅ Good: Use range query instead
        query_good = """
        SELECT * FROM orders
        WHERE order_date >= '2024-01-01'
        AND order_date < '2025-01-01';
        -- Uses index on order_date
        """

    def use_limit_with_index(self):
        """
        LIMIT with proper index is very fast
        """
        # ✅ Fast: Index scan with LIMIT
        query = """
        SELECT * FROM orders
        WHERE customer_id = 12345
        ORDER BY order_date DESC
        LIMIT 10;
        -- With index on (customer_id, order_date), this is O(log n + 10)
        """

    def batch_updates(self):
        """
        Batch updates instead of row-by-row
        """
        # ❌ Bad: Multiple round trips
        cursor = self.conn.cursor()
        for order_id in range(1, 1000):
            cursor.execute(
                "UPDATE orders SET status = 'shipped' WHERE order_id = %s",
                (order_id,)
            )
        self.conn.commit()

        # ✅ Good: Single query
        cursor = self.conn.cursor()
        cursor.execute("""
            UPDATE orders
            SET status = 'shipped'
            WHERE order_id IN (
                SELECT order_id FROM orders
                WHERE status = 'processing'
                LIMIT 1000
            )
        """)
        self.conn.commit()

    def use_exists_instead_of_in(self):
        """
        EXISTS can be faster than IN for subqueries
        """
        # ❌ IN with subquery (may be slow)
        query_in = """
        SELECT * FROM customers
        WHERE customer_id IN (
            SELECT customer_id FROM orders
            WHERE order_date > '2024-01-01'
        );
        """

        # ✅ EXISTS (often faster, stops at first match)
        query_exists = """
        SELECT * FROM customers c
        WHERE EXISTS (
            SELECT 1 FROM orders o
            WHERE o.customer_id = c.customer_id
            AND o.order_date > '2024-01-01'
        );
        """

    def partition_large_tables(self):
        """
        Use table partitioning for very large tables
        """
        # Create partitioned table
        query = """
        -- Parent table
        CREATE TABLE orders_partitioned (
            order_id BIGSERIAL,
            customer_id INT,
            order_date DATE,
            total DECIMAL(10,2),
            status VARCHAR(20)
        ) PARTITION BY RANGE (order_date);

        -- Partitions by month
        CREATE TABLE orders_2024_01 PARTITION OF orders_partitioned
            FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

        CREATE TABLE orders_2024_02 PARTITION OF orders_partitioned
            FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

        -- Query automatically uses correct partition
        -- SELECT * FROM orders_partitioned
        -- WHERE order_date = '2024-01-15'
        -- Only scans orders_2024_01 partition!
        """
        return query

    def use_union_all_not_union(self):
        """
        UNION ALL is faster than UNION (no duplicate removal)
        """
        # ❌ UNION: Removes duplicates (slower)
        query_union = """
        SELECT customer_id, name FROM customers_us
        UNION
        SELECT customer_id, name FROM customers_eu;
        -- Sorts and removes duplicates
        """

        # ✅ UNION ALL: No deduplication (faster)
        query_union_all = """
        SELECT customer_id, name FROM customers_us
        UNION ALL
        SELECT customer_id, name FROM customers_eu;
        -- If you know there are no duplicates, use UNION ALL
        """

4. Connection Pooling

from psycopg2 import pool
from contextlib import contextmanager
import threading
import time

class DatabaseConnectionPool:
    """
    Connection pooling for better performance

    Why pooling?
    - Creating connections is expensive (TCP handshake, auth, etc.)
    - Reusing connections amortizes connection cost
    - Limits max connections to database
    """

    def __init__(self, min_conn=5, max_conn=20):
        self.pool = pool.ThreadedConnectionPool(
            min_conn,
            max_conn,
            host="localhost",
            database="mydb",
            user="user",
            password="password"
        )

    @contextmanager
    def get_connection(self):
        """
        Context manager for connection handling
        Automatically returns connection to pool
        """
        conn = self.pool.getconn()
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            self.pool.putconn(conn)

    def execute_query(self, query, params=None):
        """
        Execute query using pooled connection
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params)
            return cursor.fetchall()

    def close_all(self):
        """
        Close all connections in pool
        """
        self.pool.closeall()


# SQLAlchemy connection pooling
from sqlalchemy import create_engine, pool as sa_pool

class SQLAlchemyPooling:
    """
    SQLAlchemy has built-in connection pooling
    """

    def create_engine_with_pooling(self):
        """
        Create engine with custom pool settings
        """
        engine = create_engine(
            'postgresql://user:password@localhost/mydb',

            # Pool settings
            poolclass=sa_pool.QueuePool,  # Default pool type
            pool_size=10,                 # Number of connections to maintain
            max_overflow=20,              # Max connections beyond pool_size
            pool_timeout=30,              # Seconds to wait for connection
            pool_recycle=3600,            # Recycle connections after 1 hour
            pool_pre_ping=True,           # Test connections before use

            # Performance settings
            echo=False,                   # Don't log SQL (production)
            pool_reset_on_return='rollback',  # Reset connection state
        )
        return engine

    def create_nullpool_for_serverless(self):
        """
        NullPool for serverless/lambda (no persistent connections)
        """
        engine = create_engine(
            'postgresql://user:password@localhost/mydb',
            poolclass=sa_pool.NullPool  # No pooling, create new connection each time
        )
        return engine


# Django connection pooling with django-db-pool
class DjangoPoolingSettings:
    """
    Django database settings with connection pooling
    """

    DATABASES = {
        'default': {
            'ENGINE': 'django_db_pool.backends.postgresql',
            'NAME': 'mydb',
            'USER': 'user',
            'PASSWORD': 'password',
            'HOST': 'localhost',
            'PORT': '5432',

            # Pool settings
            'POOL_OPTIONS': {
                'POOL_SIZE': 10,
                'MAX_OVERFLOW': 10,
                'RECYCLE': 3600,
                'TIMEOUT': 30,
            }
        }
    }


# Connection pool monitoring
class PoolMonitor:
    """
    Monitor connection pool health
    """

    def __init__(self, engine):
        self.engine = engine

    def get_pool_status(self):
        """
        Get current pool statistics
        """
        pool = self.engine.pool

        return {
            'size': pool.size(),              # Current pool size
            'checked_in': pool.checkedin(),   # Available connections
            'checked_out': pool.checkedout(), # In-use connections
            'overflow': pool.overflow(),      # Overflow connections
            'total': pool.size() + pool.overflow()
        }

    def log_pool_stats(self):
        """
        Log pool statistics for monitoring
        """
        stats = self.get_pool_status()
        print(f"""
        Pool Statistics:
        - Total connections: {stats['total']}
        - Available: {stats['checked_in']}
        - In use: {stats['checked_out']}
        - Overflow: {stats['overflow']}
        """)
Connection Pool Best Practices:

5. Transaction Isolation Levels

Isolation Level Dirty Read Non-Repeatable Read Phantom Read Use Case
Read Uncommitted Possible Possible Possible Analytics, approximate queries
Read Committed Prevented Possible Possible Default for most databases
Repeatable Read Prevented Prevented Possible Financial reports, consistency needed
Serializable Prevented Prevented Prevented Critical transactions (bank transfers)
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_READ_COMMITTED, \
    ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE

class TransactionIsolation:
    """
    Demonstrate transaction isolation levels
    """

    def __init__(self, conn):
        self.conn = conn

    def read_committed_example(self):
        """
        Read Committed (default):
        - Sees only committed data
        - Each query sees latest committed data
        """
        self.conn.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED)

        cursor = self.conn.cursor()

        # Transaction 1: Read account balance
        cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
        balance1 = cursor.fetchone()[0]
        print(f"Balance read 1: {balance1}")

        # (Another transaction commits a change here)
        time.sleep(1)

        # Transaction 1: Read again (may see different value!)
        cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
        balance2 = cursor.fetchone()[0]
        print(f"Balance read 2: {balance2}")

        # Non-repeatable read: balance1 != balance2

    def repeatable_read_example(self):
        """
        Repeatable Read:
        - Snapshot of data at transaction start
        - Same query returns same results within transaction
        """
        self.conn.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)

        cursor = self.conn.cursor()

        # Start transaction
        cursor.execute("BEGIN")

        # Read 1
        cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
        balance1 = cursor.fetchone()[0]
        print(f"Balance read 1: {balance1}")

        # (Another transaction commits a change here)
        time.sleep(1)

        # Read 2 (same result as read 1!)
        cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
        balance2 = cursor.fetchone()[0]
        print(f"Balance read 2: {balance2}")

        assert balance1 == balance2  # Always true!

        self.conn.commit()

    def serializable_bank_transfer(self, from_account, to_account, amount):
        """
        Serializable: Full isolation, prevents all anomalies
        Use for critical operations like money transfers
        """
        self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

        cursor = self.conn.cursor()

        try:
            # Check balance
            cursor.execute(
                "SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE",
                (from_account,)
            )
            balance = cursor.fetchone()[0]

            if balance < amount:
                raise ValueError("Insufficient funds")

            # Debit from account
            cursor.execute("""
                UPDATE accounts
                SET balance = balance - %s
                WHERE account_id = %s
            """, (amount, from_account))

            # Credit to account
            cursor.execute("""
                UPDATE accounts
                SET balance = balance + %s
                WHERE account_id = %s
            """, (amount, to_account))

            self.conn.commit()
            print(f"Transfer complete: ${amount} from {from_account} to {to_account}")

        except psycopg2.extensions.TransactionRollbackError:
            # Serialization failure - retry transaction
            self.conn.rollback()
            print("Serialization conflict - retrying...")
            time.sleep(0.1)
            return self.serializable_bank_transfer(from_account, to_account, amount)

        except Exception as e:
            self.conn.rollback()
            raise e


class OptimisticLocking:
    """
    Optimistic locking with version column
    Better for low-contention scenarios
    """

    def __init__(self, conn):
        self.conn = conn

    def update_with_version(self, product_id, new_price):
        """
        Update using version column for optimistic locking
        """
        cursor = self.conn.cursor()

        # Read current version
        cursor.execute("""
            SELECT price, version
            FROM products
            WHERE product_id = %s
        """, (product_id,))

        current_price, current_version = cursor.fetchone()

        # Update only if version hasn't changed
        cursor.execute("""
            UPDATE products
            SET price = %s, version = version + 1
            WHERE product_id = %s
            AND version = %s
        """, (new_price, product_id, current_version))

        if cursor.rowcount == 0:
            raise ValueError("Concurrent modification detected - retry")

        self.conn.commit()
        print(f"Updated product {product_id} to ${new_price}")


class PessimisticLocking:
    """
    Pessimistic locking with SELECT FOR UPDATE
    Better for high-contention scenarios
    """

    def __init__(self, conn):
        self.conn = conn

    def reserve_seat(self, seat_id, user_id):
        """
        Lock row during reservation (prevents double-booking)
        """
        cursor = self.conn.cursor()

        try:
            # Lock the row
            cursor.execute("""
                SELECT status
                FROM seats
                WHERE seat_id = %s
                FOR UPDATE NOWAIT
            """, (seat_id,))

            status = cursor.fetchone()[0]

            if status != 'available':
                raise ValueError("Seat already reserved")

            # Reserve the seat
            cursor.execute("""
                UPDATE seats
                SET status = 'reserved', user_id = %s
                WHERE seat_id = %s
            """, (user_id, seat_id))

            self.conn.commit()
            print(f"Seat {seat_id} reserved for user {user_id}")

        except psycopg2.extensions.LockNotAvailable:
            print("Seat is being reserved by another user")
            raise

6. Interview Tips

Key Topics to Mention:
Common Mistakes to Avoid:
Performance Checklist:
  1. Add indexes on foreign keys and frequently filtered columns
  2. Use composite indexes for multi-column filters (order matters!)
  3. Implement connection pooling (min=5, max=20 is good starting point)
  4. Use eager loading to avoid N+1 queries
  5. Monitor slow query log and optimize top offenders
  6. Use covering indexes for frequently accessed column sets
  7. Partition very large tables (> 100GB)
  8. Use appropriate isolation level (don't default to SERIALIZABLE)
  9. Implement caching layer (Redis) for frequently read data
  10. Use read replicas for read-heavy workloads