Streaming Systems Deep Dive

Kafka, Event Streaming, and Real-Time Data Processing

Why Streaming Systems?

The Problem with Request-Response

Traditional: Request-Response

graph TB Service1[Order Service] -->|"HTTP POST"| Service2[Inventory Service] Service2 -->|"HTTP POST"| Service3[Shipping Service] Service3 -->|"HTTP POST"| Service4[Notification Service] style Service1 fill:#bf616a,color:#2e3440 style Service2 fill:#bf616a,color:#2e3440 style Service3 fill:#bf616a,color:#2e3440 style Service4 fill:#bf616a,color:#2e3440
Problems with synchronous communication:

The Streaming Solution

graph LR subgraph "Event-Driven: Streaming" Order[Order Service] -->|"Publish"| Kafka[Kafka] Kafka -->|"Subscribe"| Inventory[Inventory Service] Kafka -->|"Subscribe"| Shipping[Shipping Service] Kafka -->|"Subscribe"| Notification[Notification Service] Kafka -->|"Subscribe"| Analytics[Analytics Service] end style Order fill:#a3be8c,color:#2e3440 style Kafka fill:#d08770,color:#2e3440 style Inventory fill:#88c0d0,color:#2e3440 style Shipping fill:#88c0d0,color:#2e3440 style Notification fill:#88c0d0,color:#2e3440 style Analytics fill:#88c0d0,color:#2e3440
WHY event streaming is better:

Kafka Internals

Core Concepts

graph LR subgraph Producers["Producers"] Producer1([Order Service]) Producer2([Checkout Service]) end subgraph Cluster["Kafka Cluster - Topic: orders"] P0["Partition 0
Leader: Broker 1"] P1["Partition 1
Leader: Broker 2"] P2["Partition 2
Leader: Broker 3"] end subgraph Consumers["Consumer Group: order-processors"] C1[Consumer 1
Reads P0] C2[Consumer 2
Reads P1] C3[Consumer 3
Reads P2] end Producer1 -->|"order_id=123"| P0 Producer1 -->|"order_id=456"| P1 Producer2 -->|"order_id=789"| P2 P0 --> C1 P1 --> C2 P2 --> C3 style P0 fill:#88c0d0,color:#2e3440 style P1 fill:#88c0d0,color:#2e3440 style P2 fill:#88c0d0,color:#2e3440 style Producer1 fill:#a3be8c,color:#2e3440 style Producer2 fill:#a3be8c,color:#2e3440 style C1 fill:#ebcb8b,color:#2e3440 style C2 fill:#ebcb8b,color:#2e3440 style C3 fill:#ebcb8b,color:#2e3440
Concept What It Is Analogy
Topic Category of messages (e.g., "orders", "user-events") Database table
Partition Ordered, immutable sequence of records within a topic Shard of a table
Offset Unique ID for each record within a partition Row number
Consumer Group Set of consumers that share work (each partition → one consumer) Worker pool
Broker Kafka server that stores data and serves clients Database node

Partition Deep Dive

How Partitions Work

graph LR subgraph "Partition 0 (Append-Only Log)" O0["Offset 0
order_id: 100"] --> O1["Offset 1
order_id: 103"] O1 --> O2["Offset 2
order_id: 106"] O2 --> O3["Offset 3
order_id: 109"] O3 --> O4["Offset 4
order_id: 112"] O4 --> New["New messages
appended here →"] end Producer([Producer]) -->|"Write"| New Consumer([Consumer]) -->|"Read from offset 2"| O2 style O0 fill:#434c5e,color:#e0e0e0 style O1 fill:#434c5e,color:#e0e0e0 style O2 fill:#88c0d0,color:#2e3440 style O3 fill:#88c0d0,color:#2e3440 style O4 fill:#88c0d0,color:#2e3440 style New fill:#a3be8c,color:#2e3440 style Producer fill:#a3be8c,color:#2e3440 style Consumer fill:#ebcb8b,color:#2e3440
WHY partitions matter:
  • Parallelism: More partitions = more consumers can read in parallel
  • Ordering: Messages within a partition are strictly ordered
  • Scalability: Partitions can be spread across brokers

Partition Key Selection

# Messages with same key go to same partition
producer.send("orders", key="user_123", value={"order_id": 456, ...})
producer.send("orders", key="user_123", value={"order_id": 789, ...})
# Both go to same partition → ordering preserved for user_123

# How Kafka chooses partition:
partition = hash(key) % num_partitions
HOW to choose partition key:

Rule of thumb: Use the entity ID that needs ordering guarantees.

Replication

graph LR Producer([Producer]) -->|"Write"| Leader subgraph B1["Broker 1"] Leader["Partition 0
(LEADER)
Offsets: 0-100"] end subgraph B2["Broker 2"] Follower1["Partition 0
(FOLLOWER)
Offsets: 0-100"] end subgraph B3["Broker 3"] Follower2["Partition 0
(FOLLOWER)
Offsets: 0-98"] end Leader -->|"Replicate"| Follower1 Leader -->|"Replicate"| Follower2 Consumer([Consumer]) -->|"Read"| Leader style Leader fill:#a3be8c,color:#2e3440 style Follower1 fill:#88c0d0,color:#2e3440 style Follower2 fill:#ebcb8b,color:#2e3440 style Producer fill:#a3be8c,color:#2e3440 style Consumer fill:#ebcb8b,color:#2e3440

ISR (In-Sync Replicas)

Replicas that are caught up with the leader. A message is "committed" when all ISRs have it.

Setting Value Meaning
replication.factor 3 Each partition has 3 copies
min.insync.replicas 2 At least 2 replicas must acknowledge before commit
acks (producer) all Wait for all ISRs to acknowledge
WHY these settings?
  • replication.factor=3: Survive 2 broker failures
  • min.insync.replicas=2: Data safe even if 1 replica lags
  • acks=all: No data loss on leader failure

Consumer Groups

graph LR subgraph Topic["Topic: orders (6 partitions)"] P0[P0] P1[P1] P2[P2] P3[P3] P4[P4] P5[P5] end subgraph GroupA["Consumer Group A (3 consumers)"] CA1[Consumer A1
Reads P0, P1] CA2[Consumer A2
Reads P2, P3] CA3[Consumer A3
Reads P4, P5] end subgraph GroupB["Consumer Group B (2 consumers)"] CB1[Consumer B1
Reads P0, P1, P2] CB2[Consumer B2
Reads P3, P4, P5] end P0 --> CA1 P1 --> CA1 P2 --> CA2 P3 --> CA2 P4 --> CA3 P5 --> CA3 P0 --> CB1 P1 --> CB1 P2 --> CB1 P3 --> CB2 P4 --> CB2 P5 --> CB2 style P0 fill:#88c0d0,color:#2e3440 style P1 fill:#88c0d0,color:#2e3440 style P2 fill:#88c0d0,color:#2e3440 style P3 fill:#88c0d0,color:#2e3440 style P4 fill:#88c0d0,color:#2e3440 style P5 fill:#88c0d0,color:#2e3440 style CA1 fill:#a3be8c,color:#2e3440 style CA2 fill:#a3be8c,color:#2e3440 style CA3 fill:#a3be8c,color:#2e3440 style CB1 fill:#ebcb8b,color:#2e3440 style CB2 fill:#ebcb8b,color:#2e3440
HOW consumer groups work:

Rule: num_partitions >= num_consumers for parallelism

Consumer Offset Management

# Consumer tracks its position per partition
# Stored in internal topic: __consumer_offsets

# Auto-commit (default, can lose messages)
enable.auto.commit=true
auto.commit.interval.ms=5000

# Manual commit (safer)
enable.auto.commit=false

while True:
    records = consumer.poll(timeout=1000)
    for record in records:
        process(record)  # Do work first
    consumer.commit()    # Then commit offset
At-Least-Once Pitfall:

If you commit BEFORE processing and crash during processing → message lost!

If you commit AFTER processing and crash after processing but before commit → message reprocessed!

Make your processing idempotent to handle reprocessing safely.

Exactly-Once Semantics

The Three Guarantees

Guarantee Meaning When Messages Lost? When Duplicates?
At-Most-Once Fire and forget Producer crash, network failure Never
At-Least-Once Retry until acknowledged Never Retry after timeout, consumer crash
Exactly-Once Each message processed exactly once Never Never

How Kafka Achieves Exactly-Once

1. Idempotent Producer

Kafka assigns a unique ID to each producer. Retried messages with same sequence number are deduplicated.

# Enable idempotent producer
enable.idempotence=true

# Kafka tracks: (producer_id, sequence_number) per partition
# Duplicate detected if same sequence_number arrives twice
sequenceDiagram participant Producer participant Broker Producer->>Broker: Message (seq=1) Broker-->>Producer: ACK Producer->>Broker: Message (seq=2) Note over Broker: Network timeout, no ACK received Producer->>Broker: Retry Message (seq=2) Note over Broker: Duplicate! Same seq=2, ignore Broker-->>Producer: ACK (deduped)

2. Transactional Producer

Atomic writes across multiple partitions/topics. All or nothing.

producer = KafkaProducer(
    transactional_id="order-processor-1",  # Unique ID
    enable_idempotence=True
)

producer.init_transactions()

try:
    producer.begin_transaction()

    # These writes are atomic
    producer.send("orders", value=order)
    producer.send("inventory", value=inventory_update)
    producer.send("notifications", value=notification)

    producer.commit_transaction()  # All visible at once
except Exception:
    producer.abort_transaction()   # All rolled back
WHY transactional producer?
  • Consume from topic A, produce to topic B atomically
  • Update multiple topics consistently
  • Stream processing frameworks use this internally

3. Consume-Transform-Produce Pattern

graph LR Input[Input Topic] --> Consumer[Consumer] Consumer --> Process[Process] Process --> Producer[Producer] Producer --> Output[Output Topic] Producer -->|"Commit offset
in same transaction"| Offsets[__consumer_offsets] style Input fill:#88c0d0,color:#2e3440 style Output fill:#a3be8c,color:#2e3440 style Offsets fill:#ebcb8b,color:#2e3440 style Consumer fill:#d08770,color:#2e3440 style Process fill:#d08770,color:#2e3440 style Producer fill:#d08770,color:#2e3440
# Exactly-once stream processing
producer.begin_transaction()

# Read from input
records = consumer.poll()

for record in records:
    result = transform(record)

    # Write to output
    producer.send("output-topic", value=result)

# Commit offset AND output in same transaction
producer.send_offsets_to_transaction(
    consumer.get_offsets(),
    consumer.group_id
)
producer.commit_transaction()
HOW this achieves exactly-once:
  1. Consumer reads message at offset N
  2. Process and produce output
  3. Commit output + offset N atomically
  4. If crash before commit → both rolled back → reprocess from N
  5. If crash after commit → both committed → move to N+1

Stream Processing Frameworks

When to Use What

Framework Best For Latency Complexity
Kafka Streams JVM apps, simple transformations, small state Milliseconds Low
Apache Flink Complex event processing, large state, exactly-once Milliseconds High
Apache Spark Streaming Batch-like processing, ML integration Seconds (micro-batch) Medium
ksqlDB SQL queries on streams, quick prototyping Milliseconds Very Low

Kafka Streams Example

// Word count streaming
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream("input-topic");

KTable<String, Long> wordCounts = textLines
    .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .count();

wordCounts.toStream().to("word-counts-output");

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

Stream Processing Concepts

Windowing

Group events by time windows for aggregation.

graph LR subgraph "Tumbling Window (Non-overlapping)" T1["Window 1
00:00-00:05"] T2["Window 2
00:05-00:10"] T3["Window 3
00:10-00:15"] end subgraph "Sliding Window (Overlapping)" S1["Window
00:00-00:05"] S2["Window
00:02-00:07"] S3["Window
00:04-00:09"] end subgraph "Session Window (Gap-based)" SS1["Session 1
(events close together)"] Gap["Gap > threshold"] SS2["Session 2
(new session)"] end
// Tumbling window: Count orders per 5-minute window
orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Sliding window: Count orders in last 5 minutes, updated every 1 minute
orders
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Session window: Group user activity with 30-minute inactivity gap
userEvents
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();

Event Time vs Processing Time

graph LR subgraph "Event Time" E1["Event created
10:00:00"] end subgraph "Processing Time" E2["Event processed
10:00:05"] end E1 -->|"Network delay
5 seconds"| E2
Event Time vs Processing Time:
Aspect Event Time Processing Time
Definition When event actually occurred When event is processed
Deterministic? Yes (replayable) No (depends on when processed)
Late events? Must handle Not an issue
Use case Accurate analytics Real-time monitoring
// Handle late events with watermarks
.windowedBy(TimeWindows
    .ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
// Events up to 1 minute late will still be included in correct window

Stateful Processing

graph LR Input[Input Stream] --> Processor[Processor] Processor <--> State[(Local State
RocksDB)] Processor --> Output[Output Stream] State -->|"Changelog"| Changelog[Changelog Topic] style Input fill:#88c0d0,color:#2e3440 style Output fill:#a3be8c,color:#2e3440 style State fill:#ebcb8b,color:#2e3440 style Changelog fill:#d08770,color:#2e3440 style Processor fill:#b48ead,color:#2e3440
WHY local state + changelog?
  • Performance: Local state (RocksDB) is fast, no network calls
  • Durability: Changelog topic backs up state to Kafka
  • Recovery: On crash, rebuild state from changelog
  • Scalability: State partitioned like input topic

Common Streaming Patterns

1. Event Sourcing

Store state as a sequence of events, not current state.

graph LR subgraph "Event Log (Source of Truth)" E1["AccountCreated
{id: 123}"] E2["MoneyDeposited
{amount: 100}"] E3["MoneyWithdrawn
{amount: 30}"] E4["MoneyDeposited
{amount: 50}"] end subgraph "Materialized View" View["Account 123
Balance: $120"] end E1 --> E2 --> E3 --> E4 --> View
# Event sourcing with Kafka
# Topic: account-events (source of truth)
{"event": "AccountCreated", "account_id": "123", "timestamp": "..."}
{"event": "MoneyDeposited", "account_id": "123", "amount": 100}
{"event": "MoneyWithdrawn", "account_id": "123", "amount": 30}
{"event": "MoneyDeposited", "account_id": "123", "amount": 50}

# Consumer rebuilds current state by replaying events
balance = 0
for event in events:
    if event.type == "MoneyDeposited":
        balance += event.amount
    elif event.type == "MoneyWithdrawn":
        balance -= event.amount
# balance = 120
WHY event sourcing?
  • Audit trail: Complete history of what happened
  • Debugging: Replay events to reproduce bugs
  • New views: Build new projections from existing events
  • Time travel: Query state at any point in time

2. CQRS (Command Query Responsibility Segregation)

graph LR subgraph Write["Write Side"] Command([Command]) --> WriteAPI[Write API] WriteAPI --> EventStore[(Event Store
Kafka)] end subgraph Read["Read Side"] EventStore --> Projector[Projector] Projector --> ReadDB[(Read Database
Optimized for queries)] Query([Query]) --> ReadAPI[Read API] ReadAPI --> ReadDB end style Command fill:#bf616a,color:#2e3440 style Query fill:#a3be8c,color:#2e3440 style EventStore fill:#d08770,color:#2e3440 style ReadDB fill:#88c0d0,color:#2e3440 style WriteAPI fill:#ebcb8b,color:#2e3440 style ReadAPI fill:#ebcb8b,color:#2e3440 style Projector fill:#b48ead,color:#2e3440
HOW CQRS works:
  1. Commands (writes) go to event store (Kafka)
  2. Projectors consume events and update read databases
  3. Queries read from optimized read models (can be different DBs!)
  4. Scale reads and writes independently

3. Change Data Capture (CDC)

graph LR subgraph "Source System" App[Application] --> DB[(MySQL)] end subgraph "CDC" DB -->|"Read binlog"| Debezium[Debezium] Debezium --> Kafka[Kafka] end subgraph "Consumers" Kafka --> Search[Elasticsearch] Kafka --> Cache[Redis Cache] Kafka --> Analytics[Data Warehouse] end style DB fill:#88c0d0,color:#2e3440 style Debezium fill:#a3be8c,color:#2e3440 style Kafka fill:#d08770,color:#2e3440 style Search fill:#ebcb8b,color:#2e3440 style Cache fill:#ebcb8b,color:#2e3440 style Analytics fill:#ebcb8b,color:#2e3440
# Debezium CDC event example
{
  "before": {"id": 123, "name": "John", "email": "john@old.com"},
  "after": {"id": 123, "name": "John", "email": "john@new.com"},
  "source": {
    "db": "users",
    "table": "accounts",
    "ts_ms": 1699999200000
  },
  "op": "u"  // u=update, c=create, d=delete
}
WHY CDC?

4. Saga Pattern (Distributed Transactions)

sequenceDiagram participant Order as Order Service participant Kafka participant Inventory as Inventory Service participant Payment as Payment Service participant Shipping as Shipping Service Order->>Kafka: OrderCreated Kafka->>Inventory: OrderCreated Inventory->>Kafka: InventoryReserved Kafka->>Payment: InventoryReserved Payment->>Kafka: PaymentProcessed Kafka->>Shipping: PaymentProcessed Shipping->>Kafka: ShipmentScheduled Kafka->>Order: ShipmentScheduled Order->>Order: Mark order complete Note over Order,Shipping: If any step fails, compensating events undo previous steps
# Saga with compensating transactions
class OrderSaga:
    def on_order_created(self, event):
        # Reserve inventory
        publish("inventory-commands", {"type": "ReserveInventory", ...})

    def on_inventory_reserved(self, event):
        # Process payment
        publish("payment-commands", {"type": "ProcessPayment", ...})

    def on_payment_failed(self, event):
        # Compensate: release inventory
        publish("inventory-commands", {"type": "ReleaseInventory", ...})

    def on_inventory_reserve_failed(self, event):
        # Compensate: cancel order
        publish("order-commands", {"type": "CancelOrder", ...})

Real-World Use Cases

1. Real-Time Analytics Dashboard

User Events → Kafka → Flink (aggregate per minute) → Redis → Dashboard

# Flink SQL
SELECT
    window_start,
    page_url,
    COUNT(*) as page_views,
    COUNT(DISTINCT user_id) as unique_visitors
FROM TABLE(
    TUMBLE(TABLE page_views, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, page_url;

2. Fraud Detection

Transactions → Kafka → Flink CEP → Alerts

# Complex Event Processing: Detect 3+ transactions in 5 minutes from different countries
Pattern<Transaction> pattern = Pattern
    .<Transaction>begin("first")
    .where(tx -> true)
    .followedBy("second")
    .where(tx -> !tx.country.equals(prev.country))
    .followedBy("third")
    .where(tx -> !tx.country.equals(prev.country))
    .within(Time.minutes(5));

3. Log Aggregation

App Logs → Kafka → Logstash → Elasticsearch → Kibana

# High throughput: 100K+ logs/second
# Kafka handles backpressure, Elasticsearch can catch up

4. Event-Driven Microservices

# Order placed → multiple services react
order-events:
  - OrderService publishes OrderPlaced
  - InventoryService: reserve stock
  - PaymentService: charge card
  - NotificationService: send confirmation
  - AnalyticsService: update metrics
  - RecommendationService: update user profile

Key Takeaways

When to Use Streaming

Use Streaming When Use Request-Response When
Multiple consumers need same events Single consumer, synchronous response needed
Decoupling services Tight coupling is acceptable
Event replay needed No replay requirement
High throughput, async processing Low throughput, sync processing
Building audit logs Simple CRUD operations

Kafka Configuration Cheat Sheet

Requirement Configuration
No data loss acks=all, min.insync.replicas=2
Exactly-once enable.idempotence=true, transactional.id=X
High throughput acks=1, batch.size=65536, linger.ms=5
Low latency acks=1, linger.ms=0
Ordering Use same partition key for related events

← Back to Index