Waqas Ahmad — Software Architect & Technical Consultant - Available USA, Europe, Global

Waqas Ahmad — Software Architect & Technical Consultant

Specializing in

Distributed Systems

.NET ArchitectureCloud-Native ArchitectureAzure Cloud EngineeringAPI ArchitectureMicroservices ArchitectureEvent-Driven ArchitectureDatabase Design & Optimization

👋 Hi, I'm Waqas — a Software Architect and Technical Consultant specializing in .NET, Azure, microservices, and API-first system design..
I help companies build reliable, maintainable, and high-performance backend platforms that scale.

Experienced across engineering ecosystems shaped by Microsoft, the Cloud Native Computing Foundation, and the Apache Software Foundation.

Available for remote consulting (USA, Europe, Global) — flexible across EST, PST, GMT & CET.

services
Article

Event Sourcing and CQRS: Patterns, Bus, Unit of Work, and Resilience

Event Sourcing and CQRS: patterns, persistence, and tooling in .NET.

services
Read the article

Introduction

This guidance is relevant when the topic of this article applies to your system or design choices; it breaks down when constraints or context differ. I’ve applied it in real projects and refined the takeaways over time (as of 2026).

Storing only current state makes audit trails and independent read/write scaling difficult when you need both. This article covers Event Sourcing (store events, rebuild state by replay) and CQRS (separate command and query models), including single vs multiple buses, Unit of Work, persistence and resilience (outbox, recovery), and full .NET code and structure. For architects and tech leads, combining Event Sourcing with CQRS matters when you need an audit trail, replay, or different read and write scaling—but it adds complexity, so the article explains when to use it and when not to.

If you are new: start with Topics covered and Event Sourcing and CQRS at a glance.

For a deeper overview of this topic, explore the full Event-Driven Architecture guide.

Decision Context

  • System scale: Varies by context; the approach in this article applies to the scales and scenarios described in the body.
  • Team size: Typically small to medium teams; ownership and clarity matter more than headcount.
  • Time / budget pressure: Applicable under delivery pressure; I’ve used it in both greenfield and incremental refactors.
  • Technical constraints: .NET and related stack where relevant; constraints are noted in the article where they affect the approach.
  • Non-goals: This article does not optimize for every possible scenario; boundaries are stated where they matter.

What is Event Sourcing and what is CQRS

Event Sourcing: You store events (e.g. OrderPlaced, PaymentReceived) in an event store. You do not overwrite state; you append events. The current state of an aggregate (e.g. Order) is computed by replaying all events for that aggregate. Benefits: audit trail, time travel, replay for new read models.

CQRS: You separate the write model (commands, consistency) from the read model (queries, scalability). Writes go to one store (e.g. event store or transactional DB); reads go to projections (e.g. SQL views, document stores) optimized for query shape. You can scale reads and writes independently.

Together: Events are the write model; projections consume events and build read models. Command handlers append events; query API reads from read models only. See Event Sourcing and CQRS at a glance.


Event Sourcing and CQRS at a glance

Concept What it is Where it lives
Command Intent to change state (e.g. PlaceOrder) Write side; sent to bus or handler
Event Something that happened (e.g. OrderPlaced) Event store; consumed by projections
Aggregate Replays events, produces new events Write side
Event store Append-only log of events Infrastructure
Bus Delivers commands (and optionally events) to handlers Single or multiple; see Single bus and Multiple buses
Projection Subscribes to events, updates read model Read side
Read model Optimized for queries Query API reads from here

CQRS patterns and best practices

First steps:

  1. Define commands and events — Commands are imperative (PlaceOrder); events are past tense (OrderPlaced). Keep them in separate namespaces or projects.
  2. One handler per command — Each command type has one handler. Handler loads aggregate (or creates it), applies command (produces events), persists events.
  3. Idempotency — Use a command ID or idempotency key so duplicate submissions (e.g. retry after timeout) do not double-apply. Store processed command IDs and reject duplicates.
  4. Validation — Validate commands before loading the aggregate (e.g. required fields, bounds). Validate business rules inside the aggregate when applying the command.

Best practices:

  • Thin controllers — API receives the command and sends it to the bus (or handler); no business logic in the controller.
  • Async by default — Commands can be queued; return 202 Accepted with a correlation ID when you process asynchronously.
  • Correlation ID — Attach a correlation ID to every command and event so you can trace a request across write and read side.
  • Event versioning — When you change an event schema, use version numbers and upcasters (or event adapters) to transform old events when replaying.

Single bus: queue limits and backpressure

Single bus: One message bus (e.g. in-process MediatR, or a queue like RabbitMQ, Azure Service Bus) for commands. All command handlers are registered with the same bus; the bus routes each command to its handler.

Queue limits: If the bus is backed by a bounded queue (e.g. Channel with a capacity, or a broker queue with maxLength), you limit how many commands can be pending. When the queue is full, you need a backpressure strategy:

Strategy What you do When to use
Reject Return 503 Service Unavailable or 429 Too Many Requests to the client When you want to push back immediately so clients retry later.
Block / wait Caller blocks until the queue has space (with a timeout) When you must not lose the command and can afford to wait.
Overflow to secondary Write to a dead-letter or overflow queue; process later Enterprise: avoid dropping commands; reprocess when load drops.

Implementing a limit (e.g. in .NET):

// Bounded channel: max 1000 pending commands
var channel = Channel.CreateBounded<CommandEnvelope>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait // or DropWrite, etc.
});
// Producer: await channel.Writer.WriteAsync(cmd, ct); // blocks when full if FullMode.Wait
// Consumer: await channel.Reader.ReadAsync(ct);

Best practice: Configure a max queue size and a backpressure policy (reject, wait, or overflow) so that a burst of commands does not exhaust memory or overwhelm the event store.

Single bus: full working setup (bounded queue, presentation, Program.cs)

Below is a complete setup: presentation (API controller that sends commands to the bus), bounded command bus with queue limit and backpressure, background dispatcher that processes the queue, and Program.cs wiring everything.

1. Command bus interface and envelope

// OrderES.Write/Abstractions/ICommandBus.cs
namespace OrderES.Write.Abstractions;

public interface ICommandBus
{
    /// <summary>Enqueue command. Returns false if queue full (backpressure: reject).</summary>
    ValueTask<bool> TryEnqueueAsync(object command, CancellationToken ct = default);
    /// <summary>Enqueue and wait until processed (optional; for sync 201 response).</summary>
    Task EnqueueAndWaitAsync(object command, CancellationToken ct = default);
}

public sealed class CommandEnvelope
{
    public object Command { get; init; } = null!;
    public string CorrelationId { get; init; } = null!;
    public TaskCompletionSource? Completion { get; init; }
}

2. Bounded command bus (Channel with limit, reject when full)

// OrderES.Write/Infrastructure/BoundedCommandBus.cs
using System.Threading.Channels;

namespace OrderES.Write.Infrastructure;

public sealed class BoundedCommandBus : ICommandBus
{
    private readonly Channel<CommandEnvelope> _channel;
    public const int MaxQueueLength = 1000;

    public BoundedCommandBus()
    {
        _channel = Channel.CreateBounded<CommandEnvelope>(new BoundedChannelOptions(MaxQueueLength)
        {
            FullMode = BoundedChannelFullMode.DropWrite // or Wait: blocks until space
        });
    }

    internal ChannelReader<CommandEnvelope> Reader => _channel.Reader;

    public async ValueTask<bool> TryEnqueueAsync(object command, CancellationToken ct = default)
    {
        var envelope = new CommandEnvelope { Command = command, CorrelationId = Guid.NewGuid().ToString("N") };
        return await _channel.Writer.WaitToWriteAsync(ct).ConfigureAwait(false)
            && _channel.Writer.TryWrite(envelope);
    }

    public async Task EnqueueAndWaitAsync(object command, CancellationToken ct = default)
    {
        var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
        var envelope = new CommandEnvelope { Command = command, CorrelationId = Guid.NewGuid().ToString("N"), Completion = tcs };
        await _channel.Writer.WriteAsync(envelope, ct).ConfigureAwait(false);
        await tcs.Task.WaitAsync(ct).ConfigureAwait(false);
    }
}

3. Background service: read from channel, dispatch to handler

// OrderES.Write/Infrastructure/CommandDispatcherHostedService.cs
namespace OrderES.Write.Infrastructure;

public sealed class CommandDispatcherHostedService : BackgroundService
{
    private readonly BoundedCommandBus _bus;
    private readonly IServiceProvider _services;

    public CommandDispatcherHostedService(BoundedCommandBus bus, IServiceProvider services)
    {
        _bus = bus;
        _services = services;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var envelope in _bus.Reader.ReadAllAsync(stoppingToken).ConfigureAwait(false))
        {
            try
            {
                using var scope = _services.CreateScope();
                var handler = ResolveHandler(scope.ServiceProvider, envelope.Command);
                if (handler != null)
                {
                    await handler.Invoke(envelope.Command, stoppingToken).ConfigureAwait(false);
                    envelope.Completion?.TrySetResult();
                }
                else
                    envelope.Completion?.TrySetException(new InvalidOperationException("No handler for " + envelope.Command.GetType().Name));
            }
            catch (Exception ex)
            {
                envelope.Completion?.TrySetException(ex);
            }
        }
    }

    private static Func<object, CancellationToken, Task>? ResolveHandler(IServiceProvider sp, object command)
    {
        var handlerType = typeof(ICommandHandler<>).MakeGenericType(command.GetType());
        var handler = sp.GetService(handlerType);
        if (handler == null) return null;
        var method = handlerType.GetMethod("HandleAsync");
        return (cmd, ct) => (Task)method!.Invoke(handler, new[] { cmd, ct })!;
    }
}

public interface ICommandHandler<TCommand>
{
    Task HandleAsync(TCommand command, CancellationToken ct = default);
}

4. Presentation: write API controller

// OrderES.Api/Controllers/OrdersWriteController.cs
using Microsoft.AspNetCore.Mvc;
using OrderES.Write.Abstractions;
using OrderES.Write.Commands;

namespace OrderES.Api.Controllers;

[ApiController]
[Route("api/orders")]
public class OrdersWriteController : ControllerBase
{
    private readonly ICommandBus _bus;

    public OrdersWriteController(ICommandBus bus) => _bus = bus;

    [HttpPost]
    public async Task<ActionResult> PlaceOrder([FromBody] PlaceOrderRequest request, CancellationToken ct)
    {
        var items = request.Items.Select(i => new OrderLine(i.ProductId, i.Quantity, i.UnitPrice)).ToList();
        var command = new PlaceOrder(Guid.NewGuid(), request.CustomerId, items);
        var correlationId = HttpContext.Request.Headers["X-Correlation-ID"].FirstOrDefault() ?? Guid.NewGuid().ToString("N");

        // Option A: fire-and-forget, return 202 Accepted
        var enqueued = await _bus.TryEnqueueAsync(command, ct).ConfigureAwait(false);
        if (!enqueued)
            return StatusCode(503, new { error = "Command queue full", correlationId });

        Response.Headers["X-Correlation-ID"] = correlationId;
        return AcceptedAtAction(nameof(GetOrder), "OrdersRead", new { id = command.OrderId }, new { orderId = command.OrderId, correlationId });
    }

    [HttpPost("sync")]
    public async Task<ActionResult<Guid>> PlaceOrderSync([FromBody] PlaceOrderRequest request, CancellationToken ct)
    {
        var items = request.Items.Select(i => new OrderLine(i.ProductId, i.Quantity, i.UnitPrice)).ToList();
        var command = new PlaceOrder(Guid.NewGuid(), request.CustomerId, items);
        await _bus.EnqueueAndWaitAsync(command, ct).ConfigureAwait(false);
        return CreatedAtAction(nameof(GetOrder), "OrdersRead", new { id = command.OrderId }, command.OrderId);
    }

    private object GetOrder(Guid id) => null!;
}

public record PlaceOrderRequest(string CustomerId, IReadOnlyList<OrderLineRequest> Items);
public record OrderLineRequest(Guid ProductId, int Quantity, decimal UnitPrice);

5. Program.cs: register bus, dispatcher, handlers, event store

// OrderES.Api/Program.cs
using OrderES.Write.Abstractions;
using OrderES.Write.Handlers;
using OrderES.Write.Infrastructure;
using OrderES.EventStore;

var builder = WebApplication.CreateBuilder(args);

// Bounded command bus (single bus with queue limit)
builder.Services.AddSingleton<BoundedCommandBus>();
builder.Services.AddSingleton<ICommandBus>(sp => sp.GetRequiredService<BoundedCommandBus>());
builder.Services.AddHostedService<CommandDispatcherHostedService>();

// Event store and command handlers
builder.Services.AddScoped<IEventStore, SqlEventStore>();
builder.Services.AddScoped<ICommandHandler<PlaceOrder>, PlaceOrderHandler>();

builder.Services.AddControllers();
var app = builder.Build();
app.MapControllers();
app.Run();

How it works: Client POSTs to /api/orders. Controller builds PlaceOrder, calls _bus.TryEnqueueAsync. If the queue has space, the command is written to the Channel and the API returns 202 Accepted with orderId and correlationId. The CommandDispatcherHostedService reads from the channel, resolves PlaceOrderHandler, and calls HandleAsync. The handler loads the aggregate, appends events to the event store. If the queue is full, TryWrite returns false and the API returns 503. For a sync flow, use EnqueueAndWaitAsync and return 201 Created after the handler completes.


Multiple buses: enterprise level

When to use multiple buses:

  • Separate command bus and event bus — Commands go to a command bus (e.g. one queue); events (after persistence) are published to an event bus (e.g. Kafka, Event Hub) for projections and other services. So: write side publishes events; read side (and other microservices) subscribe to the event bus.
  • Priority lanes — One bus for critical commands (e.g. payment), another for bulk or low-priority (e.g. analytics). Prevents slow handlers from blocking critical ones.
  • Bounded contexts — Each domain or microservice has its own command bus; they communicate via events on a shared event bus. Clear ownership and scaling per context.

Enterprise pattern: Command bus (single or per context) + Event bus (for events after they are stored). Command handlers append to event store and then publish the same events to the event bus. Projections (and other services) subscribe to the event bus, not to the command bus. This gives you durability (event store) and decoupling (event bus for many consumers).

Multiple buses: full setup (command bus + event bus, presentation)

Below is a complete setup with two buses: a command bus (bounded channel as above) and an event bus (e.g. in-memory Channel or RabbitMQ). The command handler appends to the event store and then publishes events to the event bus. A projection worker subscribes to the event bus and updates the read model.

1. Event bus interface and in-memory implementation

// OrderES.Write/Abstractions/IEventBus.cs
namespace OrderES.Write.Abstractions;

public interface IEventBus
{
    Task PublishAsync(IReadOnlyList<object> events, CancellationToken ct = default);
}

// OrderES.Write/Infrastructure/InMemoryEventBus.cs
using System.Threading.Channels;

public sealed class InMemoryEventBus : IEventBus, IAsyncEnumerable<object>
{
    private readonly Channel<object> _channel = Channel.CreateUnbounded<object>();

    public async Task PublishAsync(IReadOnlyList<object> events, CancellationToken ct = default)
    {
        foreach (var e in events)
            await _channel.Writer.WriteAsync(e, ct).ConfigureAwait(false);
    }

    public IAsyncEnumerator<object> GetAsyncEnumerator(CancellationToken ct = default)
        => _channel.Reader.ReadAllAsync(ct).GetAsyncEnumerator(ct);
}

2. Command handler: append to event store, then publish to event bus

// OrderES.Write/Handlers/PlaceOrderHandler.cs
namespace OrderES.Write.Handlers;

public class PlaceOrderHandler : ICommandHandler<PlaceOrder>
{
    private readonly IEventStore _store;
    private readonly IEventBus _eventBus;

    public PlaceOrderHandler(IEventStore store, IEventBus eventBus)
    {
        _store = store;
        _eventBus = eventBus;
    }

    public async Task HandleAsync(PlaceOrder cmd, CancellationToken ct)
    {
        var events = await _store.LoadAsync(cmd.OrderId, ct).ConfigureAwait(false);
        var agg = new OrderAggregate();
        foreach (var e in events) agg.Apply((dynamic)e);
        agg.Place(cmd);
        var newEvents = agg.GetUncommitted();
        await _store.AppendAsync(agg.Id, newEvents, ct).ConfigureAwait(false);
        agg.ClearUncommitted();
        await _eventBus.PublishAsync(newEvents, ct).ConfigureAwait(false);
    }
}

3. Projection worker: subscribe to event bus, update read model

// OrderES.Projections/EventBusProjectionHostedService.cs
namespace OrderES.Projections;

public sealed class EventBusProjectionHostedService : BackgroundService
{
    private readonly InMemoryEventBus _eventBus;
    private readonly IServiceProvider _services;

    public EventBusProjectionHostedService(InMemoryEventBus eventBus, IServiceProvider services)
    {
        _eventBus = eventBus;
        _services = services;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var evt in _eventBus.WithCancellation(stoppingToken).ConfigureAwait(false))
        {
            using var scope = _services.CreateScope();
            var readModel = scope.ServiceProvider.GetRequiredService<IOrderListReadModel>();
            if (evt is OrderPlaced placed)
                await readModel.InsertAsync(new OrderListDto(placed.OrderId, "Placed", placed.CreatedAt), stoppingToken).ConfigureAwait(false);
        }
    }
}

4. Program.cs: register command bus, event bus, dispatcher, handler, event store, projection worker

// OrderES.Api/Program.cs (multiple buses)
builder.Services.AddSingleton<BoundedCommandBus>();
builder.Services.AddSingleton<ICommandBus>(sp => sp.GetRequiredService<BoundedCommandBus>());
builder.Services.AddSingleton<InMemoryEventBus>();
builder.Services.AddSingleton<IEventBus>(sp => sp.GetRequiredService<InMemoryEventBus>());
builder.Services.AddHostedService<CommandDispatcherHostedService>();
builder.Services.AddHostedService<EventBusProjectionHostedService>();

builder.Services.AddScoped<IEventStore, SqlEventStore>();
builder.Services.AddScoped<IOrderListReadModel, SqlOrderListReadModel>();
builder.Services.AddScoped<ICommandHandler<PlaceOrder>, PlaceOrderHandler>();

Flow: API → command bus (bounded) → PlaceOrderHandler → event store (append) → event bus (publish) → EventBusProjectionHostedService → read model. For durable event bus, replace InMemoryEventBus with RabbitMQ/Azure Service Bus and a consumer hosted service.


Assigning responsibilities

Role Responsibility Layer
API / Controller Receives HTTP request, builds command, sends to bus (or calls handler). Returns 202 or 201. Presentation
Command handler Receives command, loads aggregate from event store (replay), calls aggregate method (e.g. PlaceOrder), gets new events, appends to event store. Optionally publishes events to event bus. Application (write)
Aggregate Holds business rules; replays past events to restore state; produces new events when command is applied. No I/O. Domain
Event store Append events; load events by aggregate ID. Infrastructure
Event publisher After append, publish events to event bus (if you use one). Infrastructure
Projection Subscribe to events (from store or event bus); update read model (e.g. SQL table, document). Application (read)
Query API Reads from read model only. No commands, no event store. Presentation (read)

Rule: Write side never reads from the read model for business decisions (it can for validation if needed). Read side never writes to the event store. Commands are assigned to one handler; events can have many projections.


Unit of Work with CQRS: how and why

What is Unit of Work (UoW)? A UoW groups multiple operations into one transaction: either all commit or all roll back.

With CQRS:

  • Command side: The unit of work is typically one command: load aggregate, apply command, append events. That append is a single transaction to the event store. So the “UoW” is one aggregate + one append. You do not usually span multiple aggregates in one transaction in Event Sourcing; you use events and eventual consistency between aggregates.
  • Read side: Projections often update one or more read-model tables. You can wrap a projection’s update in a transaction (UoW) so that one event’s projection either fully applies or not (e.g. one SQL transaction for OrderListProjection updating OrderList).

Why use UoW on the command side? So that appending events is atomic: if you append three events for one aggregate, either all three are stored or none. The event store implementation (e.g. one INSERT per event in a single DB transaction, or a batch append) should be transactional.

Why use UoW on the read side? So that one projection handler updates the read model in one transaction—no partial updates if the handler fails mid-way. For multiple projections, each projection can have its own transaction; they are independent (eventual consistency).

CQRS without Event Sourcing: You might have a UoW that wraps repository saves for the write model (e.g. EF Core SaveChanges). Same idea: one transaction per command (or per aggregate).


Persistence and resilience: crashes, queues, outbox, recovery

Server crashes: If the server crashes after accepting a command but before appending to the event store, the command is lost unless you have durable command queuing.

Queuing commands:

  • Durable command queue — Accept the command and persist it to a queue (e.g. Azure Service Bus, RabbitMQ, or a DB table used as a queue). A worker (same process or separate) reads from the queue and runs the command handler. If the server crashes, the command remains in the queue and is processed after restart (or by another instance).
  • At-least-once delivery — The queue redelivers if the worker does not complete. So command handlers must be idempotent (use command ID / idempotency key and reject duplicates).

Outbox pattern: When the command handler appends events and publishes to an event bus, you risk: event stored but publish fails (or vice versa). Transactional outbox: In the same transaction as appending events, write a row to an outbox table (e.g. OutboxEvents). A relay process reads the outbox and publishes to the event bus, then marks the row as published. So: one transaction = event store + outbox; no lost events.

Recovery after crash:

  1. Commands in durable queue — Restart workers; they pull from the queue and process. No manual “re-queue” if the queue is external (e.g. Service Bus).
  2. Commands only in memory — Lost. Mitigation: use a durable queue or outbox for commands (e.g. API writes command to DB outbox, worker processes from outbox).
  3. Projections — Replay events from the event store to rebuild read models. Use checkpoints (last processed event position) so you can resume; run during deployment or as a dedicated job.

Best practice: Persist commands (durable queue or outbox) before processing so that crashes do not lose user intent. Use idempotency and outbox for events so that publishing is reliable.


Best tools and practices

Tool / area Purpose When to use
MediatR In-process command/query bus; one handler per command. Single process; simple CQRS without a message broker. No built-in queue limit; you can wrap in a Channel or bounded queue.
NServiceBus Durable message bus (MSMQ, RabbitMQ, Azure Service Bus). Handles retries, outbox, sagas. Enterprise; need durable queues, multiple buses, saga/orchestration.
MassTransit Durable bus (RabbitMQ, Azure Service Bus). Supports consumer, saga, outbox. .NET; need broker-based messaging and outbox.
EventStoreDB Event store (append-only, subscriptions). Event Sourcing; dedicated event store.
Marten Postgres as event store + document store for read model. .NET; want event store + projections in one DB.
Kafka / Event Hub Event bus for publishing events to many consumers. Multiple projections or microservices consuming the same stream.

Practices: Use correlation ID and command ID on every message. Limit queue depth and define backpressure. Use transactional outbox when publishing events. Version events and use upcasters. Snapshot aggregates when replay becomes slow.


Project structure and class diagram

OrderES.sln
  src/OrderES.Write/           # Commands, aggregate, command handlers
    Commands/PlaceOrder.cs
    Domain/OrderAggregate.cs
    Handlers/PlaceOrderHandler.cs
    IEventStore.cs
  src/OrderES.EventStore/      # Append/load events (e.g. SQL, EventStoreDB)
  src/OrderES.Projections/     # Subscribe to events, update read model
    OrderListProjection.cs
  src/OrderES.Read/            # Query API, read model
    Controllers/OrdersReadController.cs
    IOrderListReadModel.cs

Class structure (main types):

Loading diagram…

Full code example

End-to-end working code: domain, event store, command handler, presentation (write + read API), and single-bus setup (bounded queue + dispatcher). You can copy and adapt this.

1. Commands and events (domain contracts)

// OrderES.Write/Commands/PlaceOrder.cs
namespace OrderES.Write.Commands;

public record PlaceOrder(Guid OrderId, string CustomerId, IReadOnlyList<OrderLine> Items);

public record OrderLine(Guid ProductId, int Quantity, decimal UnitPrice);

// OrderES.Write/Events/OrderPlaced.cs
namespace OrderES.Write.Events;

public record OrderPlaced(Guid OrderId, string CustomerId, IReadOnlyList<OrderLine> Items, DateTime CreatedAt);

2. Aggregate: replay events, produce new events (_uncommitted)

// OrderES.Write/Domain/OrderAggregate.cs
using OrderES.Write.Commands;
using OrderES.Write.Events;

namespace OrderES.Write.Domain;

public class OrderAggregate
{
    private readonly List<object> _uncommitted = new();

    public Guid Id { get; private set; }
    public string Status { get; private set; } = "None";
    public decimal Total { get; private set; }

    public void Apply(OrderPlaced e)
    {
        Id = e.OrderId;
        Status = "Placed";
        Total = e.Items.Sum(x => x.Quantity * x.UnitPrice);
    }

    public void Place(PlaceOrder cmd)
    {
        var evt = new OrderPlaced(cmd.OrderId, cmd.CustomerId, cmd.Items, DateTime.UtcNow);
        _uncommitted.Add(evt);
        Apply(evt);
    }

    public IReadOnlyList<object> GetUncommitted() => _uncommitted.ToList();
    public void ClearUncommitted() => _uncommitted.Clear();
}

3. Event store interface and SQL implementation

// OrderES.Write/Abstractions/IEventStore.cs
namespace OrderES.Write.Abstractions;

public interface IEventStore
{
    Task<IReadOnlyList<object>> LoadAsync(Guid aggregateId, CancellationToken ct = default);
    Task AppendAsync(Guid aggregateId, IEnumerable<object> events, CancellationToken ct = default);
}

// OrderES.EventStore/SqlEventStore.cs
using System.Data;
using System.Text.Json;
using Microsoft.Data.SqlClient;

namespace OrderES.EventStore;

public class SqlEventStore : IEventStore
{
    private readonly string _connectionString;

    public SqlEventStore(IConfiguration config)
        => _connectionString = config.GetConnectionString("Events")!;

    public async Task<IReadOnlyList<object>> LoadAsync(Guid aggregateId, CancellationToken ct)
    {
        await using var conn = new SqlConnection(_connectionString);
        await conn.OpenAsync(ct).ConfigureAwait(false);
        var cmd = conn.CreateCommand();
        cmd.CommandText = "SELECT Type, Payload FROM Events WHERE AggregateId = @Id ORDER BY Version";
        cmd.Parameters.Add(new SqlParameter("@Id", aggregateId));
        var list = new List<object>();
        await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
        while (await reader.ReadAsync(ct).ConfigureAwait(false))
        {
            var typeName = reader.GetString(0);
            var json = reader.GetString(1);
            var type = Type.GetType(typeName)!;
            list.Add(JsonSerializer.Deserialize(json, type)!);
        }
        return list;
    }

    public async Task AppendAsync(Guid aggregateId, IEnumerable<object> events, CancellationToken ct)
    {
        await using var conn = new SqlConnection(_connectionString);
        await conn.OpenAsync(ct).ConfigureAwait(false);
        var version = await GetNextVersionAsync(conn, aggregateId, ct).ConfigureAwait(false);
        foreach (var e in events)
        {
            var cmd = conn.CreateCommand();
            cmd.CommandText = "INSERT INTO Events (AggregateId, Version, Type, Payload) VALUES (@Id, @V, @Type, @Payload)";
            cmd.Parameters.AddWithValue("@Id", aggregateId);
            cmd.Parameters.AddWithValue("@V", version++);
            cmd.Parameters.AddWithValue("@Type", e.GetType().AssemblyQualifiedName);
            cmd.Parameters.AddWithValue("@Payload", JsonSerializer.Serialize(e));
            await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
        }
    }

    private static async Task<int> GetNextVersionAsync(SqlConnection conn, Guid id, CancellationToken ct)
    {
        var cmd = conn.CreateCommand();
        cmd.CommandText = "SELECT COALESCE(MAX(Version), 0) FROM Events WHERE AggregateId = @Id";
        cmd.Parameters.Add(new SqlParameter("@Id", id));
        var v = await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false);
        return Convert.ToInt32(v) + 1;
    }
}

4. Command handler (load aggregate, apply command, append, optional publish)

// OrderES.Write/Handlers/PlaceOrderHandler.cs
namespace OrderES.Write.Handlers;

public class PlaceOrderHandler : ICommandHandler<PlaceOrder>
{
    private readonly IEventStore _store;

    public PlaceOrderHandler(IEventStore store) => _store = store;

    public async Task HandleAsync(PlaceOrder cmd, CancellationToken ct)
    {
        var events = await _store.LoadAsync(cmd.OrderId, ct).ConfigureAwait(false);
        var agg = new OrderAggregate();
        foreach (var e in events) agg.Apply((dynamic)e);
        agg.Place(cmd);
        await _store.AppendAsync(agg.Id, agg.GetUncommitted(), ct).ConfigureAwait(false);
        agg.ClearUncommitted();
    }
}

5. Presentation: write controller (sends to bus) and read controller

// OrderES.Api/Controllers/OrdersWriteController.cs — see "Single bus: full working setup" above for full code.
// Summary: POST builds PlaceOrder, calls _bus.TryEnqueueAsync(command); returns 202 + correlationId or 503 if queue full.
// POST sync: _bus.EnqueueAndWaitAsync(command); returns 201 + orderId.

// OrderES.Read/Controllers/OrdersReadController.cs
[ApiController]
[Route("api/orders")]
public class OrdersReadController : ControllerBase
{
    private readonly IOrderListReadModel _readModel;

    public OrdersReadController(IOrderListReadModel readModel) => _readModel = readModel;

    [HttpGet]
    public async Task<ActionResult<IReadOnlyList<OrderListDto>>> GetAll(CancellationToken ct)
        => Ok(await _readModel.GetAllAsync(ct).ConfigureAwait(false));

    [HttpGet("{id:guid}")]
    public async Task<ActionResult<OrderListDto>> Get(Guid id, CancellationToken ct)
    {
        var order = await _readModel.GetByIdAsync(id, ct).ConfigureAwait(false);
        return order == null ? NotFound() : Ok(order);
    }
}

public record OrderListDto(Guid OrderId, string CustomerId, string Status, DateTime CreatedAt);

public interface IOrderListReadModel
{
    Task InsertAsync(OrderListDto dto, CancellationToken ct = default);
    Task<IReadOnlyList<OrderListDto>> GetAllAsync(CancellationToken ct = default);
    Task<OrderListDto?> GetByIdAsync(Guid id, CancellationToken ct = default);
}

6. Program.cs: single bus with bounded queue and presentation

// See "Single bus: full working setup" above for full Program.cs.
// Summary: AddSingleton<BoundedCommandBus>, AddSingleton<ICommandBus>, AddHostedService<CommandDispatcherHostedService>,
// AddScoped<IEventStore, SqlEventStore>, AddScoped<ICommandHandler<PlaceOrder>, PlaceOrderHandler>, AddControllers.

7. Database: Events table (for SQL event store)

CREATE TABLE Events (
    AggregateId UNIQUEIDENTIFIER NOT NULL,
    Version INT NOT NULL,
    Type NVARCHAR(500) NOT NULL,
    Payload NVARCHAR(MAX) NOT NULL,
    PRIMARY KEY (AggregateId, Version)
);

How the flow fits together: Client POSTs /api/orders with body { "customerId": "...", "items": [...] }. OrdersWriteController builds PlaceOrder, calls _bus.TryEnqueueAsync. CommandDispatcherHostedService reads from the bounded channel, resolves PlaceOrderHandler, calls HandleAsync. Handler loads events for the aggregate, replays into OrderAggregate, calls Place(cmd) (produces OrderPlaced), appends to event store, clears uncommitted. Read API GetAll / Get(id) read from IOrderListReadModel (updated by projections; if you use the multiple buses setup, the projection worker subscribes to the event bus and updates the read model).


Enterprise practices and pitfalls

Do:

  • Use idempotency keys (or command ID) and reject duplicates.
  • Use a durable queue or outbox for commands so crashes do not lose them.
  • Use transactional outbox when publishing events to an event bus.
  • Set queue limits and backpressure policy on the command bus.
  • Use correlation ID on commands and events for tracing.
  • Version events and provide upcasters for old events.
  • Snapshot aggregates when replay is slow.

Pitfalls:

  • No idempotency — Retries or redelivery cause duplicate application of commands.
  • No durable command queue — Crashes lose commands.
  • Publishing events without outbox — Event stored but publish fails; consumers never see the event.
  • No queue limit — Burst of commands exhausts memory or overloads the store.
  • Mixing read model in command path — Write side should not depend on read model for consistency (eventual consistency only).

Summary

Event Sourcing stores events and rebuilds state by replay; CQRS separates write (commands) and read (queries) models—together they give auditability and independent scaling but require idempotency, outbox, and clear bus design. Getting persistence or bus order wrong leads to lost commands or inconsistent read models; using queue limits, transactional outbox, and versioned events keeps systems recoverable. Next, decide whether you need full Event Sourcing + CQRS or CQRS-only, then design one command handler and one projection before scaling out.

  • Event Sourcing = store events; rebuild state by replay. CQRS = separate write (commands) and read (queries) models.
  • CQRS patterns: One handler per command; idempotency; correlation ID; thin API.
  • Single bus: Use queue limits and backpressure (reject, wait, or overflow). Multiple buses: Command bus + event bus; priority lanes or bounded contexts.
  • Responsibilities: API sends command to bus; handler loads aggregate, appends events; projections subscribe to events and update read model; query API reads read model only.
  • Unit of Work: One transaction per append on command side; one transaction per projection update on read side. Use outbox when publishing events.
  • Resilience: Durable command queue; idempotent handlers; transactional outbox; replay projections from event store for recovery.
  • Tools: MediatR (in-process), NServiceBus/MassTransit (durable bus), EventStoreDB/Marten (event store), Kafka/Event Hub (event bus).

Position & Rationale

I apply Event Sourcing when we need a full audit trail, time travel, or multiple read models from the same history; I avoid it for simple CRUD where a single current state is enough. I use CQRS with or without Event Sourcing when read and write load or shapes justify separate models; I keep the write side free of read-model dependencies for consistency. I prefer a bounded command queue with backpressure (reject, wait, or overflow) over an unbounded in-process queue—I’ve seen memory and overload issues otherwise. I use a transactional outbox when publishing events to an event bus so we never lose events after append. I reject implementing CQRS without idempotency (command ID or idempotency key); retries and at-least-once delivery make it mandatory. I avoid mixing read-model reads into the command path for business decisions; eventual consistency is explicit.


Trade-Offs & Failure Modes

  • What this sacrifices: Event Sourcing adds storage and replay cost; CQRS adds eventual consistency and operational complexity (two models, projections). You give up simple “read your writes” and single-store simplicity.
  • Where it degrades: Unbounded command queues exhaust memory under load. No outbox when publishing to an event bus risks “event stored but not published.” No idempotency leads to duplicate application of commands on retry. Slow replay (no snapshots) blocks recovery and scaling.
  • How it fails when misapplied: Using Event Sourcing for domains that don’t need audit or replay. Using CQRS without clear boundaries so the write side starts reading from the read model for consistency. No durable command queue so crashes lose accepted commands.
  • Early warning signs: “We can’t rebuild the read model”; “commands are applied twice”; “we’re not sure if the event was published”; queue depth growing without backpressure.

What Most Guides Miss

Many tutorials show the happy path: one command, one handler, append events, update read model. They rarely cover durable command queuing (so a crash after accept doesn’t lose the command), transactional outbox (so event store + publish are atomic), or queue limits and backpressure. In practice, in-process unbounded queues blow up under burst load, and “append then publish” without an outbox loses events when publish fails. Another gap: event versioning and upcasters—schema changes happen; without versioning and transformation, replay breaks. Guides also underplay the need for idempotency on every command when using at-least-once delivery.


Decision Framework

  • If you need audit, time travel, or multiple read models from history → Use Event Sourcing; store events, replay for state; use snapshots when replay is slow.
  • If read and write load or shapes differ → Use CQRS; keep write side independent of read model for consistency; use projections for read side.
  • If you use a command bus → Prefer a bounded queue and defined backpressure (reject 503, wait, or overflow); avoid unbounded in-process queues.
  • If you publish events to an event bus → Use a transactional outbox (same transaction as append) and a relay to publish; do not publish directly after append.
  • If you accept commands over the network → Use durable command queue or outbox so crashes do not lose commands; make handlers idempotent.
  • If ownership is split (write vs read) → Assign clearly: API sends command to bus; handler appends events; projections subscribe to events; query API reads read model only.

You can also explore more patterns in the Event-Driven Architecture resource page.

Key Takeaways

  • Event Sourcing and CQRS together: events are the write model; projections build read models; design for idempotency and eventual consistency.
  • Use queue limits and backpressure on the command bus; use durable queue or outbox so accepted commands survive crashes.
  • Use transactional outbox when publishing events; version events and provide upcasters for replay.
  • Revisit “When I Would Use This Again” when choosing between CRUD, CQRS-only, or full Event Sourcing + CQRS.

Need help designing resilient microservices? I support teams with domain boundaries, service decomposition, and distributed systems architecture.

When I Would Use This Again — and When I Wouldn’t

I would use Event Sourcing again when we need audit, temporal queries, or the ability to add new read models from the same history. I would use CQRS again when read and write load or shapes justify separate models and we can own projections and eventual consistency. I wouldn’t use Event Sourcing for simple CRUD or when there’s no real need for replay or audit—the cost and complexity aren’t justified. I wouldn’t adopt both without durable command queuing and idempotency; otherwise retries and crashes cause duplicate or lost work. If the team cannot own outbox, projections, and replay, I’d choose a simpler write model and add CQRS or eventing only when the pain is clear.


services
Frequently Asked Questions

Frequently Asked Questions

What is Event Sourcing?

Store events (things that happened) instead of current state. Rebuild state by replaying events. Gives audit trail, time travel, and the ability to build new read models from history.

What is CQRS?

Command Query Responsibility Segregation: separate write model (commands) from read model (queries). Different stores and scaling for each. Often used with Event Sourcing (events = write model, projections = read model).

Why use a single bus with a queue limit?

To avoid unbounded memory and overload. When the queue is full, you backpressure: reject (503/429), wait, or overflow to a secondary queue.

When use multiple buses?

When you need command bus + event bus (events published after store); priority lanes (critical vs bulk); or one bus per bounded context with a shared event bus.

How assign responsibilities in CQRS?

API sends command to bus. Handler loads aggregate, applies command, appends events. Aggregate holds business rules and produces events. Projection subscribes to events and updates read model. Query API reads read model only.

Unit of Work with CQRS?

On command side: one transaction = one append to event store (one aggregate). On read side: one transaction per projection update. Use transactional outbox when publishing events so store + publish are consistent.

Server crashed after accepting command: how queue new commands?

Use a durable command queue (e.g. Azure Service Bus, RabbitMQ, or DB table). API persists the command to the queue; a worker processes it. After crash, commands remain in the queue and are processed on restart (or by another instance). No manual “re-queue” if the queue is external.

Best tools for CQRS and Event Sourcing in .NET?

In-process: MediatR. Durable bus: NServiceBus, MassTransit. Event store: EventStoreDB, Marten. Event bus: Kafka, Azure Event Hub. Use outbox (e.g. MassTransit outbox, NServiceBus outbox) when publishing events.

What is the outbox pattern?

Append events and in the same transaction write a row to an outbox table. A relay process reads the outbox and publishes to the event bus, then marks the row as published. Ensures events are not lost if publish fails.

How rebuild projections after a crash or schema change?

Replay events from the event store. Use checkpoints (last processed position) so you can resume. Run as a deployment step or a dedicated job. Optionally snapshot aggregates to speed up replay.

services
Related Guides & Resources

services
Related services