Messaging Systems

Message Queues, Pub/Sub, Event Streaming & AWS Services

1. Messaging Patterns Overview

Point-to-Point (Queue)

Pattern: One producer → Queue → One consumer

  • Message consumed by exactly one consumer
  • Message deleted after consumption
  • Load balancing across consumers
  • Good for: Task distribution, job queues

Examples: SQS, RabbitMQ queues

Publish/Subscribe (Pub/Sub)

Pattern: One producer → Topic → Many subscribers

  • Message delivered to all subscribers
  • Decoupled producers and consumers
  • Fan-out pattern
  • Good for: Notifications, events

Examples: SNS, Redis Pub/Sub, RabbitMQ exchanges

Event Streaming

Pattern: Append-only log of events

  • Messages retained for configurable time
  • Multiple consumers at different offsets
  • Replay capability
  • Good for: Event sourcing, analytics

Examples: Kafka, Kinesis

2. Comprehensive Comparison

System AWS Equivalent Pattern Throughput Latency Ordering Persistence Best For
RabbitMQ Amazon MQ Queue + Pub/Sub 10K-100K msg/s ~1-10 ms Per queue Optional Traditional messaging, complex routing
Apache Kafka Amazon MSK Event streaming 1M+ msg/s ~5-15 ms Per partition Always (configurable) High-throughput streaming, event sourcing
Redis Pub/Sub ElastiCache Pub/Sub 100K+ msg/s ~1 ms None None (fire-forget) Real-time notifications, chat
AWS SQS Native AWS Queue 3K-30K msg/s ~10-100 ms FIFO available 4-14 days Decoupling, serverless, simple queues
AWS SNS Native AWS Pub/Sub 30K+ msg/s ~10-50 ms None None (immediate delivery) Fan-out, mobile push, email/SMS
AWS Kinesis Native AWS Event streaming 1M+ msg/s ~70-200 ms Per shard 1-365 days Real-time analytics, log aggregation
EventBridge Native AWS Event bus 10K+ msg/s ~50-500 ms None Archive available Event-driven architectures, SaaS integrations

3. AWS SQS (Simple Queue Service)

When to Use:

3.1 Standard Queue vs FIFO Queue

Feature Standard Queue FIFO Queue
Throughput Unlimited (nearly) 3,000 msg/s (batched: 30,000)
Ordering Best-effort Guaranteed FIFO
Delivery At-least-once (duplicates possible) Exactly-once processing
Use Case High throughput, order not critical Order matters (payments, bookings)
import boto3
import json

class SQSMessaging:
    """AWS SQS Usage Patterns"""

    def __init__(self):
        self.sqs = boto3.client('sqs', region_name='us-east-1')

    def send_message(self, queue_url: str, message: dict):
        """
        Send message to SQS queue
        """
        response = self.sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message),
            MessageAttributes={
                'Priority': {
                    'StringValue': 'High',
                    'DataType': 'String'
                }
            }
        )
        return response['MessageId']

    def send_message_fifo(self, queue_url: str, message: dict,
                         deduplication_id: str, group_id: str):
        """
        Send message to FIFO queue

        - MessageGroupId: Messages in same group are processed in order
        - MessageDeduplicationId: Prevents duplicates within 5-minute window
        """
        response = self.sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message),
            MessageGroupId=group_id,  # Orders messages within this group
            MessageDeduplicationId=deduplication_id  # Prevents duplicates
        )
        return response['MessageId']

    def receive_messages(self, queue_url: str, max_messages: int = 10):
        """
        Receive and process messages (long polling)
        """
        response = self.sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=max_messages,  # 1-10
            WaitTimeSeconds=20,  # Long polling (reduces empty responses)
            VisibilityTimeout=30,  # Hide message for 30s while processing
            MessageAttributeNames=['All']
        )

        messages = response.get('Messages', [])

        for message in messages:
            try:
                # Process message
                body = json.loads(message['Body'])
                self.process_message(body)

                # Delete message after successful processing
                self.sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=message['ReceiptHandle']
                )
            except Exception as e:
                print(f"Processing failed: {e}")
                # Message will become visible again after VisibilityTimeout
                # Consider using Dead Letter Queue (DLQ) for failed messages

    def batch_send(self, queue_url: str, messages: list):
        """
        Send up to 10 messages in batch (more efficient)
        """
        entries = [
            {
                'Id': str(i),
                'MessageBody': json.dumps(msg)
            }
            for i, msg in enumerate(messages[:10])
        ]

        response = self.sqs.send_message_batch(
            QueueUrl=queue_url,
            Entries=entries
        )

        return {
            'successful': len(response.get('Successful', [])),
            'failed': len(response.get('Failed', []))
        }

    def process_message(self, message: dict):
        """Process message logic"""
        print(f"Processing: {message}")


# Dead Letter Queue Pattern
def setup_dlq():
    """
    Dead Letter Queue: Capture failed messages

    After maxReceiveCount attempts, move message to DLQ
    """
    sqs = boto3.client('sqs')

    # Create DLQ
    dlq_response = sqs.create_queue(
        QueueName='failed-messages-dlq'
    )
    dlq_url = dlq_response['QueueUrl']

    # Get DLQ ARN
    dlq_attrs = sqs.get_queue_attributes(
        QueueUrl=dlq_url,
        AttributeNames=['QueueArn']
    )
    dlq_arn = dlq_attrs['Attributes']['QueueArn']

    # Create main queue with DLQ
    main_queue = sqs.create_queue(
        QueueName='main-queue',
        Attributes={
            'RedrivePolicy': json.dumps({
                'deadLetterTargetArn': dlq_arn,
                'maxReceiveCount': '3'  # After 3 failed attempts → DLQ
            }),
            'VisibilityTimeout': '30',
            'MessageRetentionPeriod': '345600'  # 4 days
        }
    )

    return main_queue['QueueUrl'], dlq_url

3.2 SQS Best Practices

4. AWS SNS (Simple Notification Service)

When to Use:
import boto3
import json

class SNSMessaging:
    """AWS SNS Usage Patterns"""

    def __init__(self):
        self.sns = boto3.client('sns', region_name='us-east-1')

    def publish_to_topic(self, topic_arn: str, message: dict, subject: str = None):
        """
        Publish message to SNS topic
        All subscribers receive the message
        """
        response = self.sns.publish(
            TopicArn=topic_arn,
            Message=json.dumps({
                'default': json.dumps(message),  # Default message
                'email': f"Subject: {subject}\n\n{message.get('text', '')}",  # Email format
                'sqs': json.dumps(message),  # SQS format
                'lambda': json.dumps(message)  # Lambda format
            }),
            Subject=subject,
            MessageStructure='json',  # Different formats per protocol
            MessageAttributes={
                'event_type': {
                    'DataType': 'String',
                    'StringValue': message.get('type', 'generic')
                }
            }
        )
        return response['MessageId']

    def create_topic(self, topic_name: str):
        """Create SNS topic"""
        response = self.sns.create_topic(
            Name=topic_name,
            Attributes={
                'DisplayName': topic_name,
                'FifoTopic': 'false'  # or 'true' for FIFO
            }
        )
        return response['TopicArn']

    def subscribe_email(self, topic_arn: str, email: str):
        """Subscribe email to topic"""
        response = self.sns.subscribe(
            TopicArn=topic_arn,
            Protocol='email',
            Endpoint=email
        )
        # User must confirm subscription via email
        return response['SubscriptionArn']

    def subscribe_sqs(self, topic_arn: str, queue_arn: str):
        """
        Subscribe SQS queue to SNS topic (fan-out pattern)

        Benefits:
        - Durability: SNS delivers immediately, SQS stores
        - Multiple processing: Each subscriber gets own queue
        - Decoupling: Add/remove subscribers without affecting others
        """
        response = self.sns.subscribe(
            TopicArn=topic_arn,
            Protocol='sqs',
            Endpoint=queue_arn
        )
        return response['SubscriptionArn']

    def subscribe_lambda(self, topic_arn: str, lambda_arn: str):
        """Subscribe Lambda function to topic"""
        response = self.sns.subscribe(
            TopicArn=topic_arn,
            Protocol='lambda',
            Endpoint=lambda_arn
        )
        return response['SubscriptionArn']

    def subscribe_with_filter(self, topic_arn: str, queue_arn: str, filter_policy: dict):
        """
        Subscribe with message filtering

        Only messages matching filter are delivered to subscriber
        """
        response = self.sns.subscribe(
            TopicArn=topic_arn,
            Protocol='sqs',
            Endpoint=queue_arn,
            Attributes={
                'FilterPolicy': json.dumps(filter_policy)
            }
        )
        return response['SubscriptionArn']


# SNS + SQS Fan-Out Pattern
def fanout_pattern_example():
    """
    Pattern: SNS → Multiple SQS Queues

    Use case: Order placed event triggers:
    - Email notification service
    - Inventory update service
    - Analytics service
    Each service gets its own queue
    """
    sns = boto3.client('sns')
    sqs = boto3.client('sqs')

    # Create SNS topic
    topic = sns.create_topic(Name='order-events')
    topic_arn = topic['TopicArn']

    # Create SQS queues for different services
    email_queue = sqs.create_queue(QueueName='email-notifications')
    inventory_queue = sqs.create_queue(QueueName='inventory-updates')
    analytics_queue = sqs.create_queue(QueueName='analytics-events')

    # Subscribe queues to topic with filters
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint=email_queue['QueueArn'],
        Attributes={
            'FilterPolicy': json.dumps({
                'event_type': ['order_placed', 'order_shipped']
            })
        }
    )

    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint=inventory_queue['QueueArn'],
        Attributes={
            'FilterPolicy': json.dumps({
                'event_type': ['order_placed', 'order_cancelled']
            })
        }
    )

    # Analytics gets all events (no filter)
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint=analytics_queue['QueueArn']
    )

    # Publish event
    sns.publish(
        TopicArn=topic_arn,
        Message=json.dumps({
            'order_id': '12345',
            'customer_id': '67890',
            'total': 99.99
        }),
        MessageAttributes={
            'event_type': {
                'DataType': 'String',
                'StringValue': 'order_placed'
            }
        }
    )

5. Apache Kafka & Amazon MSK

When to Use:

5.1 Kafka Core Concepts

Concept Description
Topic Category/feed of messages (e.g., "user-events", "orders")
Partition Topics split into partitions for parallelism. Messages in a partition are ordered.
Offset Unique ID of message within partition (sequential). Consumer tracks its offset.
Producer Publishes messages to topics. Chooses partition via key or round-robin.
Consumer Reads messages from topics. Part of consumer group for load balancing.
Consumer Group Group of consumers. Each partition consumed by one consumer in group.
Broker Kafka server. Cluster has multiple brokers for fault tolerance.
from kafka import KafkaProducer, KafkaConsumer
import json

class KafkaMessaging:
    """Apache Kafka / Amazon MSK Usage"""

    def __init__(self, bootstrap_servers: list):
        """
        bootstrap_servers: ['localhost:9092'] or MSK cluster endpoints
        """
        self.bootstrap_servers = bootstrap_servers

    def create_producer(self):
        """
        Create Kafka producer
        """
        producer = KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Wait for all replicas to acknowledge
            retries=3,
            compression_type='gzip',  # Reduce network usage
            max_in_flight_requests_per_connection=5,
            batch_size=16384,  # Batch messages for efficiency
            linger_ms=10  # Wait 10ms to batch more messages
        )
        return producer

    def send_message(self, topic: str, message: dict, key: str = None):
        """
        Send message to Kafka topic

        Key determines partition:
        - Same key → same partition → guaranteed order
        - No key → round-robin across partitions
        """
        producer = self.create_producer()

        future = producer.send(
            topic,
            value=message,
            key=key  # Messages with same key go to same partition
        )

        # Synchronous send (wait for confirmation)
        record_metadata = future.get(timeout=10)

        producer.flush()
        producer.close()

        return {
            'topic': record_metadata.topic,
            'partition': record_metadata.partition,
            'offset': record_metadata.offset
        }

    def create_consumer(self, topics: list, group_id: str):
        """
        Create Kafka consumer

        Consumer group: Partitions distributed among consumers
        - 3 partitions, 3 consumers → each gets 1 partition
        - 3 partitions, 2 consumers → one gets 2 partitions, other gets 1
        - 3 partitions, 5 consumers → 3 consumers active, 2 idle
        """
        consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=self.bootstrap_servers,
            group_id=group_id,  # All consumers in group share load
            auto_offset_reset='earliest',  # 'earliest' or 'latest'
            enable_auto_commit=True,  # Auto-commit offset every 5s
            auto_commit_interval_ms=5000,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            max_poll_records=500,  # Max messages per poll
            session_timeout_ms=30000,  # If no heartbeat for 30s, rebalance
        )
        return consumer

    def consume_messages(self, topics: list, group_id: str):
        """
        Consume messages from Kafka
        """
        consumer = self.create_consumer(topics, group_id)

        try:
            for message in consumer:
                print(f"Partition: {message.partition}, "
                      f"Offset: {message.offset}, "
                      f"Key: {message.key}, "
                      f"Value: {message.value}")

                # Process message
                self.process_message(message.value)

                # Manual commit (if auto_commit=False)
                # consumer.commit()

        except KeyboardInterrupt:
            pass
        finally:
            consumer.close()

    def consume_from_offset(self, topic: str, partition: int, offset: int):
        """
        Replay messages from specific offset

        Powerful feature: Can reprocess historical data
        """
        consumer = KafkaConsumer(
            bootstrap_servers=self.bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest'
        )

        # Assign specific partition
        from kafka import TopicPartition
        tp = TopicPartition(topic, partition)
        consumer.assign([tp])

        # Seek to specific offset
        consumer.seek(tp, offset)

        # Read messages from that offset onwards
        for message in consumer:
            print(f"Offset {message.offset}: {message.value}")
            if message.offset >= offset + 100:  # Read 100 messages
                break

        consumer.close()

    def process_message(self, message: dict):
        """Process message logic"""
        print(f"Processing: {message}")


# Amazon MSK (Managed Streaming for Kafka) Example
def msk_example():
    """
    Amazon MSK: Fully managed Kafka service

    Benefits over self-hosted Kafka:
    - Automatic patching and updates
    - Built-in monitoring with CloudWatch
    - Multi-AZ replication
    - Integration with AWS IAM
    - Automatic scaling
    """

    # MSK cluster endpoints (from AWS Console)
    bootstrap_servers = [
        'b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9092',
        'b-2.mycluster.abc123.kafka.us-east-1.amazonaws.com:9092',
        'b-3.mycluster.abc123.kafka.us-east-1.amazonaws.com:9092'
    ]

    kafka = KafkaMessaging(bootstrap_servers)

    # Producer
    kafka.send_message(
        topic='user-events',
        message={
            'user_id': '12345',
            'event': 'login',
            'timestamp': '2025-01-01T10:00:00Z'
        },
        key='12345'  # All events for user 12345 go to same partition
    )

    # Consumer
    kafka.consume_messages(
        topics=['user-events'],
        group_id='analytics-service'
    )

5.2 Kafka vs SQS/SNS

Feature Kafka / MSK SQS + SNS
Throughput Millions of messages/sec Thousands of messages/sec
Latency 5-15 ms (p99) 10-100 ms
Message Retention Days to years (configurable) 4-14 days (SQS), immediate (SNS)
Replay Yes (seek to any offset) No (once consumed, gone)
Consumer Groups Multiple independent groups SQS: one consumer per message
Operations Requires management (or MSK) Fully managed, serverless
Cost Higher (broker instances) Lower (pay per request)
Use Case High-throughput streaming Simple queues, serverless
Choose Kafka/MSK when: Choose SQS/SNS when:

6. AWS Kinesis

When to Use:

6.1 Kinesis Services

Service Purpose Use Case
Kinesis Data Streams Real-time data streaming Clickstreams, logs, IoT data
Kinesis Data Firehose Load streams into S3, Redshift, ElasticSearch Data lake ingestion, log archival
Kinesis Data Analytics SQL/Apache Flink on streams Real-time dashboards, anomaly detection
Kinesis Video Streams Video streaming and analysis Security cameras, ML video analysis
import boto3
import json
import time

class KinesisMessaging:
    """AWS Kinesis Data Streams"""

    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis', region_name='us-east-1')
        self.stream_name = stream_name

    def put_record(self, data: dict, partition_key: str):
        """
        Put single record to stream

        partition_key: Determines which shard (same as Kafka key)
        """
        response = self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key  # Hash determines shard
        )

        return {
            'shard_id': response['ShardId'],
            'sequence_number': response['SequenceNumber']
        }

    def put_records_batch(self, records: list):
        """
        Batch put up to 500 records (5 MB total)
        More efficient than individual puts
        """
        entries = [
            {
                'Data': json.dumps(record['data']),
                'PartitionKey': record['partition_key']
            }
            for record in records[:500]
        ]

        response = self.kinesis.put_records(
            StreamName=self.stream_name,
            Records=entries
        )

        return {
            'successful': len(records) - response['FailedRecordCount'],
            'failed': response['FailedRecordCount']
        }

    def get_shard_iterator(self, shard_id: str, iterator_type: str = 'LATEST'):
        """
        Get shard iterator for reading

        iterator_type:
        - LATEST: Start from newest records
        - TRIM_HORIZON: Start from oldest available
        - AT_SEQUENCE_NUMBER: Start from specific sequence
        - AFTER_SEQUENCE_NUMBER: Start after specific sequence
        - AT_TIMESTAMP: Start from timestamp
        """
        response = self.kinesis.get_shard_iterator(
            StreamName=self.stream_name,
            ShardId=shard_id,
            ShardIteratorType=iterator_type
        )

        return response['ShardIterator']

    def consume_records(self, shard_id: str):
        """
        Consume records from shard

        NOTE: For production, use Kinesis Client Library (KCL)
        which handles shard discovery, checkpointing, load balancing
        """
        shard_iterator = self.get_shard_iterator(shard_id)

        while True:
            response = self.kinesis.get_records(
                ShardIterator=shard_iterator,
                Limit=100  # Max 10,000 records or 10 MB
            )

            records = response['Records']

            for record in records:
                data = json.loads(record['Data'])
                print(f"Partition Key: {record['PartitionKey']}, "
                      f"Sequence: {record['SequenceNumber']}, "
                      f"Data: {data}")

                # Process record
                self.process_record(data)

            # Move to next batch
            shard_iterator = response['NextShardIterator']

            if not records:
                time.sleep(1)  # No data, wait before polling again

    def process_record(self, data: dict):
        """Process record logic"""
        pass


# Kinesis Data Firehose Example
def firehose_to_s3():
    """
    Kinesis Firehose: Easy way to load streaming data into S3

    Benefits:
    - Fully managed (no shards to manage)
    - Auto-scaling
    - Built-in transformation (Lambda)
    - Compression (gzip, snappy)
    - Encryption
    """
    firehose = boto3.client('firehose')

    # Put record to Firehose delivery stream
    firehose.put_record(
        DeliveryStreamName='logs-to-s3',
        Record={
            'Data': json.dumps({
                'timestamp': '2025-01-01T10:00:00Z',
                'user_id': '12345',
                'action': 'page_view',
                'page': '/products/123'
            })
        }
    )

    # Firehose automatically:
    # 1. Buffers records (60s or 1 MB by default)
    # 2. Optionally transforms via Lambda
    # 3. Compresses (if configured)
    # 4. Writes to S3 in batches
    # 5. Can also write to Redshift, ElasticSearch, Splunk, HTTP endpoints


# Lambda consumer for Kinesis
def lambda_kinesis_consumer(event, context):
    """
    Lambda function triggered by Kinesis stream

    Lambda polls stream and invokes function with batch of records
    """
    for record in event['Records']:
        # Kinesis data is base64 encoded
        import base64
        payload = json.loads(base64.b64decode(record['kinesis']['data']))

        print(f"Processing: {payload}")

        # Process record
        # Lambda automatically handles checkpointing

    return {'statusCode': 200}

6.2 Kinesis vs Kafka Comparison

Feature Kinesis Data Streams Apache Kafka / MSK
Management Fully managed, serverless shards MSK: Managed brokers (still configure cluster)
Scaling Manual shard splits/merges or on-demand Add brokers and partitions
Retention 1-365 days Days to years (unlimited with tiered storage)
Throughput 1 MB/s write, 2 MB/s read per shard 10s-100s MB/s per broker
Latency 70-200 ms 5-15 ms
Ordering Per shard Per partition
Pricing Per shard-hour + PUT payload units Per broker-hour + storage
AWS Integration Native (Lambda, Firehose, Analytics) Via connectors
Ecosystem AWS-specific Large open-source ecosystem
Choose Kinesis when: Choose Kafka/MSK when:

7. RabbitMQ & Amazon MQ

When to Use:
import pika
import json

class RabbitMQMessaging:
    """RabbitMQ / Amazon MQ Usage"""

    def __init__(self, host: str = 'localhost', port: int = 5672):
        """
        For Amazon MQ:
        host = 'b-xxx.mq.us-east-1.amazonaws.com'
        Use credentials from AWS Console
        """
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host, port=port)
        )
        self.channel = self.connection.channel()

    def simple_queue(self, queue_name: str, message: dict):
        """
        Simple queue pattern (point-to-point)
        """
        # Declare queue (idempotent)
        self.channel.queue_declare(
            queue=queue_name,
            durable=True,  # Survives broker restart
            arguments={
                'x-message-ttl': 86400000,  # 24 hours in ms
                'x-max-length': 10000  # Max 10K messages
            }
        )

        # Publish message
        self.channel.basic_publish(
            exchange='',  # Default exchange
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent message
                content_type='application/json',
                priority=5  # Priority 0-9 (if queue configured)
            )
        )

    def consume_queue(self, queue_name: str):
        """
        Consume from queue
        """
        self.channel.queue_declare(queue=queue_name, durable=True)

        # Prefetch: Only get 10 unacked messages at a time
        self.channel.basic_qos(prefetch_count=10)

        def callback(ch, method, properties, body):
            message = json.loads(body)
            print(f"Received: {message}")

            # Process message
            try:
                self.process_message(message)

                # Acknowledge (remove from queue)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                # Reject and requeue (or send to DLX)
                ch.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=False  # Send to dead-letter exchange
                )

        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback,
            auto_ack=False  # Manual ack
        )

        print('Waiting for messages...')
        self.channel.start_consuming()

    def topic_exchange(self, exchange_name: str, routing_key: str, message: dict):
        """
        Topic exchange: Route by pattern matching

        routing_key patterns:
        - "user.created" matches "user.created"
        - "user.*" matches "user.created", "user.deleted"
        - "user.#" matches "user.created", "user.profile.updated"

        Use case: Different services subscribe to different event types
        """
        # Declare exchange
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type='topic',
            durable=True
        )

        # Publish
        self.channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,  # e.g., "order.created"
            body=json.dumps(message)
        )

    def subscribe_to_topic(self, exchange_name: str, binding_key: str, queue_name: str):
        """
        Subscribe queue to topic exchange with pattern
        """
        # Declare exchange
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type='topic',
            durable=True
        )

        # Declare queue
        self.channel.queue_declare(queue=queue_name, durable=True)

        # Bind queue to exchange with routing key pattern
        self.channel.queue_bind(
            exchange=exchange_name,
            queue=queue_name,
            routing_key=binding_key  # e.g., "order.*" or "order.created"
        )

    def fanout_exchange(self, exchange_name: str, message: dict):
        """
        Fanout exchange: Broadcast to all bound queues

        Like SNS - all subscribers get the message
        """
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type='fanout',
            durable=True
        )

        self.channel.basic_publish(
            exchange=exchange_name,
            routing_key='',  # Ignored for fanout
            body=json.dumps(message)
        )

    def dead_letter_exchange(self, main_queue: str, dlx_queue: str):
        """
        Dead Letter Exchange (DLX): Failed messages go here

        Similar to SQS DLQ
        """
        # Declare DLX
        self.channel.exchange_declare(
            exchange='dlx',
            exchange_type='direct'
        )

        # Declare DLX queue
        self.channel.queue_declare(queue=dlx_queue, durable=True)
        self.channel.queue_bind(exchange='dlx', queue=dlx_queue, routing_key='failed')

        # Declare main queue with DLX
        self.channel.queue_declare(
            queue=main_queue,
            durable=True,
            arguments={
                'x-dead-letter-exchange': 'dlx',
                'x-dead-letter-routing-key': 'failed',
                'x-message-ttl': 60000  # 60s TTL
            }
        )

    def process_message(self, message: dict):
        """Process message logic"""
        pass

    def close(self):
        self.connection.close()


# Amazon MQ Example
def amazon_mq_example():
    """
    Amazon MQ: Managed RabbitMQ and ActiveMQ

    Use when:
    - Migrating from on-prem RabbitMQ/ActiveMQ
    - Need AMQP, MQTT, STOMP protocols
    - Complex routing requirements

    Note: For new projects in AWS, prefer SQS/SNS (simpler, cheaper)
    """

    # Connect to Amazon MQ broker
    rabbitmq = RabbitMQMessaging(
        host='b-xxx-yyy.mq.us-east-1.amazonaws.com',
        port=5671  # SSL port
    )

    # Use like regular RabbitMQ
    rabbitmq.simple_queue('tasks', {'task': 'process_order', 'order_id': 123})

8. AWS EventBridge

When to Use:
import boto3
import json

class EventBridgeMessaging:
    """AWS EventBridge"""

    def __init__(self):
        self.events = boto3.client('events', region_name='us-east-1')

    def put_event(self, source: str, detail_type: str, detail: dict):
        """
        Put event to default event bus
        """
        response = self.events.put_events(
            Entries=[
                {
                    'Source': source,  # e.g., 'myapp.orders'
                    'DetailType': detail_type,  # e.g., 'Order Placed'
                    'Detail': json.dumps(detail),
                    'EventBusName': 'default'
                }
            ]
        )

        return response['Entries'][0]

    def create_rule(self, rule_name: str, event_pattern: dict, target_arn: str):
        """
        Create rule to route events to target

        event_pattern: Filter which events trigger this rule
        target_arn: Lambda, SQS, SNS, Step Functions, etc.
        """
        # Create rule
        self.events.put_rule(
            Name=rule_name,
            EventPattern=json.dumps(event_pattern),
            State='ENABLED',
            Description='Route order events to processing queue'
        )

        # Add target to rule
        self.events.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': '1',
                    'Arn': target_arn
                }
            ]
        )


# EventBridge Pattern Examples
def eventbridge_patterns():
    """
    EventBridge event patterns (filtering)
    """
    eb = EventBridgeMessaging()

    # Pattern 1: Match specific source and detail type
    pattern1 = {
        "source": ["myapp.orders"],
        "detail-type": ["Order Placed"]
    }

    # Pattern 2: Match with content filtering
    pattern2 = {
        "source": ["myapp.orders"],
        "detail-type": ["Order Placed"],
        "detail": {
            "total": [{"numeric": [">", 100]}]  # Orders > $100
        }
    }

    # Pattern 3: Match multiple sources
    pattern3 = {
        "source": ["myapp.orders", "myapp.inventory"],
        "detail-type": ["Order Placed", "Inventory Updated"]
    }

    # Pattern 4: AWS service events
    pattern4 = {
        "source": ["aws.ec2"],
        "detail-type": ["EC2 Instance State-change Notification"],
        "detail": {
            "state": ["terminated"]
        }
    }

    # Create rule
    eb.create_rule(
        rule_name='high-value-orders',
        event_pattern=pattern2,
        target_arn='arn:aws:lambda:us-east-1:123456789012:function:ProcessHighValueOrder'
    )


# EventBridge vs SNS
def eventbridge_vs_sns():
    """
    EventBridge vs SNS

    EventBridge:
    + Rich event filtering (JSON path, numeric, prefix, etc.)
    + Schema registry and discovery
    + Event archive and replay
    + SaaS integrations
    + Cross-account routing
    - Higher latency (50-500ms)
    - Lower throughput (10K events/sec default)

    SNS:
    + Lower latency (10-50ms)
    + Higher throughput (30K+ msg/sec)
    + Simple pub/sub
    + Mobile push, SMS, email
    - Basic filtering only (message attributes)
    - No replay

    Use EventBridge for: Event-driven apps, complex routing, AWS service events
    Use SNS for: Simple pub/sub, mobile notifications, high throughput
    """
    pass

9. Redis Pub/Sub

When to Use:
Important Limitations:
import redis

class RedisPubSub:
    """Redis Pub/Sub for real-time messaging"""

    def __init__(self, host: str = 'localhost', port: int = 6379):
        self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)

    def publish(self, channel: str, message: str):
        """
        Publish message to channel

        Returns: Number of subscribers that received the message
        """
        subscribers = self.redis_client.publish(channel, message)
        return subscribers

    def subscribe(self, channels: list):
        """
        Subscribe to channels and listen for messages
        """
        pubsub = self.redis_client.pubsub()
        pubsub.subscribe(*channels)

        print(f"Subscribed to {channels}...")

        for message in pubsub.listen():
            if message['type'] == 'message':
                print(f"Channel: {message['channel']}, "
                      f"Message: {message['data']}")

                # Process message
                self.process_message(message['channel'], message['data'])

    def pattern_subscribe(self, pattern: str):
        """
        Subscribe to channels matching pattern

        pattern examples:
        - "user:*" matches "user:123", "user:456"
        - "event:*:created" matches "event:order:created", "event:user:created"
        """
        pubsub = self.redis_client.pubsub()
        pubsub.psubscribe(pattern)

        for message in pubsub.listen():
            if message['type'] == 'pmessage':
                print(f"Pattern: {message['pattern']}, "
                      f"Channel: {message['channel']}, "
                      f"Message: {message['data']}")

    def process_message(self, channel: str, message: str):
        """Process message logic"""
        pass


# Use case: Cache invalidation across multiple servers
def cache_invalidation_example():
    """
    Use Redis Pub/Sub to invalidate cache across multiple app servers

    When user updates profile:
    1. Update database
    2. Publish cache invalidation event
    3. All servers receive event and clear local cache
    """
    redis_ps = RedisPubSub()

    # Server 1: Updates user and publishes invalidation
    def update_user(user_id: int, updates: dict):
        # Update database
        update_database(user_id, updates)

        # Publish cache invalidation
        redis_ps.publish('cache:invalidate', f'user:{user_id}')

    # Server 2, 3, 4: Subscribe to invalidation events
    def cache_invalidation_listener():
        pubsub = redis_ps.redis_client.pubsub()
        pubsub.subscribe('cache:invalidate')

        for message in pubsub.listen():
            if message['type'] == 'message':
                cache_key = message['data']
                print(f"Invalidating cache: {cache_key}")

                # Clear from local cache
                local_cache.delete(cache_key)


# Redis Streams (Alternative to Pub/Sub with persistence)
def redis_streams_example():
    """
    Redis Streams: Pub/Sub WITH persistence

    Better than Pub/Sub when:
    - Need message persistence
    - Need consumer groups
    - Need acknowledgments

    Like Kafka-lite in Redis
    """
    r = redis.Redis()

    # Add message to stream
    message_id = r.xadd(
        'events',
        {
            'user_id': '123',
            'action': 'login',
            'timestamp': '2025-01-01T10:00:00Z'
        },
        maxlen=10000  # Keep last 10K messages
    )

    # Read messages from stream
    messages = r.xread({'events': '0'}, count=10)

    for stream_name, stream_messages in messages:
        for message_id, message_data in stream_messages:
            print(f"ID: {message_id}, Data: {message_data}")

    # Consumer groups (like Kafka)
    r.xgroup_create('events', 'analytics-group', id='0', mkstream=True)

    # Read as consumer group
    messages = r.xreadgroup(
        'analytics-group',
        'consumer-1',
        {'events': '>'},
        count=10
    )

10. Decision Matrix: Which System to Use?

Requirement Recommended System Why
Simple task queue, serverless SQS Fully managed, zero ops, Lambda integration
Fan-out notifications SNS Native pub/sub, multiple protocols (email, SMS, HTTP)
High-throughput streaming (>100K/s) Kafka/MSK Highest throughput, lowest latency, replay capability
Real-time analytics in AWS Kinesis Native AWS, Firehose integration, simpler than Kafka
Event-driven architecture, SaaS integration EventBridge Rich filtering, schema registry, cross-account routing
Complex routing (topic, headers) RabbitMQ/Amazon MQ Flexible exchanges, priority queues, RPC patterns
Real-time chat/notifications Redis Pub/Sub Ultra-low latency, simple, ephemeral messages
Message replay needed Kafka or Kinesis Persistent log, seek to any offset
Exactly-once processing SQS FIFO or Kafka Deduplication and ordering guarantees
Load balancing tasks across workers SQS or RabbitMQ Competing consumers pattern
Log aggregation at scale Kinesis Firehose Easy S3/Redshift loading, auto-scaling
Migrating from on-prem RabbitMQ Amazon MQ Managed RabbitMQ, minimal code changes

11. Common Architecture Patterns

11.1 Microservices Decoupling

"""
Pattern: SQS between microservices

Order Service → SQS → Inventory Service
             ↘ SQS → Email Service
             ↘ SQS → Analytics Service

Benefits:
- Services don't need to know about each other
- Async processing (Order Service responds immediately)
- Automatic retries and DLQ
- Independent scaling
"""

# Order Service
def place_order(order: dict):
    # Save order to database
    save_order(order)

    # Queue tasks asynchronously
    sqs.send_message('inventory-queue', {'action': 'reserve', 'order_id': order['id']})
    sqs.send_message('email-queue', {'action': 'send_confirmation', 'order_id': order['id']})
    sqs.send_message('analytics-queue', {'event': 'order_placed', 'order_id': order['id']})

    return {'status': 'success', 'order_id': order['id']}

11.2 Event Sourcing with Kafka

"""
Pattern: Event sourcing with Kafka

All state changes stored as immutable events
Can rebuild state by replaying events

User Events Topic: user_created, user_updated, user_deleted
Order Events Topic: order_placed, order_shipped, order_cancelled

Benefits:
- Full audit trail
- Time travel (replay to any point)
- Multiple read models from same events
- Easy debugging (reproduce bugs by replaying)
"""

def event_sourcing_pattern():
    from kafka import KafkaProducer

    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

    # Record every state change as event
    def update_user(user_id: int, updates: dict):
        event = {
            'event_type': 'user_updated',
            'user_id': user_id,
            'updates': updates,
            'timestamp': datetime.utcnow().isoformat()
        }

        # Store event in Kafka (immutable log)
        producer.send('user-events', value=json.dumps(event).encode())

        # Update read model (projection)
        apply_event_to_database(event)

    # Rebuild state by replaying all events
    def rebuild_user_state(user_id: int):
        from kafka import KafkaConsumer

        consumer = KafkaConsumer('user-events', auto_offset_reset='earliest')

        user_state = {}

        for message in consumer:
            event = json.loads(message.value)

            if event['user_id'] == user_id:
                # Apply event to rebuild state
                if event['event_type'] == 'user_created':
                    user_state = event['data']
                elif event['event_type'] == 'user_updated':
                    user_state.update(event['updates'])
                elif event['event_type'] == 'user_deleted':
                    user_state = None
                    break

        return user_state

11.3 Fan-Out with SNS + SQS

"""
Pattern: SNS Topic → Multiple SQS Queues

Order Placed Event (SNS)
    → Email Queue (SQS) → Email Service
    → Inventory Queue (SQS) → Inventory Service
    → Warehouse Queue (SQS) → Warehouse Service
    → Analytics Queue (SQS) → Analytics Service

Benefits:
- Each service has its own queue (independent processing)
- SNS provides instant fan-out
- SQS provides durability and retry
- Easy to add new subscribers
"""

def setup_fanout():
    sns = boto3.client('sns')
    sqs = boto3.client('sqs')

    # Create SNS topic
    topic = sns.create_topic(Name='order-events')
    topic_arn = topic['TopicArn']

    # Create queues for each service
    services = ['email', 'inventory', 'warehouse', 'analytics']

    for service in services:
        # Create queue
        queue = sqs.create_queue(QueueName=f'{service}-queue')
        queue_url = queue['QueueUrl']

        # Get queue ARN
        attrs = sqs.get_queue_attributes(
            QueueUrl=queue_url,
            AttributeNames=['QueueArn']
        )
        queue_arn = attrs['Attributes']['QueueArn']

        # Allow SNS to send to queue
        policy = {
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "sns.amazonaws.com"},
                "Action": "sqs:SendMessage",
                "Resource": queue_arn,
                "Condition": {
                    "ArnEquals": {"aws:SourceArn": topic_arn}
                }
            }]
        }

        sqs.set_queue_attributes(
            QueueUrl=queue_url,
            Attributes={'Policy': json.dumps(policy)}
        )

        # Subscribe queue to topic
        sns.subscribe(
            TopicArn=topic_arn,
            Protocol='sqs',
            Endpoint=queue_arn
        )

12. Best Practices Summary

General Messaging Best Practices:
Common Pitfalls: