Skip to content

Migrer vers le Event Sourcing, partie 6: stocker les événements

by Julien on novembre 6th, 2011

An english version of this post is available here.

Vue d’ensemble

Le travail difficile est maintenant terminé. La dernière partie essentielle manquante est celle consistant à stocker les événements. Ce qui est trivial.

Avantages

Journal

A cette étape, nous ne faisons pas grand chose avec le flux d’événements stockés. Malgré tout, il y a déjà une valeur d’affaire substantielle: un journal complet et parfait de tout ce qui s’est passé dans votre application. Nous verrons dans l’étape suivante ce que nous pouvons faire avec.

A l’échelle de votre entreprise cependant, c’est une mine d’or. En effet, cela vous permet une intégration parfaite et “gratuite” pour les autres services / applications de votre organisation. Par “parfaite”, je veux dire que peu importe ce que les autres applications ont besoin de votre part, elles le trouveront dans le journal, puisqu’il retrace exactement tout ce qui s’est passé. En s’abonnant au flux d’événements, elles peuvent même réagir à se qui se passe. Par “gratuite”, je veux dire que votre équipe n’a rien besoin de rajouter ou modifier dans votre application lorsqu’une nouvelle application veut s’y connecter, ou qu’une application déjà connectée a besoin d’information additionnelle. Les autres applications ont juste besoin de s’abonner au flux, et d’en dériver n’importe quelle information sans aucun changement de votre part. Bye bye les web services ou services REST que vous aviez besoin de maintenir vous même.

Je détaillerais les solutions techniques pour permettre ceci dans le dernier billet de cette série.

Implémentation

Ce qui change

Nous avons ajouté un composant crucial dans l’infrastructure:

Les aggregats

Pour pouvoir gérer les problèmes de concurrence, nous avons ajouté un champ dans les agrégats:

public interface IAggregateRoot
{
    Guid Id { get; }
    IEnumerable<IEvent> UncommitedEvents { get; }
    int Version { get; } // New!
}

L’entrepôt d’événementsr

L’entrepôt d’événements est responsable de stocker et d’indexer les événements.

public interface IEventStore
{
    void PersistUncommitedEvents(IAggregateRoot aggregate);
}

L’implémentation est plutôt simple J’ai choisi de stocker les événements dans une table de la base de données. Chaque tuple de la table contient tous les événements emmis par un agrégat lors d’un commit. La version correspond simplement aux nombre de commits sur cet agrégat.

La gestion de la conccurrence est gérée par une contrainte d’unicité sur l’identifiant de l’agrégat et sa version. Si la contrainte est brisée au moment de l’insertion du commit, cela signifie qu’une autre commande vous a devançé pendant que vous traitiez votre logique. J’ai emprunté cette technique à l’entrepôt d’événements CQRS de Jonathan Oliver.

public void PersistUncommitedEvents(IAggregateRoot aggregate)
{
    try
    {
        persistenceManager.ExecuteNonQuery(
            "INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
            new
            {
                Id = Guid.NewGuid(),
                Version = aggregate.Version + 1,
                AggregateId = aggregate.Id,
                Data = Serialize(aggregate.UncommitedEvents)
            });
    }
    catch (SqlException se)
    {
        // Thanks Jonathan Oliver's CQRS Event Store
        if (se.Number == UniqueKeyViolation) throw new ConcurrencyException();
        throw;
    }
}

Comment faut il réagir lorsqu’un problème de concurrence est détecté? Il suffit simplement de réessayer la commande!

Bus de commandes

Le bus est responsable dans mon exemple de réessayer la commande lorsque cela arrive:

public void Send<T>(T cmd) where T : ICommand
{
    var handler = container.Build<IHandleCommand<T>>();
    var handled = false;

    while (handled == false)
    {
        try
        {
            handler.Handle(cmd);
            // Trigger persistence and concurrency checking.
            persistenceManager.Commit();
            handled = true;
        }
        catch (ConcurrencyException)
        {
            // Hit a concurrency exception, must retry the command.
        }
    }
}

Persistence manager

La dernière piece du puzzle se situe dans le gestionnaire de persistence. Il faut naturellement rajouter un appel à l’entrepôt au moment du commit:

using (var tx = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
    context[TRANSACTION_KEY] = tx;

    try
    {
        foreach (var ar in aggregates)
        {
            lazyEventStore.PersistUncommitedEvents(ar);
        }
    }
    catch (ConcurrencyException)
    {
        tx.Rollback();
        context[TRANSACTION_KEY] = null;
        throw;
    }

    // At this stage, no concurrency issues, so pass on to the event handlers
    foreach (var ar in aggregates)
    {
        foreach (var evt in ar.UncommitedEvents)
        {
            eventBus.Publish(evt);
        }
    }

    context[TRANSACTION_KEY] = null;
    tx.Commit();

    context[AGGREGATE_KEY] = null;
}

Et voilà. Notez que nous n’avons pas eu besoin de toucher au domaine, ou à la couche de présentation. Cela n’a nécessité qu’un changement mineur dans l’infrastructure.

Démarrer l’application exemple

Les sources pour toute la série de billets sont disponible sur GitHub à http://github.com/jletroui/TransitioningToEventSourcing
.

Vous aurez simplement besoin de créer une base de données vide intitulée “DDDPart6″ dans SQLExpress avant de lancer l’application.
Les 3 dernières étapes de cette série étant très simples, l’exemple “partie 6″ couvre en fait les parties 6 à 8.

Liste des billets “migrer vers le Event Sourcing”:

From → Event Sourcing

8 Comments
  1. I believe System.Transactions will roll back the transaction if there’s a unique constraint exception from the database, causing your ‘Rollback’ to throw InvalidOperationException…

  2. To be clear: my comment applies in the case that you’re using System.Transactions, not SqlTransaction, but would apply with an inner SqlTransaction within a TransactionScope.

Trackbacks & Pingbacks

  1. Transitioning your DDD “light” application to CQRS and Event Sourcing | Julien's blog
  2. Transitioning to Event Sourcing, part 1: the DDD “light” application | Julien's blog
  3. Transitioning to Event Sourcing, part 3: commands | Julien's blog
  4. Transitioning to Event Sourcing, part 4: track state changes | Julien's blog
  5. Transitioning to Event Sourcing, part 2: go CQRS with DTOs | Julien's blog
  6. Transitioning to Event Sourcing, part 5: use events for updating your domain database | Julien's blog

Leave a Reply

Note: XHTML is allowed. Your email address will never be published.

Subscribe to this comment feed via RSS