Skip to content

Transitioning to Event Sourcing, part 6: store events

by Julien on November 6th, 2011

Une version française de ce billet est disponible ici.

Overview

All the hard work is behind now. The most crucial part missing for going event sourced is actually to store the events, which is quite trivial.

Benefits

Log

At this stage, we are not doing a lot with the events, but storing them is giving your business a great value: complete, perfect log of what happened in the application. We’ll see in part 7 that you can derive all sorts of valuable information from that event stream.

It also provide an invaluable asset: perfect, automatic integration for whichever other application in your enterprise. By “perfect”, I mean that whatever the client application needs from yours, everything is in the event log. By listening to this log, the other application can extract whatever information they need, and also react to whatever happen. By “automatic”, I mean your team does not need to do anything special to accommodate a new integrating application. The integrating application just subscribe to your event stream, and they are extracting whatever they need themselves. Your team doe not have to add features to a web service or a rest api to give access to something the other team would need.

I will detail a bit more how you can achieve that from the event log in the last post of this series.

Implementation

What changed

We’ve added a crucial new component in the infrastructure:

The aggregates

In order to deal with concurrency, we have added a small property in aggregates:

public interface IAggregateRoot
{
    Guid Id { get; }
    IEnumerable<IEvent> UncommitedEvents { get; }
    int Version { get; } // New!
}

The event store

The event store is the component that will be responsible for storing events:

public interface IEventStore
{
    void PersistUncommitedEvents(IAggregateRoot aggregate);
}

The implementation is pretty straightforward. I chose to store the events in an “Events” table in SQL. Each row will contain all the events emitted during one commit for a given aggregate. Concurrency is achieved by adding a unique constraint on the aggregate id and the version. If you break this constraint when committing, it means that an other command raced you while your where processing this one. I borrowed this technique from Jonathan Oliver’s Event Store.

public void PersistUncommitedEvents(IAggregateRoot aggregate)
{
    try
    {
        persistenceManager.ExecuteNonQuery(
            "INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
            new
            {
                Id = Guid.NewGuid(),
                Version = aggregate.Version + 1,
                AggregateId = aggregate.Id,
                Data = Serialize(aggregate.UncommitedEvents)
            });
    }
    catch (SqlException se)
    {
        // Thanks Jonathan Oliver's CQRS Event Store
        if (se.Number == UniqueKeyViolation) throw new ConcurrencyException();
        throw;
    }
}

So what do you do when you detect a concurrency issue? Simple: you just need to retry your command!

Command bus

The command bus is responsible for retrying a failed attempt when such a concurrency occur. It will reload the aggregate and starts over:

public void Send<T>(T cmd) where T : ICommand
{
    var handler = container.Build<IHandleCommand<T>>();
    var handled = false;

    while (handled == false)
    {
        try
        {
            handler.Handle(cmd);
            // Trigger persistence and concurrency checking.
            persistenceManager.Commit();
            handled = true;
        }
        catch (ConcurrencyException)
        {
            // Hit a concurrency exception, must retry the command.
        }
    }
}

Persistence manager

The final bit is calling the event store while committing in the persistence manager:

using (var tx = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
    context[TRANSACTION_KEY] = tx;

    try
    {
        foreach (var ar in aggregates)
        {
            lazyEventStore.PersistUncommitedEvents(ar);
        }
    }
    catch (ConcurrencyException)
    {
        tx.Rollback();
        context[TRANSACTION_KEY] = null;
        throw;
    }

    // At this stage, no concurrency issues, so pass on to the event handlers
    foreach (var ar in aggregates)
    {
        foreach (var evt in ar.UncommitedEvents)
        {
            eventBus.Publish(evt);
        }
    }

    context[TRANSACTION_KEY] = null;
    tx.Commit();

    context[AGGREGATE_KEY] = null;
}

And that is it. We don’t need to change anything in the domain or presentation layer, this was purely an infrastructure change.

Running the sample

The sources for the entire series is available on GitHub at http://github.com/jletroui/TransitioningToEventSourcing
.

To run this sample, simply create a new “DDDPart6″ database in SQLExpress before launching it.
Last 3 steps of this series being very small and composable together, the single “part 6″ sample is actually covering parts 6 to 8.

Transitioning to Event Sourcing posts:

From → Event Sourcing

8 Comments
  1. I believe System.Transactions will roll back the transaction if there’s a unique constraint exception from the database, causing your ‘Rollback’ to throw InvalidOperationException…

  2. To be clear: my comment applies in the case that you’re using System.Transactions, not SqlTransaction, but would apply with an inner SqlTransaction within a TransactionScope.

Trackbacks & Pingbacks

  1. Transitioning your DDD “light” application to CQRS and Event Sourcing | Julien's blog
  2. Transitioning to Event Sourcing, part 1: the DDD “light” application | Julien's blog
  3. Transitioning to Event Sourcing, part 3: commands | Julien's blog
  4. Transitioning to Event Sourcing, part 4: track state changes | Julien's blog
  5. Transitioning to Event Sourcing, part 2: go CQRS with DTOs | Julien's blog
  6. Transitioning to Event Sourcing, part 5: use events for updating your domain database | Julien's blog

Leave a Reply

Note: XHTML is allowed. Your email address will never be published.

Subscribe to this comment feed via RSS