← All articles
ARCHITECTURE Event-Driven Architecture Tooling: CQRS, Event Sourc... 2026-02-09 · 8 min read · event-driven · cqrs · event-sourcing

Event-Driven Architecture Tooling: CQRS, Event Sourcing, and Saga Patterns

Architecture 2026-02-09 · 8 min read event-driven cqrs event-sourcing architecture patterns

Event-Driven Architecture Tooling: CQRS, Event Sourcing, and Saga Patterns

Event-driven architecture (EDA) replaces direct service-to-service calls with events -- immutable facts about things that happened. Instead of OrderService calling InventoryService calling PaymentService in a chain, OrderService publishes an OrderPlaced event, and the other services react independently. This decoupling makes systems more resilient, scalable, and easier to evolve.

But EDA introduces complexity. You need event buses, serialization standards, idempotent consumers, saga coordination, and often CQRS or event sourcing to manage state. This guide covers the tools and patterns that make event-driven systems practical, with real TypeScript and Go examples.

Core Patterns

Event Bus / Message Broker

The event bus is the backbone. Events are published to topics, and consumers subscribe. The three most common options are:

Broker Ordering Retention Consumer Groups Best For
Apache Kafka Per-partition Configurable (days to forever) Yes High-throughput, event log
NATS JetStream Per-stream Configurable Yes Low-latency, simple operations
RabbitMQ Per-queue Until consumed (or TTL) No (use competing consumers) Task distribution, RPC

Publishing Events (TypeScript + Kafka)

import { Kafka, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092'],
});

const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
  idempotent: true,  // Exactly-once semantics
});

interface OrderPlacedEvent {
  type: 'order.placed';
  orderId: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  total: number;
  occurredAt: string;
}

async function publishOrderPlaced(order: Order): Promise<void> {
  const event: OrderPlacedEvent = {
    type: 'order.placed',
    orderId: order.id,
    customerId: order.customerId,
    items: order.items,
    total: order.total,
    occurredAt: new Date().toISOString(),
  };

  await producer.send({
    topic: 'orders',
    messages: [{
      key: order.id,          // Partition by order ID for ordering
      value: JSON.stringify(event),
      headers: {
        'event-type': 'order.placed',
        'correlation-id': order.correlationId,
      },
    }],
  });
}

Consuming Events (Go + Kafka)

package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/segmentio/kafka-go"
)

type OrderPlacedEvent struct {
    Type       string      `json:"type"`
    OrderID    string      `json:"orderId"`
    CustomerID string      `json:"customerId"`
    Items      []OrderItem `json:"items"`
    Total      float64     `json:"total"`
    OccurredAt string      `json:"occurredAt"`
}

type OrderItem struct {
    ProductID string  `json:"productId"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}

func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"kafka-1:9092", "kafka-2:9092"},
        GroupID:  "inventory-service",
        Topic:    "orders",
        MinBytes: 1,
        MaxBytes: 10e6,
    })
    defer reader.Close()

    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Fatal("read error:", err)
        }

        var event OrderPlacedEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("unmarshal error: %v", err)
            continue
        }

        if err := handleOrderPlaced(event); err != nil {
            log.Printf("handler error for order %s: %v", event.OrderID, err)
            // In production: dead-letter queue, retry with backoff
        }
    }
}

func handleOrderPlaced(event OrderPlacedEvent) error {
    // Reserve inventory for each item
    for _, item := range event.Items {
        if err := reserveStock(item.ProductID, item.Quantity); err != nil {
            return err
        }
    }
    return nil
}

CQRS: Command Query Responsibility Segregation

CQRS separates the write model (commands that change state) from the read model (queries that return data). This lets you optimize each side independently -- a normalized write model for consistency, and denormalized read models tailored to specific UI needs.

TypeScript CQRS Implementation

// Domain events
interface DomainEvent {
  aggregateId: string;
  type: string;
  data: unknown;
  metadata: {
    correlationId: string;
    occurredAt: string;
    version: number;
  };
}

// Command side: handle commands, emit events
interface Command {
  type: string;
  payload: unknown;
}

class OrderCommandHandler {
  constructor(
    private readonly repository: OrderRepository,
    private readonly eventBus: EventBus,
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // Validate
    if (command.items.length === 0) {
      throw new Error('Order must have at least one item');
    }

    // Create aggregate
    const order = Order.create({
      customerId: command.customerId,
      items: command.items,
    });

    // Persist
    await this.repository.save(order);

    // Publish events
    for (const event of order.uncommittedEvents) {
      await this.eventBus.publish(event);
    }

    return order.id;
  }
}

// Query side: maintain denormalized read models
class OrderProjection {
  constructor(private readonly readDb: ReadDatabase) {}

  async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
    await this.readDb.upsert('order_summaries', {
      orderId: event.aggregateId,
      customerId: event.data.customerId,
      itemCount: event.data.items.length,
      total: event.data.total,
      status: 'placed',
      placedAt: event.metadata.occurredAt,
    });
  }

  async onOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.readDb.update('order_summaries', event.aggregateId, {
      status: 'shipped',
      trackingNumber: event.data.trackingNumber,
      shippedAt: event.metadata.occurredAt,
    });
  }
}

// Query handler: read from optimized read model
class OrderQueryHandler {
  constructor(private readonly readDb: ReadDatabase) {}

  async getCustomerOrders(customerId: string): Promise<OrderSummary[]> {
    return this.readDb.query('order_summaries', {
      customerId,
      orderBy: 'placedAt DESC',
      limit: 50,
    });
  }
}

Event Sourcing

Event sourcing stores the full history of state changes as a sequence of events, rather than storing only the current state. The current state is derived by replaying events. This gives you a complete audit trail and the ability to reconstruct state at any point in time.

Event Store Implementation

// Event store interface
interface EventStore {
  append(streamId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  read(streamId: string, fromVersion?: number): Promise<DomainEvent[]>;
  subscribe(eventTypes: string[], handler: (event: DomainEvent) => Promise<void>): void;
}

// PostgreSQL-backed event store
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: Pool) {}

  async append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number,
  ): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // Optimistic concurrency check
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
        [streamId],
      );
      const currentVersion = result.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, got ${currentVersion}`,
        );
      }

      // Append events
      let version = expectedVersion;
      for (const event of events) {
        version++;
        await client.query(
          `INSERT INTO events (stream_id, version, event_type, data, metadata, occurred_at)
           VALUES ($1, $2, $3, $4, $5, $6)`,
          [streamId, version, event.type, JSON.stringify(event.data),
           JSON.stringify(event.metadata), event.metadata.occurredAt],
        );
      }

      await client.query('COMMIT');
    } catch (err) {
      await client.query('ROLLBACK');
      throw err;
    } finally {
      client.release();
    }
  }

  async read(streamId: string, fromVersion = 0): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version`,
      [streamId, fromVersion],
    );
    return result.rows.map(row => ({
      aggregateId: row.stream_id,
      type: row.event_type,
      data: row.data,
      metadata: { ...row.metadata, version: row.version },
    }));
  }
}

// Aggregate that rebuilds from events
class Order {
  private state: OrderState = { status: 'draft', items: [], total: 0 };
  private version = 0;
  public uncommittedEvents: DomainEvent[] = [];

  static async load(eventStore: EventStore, orderId: string): Promise<Order> {
    const order = new Order();
    const events = await eventStore.read(orderId);
    for (const event of events) {
      order.apply(event);
      order.version = event.metadata.version;
    }
    return order;
  }

  private apply(event: DomainEvent): void {
    switch (event.type) {
      case 'order.placed':
        this.state.status = 'placed';
        this.state.items = event.data.items;
        this.state.total = event.data.total;
        break;
      case 'order.shipped':
        this.state.status = 'shipped';
        break;
      case 'order.cancelled':
        this.state.status = 'cancelled';
        break;
    }
  }
}

Event Store Schema

CREATE TABLE events (
  id          BIGSERIAL PRIMARY KEY,
  stream_id   TEXT NOT NULL,
  version     INTEGER NOT NULL,
  event_type  TEXT NOT NULL,
  data        JSONB NOT NULL,
  metadata    JSONB NOT NULL DEFAULT '{}',
  occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events (stream_id, version);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_occurred ON events (occurred_at);

Saga Pattern: Coordinating Multi-Service Transactions

When a business process spans multiple services, you cannot use a database transaction. The saga pattern coordinates multi-step processes using events, with compensation actions (rollbacks) for each step.

Orchestrated Saga (TypeScript)

interface SagaStep<T> {
  name: string;
  execute: (context: T) => Promise<void>;
  compensate: (context: T) => Promise<void>;
}

class SagaOrchestrator<T> {
  private steps: SagaStep<T>[] = [];
  private completedSteps: SagaStep<T>[] = [];

  addStep(step: SagaStep<T>): this {
    this.steps.push(step);
    return this;
  }

  async execute(context: T): Promise<void> {
    for (const step of this.steps) {
      try {
        await step.execute(context);
        this.completedSteps.push(step);
      } catch (error) {
        console.error(`Saga step "${step.name}" failed:`, error);
        await this.compensate(context);
        throw new SagaFailedError(step.name, error);
      }
    }
  }

  private async compensate(context: T): Promise<void> {
    // Compensate in reverse order
    for (const step of [...this.completedSteps].reverse()) {
      try {
        await step.compensate(context);
      } catch (error) {
        console.error(`Compensation for "${step.name}" failed:`, error);
        // Log for manual intervention -- compensation failures are serious
      }
    }
  }
}

// Usage: order fulfillment saga
interface OrderContext {
  orderId: string;
  customerId: string;
  items: OrderItem[];
  total: number;
  paymentId?: string;
  shipmentId?: string;
}

const orderSaga = new SagaOrchestrator<OrderContext>()
  .addStep({
    name: 'reserve-inventory',
    execute: async (ctx) => {
      await inventoryService.reserve(ctx.orderId, ctx.items);
    },
    compensate: async (ctx) => {
      await inventoryService.release(ctx.orderId, ctx.items);
    },
  })
  .addStep({
    name: 'process-payment',
    execute: async (ctx) => {
      const payment = await paymentService.charge(ctx.customerId, ctx.total);
      ctx.paymentId = payment.id;
    },
    compensate: async (ctx) => {
      if (ctx.paymentId) {
        await paymentService.refund(ctx.paymentId);
      }
    },
  })
  .addStep({
    name: 'create-shipment',
    execute: async (ctx) => {
      const shipment = await shippingService.create(ctx.orderId, ctx.items);
      ctx.shipmentId = shipment.id;
    },
    compensate: async (ctx) => {
      if (ctx.shipmentId) {
        await shippingService.cancel(ctx.shipmentId);
      }
    },
  });

// Execute the saga
await orderSaga.execute({
  orderId: 'order-123',
  customerId: 'cust-456',
  items: [{ productId: 'prod-1', quantity: 2, price: 29.99 }],
  total: 59.98,
});

Choreography-Based Saga (Go)

In choreography, there is no orchestrator. Each service listens for events and publishes its own events. The process emerges from the interactions.

// inventory-service/handler.go
func (h *InventoryHandler) HandleOrderPlaced(ctx context.Context, event OrderPlacedEvent) error {
    err := h.repo.ReserveStock(ctx, event.OrderID, event.Items)
    if err != nil {
        // Publish failure event -- payment service will not proceed
        return h.publisher.Publish(ctx, "inventory", InventoryReservationFailed{
            OrderID: event.OrderID,
            Reason:  err.Error(),
        })
    }

    return h.publisher.Publish(ctx, "inventory", InventoryReserved{
        OrderID: event.OrderID,
        Items:   event.Items,
    })
}

// payment-service/handler.go
func (h *PaymentHandler) HandleInventoryReserved(ctx context.Context, event InventoryReserved) error {
    order, err := h.orderClient.GetOrder(ctx, event.OrderID)
    if err != nil {
        return err
    }

    payment, err := h.processor.Charge(ctx, order.CustomerID, order.Total)
    if err != nil {
        // Publish failure -- inventory service will compensate
        return h.publisher.Publish(ctx, "payments", PaymentFailed{
            OrderID: event.OrderID,
            Reason:  err.Error(),
        })
    }

    return h.publisher.Publish(ctx, "payments", PaymentProcessed{
        OrderID:   event.OrderID,
        PaymentID: payment.ID,
    })
}

// inventory-service/handler.go -- compensation
func (h *InventoryHandler) HandlePaymentFailed(ctx context.Context, event PaymentFailed) error {
    return h.repo.ReleaseStock(ctx, event.OrderID)
}

Frameworks and Libraries

Tool Language Pattern Notes
EventStoreDB Any (gRPC) Event Sourcing Purpose-built event store with projections
Axon Framework Java/Kotlin CQRS + Event Sourcing + Sagas Full framework, opinionated
Temporal Any (Go SDK, TS SDK) Durable workflows / Sagas Not EDA-specific but excellent for sagas
Watermill Go Event-driven messaging Clean abstractions over Kafka, NATS, etc.
NestJS CQRS TypeScript CQRS + Event Sourcing Built into NestJS framework
MassTransit C# Messaging + Sagas Mature .NET library

When to Use (and When Not To)

Use event-driven architecture when:

Do not use event-driven architecture when:

The biggest mistake teams make with EDA is adopting it too early. Start with a monolith. Extract services when you feel the pain. Add events when synchronous calls between services become a bottleneck. The patterns in this guide are powerful, but they are solutions to specific problems -- not starting points for every project.