Normalization, Indexing, Query Optimization, and Performance Tuning
| Normal Form | Rule | Purpose |
|---|---|---|
| 1NF | Atomic values, no repeating groups | Eliminate duplicate columns and ensure atomic values |
| 2NF | 1NF + No partial dependencies | Remove subsets of data that apply to multiple rows |
| 3NF | 2NF + No transitive dependencies | Remove columns not dependent on primary key |
| BCNF | 3NF + Every determinant is a candidate key | Handle anomalies not covered by 3NF |
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum
# ❌ Denormalized - Poor Design
class DenormalizedOrder:
"""
Problems:
- Repeating customer info for each order
- No atomic values (multiple items in one field)
- Update anomalies (customer info duplicated)
"""
def __init__(self, order_id, customer_name, customer_email,
customer_address, items_string, total):
self.order_id = order_id
self.customer_name = customer_name
self.customer_email = customer_email
self.customer_address = customer_address
self.items = items_string # "Laptop,Mouse,Keyboard"
self.total = total
# ✅ Normalized to 3NF
@dataclass
class Customer:
"""1NF: Atomic values, unique identifier"""
customer_id: int
name: str
email: str
address: str
created_at: datetime
@dataclass
class Product:
"""1NF: Each product is a separate entity"""
product_id: int
name: str
description: str
price: float
stock: int
@dataclass
class Order:
"""
2NF: No partial dependencies
- All attributes depend on entire primary key (order_id)
- customer_id is foreign key, not duplicated customer data
"""
order_id: int
customer_id: int # Foreign key
order_date: datetime
total: float
status: str
@dataclass
class OrderItem:
"""
3NF: No transitive dependencies
- Depends only on (order_id, product_id) composite key
- product_price stored at order time (historical snapshot)
- quantity is order-specific
"""
order_item_id: int
order_id: int # Foreign key
product_id: int # Foreign key
quantity: int
price_at_order: float # Snapshot of price when ordered
# Database operations with normalized schema
class DatabaseOperations:
def __init__(self, db_connection):
self.conn = db_connection
def create_order(self, customer_id: int, items: List[tuple]) -> int:
"""
Create order with proper normalization
items: List of (product_id, quantity) tuples
"""
cursor = self.conn.cursor()
# Calculate total from current product prices
total = 0
for product_id, quantity in items:
cursor.execute(
"SELECT price FROM products WHERE product_id = %s",
(product_id,)
)
price = cursor.fetchone()[0]
total += price * quantity
# Insert into orders table
cursor.execute("""
INSERT INTO orders (customer_id, order_date, total, status)
VALUES (%s, %s, %s, %s)
RETURNING order_id
""", (customer_id, datetime.now(), total, 'pending'))
order_id = cursor.fetchone()[0]
# Insert into order_items table
for product_id, quantity in items:
cursor.execute(
"SELECT price FROM products WHERE product_id = %s",
(product_id,)
)
price_at_order = cursor.fetchone()[0]
cursor.execute("""
INSERT INTO order_items
(order_id, product_id, quantity, price_at_order)
VALUES (%s, %s, %s, %s)
""", (order_id, product_id, quantity, price_at_order))
self.conn.commit()
return order_id
def get_order_details(self, order_id: int):
"""
Retrieve order with JOIN across normalized tables
"""
cursor = self.conn.cursor()
query = """
SELECT
o.order_id,
o.order_date,
o.total,
o.status,
c.name AS customer_name,
c.email AS customer_email,
p.name AS product_name,
oi.quantity,
oi.price_at_order
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.order_id = %s
"""
cursor.execute(query, (order_id,))
return cursor.fetchall()
class DenormalizationStrategies:
"""
Sometimes breaking normalization rules improves performance
Use cases:
1. Read-heavy workloads (avoid expensive JOINs)
2. Reporting/Analytics tables
3. Caching computed values
4. Historical snapshots
"""
def create_order_summary_table(self):
"""
Denormalized table for fast reporting
Stores aggregated data to avoid real-time calculations
"""
query = """
CREATE TABLE order_summaries (
customer_id INT,
customer_name VARCHAR(255), -- Denormalized
customer_email VARCHAR(255), -- Denormalized
total_orders INT, -- Computed
total_spent DECIMAL(10,2), -- Computed
last_order_date TIMESTAMP, -- Computed
avg_order_value DECIMAL(10,2), -- Computed
PRIMARY KEY (customer_id)
);
-- Materialized view or maintained by triggers
-- Updated when orders change
"""
return query
def create_product_stats_table(self):
"""
Store computed statistics for fast access
Updated periodically (e.g., hourly cron job)
"""
query = """
CREATE TABLE product_statistics (
product_id INT PRIMARY KEY,
product_name VARCHAR(255), -- Denormalized
total_sold INT, -- Computed
revenue DECIMAL(12,2), -- Computed
avg_rating DECIMAL(3,2), -- Computed
last_updated TIMESTAMP
);
"""
return query
def cache_computed_column(self):
"""
Add computed column to avoid repeated calculations
Trade-off: Write performance for read performance
"""
query = """
ALTER TABLE orders
ADD COLUMN item_count INT;
-- Maintained by trigger
CREATE TRIGGER update_item_count
AFTER INSERT OR UPDATE OR DELETE ON order_items
FOR EACH ROW
EXECUTE FUNCTION update_order_item_count();
"""
return query
A database index is a separate data structure that stores a subset of table data in a sorted, searchable format. Think of it like a book's index - instead of scanning every page (table scan), you look up the topic in the index and jump directly to the relevant pages.
Key Insight: Indexes trade disk space and write performance for dramatically faster reads.
B-Tree (Balanced Tree) is the default index type in PostgreSQL, MySQL, and most databases. Here's why it's so effective:
B-Tree for: SELECT * FROM users WHERE user_id = 12345
[10000, 20000, 30000] ← Root (Level 1)
/ | \
[5K,7.5K,10K] [15K,17.5K,20K] [25K,27.5K,30K] ← Internal (Level 2)
/ | \ / | \ / | \
[1-5K] [5-7.5K] ... [10-15K] ... [27.5-30K] ... ← Leaf nodes (Level 3)
↓ ↓ ↓ ← Point to table rows
Searching for user_id = 12345:
1. Root: 12345 < 20000, go left
2. Internal: 12345 > 10000, go middle
3. Leaf: Found! Jump to row location
Comparisons: 3 (log₂(1,000,000) ≈ 20 for 1M rows)
Without index: ~500,000 comparisons (table scan)
Cardinality = Number of unique values in a column
Selectivity = Cardinality / Total Rows
| Column | Cardinality | Selectivity | Index Worth It? | Reason |
|---|---|---|---|---|
| user_id (PK) | 1,000,000 | 100% | ✅ Excellent | Every lookup finds ~1 row |
| 950,000 | 95% | ✅ Excellent | High uniqueness, frequent lookups | |
| last_name | 50,000 | 5% | ✅ Good | Lookup finds ~20 rows, much better than scan |
| country | 200 | 0.02% | ⚠️ Maybe | Lookup finds ~5,000 rows, marginal benefit |
| gender | 3 | 0.0003% | ❌ Poor | Lookup finds ~333,333 rows, nearly a table scan! |
| is_active | 2 | 0.0002% | ❌ Very Poor | Returns 50% of table, slower than scan + filter |
-- ❌ BAD: Full index on low-cardinality column
CREATE INDEX idx_orders_status ON orders(status);
-- Problem: If 95% of orders are 'completed', index doesn't help much
-- ✅ GOOD: Partial index on minority status
CREATE INDEX idx_active_orders ON orders(customer_id, status)
WHERE status IN ('pending', 'processing', 'shipping');
-- Only indexes 5% of rows, fast for active order queries
-- Example query that benefits:
SELECT * FROM orders
WHERE customer_id = 12345
AND status = 'pending';
-- Uses partial index, scans only ~50 rows instead of 1M
A composite index on (A, B, C) is like a phone book sorted by (Last Name, First Name, City):
-- Example: E-commerce orders table
CREATE INDEX idx_orders_composite ON orders(customer_id, status, order_date);
-- ✅ Uses index efficiently:
SELECT * FROM orders WHERE customer_id = 123;
-- Uses leftmost column
SELECT * FROM orders WHERE customer_id = 123 AND status = 'shipped';
-- Uses first 2 columns
SELECT * FROM orders
WHERE customer_id = 123 AND status = 'shipped' AND order_date > '2024-01-01';
-- Uses all 3 columns (perfect!)
-- ⚠️ Uses index inefficiently:
SELECT * FROM orders WHERE status = 'shipped';
-- Can't use index, must scan all rows
SELECT * FROM orders WHERE order_date > '2024-01-01';
-- Can't use index, must scan all rows
-- ✅ Partial use:
SELECT * FROM orders
WHERE customer_id = 123 AND order_date > '2024-01-01';
-- Uses customer_id from index, but scans within that subset
-- Still better than full table scan!
-- Example: Finding user activity
-- Query: SELECT * FROM activity
-- WHERE user_id = 123 -- Equality, very selective
-- AND event_type = 'purchase' -- Equality, less selective
-- AND timestamp > '2024-01-01' -- Range filter
-- ORDER BY timestamp DESC;
-- ✅ GOOD column order:
CREATE INDEX idx_activity ON activity(user_id, event_type, timestamp DESC);
-- Reasoning:
-- 1. user_id first (equality, most selective - 1 in 100K)
-- 2. event_type second (equality, less selective - 1 in 10)
-- 3. timestamp last (range filter + sort column)
-- ❌ BAD column order:
CREATE INDEX idx_activity_bad ON activity(timestamp, event_type, user_id);
-- timestamp is range filter, can't use equality optimization
| Index Type | Data Structure | Best For | Complexity | Operators |
|---|---|---|---|---|
| B-Tree | Balanced tree | Range queries, sorting, most use cases | O(log n) | =, <, >, <=, >=, BETWEEN, IN, LIKE 'prefix%' |
| Hash | Hash table | Exact equality only, very fast | O(1) | = only |
| GiST | Generalized search tree | Geometric, full-text, custom types | Varies | &&, @>, <@, etc (geometry) |
| GIN | Generalized inverted index | Full-text search, JSON, arrays | O(log n) | @@, @>, ?, ?&, ?| |
| BRIN | Block range index | Very large tables, naturally ordered data | O(n) but small | =, <, >, <=, >=, BETWEEN |
| SP-GiST | Space-partitioned GiST | Non-balanced structures (quad-trees) | Varies | Specialized operators |
-- 1. B-Tree (default, 99% of use cases)
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_date ON orders(order_date);
-- When: Almost always. Works for =, <, >, sorting, uniqueness
-- 2. Hash Index (PostgreSQL 10+, for equality only)
CREATE INDEX idx_products_sku ON products USING HASH(sku);
-- When: Large tables, only doing = lookups, never <, >, sorting
-- Slightly faster than B-tree for =, but limited use cases
-- 3. GIN Index (Inverted index for full-text search, JSON, arrays)
CREATE INDEX idx_products_search ON products
USING GIN(to_tsvector('english', name || ' ' || description));
-- Search products:
SELECT * FROM products
WHERE to_tsvector('english', name || ' ' || description) @@ to_tsquery('laptop');
-- JSON search:
CREATE INDEX idx_users_metadata ON users USING GIN(metadata jsonb_path_ops);
SELECT * FROM users WHERE metadata @> '{"premium": true}';
-- Array search:
CREATE INDEX idx_posts_tags ON posts USING GIN(tags);
SELECT * FROM posts WHERE tags @> ARRAY['postgresql', 'performance'];
-- 4. GiST Index (Geometric data, custom types)
CREATE INDEX idx_locations ON stores USING GIST(location);
-- Find nearby stores:
SELECT * FROM stores
WHERE location <-> point(40.7128, -74.0060) < 10; -- Within 10 units
-- 5. BRIN Index (Block Range Index - very small, for huge tables)
CREATE INDEX idx_logs_timestamp ON logs USING BRIN(timestamp);
-- When:
-- - Tables > 1GB with naturally ordered data (timestamps, IDs)
-- - Index size: ~1% of B-tree size
-- - Slight scan overhead but massive space savings
-- - Perfect for append-only logs, time-series data
-- 6. Partial Index (Only index subset of rows)
CREATE INDEX idx_active_users ON users(user_id)
WHERE deleted_at IS NULL AND is_active = true;
-- 10x smaller than full index if only 10% of users are active
-- 7. Expression Index (Index on computed value)
CREATE INDEX idx_users_lower_email ON users(LOWER(email));
-- Now case-insensitive search uses index:
SELECT * FROM users WHERE LOWER(email) = 'user@example.com';
-- 8. Covering Index (INCLUDE clause - avoid table lookup)
CREATE INDEX idx_orders_covering ON orders(customer_id)
INCLUDE (order_date, total, status);
-- Query uses index-only scan (never touches table!):
SELECT order_date, total, status
FROM orders
WHERE customer_id = 12345;
-- 9. Unique Index (Enforces uniqueness)
CREATE UNIQUE INDEX idx_users_email ON users(email);
-- Faster than UNIQUE constraint + creates index
-- 10. Multicolumn/Composite Index
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status, order_date);
-- Supports queries on:
-- - (customer_id)
-- - (customer_id, status)
-- - (customer_id, status, order_date)
-- But NOT: (status) alone or (order_date) alone
An index that contains all columns needed for a query, so PostgreSQL never needs to look up the actual table rows.
Performance Impact: Can be 10-100x faster by eliminating random I/O to table.
-- Without covering index:
-- 1. Scan index to find matching rows (sequential I/O)
-- 2. Look up each row in table (random I/O - SLOW!)
-- With covering index:
-- 1. Scan index (sequential I/O)
-- 2. Done! All data is in the index.
-- Example: User dashboard query
SELECT user_id, last_login, total_orders, account_balance
FROM users
WHERE account_status = 'active'
AND last_login > '2024-01-01'
ORDER BY last_login DESC
LIMIT 100;
-- ❌ Regular index - still needs table lookups
CREATE INDEX idx_users_status_login ON users(account_status, last_login);
-- Index provides WHERE and ORDER BY
-- But SELECT columns require 100 table lookups
-- ✅ Covering index - no table lookups!
CREATE INDEX idx_users_covering ON users(account_status, last_login DESC)
INCLUDE (user_id, total_orders, account_balance);
-- Index contains everything: WHERE, ORDER BY, SELECT
-- Query uses "Index Only Scan" - 10x faster!
-- Check if query uses covering index:
EXPLAIN (ANALYZE, BUFFERS) SELECT ...;
-- Look for: "Index Only Scan" instead of "Index Scan"
Storage Cost: Indexes typically consume 20-30% of table size
Write Cost: Every INSERT/UPDATE/DELETE must update all indexes
Maintenance Cost: Indexes can become bloated and need REINDEX
-- Table with 5 indexes:
-- 1. PRIMARY KEY on id
-- 2. INDEX on customer_id
-- 3. INDEX on status
-- 4. INDEX on order_date
-- 5. INDEX on (customer_id, status)
-- Single INSERT:
INSERT INTO orders (customer_id, status, order_date, total)
VALUES (12345, 'pending', '2024-01-01', 99.99);
-- What actually happens:
-- 1. Insert row into table heap (1 write)
-- 2. Update PRIMARY KEY index (1 write)
-- 3. Update customer_id index (1 write)
-- 4. Update status index (1 write)
-- 5. Update order_date index (1 write)
-- 6. Update composite index (1 write)
-- Total: 6 writes for 1 INSERT!
-- UPDATE is even worse:
UPDATE orders SET status = 'shipped' WHERE order_id = 123;
-- Must update: table + status index + composite index = 3 writes
import psycopg2
from typing import List, Dict
import json
class IndexAnalyzer:
"""
Analyze and optimize database indexes
"""
def __init__(self, conn):
self.conn = conn
def explain_query(self, query: str, params: tuple = None) -> Dict:
"""
Use EXPLAIN ANALYZE to understand query performance
"""
cursor = self.conn.cursor()
explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
cursor.execute(explain_query, params)
result = cursor.fetchone()[0]
return result[0]
def find_unused_indexes(self) -> List[Dict]:
"""
Find indexes that are never used (candidates for removal)
Requires pg_stat_statements extension
"""
cursor = self.conn.cursor()
query = """
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch,
pg_size_pretty(pg_relation_size(indexrelid)) as index_size
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- Never used
AND indexrelname NOT LIKE 'pk_%' -- Skip primary keys
ORDER BY pg_relation_size(indexrelid) DESC;
"""
cursor.execute(query)
return cursor.fetchall()
def find_duplicate_indexes(self) -> List[Dict]:
"""
Find duplicate or redundant indexes
E.g., index on (a,b) makes index on (a) redundant
"""
cursor = self.conn.cursor()
query = """
SELECT
pg_size_pretty(SUM(pg_relation_size(idx))::BIGINT) AS total_size,
(array_agg(idx))[1] AS idx1,
(array_agg(idx))[2] AS idx2,
(array_agg(idx))[3] AS idx3
FROM (
SELECT
indexrelid::regclass AS idx,
(indrelid::text ||E'\n'|| indclass::text ||E'\n'||
indkey::text ||E'\n'|| COALESCE(indexprs::text,'')||E'\n'||
COALESCE(indpred::text,'')) AS key
FROM pg_index
) sub
GROUP BY key
HAVING COUNT(*) > 1
ORDER BY SUM(pg_relation_size(idx)) DESC;
"""
cursor.execute(query)
return cursor.fetchall()
def suggest_missing_indexes(self, min_seq_scans: int = 1000) -> List[str]:
"""
Suggest indexes for tables with many sequential scans
"""
cursor = self.conn.cursor()
query = """
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
idx_scan,
seq_scan - idx_scan AS too_much_seq,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) AS table_size
FROM pg_stat_user_tables
WHERE seq_scan > %s
AND (seq_scan - COALESCE(idx_scan, 0)) > 0
ORDER BY seq_scan DESC;
"""
cursor.execute(query, (min_seq_scans,))
return cursor.fetchall()
def check_index_bloat(self) -> List[Dict]:
"""
Find bloated indexes (need REINDEX)
"""
cursor = self.conn.cursor()
query = """
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size,
ROUND(100 * (pg_relation_size(indexrelid) -
pg_relation_size(indexrelid, 'main'))::numeric /
NULLIF(pg_relation_size(indexrelid), 0), 2) AS bloat_pct
FROM pg_stat_user_indexes
WHERE pg_relation_size(indexrelid) > 10000000 -- > 10MB
ORDER BY bloat_pct DESC;
"""
cursor.execute(query)
return cursor.fetchall()
# Example usage
def optimize_indexes():
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="user",
password="password"
)
analyzer = IndexAnalyzer(conn)
# Analyze a slow query
query = """
SELECT o.*, c.name, c.email
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.status = 'pending'
AND o.order_date > '2024-01-01'
"""
explain = analyzer.explain_query(query)
print("Query Plan:")
print(json.dumps(explain, indent=2))
# Find unused indexes
unused = analyzer.find_unused_indexes()
print(f"\nUnused indexes: {len(unused)}")
for idx in unused:
print(f" - {idx['indexname']} ({idx['index_size']})")
# Suggest missing indexes
suggestions = analyzer.suggest_missing_indexes(min_seq_scans=1000)
print(f"\nTables needing indexes: {len(suggestions)}")
for table in suggestions:
print(f" - {table['tablename']}: {table['seq_scan']} seq scans")
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, joinedload, selectinload
from typing import List
import time
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customers'
customer_id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
orders = relationship("Order", back_populates="customer")
class Order(Base):
__tablename__ = 'orders'
order_id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey('customers.customer_id'))
total = Column(Integer)
customer = relationship("Customer", back_populates="orders")
# ❌ N+1 Problem - Issues N+1 queries for N orders
def bad_get_orders_with_customers():
"""
Performance disaster:
- 1 query to get all orders
- N queries to get customer for each order
For 1000 orders = 1001 queries!
"""
session = Session()
# Query 1: Get all orders
orders = session.query(Order).all()
# Queries 2-1001: Get customer for each order
for order in orders:
print(f"Order {order.order_id}: {order.customer.name}")
# This triggers a separate SELECT for each order!
session.close()
# ✅ Solution 1: Eager Loading with JOIN
def good_get_orders_with_join():
"""
Single query with JOIN
"""
session = Session()
# Single query with JOIN
orders = session.query(Order)\
.options(joinedload(Order.customer))\
.all()
for order in orders:
print(f"Order {order.order_id}: {order.customer.name}")
# No additional queries!
session.close()
# ✅ Solution 2: Separate SELECT with IN clause
def good_get_orders_with_selectin():
"""
Two queries total:
1. Get all orders
2. Get all customers WHERE customer_id IN (...)
Better than JOIN for one-to-many with large result sets
"""
session = Session()
orders = session.query(Order)\
.options(selectinload(Order.customer))\
.all()
for order in orders:
print(f"Order {order.order_id}: {order.customer.name}")
session.close()
# ✅ Solution 3: Raw SQL with JOIN
def manual_optimized_query():
"""
Full control with raw SQL
"""
session = Session()
query = """
SELECT
o.order_id,
o.total,
c.customer_id,
c.name,
c.email
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
"""
results = session.execute(query).fetchall()
for row in results:
print(f"Order {row.order_id}: {row.name}")
session.close()
# Performance comparison
def benchmark_queries():
"""
Benchmark different approaches
"""
import time
print("N+1 Problem (BAD):")
start = time.time()
bad_get_orders_with_customers()
print(f"Time: {time.time() - start:.2f}s\n")
print("Joined Load (GOOD):")
start = time.time()
good_get_orders_with_join()
print(f"Time: {time.time() - start:.2f}s\n")
print("Select In Load (GOOD):")
start = time.time()
good_get_orders_with_selectin()
print(f"Time: {time.time() - start:.2f}s\n")
# Django ORM Example
class DjangoNPlusOneProblem:
"""
N+1 problem in Django
"""
@staticmethod
def bad_approach():
"""❌ N+1 queries"""
from myapp.models import Order
orders = Order.objects.all()
for order in orders:
print(order.customer.name) # Triggers query per order!
@staticmethod
def good_approach():
"""✅ Single query with JOIN"""
from myapp.models import Order
orders = Order.objects.select_related('customer').all()
for order in orders:
print(order.customer.name) # No additional queries!
@staticmethod
def many_to_many_case():
"""✅ Use prefetch_related for many-to-many"""
from myapp.models import Order
# Get orders with all items (many-to-many)
orders = Order.objects.prefetch_related('items').all()
for order in orders:
for item in order.items.all(): # No N+1!
print(item.name)
class QueryOptimizationTechniques:
"""
Advanced query optimization patterns
"""
def __init__(self, conn):
self.conn = conn
def use_covering_index(self):
"""
Use covering index to avoid table lookup
"""
# ❌ Without covering index: Index Scan + Table Lookup
query_bad = """
SELECT customer_id, name, email, total_orders
FROM customers
WHERE customer_id = 12345;
"""
# ✅ With covering index: Index-Only Scan
# CREATE INDEX idx_customers_covering
# ON customers(customer_id) INCLUDE (name, email, total_orders);
query_good = """
SELECT customer_id, name, email, total_orders
FROM customers
WHERE customer_id = 12345;
-- Uses index-only scan, no table access needed!
"""
def avoid_function_on_indexed_column(self):
"""
Functions on indexed columns prevent index usage
"""
# ❌ Bad: Function prevents index usage
query_bad = """
SELECT * FROM orders
WHERE YEAR(order_date) = 2024;
-- Full table scan!
"""
# ✅ Good: Use range query instead
query_good = """
SELECT * FROM orders
WHERE order_date >= '2024-01-01'
AND order_date < '2025-01-01';
-- Uses index on order_date
"""
def use_limit_with_index(self):
"""
LIMIT with proper index is very fast
"""
# ✅ Fast: Index scan with LIMIT
query = """
SELECT * FROM orders
WHERE customer_id = 12345
ORDER BY order_date DESC
LIMIT 10;
-- With index on (customer_id, order_date), this is O(log n + 10)
"""
def batch_updates(self):
"""
Batch updates instead of row-by-row
"""
# ❌ Bad: Multiple round trips
cursor = self.conn.cursor()
for order_id in range(1, 1000):
cursor.execute(
"UPDATE orders SET status = 'shipped' WHERE order_id = %s",
(order_id,)
)
self.conn.commit()
# ✅ Good: Single query
cursor = self.conn.cursor()
cursor.execute("""
UPDATE orders
SET status = 'shipped'
WHERE order_id IN (
SELECT order_id FROM orders
WHERE status = 'processing'
LIMIT 1000
)
""")
self.conn.commit()
def use_exists_instead_of_in(self):
"""
EXISTS can be faster than IN for subqueries
"""
# ❌ IN with subquery (may be slow)
query_in = """
SELECT * FROM customers
WHERE customer_id IN (
SELECT customer_id FROM orders
WHERE order_date > '2024-01-01'
);
"""
# ✅ EXISTS (often faster, stops at first match)
query_exists = """
SELECT * FROM customers c
WHERE EXISTS (
SELECT 1 FROM orders o
WHERE o.customer_id = c.customer_id
AND o.order_date > '2024-01-01'
);
"""
def partition_large_tables(self):
"""
Use table partitioning for very large tables
"""
# Create partitioned table
query = """
-- Parent table
CREATE TABLE orders_partitioned (
order_id BIGSERIAL,
customer_id INT,
order_date DATE,
total DECIMAL(10,2),
status VARCHAR(20)
) PARTITION BY RANGE (order_date);
-- Partitions by month
CREATE TABLE orders_2024_01 PARTITION OF orders_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02 PARTITION OF orders_partitioned
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Query automatically uses correct partition
-- SELECT * FROM orders_partitioned
-- WHERE order_date = '2024-01-15'
-- Only scans orders_2024_01 partition!
"""
return query
def use_union_all_not_union(self):
"""
UNION ALL is faster than UNION (no duplicate removal)
"""
# ❌ UNION: Removes duplicates (slower)
query_union = """
SELECT customer_id, name FROM customers_us
UNION
SELECT customer_id, name FROM customers_eu;
-- Sorts and removes duplicates
"""
# ✅ UNION ALL: No deduplication (faster)
query_union_all = """
SELECT customer_id, name FROM customers_us
UNION ALL
SELECT customer_id, name FROM customers_eu;
-- If you know there are no duplicates, use UNION ALL
"""
from psycopg2 import pool
from contextlib import contextmanager
import threading
import time
class DatabaseConnectionPool:
"""
Connection pooling for better performance
Why pooling?
- Creating connections is expensive (TCP handshake, auth, etc.)
- Reusing connections amortizes connection cost
- Limits max connections to database
"""
def __init__(self, min_conn=5, max_conn=20):
self.pool = pool.ThreadedConnectionPool(
min_conn,
max_conn,
host="localhost",
database="mydb",
user="user",
password="password"
)
@contextmanager
def get_connection(self):
"""
Context manager for connection handling
Automatically returns connection to pool
"""
conn = self.pool.getconn()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
self.pool.putconn(conn)
def execute_query(self, query, params=None):
"""
Execute query using pooled connection
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
def close_all(self):
"""
Close all connections in pool
"""
self.pool.closeall()
# SQLAlchemy connection pooling
from sqlalchemy import create_engine, pool as sa_pool
class SQLAlchemyPooling:
"""
SQLAlchemy has built-in connection pooling
"""
def create_engine_with_pooling(self):
"""
Create engine with custom pool settings
"""
engine = create_engine(
'postgresql://user:password@localhost/mydb',
# Pool settings
poolclass=sa_pool.QueuePool, # Default pool type
pool_size=10, # Number of connections to maintain
max_overflow=20, # Max connections beyond pool_size
pool_timeout=30, # Seconds to wait for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Test connections before use
# Performance settings
echo=False, # Don't log SQL (production)
pool_reset_on_return='rollback', # Reset connection state
)
return engine
def create_nullpool_for_serverless(self):
"""
NullPool for serverless/lambda (no persistent connections)
"""
engine = create_engine(
'postgresql://user:password@localhost/mydb',
poolclass=sa_pool.NullPool # No pooling, create new connection each time
)
return engine
# Django connection pooling with django-db-pool
class DjangoPoolingSettings:
"""
Django database settings with connection pooling
"""
DATABASES = {
'default': {
'ENGINE': 'django_db_pool.backends.postgresql',
'NAME': 'mydb',
'USER': 'user',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
# Pool settings
'POOL_OPTIONS': {
'POOL_SIZE': 10,
'MAX_OVERFLOW': 10,
'RECYCLE': 3600,
'TIMEOUT': 30,
}
}
}
# Connection pool monitoring
class PoolMonitor:
"""
Monitor connection pool health
"""
def __init__(self, engine):
self.engine = engine
def get_pool_status(self):
"""
Get current pool statistics
"""
pool = self.engine.pool
return {
'size': pool.size(), # Current pool size
'checked_in': pool.checkedin(), # Available connections
'checked_out': pool.checkedout(), # In-use connections
'overflow': pool.overflow(), # Overflow connections
'total': pool.size() + pool.overflow()
}
def log_pool_stats(self):
"""
Log pool statistics for monitoring
"""
stats = self.get_pool_status()
print(f"""
Pool Statistics:
- Total connections: {stats['total']}
- Available: {stats['checked_in']}
- In use: {stats['checked_out']}
- Overflow: {stats['overflow']}
""")
| Isolation Level | Dirty Read | Non-Repeatable Read | Phantom Read | Use Case |
|---|---|---|---|---|
| Read Uncommitted | Possible | Possible | Possible | Analytics, approximate queries |
| Read Committed | Prevented | Possible | Possible | Default for most databases |
| Repeatable Read | Prevented | Prevented | Possible | Financial reports, consistency needed |
| Serializable | Prevented | Prevented | Prevented | Critical transactions (bank transfers) |
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_READ_COMMITTED, \
ISOLATION_LEVEL_REPEATABLE_READ, ISOLATION_LEVEL_SERIALIZABLE
class TransactionIsolation:
"""
Demonstrate transaction isolation levels
"""
def __init__(self, conn):
self.conn = conn
def read_committed_example(self):
"""
Read Committed (default):
- Sees only committed data
- Each query sees latest committed data
"""
self.conn.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED)
cursor = self.conn.cursor()
# Transaction 1: Read account balance
cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
balance1 = cursor.fetchone()[0]
print(f"Balance read 1: {balance1}")
# (Another transaction commits a change here)
time.sleep(1)
# Transaction 1: Read again (may see different value!)
cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
balance2 = cursor.fetchone()[0]
print(f"Balance read 2: {balance2}")
# Non-repeatable read: balance1 != balance2
def repeatable_read_example(self):
"""
Repeatable Read:
- Snapshot of data at transaction start
- Same query returns same results within transaction
"""
self.conn.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
cursor = self.conn.cursor()
# Start transaction
cursor.execute("BEGIN")
# Read 1
cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
balance1 = cursor.fetchone()[0]
print(f"Balance read 1: {balance1}")
# (Another transaction commits a change here)
time.sleep(1)
# Read 2 (same result as read 1!)
cursor.execute("SELECT balance FROM accounts WHERE account_id = 1")
balance2 = cursor.fetchone()[0]
print(f"Balance read 2: {balance2}")
assert balance1 == balance2 # Always true!
self.conn.commit()
def serializable_bank_transfer(self, from_account, to_account, amount):
"""
Serializable: Full isolation, prevents all anomalies
Use for critical operations like money transfers
"""
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
cursor = self.conn.cursor()
try:
# Check balance
cursor.execute(
"SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE",
(from_account,)
)
balance = cursor.fetchone()[0]
if balance < amount:
raise ValueError("Insufficient funds")
# Debit from account
cursor.execute("""
UPDATE accounts
SET balance = balance - %s
WHERE account_id = %s
""", (amount, from_account))
# Credit to account
cursor.execute("""
UPDATE accounts
SET balance = balance + %s
WHERE account_id = %s
""", (amount, to_account))
self.conn.commit()
print(f"Transfer complete: ${amount} from {from_account} to {to_account}")
except psycopg2.extensions.TransactionRollbackError:
# Serialization failure - retry transaction
self.conn.rollback()
print("Serialization conflict - retrying...")
time.sleep(0.1)
return self.serializable_bank_transfer(from_account, to_account, amount)
except Exception as e:
self.conn.rollback()
raise e
class OptimisticLocking:
"""
Optimistic locking with version column
Better for low-contention scenarios
"""
def __init__(self, conn):
self.conn = conn
def update_with_version(self, product_id, new_price):
"""
Update using version column for optimistic locking
"""
cursor = self.conn.cursor()
# Read current version
cursor.execute("""
SELECT price, version
FROM products
WHERE product_id = %s
""", (product_id,))
current_price, current_version = cursor.fetchone()
# Update only if version hasn't changed
cursor.execute("""
UPDATE products
SET price = %s, version = version + 1
WHERE product_id = %s
AND version = %s
""", (new_price, product_id, current_version))
if cursor.rowcount == 0:
raise ValueError("Concurrent modification detected - retry")
self.conn.commit()
print(f"Updated product {product_id} to ${new_price}")
class PessimisticLocking:
"""
Pessimistic locking with SELECT FOR UPDATE
Better for high-contention scenarios
"""
def __init__(self, conn):
self.conn = conn
def reserve_seat(self, seat_id, user_id):
"""
Lock row during reservation (prevents double-booking)
"""
cursor = self.conn.cursor()
try:
# Lock the row
cursor.execute("""
SELECT status
FROM seats
WHERE seat_id = %s
FOR UPDATE NOWAIT
""", (seat_id,))
status = cursor.fetchone()[0]
if status != 'available':
raise ValueError("Seat already reserved")
# Reserve the seat
cursor.execute("""
UPDATE seats
SET status = 'reserved', user_id = %s
WHERE seat_id = %s
""", (user_id, seat_id))
self.conn.commit()
print(f"Seat {seat_id} reserved for user {user_id}")
except psycopg2.extensions.LockNotAvailable:
print("Seat is being reserved by another user")
raise