Architectural Patterns - Senior/Distinguished Engineer Guide

Architectural patterns are high-level strategies for organizing code and systems. While design patterns (GoF) focus on object-level solutions, architectural patterns address system-level concerns like scalability, maintainability, and distributed computing.

For Senior/Distinguished Engineers: You should be able to explain when to use each pattern, their trade-offs, and how they combine in real systems.

1. Microservices Architecture

Overview

Break down application into small, independently deployable services that communicate over network.

graph TB Client["CLIENT
(Web/Mobile App)"] Client --> Gateway["API Gateway
(Routing, Auth, Rate Limiting)"] Gateway --> UserSvc["User Service
(PostgreSQL)"] Gateway --> OrderSvc["Order Service
(MongoDB)"] Gateway --> ProductSvc["Product Service
(Elasticsearch)"] UserSvc --> MQ["Message Queue
(Kafka)"] OrderSvc --> MQ ProductSvc --> MQ MQ -.->|"Async Events"| UserSvc MQ -.->|"Async Events"| OrderSvc MQ -.->|"Async Events"| ProductSvc style Client fill:#3b4252,stroke:#88c0d0,stroke-width:3px,color:#e0e0e0 style Gateway fill:#3b4252,stroke:#ebcb8b,stroke-width:2px,color:#e0e0e0 style UserSvc fill:#3b4252,stroke:#a3be8c,stroke-width:2px,color:#e0e0e0 style OrderSvc fill:#3b4252,stroke:#a3be8c,stroke-width:2px,color:#e0e0e0 style ProductSvc fill:#3b4252,stroke:#a3be8c,stroke-width:2px,color:#e0e0e0 style MQ fill:#3b4252,stroke:#b48ead,stroke-width:2px,color:#e0e0e0

When to Use

Trade-offs

Pros: Cons:
+ Independent deployment
+ Technology flexibility
+ Team autonomy
+ Fault isolation
+ Scalability per service
- Distributed system complexity
- Network latency
- Data consistency challenges
- Testing complexity
- Operational overhead

Python Example - Service Structure

# User Service (Flask microservice)
from flask import Flask, jsonify, request
from sqlalchemy import create_engine
import requests

app = Flask(__name__)
db = create_engine('postgresql://localhost/users')

@app.route('/users/', methods=['GET'])
def get_user(user_id):
    """Get user details"""
    # Query local database
    user = db.execute(f"SELECT * FROM users WHERE id = {user_id}").fetchone()

    # Call other service if needed (sync)
    # orders = requests.get(f'http://order-service/orders?user_id={user_id}')

    return jsonify({
        'id': user.id,
        'name': user.name,
        'email': user.email
    })

@app.route('/users', methods=['POST'])
def create_user():
    """Create new user"""
    data = request.json
    db.execute(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        name=data['name'],
        email=data['email']
    )

    # Publish event for other services (async)
    publish_event('user.created', data)

    return jsonify({'status': 'created'}), 201

def publish_event(event_type, data):
    """Publish to message queue"""
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    producer.send('events', {
        'type': event_type,
        'data': data
    })

if __name__ == '__main__':
    app.run(port=5001)


# Order Service (separate process)
@app.route('/orders', methods=['GET'])
def get_orders():
    user_id = request.args.get('user_id')
    orders = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}").fetchall()
    return jsonify([dict(o) for o in orders])
Real-world examples: Netflix, Uber, Amazon, Spotify
Key patterns used: API Gateway, Service Discovery, Circuit Breaker, Saga

2. Event-Driven Architecture (EDA)

Overview

Services communicate through events rather than direct calls. Producers emit events, consumers react to them.

+──────────────+         +───────────────────+
│   Producer   │ ──────> │   Event Bus       │
│  (Service A) │  emit   │  (Kafka/RabbitMQ) │
+──────────────+         +────────┬──────────+
                                  │
                    +─────────────┼─────────────+
                    │             │             │
              +─────▼──+    +─────▼──+    +─────▼──+
              │Consumer│    │Consumer│    │Consumer│
              │   B    │    │   C    │    │   D    │
              +────────+    +────────+    +────────+
              (subscribe)   (subscribe)   (subscribe)
            

When to Use

Python Example - Event-Driven System

# Event Publisher
from dataclasses import dataclass
from datetime import datetime
from kafka import KafkaProducer
import json

@dataclass
class Event:
    event_type: str
    aggregate_id: str
    data: dict
    timestamp: datetime = datetime.utcnow()

class EventPublisher:
    def __init__(self, broker='localhost:9092'):
        self.producer = KafkaProducer(
            bootstrap_servers=broker,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def publish(self, event: Event):
        self.producer.send('events', {
            'type': event.event_type,
            'aggregate_id': event.aggregate_id,
            'data': event.data,
            'timestamp': event.timestamp.isoformat()
        })
        print(f"Published: {event.event_type}")


# Event Consumer
from kafka import KafkaConsumer
from abc import ABC, abstractmethod

class EventHandler(ABC):
    @abstractmethod
    def can_handle(self, event_type: str) -> bool:
        pass

    @abstractmethod
    def handle(self, event: dict):
        pass

class OrderCreatedHandler(EventHandler):
    def can_handle(self, event_type: str) -> bool:
        return event_type == 'order.created'

    def handle(self, event: dict):
        print(f"Processing order: {event['aggregate_id']}")
        # Send email
        # Update inventory
        # Create shipping label
        return "Order processed"

class PaymentReceivedHandler(EventHandler):
    def can_handle(self, event_type: str) -> bool:
        return event_type == 'payment.received'

    def handle(self, event: dict):
        print(f"Payment received: {event['data']['amount']}")
        # Update order status
        # Send receipt
        return "Payment processed"

class EventConsumer:
    def __init__(self, handlers: list[EventHandler], broker='localhost:9092'):
        self.consumer = KafkaConsumer(
            'events',
            bootstrap_servers=broker,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.handlers = handlers

    def start(self):
        print("Starting event consumer...")
        for message in self.consumer:
            event = message.value
            event_type = event['type']

            for handler in self.handlers:
                if handler.can_handle(event_type):
                    handler.handle(event)


# Usage
if __name__ == '__main__':
    # Producer side
    publisher = EventPublisher()
    publisher.publish(Event(
        event_type='order.created',
        aggregate_id='order-123',
        data={'user_id': 'user-456', 'total': 99.99}
    ))

    # Consumer side (separate process)
    consumer = EventConsumer([
        OrderCreatedHandler(),
        PaymentReceivedHandler()
    ])
    # consumer.start()  # Blocks and listens
Real-world examples: AWS EventBridge, Apache Kafka streams
Benefits: Scalability, resilience, time decoupling

3. CQRS (Command Query Responsibility Segregation)

Overview

Separate read and write operations into different models. Commands change state, queries return data.

+──────────+                           +──────────+
│  Client  │                           │  Client  │
+────┬─────+                           +────┬─────+
     │ Command (Write)                      │ Query (Read)
     │                                      │
+────▼─────────+                      +────▼──────────+
│Command Model │                      │  Query Model  │
│ (Write DB)   │                      │  (Read DB)    │
│ PostgreSQL   │ ─────sync/async────> │  Elasticsearch│
│ (normalized) │     replication      │  (denormalized)│
+──────────────+                      +───────────────+
                                       (optimized for reads)
            

When to Use

Python Example - CQRS Implementation

# Command Side (Write)
from dataclasses import dataclass
from abc import ABC, abstractmethod

@dataclass
class Command(ABC):
    """Base command"""
    pass

@dataclass
class CreateOrderCommand(Command):
    user_id: str
    items: list
    total: float

@dataclass
class CancelOrderCommand(Command):
    order_id: str
    reason: str

class CommandHandler(ABC):
    @abstractmethod
    def handle(self, command: Command):
        pass

class CreateOrderHandler(CommandHandler):
    def __init__(self, write_db, event_bus):
        self.write_db = write_db
        self.event_bus = event_bus

    def handle(self, command: CreateOrderCommand):
        # Validate
        if command.total <= 0:
            raise ValueError("Invalid total")

        # Write to database (source of truth)
        order_id = self.write_db.execute("""
            INSERT INTO orders (user_id, items, total, status)
            VALUES (:user_id, :items, :total, 'pending')
            RETURNING id
        """, **command.__dict__).fetchone()[0]

        # Publish event for read model sync
        self.event_bus.publish({
            'type': 'OrderCreated',
            'order_id': order_id,
            'user_id': command.user_id,
            'total': command.total
        })

        return order_id


# Query Side (Read)
@dataclass
class Query(ABC):
    """Base query"""
    pass

@dataclass
class GetOrdersByUserQuery(Query):
    user_id: str

@dataclass
class GetOrderDetailsQuery(Query):
    order_id: str

class QueryHandler(ABC):
    @abstractmethod
    def handle(self, query: Query):
        pass

class GetOrdersByUserHandler(QueryHandler):
    def __init__(self, read_db):
        self.read_db = read_db  # Optimized for reads (e.g., Elasticsearch)

    def handle(self, query: GetOrdersByUserQuery):
        # Query denormalized, optimized read model
        return self.read_db.search({
            'query': {
                'match': {'user_id': query.user_id}
            },
            'sort': [{'created_at': 'desc'}]
        })


# Read Model Projector (syncs write → read)
class OrderProjector:
    """Updates read model when events occur"""
    def __init__(self, read_db):
        self.read_db = read_db

    def project_order_created(self, event):
        """Denormalize and optimize for reads"""
        # Combine data from multiple sources
        user = self.fetch_user_info(event['user_id'])

        self.read_db.index('orders', {
            'order_id': event['order_id'],
            'user_name': user['name'],
            'user_email': user['email'],
            'total': event['total'],
            'created_at': event['timestamp'],
            # ... optimized structure for queries
        })


# Application Service (coordinates)
class OrderService:
    def __init__(self, command_handlers, query_handlers):
        self.command_handlers = command_handlers
        self.query_handlers = query_handlers

    def create_order(self, user_id, items, total):
        """Command"""
        command = CreateOrderCommand(user_id, items, total)
        handler = self.command_handlers[CreateOrderCommand]
        return handler.handle(command)

    def get_user_orders(self, user_id):
        """Query"""
        query = GetOrdersByUserQuery(user_id)
        handler = self.query_handlers[GetOrdersByUserQuery]
        return handler.handle(query)

Trade-offs

Pros: Cons:
+ Independent scaling (read/write)
+ Optimized data models
+ Complex queries don't impact writes
+ Clear separation of concerns
- Eventual consistency
- Code complexity
- Duplication of data
- Synchronization overhead

4. Saga Pattern (Distributed Transactions)

Overview

Manage data consistency across microservices using sequence of local transactions with compensating actions.

Choreography Saga (Event-driven):

Order Service ──[OrderCreated]──> Payment Service ──[PaymentSuccess]──> Inventory Service
     │                                   │                                      │
     │                             [PaymentFailed]                    [InventoryReserved]
     │                                   │                                      │
     │                                   ▼                                      │
     +────────────────────── [CancelOrder] <─────────────────────────────────-+
                            (Compensating action)


Orchestration Saga (Coordinator-based):

                            +─────────────────+
                            │ Saga Coordinator│
                            +────────┬────────+
                                     │
                    +────────────────┼────────────────+
                    │                │                │
              +─────▼──────+  +─────▼──────+  +─────▼──────+
              │   Order    │  │  Payment   │  │ Inventory  │
              │  Service   │  │  Service   │  │  Service   │
              +────────────+  +────────────+  +────────────+
            

Python Example - Orchestration Saga

# Saga Coordinator Pattern
from enum import Enum
from dataclasses import dataclass
from typing import Callable, List

class SagaStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"

@dataclass
class SagaStep:
    """Each step in the saga"""
    name: str
    action: Callable          # Forward action
    compensation: Callable    # Rollback action
    completed: bool = False

class SagaOrchestrator:
    """Coordinates distributed transaction"""
    def __init__(self):
        self.steps: List[SagaStep] = []
        self.status = SagaStatus.PENDING
        self.completed_steps: List[SagaStep] = []

    def add_step(self, name: str, action: Callable, compensation: Callable):
        self.steps.append(SagaStep(name, action, compensation))
        return self

    def execute(self, context: dict):
        """Execute saga - all or nothing"""
        print(f"Starting saga with {len(self.steps)} steps")

        try:
            # Execute each step
            for step in self.steps:
                print(f"  Executing: {step.name}")
                step.action(context)
                step.completed = True
                self.completed_steps.append(step)

            self.status = SagaStatus.COMPLETED
            print("Saga completed successfully")
            return True

        except Exception as e:
            print(f"Saga failed at {step.name}: {e}")
            self.status = SagaStatus.FAILED
            self._compensate(context)
            return False

    def _compensate(self, context: dict):
        """Rollback completed steps in reverse order"""
        self.status = SagaStatus.COMPENSATING
        print("Starting compensation (rollback)...")

        for step in reversed(self.completed_steps):
            print(f"  Compensating: {step.name}")
            try:
                step.compensation(context)
            except Exception as e:
                print(f"  Warning: Compensation failed for {step.name}: {e}")

        self.status = SagaStatus.COMPENSATED
        print("Compensation completed")


# Example: Order Processing Saga
class OrderService:
    @staticmethod
    def create_order(context):
        order_id = f"order-{context['user_id']}"
        context['order_id'] = order_id
        print(f"    Created order: {order_id}")

    @staticmethod
    def cancel_order(context):
        print(f"    Cancelled order: {context.get('order_id')}")

class PaymentService:
    @staticmethod
    def charge_payment(context):
        if context.get('simulate_payment_failure'):
            raise Exception("Payment declined")
        context['payment_id'] = "payment-123"
        print(f"    Charged ${context['amount']}")

    @staticmethod
    def refund_payment(context):
        if 'payment_id' in context:
            print(f"    Refunded payment: {context['payment_id']}")

class InventoryService:
    @staticmethod
    def reserve_inventory(context):
        print(f"    Reserved {context['quantity']} items")
        context['reservation_id'] = "res-456"

    @staticmethod
    def release_inventory(context):
        if 'reservation_id' in context:
            print(f"    Released reservation: {context['reservation_id']}")

class ShippingService:
    @staticmethod
    def create_shipment(context):
        print(f"    Created shipment for order: {context['order_id']}")
        context['shipment_id'] = "ship-789"

    @staticmethod
    def cancel_shipment(context):
        if 'shipment_id' in context:
            print(f"    Cancelled shipment: {context['shipment_id']}")


# Usage
if __name__ == '__main__':
    # Successful saga
    print("\n" + "="*60)
    print("SAGA 1: Successful Order")
    print("="*60)

    saga = SagaOrchestrator()
    saga.add_step("Create Order",
                  OrderService.create_order,
                  OrderService.cancel_order)
    saga.add_step("Process Payment",
                  PaymentService.charge_payment,
                  PaymentService.refund_payment)
    saga.add_step("Reserve Inventory",
                  InventoryService.reserve_inventory,
                  InventoryService.release_inventory)
    saga.add_step("Create Shipment",
                  ShippingService.create_shipment,
                  ShippingService.cancel_shipment)

    context = {'user_id': 'user-123', 'amount': 99.99, 'quantity': 2}
    success = saga.execute(context)
    print(f"Final context: {context}")

    # Failed saga (triggers compensation)
    print("\n" + "="*60)
    print("SAGA 2: Failed Payment (will compensate)")
    print("="*60)

    saga2 = SagaOrchestrator()
    saga2.add_step("Create Order",
                   OrderService.create_order,
                   OrderService.cancel_order)
    saga2.add_step("Process Payment",
                   PaymentService.charge_payment,
                   PaymentService.refund_payment)
    saga2.add_step("Reserve Inventory",
                   InventoryService.reserve_inventory,
                   InventoryService.release_inventory)

    context2 = {
        'user_id': 'user-456',
        'amount': 199.99,
        'quantity': 5,
        'simulate_payment_failure': True  # Trigger failure
    }
    success = saga2.execute(context2)
    print(f"Saga status: {saga2.status}")
Key Points:

5. Hexagonal Architecture (Ports and Adapters)

Overview

Isolate core business logic from external concerns (UI, database, APIs) using ports and adapters.

       Adapters (Infrastructure)         Core (Business Logic)

+─────────────+                    +────────────────────+
│ REST API    │ ────────────────> │                    │
│ (FastAPI)   │     Port (IOrderRepo)    Domain Model   │
+─────────────+                    │  (Business Rules)  │
                                   │                    │
+─────────────+                    │                    │
│  GraphQL    │ ────────────────> │                    │
│   API       │     Port           +────────────────────+
+─────────────+                              │
                                             │ Port (IOrderRepo)
+─────────────+                              ▼
│ PostgreSQL  │ <──────────── Adapter ────────────────
│  Database   │               (implements port)
+─────────────+

+─────────────+
│   Kafka     │ <──────────── Event Adapter
│  Message    │
+─────────────+
            

Python Example - Hexagonal Architecture

# Core Domain (Business Logic) - No dependencies
from dataclasses import dataclass
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List

@dataclass
class Order:
    """Domain Entity"""
    id: str
    user_id: str
    items: List[dict]
    total: float
    status: str
    created_at: datetime

    def can_cancel(self) -> bool:
        """Business rule"""
        return self.status in ['pending', 'confirmed']

    def cancel(self):
        """Business logic"""
        if not self.can_cancel():
            raise ValueError(f"Cannot cancel order in {self.status} status")
        self.status = 'cancelled'


# Ports (Interfaces) - Define what core needs
class IOrderRepository(ABC):
    """Output port - persistence"""
    @abstractmethod
    def save(self, order: Order):
        pass

    @abstractmethod
    def find_by_id(self, order_id: str) -> Order:
        pass

class INotificationService(ABC):
    """Output port - external service"""
    @abstractmethod
    def send_order_confirmation(self, order: Order):
        pass


# Core Application Service (Use Cases)
class OrderService:
    """Core business logic - depends only on ports"""
    def __init__(self,
                 repository: IOrderRepository,
                 notification: INotificationService):
        self.repository = repository
        self.notification = notification

    def create_order(self, user_id: str, items: List[dict], total: float) -> Order:
        """Use case: Create Order"""
        # Business logic
        order = Order(
            id=self._generate_id(),
            user_id=user_id,
            items=items,
            total=total,
            status='pending',
            created_at=datetime.now()
        )

        # Validate business rules
        if total <= 0:
            raise ValueError("Total must be positive")

        # Use port to persist
        self.repository.save(order)

        # Use port to notify
        self.notification.send_order_confirmation(order)

        return order

    def cancel_order(self, order_id: str):
        """Use case: Cancel Order"""
        order = self.repository.find_by_id(order_id)
        order.cancel()  # Domain logic
        self.repository.save(order)

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())


# Adapters (Infrastructure) - Implement ports
class PostgreSQLOrderRepository(IOrderRepository):
    """Adapter for PostgreSQL"""
    def __init__(self, connection_string):
        from sqlalchemy import create_engine
        self.engine = create_engine(connection_string)

    def save(self, order: Order):
        self.engine.execute("""
            INSERT INTO orders (id, user_id, items, total, status, created_at)
            VALUES (:id, :user_id, :items, :total, :status, :created_at)
            ON CONFLICT (id) DO UPDATE
            SET status = :status
        """, **order.__dict__)

    def find_by_id(self, order_id: str) -> Order:
        row = self.engine.execute(
            "SELECT * FROM orders WHERE id = :id",
            id=order_id
        ).fetchone()
        return Order(**dict(row))

class InMemoryOrderRepository(IOrderRepository):
    """Adapter for in-memory (testing)"""
    def __init__(self):
        self.orders = {}

    def save(self, order: Order):
        self.orders[order.id] = order

    def find_by_id(self, order_id: str) -> Order:
        return self.orders.get(order_id)

class EmailNotificationService(INotificationService):
    """Adapter for email notifications"""
    def send_order_confirmation(self, order: Order):
        # Send email via SMTP
        print(f"Sending email for order {order.id} to user {order.user_id}")

class KafkaNotificationService(INotificationService):
    """Adapter for Kafka events"""
    def __init__(self, broker):
        from kafka import KafkaProducer
        self.producer = KafkaProducer(bootstrap_servers=broker)

    def send_order_confirmation(self, order: Order):
        self.producer.send('order-events', {
            'type': 'OrderCreated',
            'order_id': order.id
        })


# Input Adapters (Controllers)
from fastapi import FastAPI

class OrderController:
    """REST API adapter (input port)"""
    def __init__(self, order_service: OrderService):
        self.order_service = order_service
        self.app = FastAPI()
        self._setup_routes()

    def _setup_routes(self):
        @self.app.post("/orders")
        def create_order(user_id: str, items: list, total: float):
            order = self.order_service.create_order(user_id, items, total)
            return {"order_id": order.id, "status": order.status}

        @self.app.delete("/orders/{order_id}")
        def cancel_order(order_id: str):
            self.order_service.cancel_order(order_id)
            return {"status": "cancelled"}


# Dependency Injection (Wire it up)
if __name__ == '__main__':
    # Choose adapters (easily swappable!)
    repo = InMemoryOrderRepository()  # or PostgreSQLOrderRepository(...)
    notifier = EmailNotificationService()  # or KafkaNotificationService(...)

    # Inject dependencies
    order_service = OrderService(repo, notifier)

    # Create input adapter
    controller = OrderController(order_service)

    # Run
    # uvicorn.run(controller.app)

Benefits

Pattern Comparison Table

Pattern Best For Complexity When NOT to Use
Microservices Large teams, independent deployment Very High Small apps, tight coupling needed
Event-Driven Async workflows, decoupling High Immediate consistency required
CQRS Different read/write loads High Simple CRUD, strong consistency needed
Saga Distributed transactions High Can use 2PC, single database
Hexagonal Clean architecture, testability Medium Very simple apps, rapid prototyping

Interview Tips

For Senior/Distinguished Engineers