Message Queues, Pub/Sub, Event Streaming & AWS Services
Pattern: One producer → Queue → One consumer
Examples: SQS, RabbitMQ queues
Pattern: One producer → Topic → Many subscribers
Examples: SNS, Redis Pub/Sub, RabbitMQ exchanges
Pattern: Append-only log of events
Examples: Kafka, Kinesis
| System | AWS Equivalent | Pattern | Throughput | Latency | Ordering | Persistence | Best For |
|---|---|---|---|---|---|---|---|
| RabbitMQ | Amazon MQ | Queue + Pub/Sub | 10K-100K msg/s | ~1-10 ms | Per queue | Optional | Traditional messaging, complex routing |
| Apache Kafka | Amazon MSK | Event streaming | 1M+ msg/s | ~5-15 ms | Per partition | Always (configurable) | High-throughput streaming, event sourcing |
| Redis Pub/Sub | ElastiCache | Pub/Sub | 100K+ msg/s | ~1 ms | None | None (fire-forget) | Real-time notifications, chat |
| AWS SQS | Native AWS | Queue | 3K-30K msg/s | ~10-100 ms | FIFO available | 4-14 days | Decoupling, serverless, simple queues |
| AWS SNS | Native AWS | Pub/Sub | 30K+ msg/s | ~10-50 ms | None | None (immediate delivery) | Fan-out, mobile push, email/SMS |
| AWS Kinesis | Native AWS | Event streaming | 1M+ msg/s | ~70-200 ms | Per shard | 1-365 days | Real-time analytics, log aggregation |
| EventBridge | Native AWS | Event bus | 10K+ msg/s | ~50-500 ms | None | Archive available | Event-driven architectures, SaaS integrations |
| Feature | Standard Queue | FIFO Queue |
|---|---|---|
| Throughput | Unlimited (nearly) | 3,000 msg/s (batched: 30,000) |
| Ordering | Best-effort | Guaranteed FIFO |
| Delivery | At-least-once (duplicates possible) | Exactly-once processing |
| Use Case | High throughput, order not critical | Order matters (payments, bookings) |
import boto3
import json
class SQSMessaging:
"""AWS SQS Usage Patterns"""
def __init__(self):
self.sqs = boto3.client('sqs', region_name='us-east-1')
def send_message(self, queue_url: str, message: dict):
"""
Send message to SQS queue
"""
response = self.sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
MessageAttributes={
'Priority': {
'StringValue': 'High',
'DataType': 'String'
}
}
)
return response['MessageId']
def send_message_fifo(self, queue_url: str, message: dict,
deduplication_id: str, group_id: str):
"""
Send message to FIFO queue
- MessageGroupId: Messages in same group are processed in order
- MessageDeduplicationId: Prevents duplicates within 5-minute window
"""
response = self.sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
MessageGroupId=group_id, # Orders messages within this group
MessageDeduplicationId=deduplication_id # Prevents duplicates
)
return response['MessageId']
def receive_messages(self, queue_url: str, max_messages: int = 10):
"""
Receive and process messages (long polling)
"""
response = self.sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=max_messages, # 1-10
WaitTimeSeconds=20, # Long polling (reduces empty responses)
VisibilityTimeout=30, # Hide message for 30s while processing
MessageAttributeNames=['All']
)
messages = response.get('Messages', [])
for message in messages:
try:
# Process message
body = json.loads(message['Body'])
self.process_message(body)
# Delete message after successful processing
self.sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
print(f"Processing failed: {e}")
# Message will become visible again after VisibilityTimeout
# Consider using Dead Letter Queue (DLQ) for failed messages
def batch_send(self, queue_url: str, messages: list):
"""
Send up to 10 messages in batch (more efficient)
"""
entries = [
{
'Id': str(i),
'MessageBody': json.dumps(msg)
}
for i, msg in enumerate(messages[:10])
]
response = self.sqs.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
return {
'successful': len(response.get('Successful', [])),
'failed': len(response.get('Failed', []))
}
def process_message(self, message: dict):
"""Process message logic"""
print(f"Processing: {message}")
# Dead Letter Queue Pattern
def setup_dlq():
"""
Dead Letter Queue: Capture failed messages
After maxReceiveCount attempts, move message to DLQ
"""
sqs = boto3.client('sqs')
# Create DLQ
dlq_response = sqs.create_queue(
QueueName='failed-messages-dlq'
)
dlq_url = dlq_response['QueueUrl']
# Get DLQ ARN
dlq_attrs = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)
dlq_arn = dlq_attrs['Attributes']['QueueArn']
# Create main queue with DLQ
main_queue = sqs.create_queue(
QueueName='main-queue',
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': '3' # After 3 failed attempts → DLQ
}),
'VisibilityTimeout': '30',
'MessageRetentionPeriod': '345600' # 4 days
}
)
return main_queue['QueueUrl'], dlq_url
import boto3
import json
class SNSMessaging:
"""AWS SNS Usage Patterns"""
def __init__(self):
self.sns = boto3.client('sns', region_name='us-east-1')
def publish_to_topic(self, topic_arn: str, message: dict, subject: str = None):
"""
Publish message to SNS topic
All subscribers receive the message
"""
response = self.sns.publish(
TopicArn=topic_arn,
Message=json.dumps({
'default': json.dumps(message), # Default message
'email': f"Subject: {subject}\n\n{message.get('text', '')}", # Email format
'sqs': json.dumps(message), # SQS format
'lambda': json.dumps(message) # Lambda format
}),
Subject=subject,
MessageStructure='json', # Different formats per protocol
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': message.get('type', 'generic')
}
}
)
return response['MessageId']
def create_topic(self, topic_name: str):
"""Create SNS topic"""
response = self.sns.create_topic(
Name=topic_name,
Attributes={
'DisplayName': topic_name,
'FifoTopic': 'false' # or 'true' for FIFO
}
)
return response['TopicArn']
def subscribe_email(self, topic_arn: str, email: str):
"""Subscribe email to topic"""
response = self.sns.subscribe(
TopicArn=topic_arn,
Protocol='email',
Endpoint=email
)
# User must confirm subscription via email
return response['SubscriptionArn']
def subscribe_sqs(self, topic_arn: str, queue_arn: str):
"""
Subscribe SQS queue to SNS topic (fan-out pattern)
Benefits:
- Durability: SNS delivers immediately, SQS stores
- Multiple processing: Each subscriber gets own queue
- Decoupling: Add/remove subscribers without affecting others
"""
response = self.sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_arn
)
return response['SubscriptionArn']
def subscribe_lambda(self, topic_arn: str, lambda_arn: str):
"""Subscribe Lambda function to topic"""
response = self.sns.subscribe(
TopicArn=topic_arn,
Protocol='lambda',
Endpoint=lambda_arn
)
return response['SubscriptionArn']
def subscribe_with_filter(self, topic_arn: str, queue_arn: str, filter_policy: dict):
"""
Subscribe with message filtering
Only messages matching filter are delivered to subscriber
"""
response = self.sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_arn,
Attributes={
'FilterPolicy': json.dumps(filter_policy)
}
)
return response['SubscriptionArn']
# SNS + SQS Fan-Out Pattern
def fanout_pattern_example():
"""
Pattern: SNS → Multiple SQS Queues
Use case: Order placed event triggers:
- Email notification service
- Inventory update service
- Analytics service
Each service gets its own queue
"""
sns = boto3.client('sns')
sqs = boto3.client('sqs')
# Create SNS topic
topic = sns.create_topic(Name='order-events')
topic_arn = topic['TopicArn']
# Create SQS queues for different services
email_queue = sqs.create_queue(QueueName='email-notifications')
inventory_queue = sqs.create_queue(QueueName='inventory-updates')
analytics_queue = sqs.create_queue(QueueName='analytics-events')
# Subscribe queues to topic with filters
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=email_queue['QueueArn'],
Attributes={
'FilterPolicy': json.dumps({
'event_type': ['order_placed', 'order_shipped']
})
}
)
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=inventory_queue['QueueArn'],
Attributes={
'FilterPolicy': json.dumps({
'event_type': ['order_placed', 'order_cancelled']
})
}
)
# Analytics gets all events (no filter)
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=analytics_queue['QueueArn']
)
# Publish event
sns.publish(
TopicArn=topic_arn,
Message=json.dumps({
'order_id': '12345',
'customer_id': '67890',
'total': 99.99
}),
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': 'order_placed'
}
}
)
| Concept | Description |
|---|---|
| Topic | Category/feed of messages (e.g., "user-events", "orders") |
| Partition | Topics split into partitions for parallelism. Messages in a partition are ordered. |
| Offset | Unique ID of message within partition (sequential). Consumer tracks its offset. |
| Producer | Publishes messages to topics. Chooses partition via key or round-robin. |
| Consumer | Reads messages from topics. Part of consumer group for load balancing. |
| Consumer Group | Group of consumers. Each partition consumed by one consumer in group. |
| Broker | Kafka server. Cluster has multiple brokers for fault tolerance. |
from kafka import KafkaProducer, KafkaConsumer
import json
class KafkaMessaging:
"""Apache Kafka / Amazon MSK Usage"""
def __init__(self, bootstrap_servers: list):
"""
bootstrap_servers: ['localhost:9092'] or MSK cluster endpoints
"""
self.bootstrap_servers = bootstrap_servers
def create_producer(self):
"""
Create Kafka producer
"""
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas to acknowledge
retries=3,
compression_type='gzip', # Reduce network usage
max_in_flight_requests_per_connection=5,
batch_size=16384, # Batch messages for efficiency
linger_ms=10 # Wait 10ms to batch more messages
)
return producer
def send_message(self, topic: str, message: dict, key: str = None):
"""
Send message to Kafka topic
Key determines partition:
- Same key → same partition → guaranteed order
- No key → round-robin across partitions
"""
producer = self.create_producer()
future = producer.send(
topic,
value=message,
key=key # Messages with same key go to same partition
)
# Synchronous send (wait for confirmation)
record_metadata = future.get(timeout=10)
producer.flush()
producer.close()
return {
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
def create_consumer(self, topics: list, group_id: str):
"""
Create Kafka consumer
Consumer group: Partitions distributed among consumers
- 3 partitions, 3 consumers → each gets 1 partition
- 3 partitions, 2 consumers → one gets 2 partitions, other gets 1
- 3 partitions, 5 consumers → 3 consumers active, 2 idle
"""
consumer = KafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=group_id, # All consumers in group share load
auto_offset_reset='earliest', # 'earliest' or 'latest'
enable_auto_commit=True, # Auto-commit offset every 5s
auto_commit_interval_ms=5000,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
max_poll_records=500, # Max messages per poll
session_timeout_ms=30000, # If no heartbeat for 30s, rebalance
)
return consumer
def consume_messages(self, topics: list, group_id: str):
"""
Consume messages from Kafka
"""
consumer = self.create_consumer(topics, group_id)
try:
for message in consumer:
print(f"Partition: {message.partition}, "
f"Offset: {message.offset}, "
f"Key: {message.key}, "
f"Value: {message.value}")
# Process message
self.process_message(message.value)
# Manual commit (if auto_commit=False)
# consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()
def consume_from_offset(self, topic: str, partition: int, offset: int):
"""
Replay messages from specific offset
Powerful feature: Can reprocess historical data
"""
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
# Assign specific partition
from kafka import TopicPartition
tp = TopicPartition(topic, partition)
consumer.assign([tp])
# Seek to specific offset
consumer.seek(tp, offset)
# Read messages from that offset onwards
for message in consumer:
print(f"Offset {message.offset}: {message.value}")
if message.offset >= offset + 100: # Read 100 messages
break
consumer.close()
def process_message(self, message: dict):
"""Process message logic"""
print(f"Processing: {message}")
# Amazon MSK (Managed Streaming for Kafka) Example
def msk_example():
"""
Amazon MSK: Fully managed Kafka service
Benefits over self-hosted Kafka:
- Automatic patching and updates
- Built-in monitoring with CloudWatch
- Multi-AZ replication
- Integration with AWS IAM
- Automatic scaling
"""
# MSK cluster endpoints (from AWS Console)
bootstrap_servers = [
'b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9092',
'b-2.mycluster.abc123.kafka.us-east-1.amazonaws.com:9092',
'b-3.mycluster.abc123.kafka.us-east-1.amazonaws.com:9092'
]
kafka = KafkaMessaging(bootstrap_servers)
# Producer
kafka.send_message(
topic='user-events',
message={
'user_id': '12345',
'event': 'login',
'timestamp': '2025-01-01T10:00:00Z'
},
key='12345' # All events for user 12345 go to same partition
)
# Consumer
kafka.consume_messages(
topics=['user-events'],
group_id='analytics-service'
)
| Feature | Kafka / MSK | SQS + SNS |
|---|---|---|
| Throughput | Millions of messages/sec | Thousands of messages/sec |
| Latency | 5-15 ms (p99) | 10-100 ms |
| Message Retention | Days to years (configurable) | 4-14 days (SQS), immediate (SNS) |
| Replay | Yes (seek to any offset) | No (once consumed, gone) |
| Consumer Groups | Multiple independent groups | SQS: one consumer per message |
| Operations | Requires management (or MSK) | Fully managed, serverless |
| Cost | Higher (broker instances) | Lower (pay per request) |
| Use Case | High-throughput streaming | Simple queues, serverless |
| Service | Purpose | Use Case |
|---|---|---|
| Kinesis Data Streams | Real-time data streaming | Clickstreams, logs, IoT data |
| Kinesis Data Firehose | Load streams into S3, Redshift, ElasticSearch | Data lake ingestion, log archival |
| Kinesis Data Analytics | SQL/Apache Flink on streams | Real-time dashboards, anomaly detection |
| Kinesis Video Streams | Video streaming and analysis | Security cameras, ML video analysis |
import boto3
import json
import time
class KinesisMessaging:
"""AWS Kinesis Data Streams"""
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis', region_name='us-east-1')
self.stream_name = stream_name
def put_record(self, data: dict, partition_key: str):
"""
Put single record to stream
partition_key: Determines which shard (same as Kafka key)
"""
response = self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(data),
PartitionKey=partition_key # Hash determines shard
)
return {
'shard_id': response['ShardId'],
'sequence_number': response['SequenceNumber']
}
def put_records_batch(self, records: list):
"""
Batch put up to 500 records (5 MB total)
More efficient than individual puts
"""
entries = [
{
'Data': json.dumps(record['data']),
'PartitionKey': record['partition_key']
}
for record in records[:500]
]
response = self.kinesis.put_records(
StreamName=self.stream_name,
Records=entries
)
return {
'successful': len(records) - response['FailedRecordCount'],
'failed': response['FailedRecordCount']
}
def get_shard_iterator(self, shard_id: str, iterator_type: str = 'LATEST'):
"""
Get shard iterator for reading
iterator_type:
- LATEST: Start from newest records
- TRIM_HORIZON: Start from oldest available
- AT_SEQUENCE_NUMBER: Start from specific sequence
- AFTER_SEQUENCE_NUMBER: Start after specific sequence
- AT_TIMESTAMP: Start from timestamp
"""
response = self.kinesis.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_id,
ShardIteratorType=iterator_type
)
return response['ShardIterator']
def consume_records(self, shard_id: str):
"""
Consume records from shard
NOTE: For production, use Kinesis Client Library (KCL)
which handles shard discovery, checkpointing, load balancing
"""
shard_iterator = self.get_shard_iterator(shard_id)
while True:
response = self.kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100 # Max 10,000 records or 10 MB
)
records = response['Records']
for record in records:
data = json.loads(record['Data'])
print(f"Partition Key: {record['PartitionKey']}, "
f"Sequence: {record['SequenceNumber']}, "
f"Data: {data}")
# Process record
self.process_record(data)
# Move to next batch
shard_iterator = response['NextShardIterator']
if not records:
time.sleep(1) # No data, wait before polling again
def process_record(self, data: dict):
"""Process record logic"""
pass
# Kinesis Data Firehose Example
def firehose_to_s3():
"""
Kinesis Firehose: Easy way to load streaming data into S3
Benefits:
- Fully managed (no shards to manage)
- Auto-scaling
- Built-in transformation (Lambda)
- Compression (gzip, snappy)
- Encryption
"""
firehose = boto3.client('firehose')
# Put record to Firehose delivery stream
firehose.put_record(
DeliveryStreamName='logs-to-s3',
Record={
'Data': json.dumps({
'timestamp': '2025-01-01T10:00:00Z',
'user_id': '12345',
'action': 'page_view',
'page': '/products/123'
})
}
)
# Firehose automatically:
# 1. Buffers records (60s or 1 MB by default)
# 2. Optionally transforms via Lambda
# 3. Compresses (if configured)
# 4. Writes to S3 in batches
# 5. Can also write to Redshift, ElasticSearch, Splunk, HTTP endpoints
# Lambda consumer for Kinesis
def lambda_kinesis_consumer(event, context):
"""
Lambda function triggered by Kinesis stream
Lambda polls stream and invokes function with batch of records
"""
for record in event['Records']:
# Kinesis data is base64 encoded
import base64
payload = json.loads(base64.b64decode(record['kinesis']['data']))
print(f"Processing: {payload}")
# Process record
# Lambda automatically handles checkpointing
return {'statusCode': 200}
| Feature | Kinesis Data Streams | Apache Kafka / MSK |
|---|---|---|
| Management | Fully managed, serverless shards | MSK: Managed brokers (still configure cluster) |
| Scaling | Manual shard splits/merges or on-demand | Add brokers and partitions |
| Retention | 1-365 days | Days to years (unlimited with tiered storage) |
| Throughput | 1 MB/s write, 2 MB/s read per shard | 10s-100s MB/s per broker |
| Latency | 70-200 ms | 5-15 ms |
| Ordering | Per shard | Per partition |
| Pricing | Per shard-hour + PUT payload units | Per broker-hour + storage |
| AWS Integration | Native (Lambda, Firehose, Analytics) | Via connectors |
| Ecosystem | AWS-specific | Large open-source ecosystem |
import pika
import json
class RabbitMQMessaging:
"""RabbitMQ / Amazon MQ Usage"""
def __init__(self, host: str = 'localhost', port: int = 5672):
"""
For Amazon MQ:
host = 'b-xxx.mq.us-east-1.amazonaws.com'
Use credentials from AWS Console
"""
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, port=port)
)
self.channel = self.connection.channel()
def simple_queue(self, queue_name: str, message: dict):
"""
Simple queue pattern (point-to-point)
"""
# Declare queue (idempotent)
self.channel.queue_declare(
queue=queue_name,
durable=True, # Survives broker restart
arguments={
'x-message-ttl': 86400000, # 24 hours in ms
'x-max-length': 10000 # Max 10K messages
}
)
# Publish message
self.channel.basic_publish(
exchange='', # Default exchange
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type='application/json',
priority=5 # Priority 0-9 (if queue configured)
)
)
def consume_queue(self, queue_name: str):
"""
Consume from queue
"""
self.channel.queue_declare(queue=queue_name, durable=True)
# Prefetch: Only get 10 unacked messages at a time
self.channel.basic_qos(prefetch_count=10)
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received: {message}")
# Process message
try:
self.process_message(message)
# Acknowledge (remove from queue)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Reject and requeue (or send to DLX)
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # Send to dead-letter exchange
)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False # Manual ack
)
print('Waiting for messages...')
self.channel.start_consuming()
def topic_exchange(self, exchange_name: str, routing_key: str, message: dict):
"""
Topic exchange: Route by pattern matching
routing_key patterns:
- "user.created" matches "user.created"
- "user.*" matches "user.created", "user.deleted"
- "user.#" matches "user.created", "user.profile.updated"
Use case: Different services subscribe to different event types
"""
# Declare exchange
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type='topic',
durable=True
)
# Publish
self.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key, # e.g., "order.created"
body=json.dumps(message)
)
def subscribe_to_topic(self, exchange_name: str, binding_key: str, queue_name: str):
"""
Subscribe queue to topic exchange with pattern
"""
# Declare exchange
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type='topic',
durable=True
)
# Declare queue
self.channel.queue_declare(queue=queue_name, durable=True)
# Bind queue to exchange with routing key pattern
self.channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=binding_key # e.g., "order.*" or "order.created"
)
def fanout_exchange(self, exchange_name: str, message: dict):
"""
Fanout exchange: Broadcast to all bound queues
Like SNS - all subscribers get the message
"""
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type='fanout',
durable=True
)
self.channel.basic_publish(
exchange=exchange_name,
routing_key='', # Ignored for fanout
body=json.dumps(message)
)
def dead_letter_exchange(self, main_queue: str, dlx_queue: str):
"""
Dead Letter Exchange (DLX): Failed messages go here
Similar to SQS DLQ
"""
# Declare DLX
self.channel.exchange_declare(
exchange='dlx',
exchange_type='direct'
)
# Declare DLX queue
self.channel.queue_declare(queue=dlx_queue, durable=True)
self.channel.queue_bind(exchange='dlx', queue=dlx_queue, routing_key='failed')
# Declare main queue with DLX
self.channel.queue_declare(
queue=main_queue,
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 60000 # 60s TTL
}
)
def process_message(self, message: dict):
"""Process message logic"""
pass
def close(self):
self.connection.close()
# Amazon MQ Example
def amazon_mq_example():
"""
Amazon MQ: Managed RabbitMQ and ActiveMQ
Use when:
- Migrating from on-prem RabbitMQ/ActiveMQ
- Need AMQP, MQTT, STOMP protocols
- Complex routing requirements
Note: For new projects in AWS, prefer SQS/SNS (simpler, cheaper)
"""
# Connect to Amazon MQ broker
rabbitmq = RabbitMQMessaging(
host='b-xxx-yyy.mq.us-east-1.amazonaws.com',
port=5671 # SSL port
)
# Use like regular RabbitMQ
rabbitmq.simple_queue('tasks', {'task': 'process_order', 'order_id': 123})
import boto3
import json
class EventBridgeMessaging:
"""AWS EventBridge"""
def __init__(self):
self.events = boto3.client('events', region_name='us-east-1')
def put_event(self, source: str, detail_type: str, detail: dict):
"""
Put event to default event bus
"""
response = self.events.put_events(
Entries=[
{
'Source': source, # e.g., 'myapp.orders'
'DetailType': detail_type, # e.g., 'Order Placed'
'Detail': json.dumps(detail),
'EventBusName': 'default'
}
]
)
return response['Entries'][0]
def create_rule(self, rule_name: str, event_pattern: dict, target_arn: str):
"""
Create rule to route events to target
event_pattern: Filter which events trigger this rule
target_arn: Lambda, SQS, SNS, Step Functions, etc.
"""
# Create rule
self.events.put_rule(
Name=rule_name,
EventPattern=json.dumps(event_pattern),
State='ENABLED',
Description='Route order events to processing queue'
)
# Add target to rule
self.events.put_targets(
Rule=rule_name,
Targets=[
{
'Id': '1',
'Arn': target_arn
}
]
)
# EventBridge Pattern Examples
def eventbridge_patterns():
"""
EventBridge event patterns (filtering)
"""
eb = EventBridgeMessaging()
# Pattern 1: Match specific source and detail type
pattern1 = {
"source": ["myapp.orders"],
"detail-type": ["Order Placed"]
}
# Pattern 2: Match with content filtering
pattern2 = {
"source": ["myapp.orders"],
"detail-type": ["Order Placed"],
"detail": {
"total": [{"numeric": [">", 100]}] # Orders > $100
}
}
# Pattern 3: Match multiple sources
pattern3 = {
"source": ["myapp.orders", "myapp.inventory"],
"detail-type": ["Order Placed", "Inventory Updated"]
}
# Pattern 4: AWS service events
pattern4 = {
"source": ["aws.ec2"],
"detail-type": ["EC2 Instance State-change Notification"],
"detail": {
"state": ["terminated"]
}
}
# Create rule
eb.create_rule(
rule_name='high-value-orders',
event_pattern=pattern2,
target_arn='arn:aws:lambda:us-east-1:123456789012:function:ProcessHighValueOrder'
)
# EventBridge vs SNS
def eventbridge_vs_sns():
"""
EventBridge vs SNS
EventBridge:
+ Rich event filtering (JSON path, numeric, prefix, etc.)
+ Schema registry and discovery
+ Event archive and replay
+ SaaS integrations
+ Cross-account routing
- Higher latency (50-500ms)
- Lower throughput (10K events/sec default)
SNS:
+ Lower latency (10-50ms)
+ Higher throughput (30K+ msg/sec)
+ Simple pub/sub
+ Mobile push, SMS, email
- Basic filtering only (message attributes)
- No replay
Use EventBridge for: Event-driven apps, complex routing, AWS service events
Use SNS for: Simple pub/sub, mobile notifications, high throughput
"""
pass
import redis
class RedisPubSub:
"""Redis Pub/Sub for real-time messaging"""
def __init__(self, host: str = 'localhost', port: int = 6379):
self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)
def publish(self, channel: str, message: str):
"""
Publish message to channel
Returns: Number of subscribers that received the message
"""
subscribers = self.redis_client.publish(channel, message)
return subscribers
def subscribe(self, channels: list):
"""
Subscribe to channels and listen for messages
"""
pubsub = self.redis_client.pubsub()
pubsub.subscribe(*channels)
print(f"Subscribed to {channels}...")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Channel: {message['channel']}, "
f"Message: {message['data']}")
# Process message
self.process_message(message['channel'], message['data'])
def pattern_subscribe(self, pattern: str):
"""
Subscribe to channels matching pattern
pattern examples:
- "user:*" matches "user:123", "user:456"
- "event:*:created" matches "event:order:created", "event:user:created"
"""
pubsub = self.redis_client.pubsub()
pubsub.psubscribe(pattern)
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Pattern: {message['pattern']}, "
f"Channel: {message['channel']}, "
f"Message: {message['data']}")
def process_message(self, channel: str, message: str):
"""Process message logic"""
pass
# Use case: Cache invalidation across multiple servers
def cache_invalidation_example():
"""
Use Redis Pub/Sub to invalidate cache across multiple app servers
When user updates profile:
1. Update database
2. Publish cache invalidation event
3. All servers receive event and clear local cache
"""
redis_ps = RedisPubSub()
# Server 1: Updates user and publishes invalidation
def update_user(user_id: int, updates: dict):
# Update database
update_database(user_id, updates)
# Publish cache invalidation
redis_ps.publish('cache:invalidate', f'user:{user_id}')
# Server 2, 3, 4: Subscribe to invalidation events
def cache_invalidation_listener():
pubsub = redis_ps.redis_client.pubsub()
pubsub.subscribe('cache:invalidate')
for message in pubsub.listen():
if message['type'] == 'message':
cache_key = message['data']
print(f"Invalidating cache: {cache_key}")
# Clear from local cache
local_cache.delete(cache_key)
# Redis Streams (Alternative to Pub/Sub with persistence)
def redis_streams_example():
"""
Redis Streams: Pub/Sub WITH persistence
Better than Pub/Sub when:
- Need message persistence
- Need consumer groups
- Need acknowledgments
Like Kafka-lite in Redis
"""
r = redis.Redis()
# Add message to stream
message_id = r.xadd(
'events',
{
'user_id': '123',
'action': 'login',
'timestamp': '2025-01-01T10:00:00Z'
},
maxlen=10000 # Keep last 10K messages
)
# Read messages from stream
messages = r.xread({'events': '0'}, count=10)
for stream_name, stream_messages in messages:
for message_id, message_data in stream_messages:
print(f"ID: {message_id}, Data: {message_data}")
# Consumer groups (like Kafka)
r.xgroup_create('events', 'analytics-group', id='0', mkstream=True)
# Read as consumer group
messages = r.xreadgroup(
'analytics-group',
'consumer-1',
{'events': '>'},
count=10
)
| Requirement | Recommended System | Why |
|---|---|---|
| Simple task queue, serverless | SQS | Fully managed, zero ops, Lambda integration |
| Fan-out notifications | SNS | Native pub/sub, multiple protocols (email, SMS, HTTP) |
| High-throughput streaming (>100K/s) | Kafka/MSK | Highest throughput, lowest latency, replay capability |
| Real-time analytics in AWS | Kinesis | Native AWS, Firehose integration, simpler than Kafka |
| Event-driven architecture, SaaS integration | EventBridge | Rich filtering, schema registry, cross-account routing |
| Complex routing (topic, headers) | RabbitMQ/Amazon MQ | Flexible exchanges, priority queues, RPC patterns |
| Real-time chat/notifications | Redis Pub/Sub | Ultra-low latency, simple, ephemeral messages |
| Message replay needed | Kafka or Kinesis | Persistent log, seek to any offset |
| Exactly-once processing | SQS FIFO or Kafka | Deduplication and ordering guarantees |
| Load balancing tasks across workers | SQS or RabbitMQ | Competing consumers pattern |
| Log aggregation at scale | Kinesis Firehose | Easy S3/Redshift loading, auto-scaling |
| Migrating from on-prem RabbitMQ | Amazon MQ | Managed RabbitMQ, minimal code changes |
"""
Pattern: SQS between microservices
Order Service → SQS → Inventory Service
↘ SQS → Email Service
↘ SQS → Analytics Service
Benefits:
- Services don't need to know about each other
- Async processing (Order Service responds immediately)
- Automatic retries and DLQ
- Independent scaling
"""
# Order Service
def place_order(order: dict):
# Save order to database
save_order(order)
# Queue tasks asynchronously
sqs.send_message('inventory-queue', {'action': 'reserve', 'order_id': order['id']})
sqs.send_message('email-queue', {'action': 'send_confirmation', 'order_id': order['id']})
sqs.send_message('analytics-queue', {'event': 'order_placed', 'order_id': order['id']})
return {'status': 'success', 'order_id': order['id']}
"""
Pattern: Event sourcing with Kafka
All state changes stored as immutable events
Can rebuild state by replaying events
User Events Topic: user_created, user_updated, user_deleted
Order Events Topic: order_placed, order_shipped, order_cancelled
Benefits:
- Full audit trail
- Time travel (replay to any point)
- Multiple read models from same events
- Easy debugging (reproduce bugs by replaying)
"""
def event_sourcing_pattern():
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Record every state change as event
def update_user(user_id: int, updates: dict):
event = {
'event_type': 'user_updated',
'user_id': user_id,
'updates': updates,
'timestamp': datetime.utcnow().isoformat()
}
# Store event in Kafka (immutable log)
producer.send('user-events', value=json.dumps(event).encode())
# Update read model (projection)
apply_event_to_database(event)
# Rebuild state by replaying all events
def rebuild_user_state(user_id: int):
from kafka import KafkaConsumer
consumer = KafkaConsumer('user-events', auto_offset_reset='earliest')
user_state = {}
for message in consumer:
event = json.loads(message.value)
if event['user_id'] == user_id:
# Apply event to rebuild state
if event['event_type'] == 'user_created':
user_state = event['data']
elif event['event_type'] == 'user_updated':
user_state.update(event['updates'])
elif event['event_type'] == 'user_deleted':
user_state = None
break
return user_state
"""
Pattern: SNS Topic → Multiple SQS Queues
Order Placed Event (SNS)
→ Email Queue (SQS) → Email Service
→ Inventory Queue (SQS) → Inventory Service
→ Warehouse Queue (SQS) → Warehouse Service
→ Analytics Queue (SQS) → Analytics Service
Benefits:
- Each service has its own queue (independent processing)
- SNS provides instant fan-out
- SQS provides durability and retry
- Easy to add new subscribers
"""
def setup_fanout():
sns = boto3.client('sns')
sqs = boto3.client('sqs')
# Create SNS topic
topic = sns.create_topic(Name='order-events')
topic_arn = topic['TopicArn']
# Create queues for each service
services = ['email', 'inventory', 'warehouse', 'analytics']
for service in services:
# Create queue
queue = sqs.create_queue(QueueName=f'{service}-queue')
queue_url = queue['QueueUrl']
# Get queue ARN
attrs = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['QueueArn']
)
queue_arn = attrs['Attributes']['QueueArn']
# Allow SNS to send to queue
policy = {
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": queue_arn,
"Condition": {
"ArnEquals": {"aws:SourceArn": topic_arn}
}
}]
}
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={'Policy': json.dumps(policy)}
)
# Subscribe queue to topic
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_arn
)