Logs, Metrics, Traces, and Production Debugging
| Pillar | What | When to Use | Tools |
|---|---|---|---|
| Logs | Discrete events with timestamps | Debugging specific requests, audit trails | ELK, Loki, CloudWatch |
| Metrics | Numerical measurements over time | Monitoring system health, capacity planning | Prometheus, Grafana, Datadog |
| Traces | Request journey across services | Understanding latency, distributed debugging | Jaeger, Zipkin, OpenTelemetry |
Observability = Logs + Metrics + Traces working together
| Level | Purpose | Example |
|---|---|---|
| DEBUG | Development debugging | Variable values, function entry/exit |
| INFO | Normal operations | Request started, user logged in |
| WARNING | Unexpected but handled | Deprecated API used, retry attempt |
| ERROR | Error occurred, needs attention | Database connection failed, 500 error |
| CRITICAL | System unstable, urgent action needed | Out of memory, disk full |
import logging
import json
from datetime import datetime
from typing import Any, Dict
import traceback
class StructuredLogger:
"""
Structured logging with JSON output
Why structured logs?
- Machine-parseable (easy aggregation)
- Consistent format
- Easy to search/filter
- Better for log aggregation systems (ELK, Splunk)
"""
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
self.logger.setLevel(logging.DEBUG)
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(self.JSONFormatter())
self.logger.addHandler(handler)
class JSONFormatter(logging.Formatter):
"""Format logs as JSON"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'service': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add extra fields if present
if hasattr(record, 'extra'):
log_data.update(record.extra)
# Add exception info if present
if record.exc_info:
log_data['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
}
return json.dumps(log_data)
def info(self, message: str, **kwargs):
"""Log info with extra context"""
self.logger.info(message, extra={'extra': kwargs})
def error(self, message: str, **kwargs):
"""Log error with extra context"""
self.logger.error(message, extra={'extra': kwargs})
def warning(self, message: str, **kwargs):
"""Log warning with extra context"""
self.logger.warning(message, extra={'extra': kwargs})
# Usage example
logger = StructuredLogger('api-service')
# Log with structured context
logger.info(
"User login successful",
user_id=12345,
ip_address="192.168.1.1",
session_id="abc-123"
)
# Output:
# {
# "timestamp": "2024-01-15T10:30:00.123456",
# "level": "INFO",
# "service": "api-service",
# "message": "User login successful",
# "user_id": 12345,
# "ip_address": "192.168.1.1",
# "session_id": "abc-123"
# }
# Context manager for request tracking
import contextvars
request_id_var = contextvars.ContextVar('request_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)
class RequestContextLogger:
"""
Logger that automatically includes request context
Useful for tracking requests across async code
"""
def __init__(self, logger: StructuredLogger):
self.logger = logger
def info(self, message: str, **kwargs):
"""Add request context to all logs"""
context = {
'request_id': request_id_var.get(),
'user_id': user_id_var.get(),
**kwargs
}
self.logger.info(message, **context)
def error(self, message: str, **kwargs):
"""Add request context to errors"""
context = {
'request_id': request_id_var.get(),
'user_id': user_id_var.get(),
**kwargs
}
self.logger.error(message, **context)
# Flask integration
from flask import Flask, g, request
import uuid
app = Flask(__name__)
logger = StructuredLogger('api')
@app.before_request
def before_request():
"""Generate request ID for correlation"""
g.request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
g.start_time = datetime.utcnow()
logger.info(
"Request started",
request_id=g.request_id,
method=request.method,
path=request.path,
ip=request.remote_addr
)
@app.after_request
def after_request(response):
"""Log request completion"""
duration = (datetime.utcnow() - g.start_time).total_seconds()
logger.info(
"Request completed",
request_id=g.request_id,
status_code=response.status_code,
duration_seconds=duration,
method=request.method,
path=request.path
)
return response
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
class PrometheusMetrics:
"""
Prometheus metrics for RED method
Prometheus concepts:
- Counter: Monotonically increasing (requests, errors)
- Gauge: Can go up/down (active connections, queue size)
- Histogram: Distribution of values (latency, request size)
- Summary: Similar to histogram, calculates quantiles
"""
# Rate: Total requests
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)
# Errors: Failed requests
error_count = Counter(
'http_errors_total',
'Total HTTP errors',
['method', 'endpoint', 'error_type']
)
# Duration: Request latency histogram
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0] # Latency buckets
)
# Additional useful metrics
active_requests = Gauge(
'http_requests_active',
'Active HTTP requests',
['method', 'endpoint']
)
@classmethod
def track_request(cls, method: str, endpoint: str):
"""
Decorator to track request metrics
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Increment active requests
cls.active_requests.labels(method=method, endpoint=endpoint).inc()
# Start timer
start_time = time.time()
try:
# Execute request
result = func(*args, **kwargs)
# Record successful request
cls.request_count.labels(
method=method,
endpoint=endpoint,
status_code=200
).inc()
return result
except Exception as e:
# Record error
cls.error_count.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__
).inc()
cls.request_count.labels(
method=method,
endpoint=endpoint,
status_code=500
).inc()
raise
finally:
# Record duration
duration = time.time() - start_time
cls.request_duration.labels(
method=method,
endpoint=endpoint
).observe(duration)
# Decrement active requests
cls.active_requests.labels(
method=method,
endpoint=endpoint
).dec()
return wrapper
return decorator
# Flask integration
from flask import Flask, jsonify
app = Flask(__name__)
# Start Prometheus metrics server on port 8000
start_http_server(8000)
@app.route('/api/users/')
@PrometheusMetrics.track_request('GET', '/api/users/:id')
def get_user(user_id):
"""Tracked endpoint"""
# Simulate work
time.sleep(0.1)
return jsonify({"id": user_id, "name": "John"})
# Custom business metrics
class BusinessMetrics:
"""
Track business-specific metrics
"""
# Revenue metrics
orders_total = Counter(
'orders_total',
'Total orders',
['product', 'country']
)
revenue_total = Counter(
'revenue_total',
'Total revenue in cents',
['product', 'country']
)
# User activity
user_signups = Counter('user_signups_total', 'Total user signups')
active_users = Gauge('active_users', 'Currently active users')
# System metrics
database_connections = Gauge(
'database_connections_active',
'Active database connections'
)
cache_hit_rate = Gauge('cache_hit_rate', 'Cache hit rate percentage')
import psutil
from prometheus_client import Gauge
class SystemMetrics:
"""
System resource metrics (USE method)
"""
# CPU metrics
cpu_utilization = Gauge('cpu_utilization_percent', 'CPU utilization %')
cpu_saturation = Gauge('cpu_load_average', 'CPU load average (1min)')
# Memory metrics
memory_utilization = Gauge('memory_utilization_percent', 'Memory utilization %')
memory_available = Gauge('memory_available_bytes', 'Available memory in bytes')
# Disk metrics
disk_utilization = Gauge('disk_utilization_percent', 'Disk utilization %', ['device'])
disk_io_saturation = Gauge('disk_io_wait_percent', 'Disk I/O wait %')
# Network metrics
network_utilization = Gauge('network_utilization_percent', 'Network utilization %', ['interface'])
network_errors = Gauge('network_errors_total', 'Network errors', ['interface'])
@classmethod
def collect_metrics(cls):
"""
Collect system metrics periodically
Run this in background thread every 15 seconds
"""
# CPU
cls.cpu_utilization.set(psutil.cpu_percent(interval=1))
cls.cpu_saturation.set(psutil.getloadavg()[0]) # 1-minute load average
# Memory
memory = psutil.virtual_memory()
cls.memory_utilization.set(memory.percent)
cls.memory_available.set(memory.available)
# Disk
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
cls.disk_utilization.labels(device=partition.device).set(usage.percent)
except PermissionError:
pass
# Disk I/O
io_counters = psutil.disk_io_counters()
# Calculate I/O wait percentage (requires two samples)
# Network
net_io = psutil.net_io_counters(pernic=True)
for interface, counters in net_io.items():
cls.network_errors.labels(interface=interface).set(
counters.errin + counters.errout
)
# Start metrics collection in background
import threading
def metrics_collector():
"""Background thread for metrics collection"""
while True:
SystemMetrics.collect_metrics()
time.sleep(15) # Collect every 15 seconds
# Start collector thread
collector_thread = threading.Thread(target=metrics_collector, daemon=True)
collector_thread.start()
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
import requests
# Setup tracing
resource = Resource.create({"service.name": "api-service"})
tracer_provider = TracerProvider(resource=resource)
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
class DistributedTracing:
"""
Distributed tracing with OpenTelemetry
Traces show:
- Request path across services
- Time spent in each service
- Where bottlenecks are
- Errors in distributed system
"""
@staticmethod
def traced_operation(name: str):
"""
Decorator for traced operations
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(name) as span:
# Add attributes to span
span.set_attribute("function", func.__name__)
try:
result = func(*args, **kwargs)
# Add result metadata
span.set_attribute("result.success", True)
return result
except Exception as e:
# Record exception in span
span.record_exception(e)
span.set_attribute("result.success", False)
raise
return wrapper
return decorator
@staticmethod
def create_child_span(name: str, parent_span=None):
"""
Create child span manually
Useful for:
- Multiple operations within function
- Async operations
- Custom instrumentation
"""
if parent_span:
ctx = trace.set_span_in_context(parent_span)
return tracer.start_span(name, context=ctx)
else:
return tracer.start_as_current_span(name)
# Example: Multi-service trace
@DistributedTracing.traced_operation("handle_order")
def handle_order(order_id: int):
"""
Order processing spans multiple services
Trace will show:
handle_order
├── validate_order
├── check_inventory (external service)
├── process_payment (external service)
└── send_confirmation
"""
# Validate order (local)
with tracer.start_as_current_span("validate_order") as span:
span.set_attribute("order_id", order_id)
validate_order(order_id)
# Check inventory (external service)
with tracer.start_as_current_span("check_inventory") as span:
span.set_attribute("order_id", order_id)
# Propagate trace context to downstream service
headers = inject_trace_context()
response = requests.get(
f"http://inventory-service/check/{order_id}",
headers=headers
)
span.set_attribute("http.status_code", response.status_code)
# Process payment (external service)
with tracer.start_as_current_span("process_payment") as span:
headers = inject_trace_context()
response = requests.post(
"http://payment-service/charge",
json={"order_id": order_id, "amount": 100},
headers=headers
)
# Send confirmation
with tracer.start_as_current_span("send_confirmation"):
send_email(order_id)
def inject_trace_context() -> dict:
"""
Inject trace context into HTTP headers
This propagates the trace to downstream services
"""
from opentelemetry.propagate import inject
headers = {}
inject(headers) # Adds traceparent, tracestate headers
return headers
def extract_trace_context(headers: dict):
"""
Extract trace context from incoming request
This continues the trace from upstream service
"""
from opentelemetry.propagate import extract
ctx = extract(headers)
return ctx
# Flask integration
@app.route('/api/orders', methods=['POST'])
def create_order():
"""
Traced API endpoint
Automatically creates span for request
"""
# Extract trace context from request headers
ctx = extract_trace_context(dict(request.headers))
with tracer.start_as_current_span("create_order", context=ctx) as span:
order_data = request.get_json()
span.set_attribute("order.items", len(order_data.get('items', [])))
span.set_attribute("order.total", order_data.get('total'))
# Process order
order_id = process_order(order_data)
span.set_attribute("order.id", order_id)
return jsonify({"order_id": order_id})
# Async tracing example
import asyncio
async def async_traced_operation():
"""
Tracing with async/await
"""
with tracer.start_as_current_span("async_parent"):
# Parent span
# Child span 1
with tracer.start_as_current_span("fetch_user"):
user = await fetch_user_from_db(123)
# Child span 2
with tracer.start_as_current_span("fetch_orders"):
orders = await fetch_orders_from_db(user.id)
return user, orders
| Term | Meaning | Example |
|---|---|---|
| SLI (Service Level Indicator) |
Quantitative measure of service level | 99.9% of requests succeed 95% of requests < 200ms |
| SLO (Service Level Objective) |
Target value for SLI | API uptime ≥ 99.9% P95 latency < 200ms |
| SLA (Service Level Agreement) |
Contract with consequences if SLO not met | 99.9% uptime guaranteed or 10% monthly credit |
| Error Budget | Acceptable amount of downtime | 0.1% (43.8 min/month for 99.9%) |
class SLOMonitor:
"""
Monitor SLOs and error budgets
Common SLIs:
- Availability (uptime %)
- Latency (P50, P95, P99)
- Error rate (%)
- Throughput (requests/sec)
"""
def __init__(self, redis_client):
self.cache = redis_client
def track_request(self, success: bool, latency_ms: float):
"""
Track request for SLI calculation
Store in time-series (sorted set in Redis)
"""
timestamp = time.time()
# Store success/failure
self.cache.zadd('sli:requests', {
f'{timestamp}:{"success" if success else "failure"}': timestamp
})
# Store latency
if success:
self.cache.zadd('sli:latency', {
f'{timestamp}:{latency_ms}': timestamp
})
# Trim old data (keep last 30 days)
cutoff = timestamp - (30 * 24 * 3600)
self.cache.zremrangebyscore('sli:requests', '-inf', cutoff)
self.cache.zremrangebyscore('sli:latency', '-inf', cutoff)
def calculate_availability_sli(self, window_hours: int = 24) -> float:
"""
Calculate availability SLI over time window
SLI = successful_requests / total_requests
"""
cutoff = time.time() - (window_hours * 3600)
# Get requests in time window
requests = self.cache.zrangebyscore('sli:requests', cutoff, '+inf')
if not requests:
return 100.0
total = len(requests)
successful = sum(1 for r in requests if b'success' in r)
return (successful / total) * 100
def calculate_latency_percentile(self, percentile: int, window_hours: int = 24) -> float:
"""
Calculate latency percentile (P50, P95, P99)
SLI = P95 latency < 200ms
"""
cutoff = time.time() - (window_hours * 3600)
# Get latencies in time window
latencies_raw = self.cache.zrangebyscore('sli:latency', cutoff, '+inf')
if not latencies_raw:
return 0.0
# Extract latency values
latencies = sorted([
float(l.decode().split(':')[1])
for l in latencies_raw
])
# Calculate percentile
index = int(len(latencies) * percentile / 100)
return latencies[min(index, len(latencies) - 1)]
def calculate_error_budget(self, slo_target: float = 99.9) -> dict:
"""
Calculate error budget
Error budget = 100% - SLO
Example: 99.9% SLO → 0.1% error budget
This tells you how much downtime is acceptable
"""
current_sli = self.calculate_availability_sli(window_hours=720) # 30 days
error_budget_percent = 100 - slo_target
used_percent = 100 - current_sli
remaining_percent = error_budget_percent - used_percent
# Convert to time (for 30-day window)
window_minutes = 30 * 24 * 60
error_budget_minutes = window_minutes * (error_budget_percent / 100)
used_minutes = window_minutes * (used_percent / 100)
remaining_minutes = error_budget_minutes - used_minutes
return {
'slo_target': slo_target,
'current_sli': current_sli,
'error_budget_percent': error_budget_percent,
'error_budget_minutes': error_budget_minutes,
'used_minutes': used_minutes,
'remaining_minutes': remaining_minutes,
'budget_exhausted': remaining_minutes <= 0
}
# Alerting based on SLOs
class SLOAlerting:
"""
Alert when approaching SLO violation
"""
def check_slo_violation(self, monitor: SLOMonitor):
"""
Check if SLO is at risk
Alert thresholds:
- 50% error budget consumed: Warning
- 75% error budget consumed: Critical
- 100% error budget consumed: Page on-call
"""
budget = monitor.calculate_error_budget(slo_target=99.9)
used_percent = (budget['used_minutes'] / budget['error_budget_minutes']) * 100
if used_percent >= 100:
# SLO violated
self.page_oncall("SLO violated - error budget exhausted")
elif used_percent >= 75:
# Critical warning
self.alert_critical(f"Error budget 75% consumed ({budget['remaining_minutes']:.1f} min remaining)")
elif used_percent >= 50:
# Warning
self.alert_warning(f"Error budget 50% consumed ({budget['remaining_minutes']:.1f} min remaining)")
def page_oncall(self, message: str):
"""Page on-call engineer (PagerDuty, Opsgenie)"""
print(f"🚨 PAGE: {message}")
def alert_critical(self, message: str):
"""Send critical alert (Slack, email)"""
print(f"⚠️ CRITICAL: {message}")
def alert_warning(self, message: str):
"""Send warning"""
print(f"⚡ WARNING: {message}")
from flask import Flask, jsonify
import psycopg2
import redis as redis_client
app = Flask(__name__)
class HealthCheck:
"""
Health check endpoints for orchestrators (Kubernetes)
Liveness: Is the app alive? (restart if not)
Readiness: Is the app ready to serve traffic? (route traffic if yes)
Startup: Has the app finished starting? (used for slow-starting apps)
"""
def __init__(self):
self.db_conn = None
self.redis_conn = None
def check_liveness(self) -> tuple[bool, dict]:
"""
Liveness probe - is application alive?
Should check:
- Application is running
- Not deadlocked
- Basic functionality works
Should NOT check:
- Database connectivity (not liveness issue)
- External dependencies
If fails: Kubernetes restarts pod
"""
try:
# Basic check - can we respond?
import threading
thread_count = threading.active_count()
if thread_count > 100:
# Possible thread leak/deadlock
return False, {'status': 'unhealthy', 'reason': 'too many threads'}
return True, {'status': 'healthy', 'threads': thread_count}
except Exception as e:
return False, {'status': 'unhealthy', 'error': str(e)}
def check_readiness(self) -> tuple[bool, dict]:
"""
Readiness probe - is application ready for traffic?
Should check:
- Database connectivity
- Cache connectivity
- Required external services
- Configuration loaded
If fails: Kubernetes removes from load balancer (no traffic)
"""
checks = {}
all_ready = True
# Check database
try:
db_ready = self._check_database()
checks['database'] = 'ready' if db_ready else 'not ready'
all_ready = all_ready and db_ready
except Exception as e:
checks['database'] = f'error: {str(e)}'
all_ready = False
# Check cache
try:
cache_ready = self._check_redis()
checks['cache'] = 'ready' if cache_ready else 'not ready'
all_ready = all_ready and cache_ready
except Exception as e:
checks['cache'] = f'error: {str(e)}'
all_ready = False
return all_ready, {'status': 'ready' if all_ready else 'not ready', 'checks': checks}
def check_startup(self) -> tuple[bool, dict]:
"""
Startup probe - has application finished starting?
Used for slow-starting applications
Kubernetes waits for startup before checking liveness/readiness
Example: Large models loading, database migrations
"""
try:
# Check if initialization complete
# In real app, check if all resources loaded
initialization_complete = True # Replace with actual check
return initialization_complete, {
'status': 'started' if initialization_complete else 'starting'
}
except Exception as e:
return False, {'status': 'error', 'error': str(e)}
def _check_database(self) -> bool:
"""Check database connectivity"""
try:
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
conn.close()
return True
except:
return False
def _check_redis(self) -> bool:
"""Check Redis connectivity"""
try:
r = redis_client.Redis(host='localhost', port=6379)
r.ping()
return True
except:
return False
# Flask routes
health = HealthCheck()
@app.route('/health/live')
def liveness():
"""Liveness probe endpoint"""
is_alive, details = health.check_liveness()
status_code = 200 if is_alive else 503
return jsonify(details), status_code
@app.route('/health/ready')
def readiness():
"""Readiness probe endpoint"""
is_ready, details = health.check_readiness()
status_code = 200 if is_ready else 503
return jsonify(details), status_code
@app.route('/health/startup')
def startup():
"""Startup probe endpoint"""
is_started, details = health.check_startup()
status_code = 200 if is_started else 503
return jsonify(details), status_code
# Kubernetes deployment YAML
"""
apiVersion: v1
kind: Pod
metadata:
name: api-pod
spec:
containers:
- name: api
image: api:latest
ports:
- containerPort: 5000
# Startup probe (runs first, higher failure threshold)
startupProbe:
httpGet:
path: /health/startup
port: 5000
failureThreshold: 30 # 30 * 10s = 5 min max startup time
periodSeconds: 10
# Liveness probe (restart if fails)
livenessProbe:
httpGet:
path: /health/live
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
failureThreshold: 3 # Restart after 3 failures
# Readiness probe (remove from load balancer if fails)
readinessProbe:
httpGet:
path: /health/ready
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
failureThreshold: 3 # Remove from LB after 3 failures
"""