Day 13 — Week 2

Stream Processing & Event Sourcing

Instead of storing current state, event sourcing stores every change as an immutable event. The current state is derived by replaying the event log — enabling time travel, audit trails, and multiple read models.

Event Sourcing CQRS Pattern Event Replay Snapshots Stream Processing
Key Concepts
📜
Event Sourcing
Never UPDATE rows — only APPEND events. To get current state, replay all events from the beginning (or from a snapshot). Events are immutable facts: "OrderPlaced", "PaymentReceived", "OrderShipped".
✂️
CQRS
Command Query Responsibility Segregation: separate write models (events) from read models (projections). Writes append events; reads query from a pre-computed read model optimized for the query pattern.
📷
Snapshots
Periodic checkpoint of aggregate state. Instead of replaying 1M events from the beginning, load the last snapshot (event #950K) and replay only the last 50K events. Crucial for performance at scale.
Stream Processing
Process events as they flow in real time (Kafka Streams, Apache Flink). Apply transformations, aggregations, and joins over time windows. Output to read models, alerts, or downstream topics.
Interactive Simulation — Event Log Replay
📋 Event Log
📊 Read Model (current state)
Order status
Total$0.00
Items0
Paymentnone
Events replayed0
Architecture
Commands (writes)
Event Store
Kafka / EventStoreDB
Projector
Read Model
Redis / Postgres
Query API
AspectEvent SourcingTraditional CRUD
Audit trailComplete — every change recordedManual — need extra audit table
Time travelYes — replay to any point in timeNo — only current state
ComplexityHigher — schema evolution trickyLower — simpler mental model
PerformanceWrites fast; reads via projectionsReads fast with indexes
Best forFinancial systems, audit-heavy, complex domainsSimple CRUD, most web apps
Code Example — Event Sourcing with Snapshots
from dataclasses import dataclass, field
from typing import List
import json, time

@dataclass
class Event:
    type: str
    data: dict
    timestamp: float = field(default_factory=time.time)
    version: int = 0

class OrderAggregate:
    def __init__(self):
        self.events: List[Event] = []
        self.status = "new"
        self.total = 0.0
        self.version = 0

    def apply(self, event: Event):
        """Mutate state from event — deterministic."""
        if event.type == "OrderPlaced":
            self.status = "placed"
            self.total = event.data["total"]
        elif event.type == "PaymentReceived":
            self.status = "paid"
        elif event.type == "OrderShipped":
            self.status = "shipped"
        elif event.type == "OrderCancelled":
            self.status = "cancelled"
        self.version += 1

    def replay(self, events: List[Event]):
        """Rebuild state from event history."""
        for e in events:
            self.apply(e)

    def snapshot(self) -> dict:
        """Save checkpoint to avoid replaying all events."""
        return {"status": self.status, "total": self.total, "version": self.version}
Quiz
1. In event sourcing, what is the "source of truth"?
2. What problem does CQRS solve?
3. Why are snapshots important in event sourcing at scale?
4. What is the key difference between stream processing and batch processing?
5. Your event sourcing system has a bug in the "OrderShipped" event handler that miscalculated delivery_fee. How do you fix historical data?