Migrer vers le Event Sourcing, partie 6: stocker les événements
An english version of this post is available here.
Vue d’ensemble
Le travail difficile est maintenant terminé. La dernière partie essentielle manquante est celle consistant à stocker les événements. Ce qui est trivial.
Avantages
Journal
A cette étape, nous ne faisons pas grand chose avec le flux d’événements stockés. Malgré tout, il y a déjà une valeur d’affaire substantielle: un journal complet et parfait de tout ce qui s’est passé dans votre application. Nous verrons dans l’étape suivante ce que nous pouvons faire avec.
A l’échelle de votre entreprise cependant, c’est une mine d’or. En effet, cela vous permet une intégration parfaite et « gratuite » pour les autres services / applications de votre organisation. Par « parfaite », je veux dire que peu importe ce que les autres applications ont besoin de votre part, elles le trouveront dans le journal, puisqu’il retrace exactement tout ce qui s’est passé. En s’abonnant au flux d’événements, elles peuvent même réagir à se qui se passe. Par « gratuite », je veux dire que votre équipe n’a rien besoin de rajouter ou modifier dans votre application lorsqu’une nouvelle application veut s’y connecter, ou qu’une application déjà connectée a besoin d’information additionnelle. Les autres applications ont juste besoin de s’abonner au flux, et d’en dériver n’importe quelle information sans aucun changement de votre part. Bye bye les web services ou services REST que vous aviez besoin de maintenir vous même.
Je détaillerais les solutions techniques pour permettre ceci dans le dernier billet de cette série.
Implémentation
Ce qui change
Nous avons ajouté un composant crucial dans l’infrastructure:
Les aggregats
Pour pouvoir gérer les problèmes de concurrence, nous avons ajouté un champ dans les agrégats:
{
Guid Id { get; }
IEnumerable<IEvent> UncommitedEvents { get; }
int Version { get; } // New!
}
L’entrepôt d’événementsr
L’entrepôt d’événements est responsable de stocker et d’indexer les événements.
{
void PersistUncommitedEvents(IAggregateRoot aggregate);
}
L’implémentation est plutôt simple J’ai choisi de stocker les événements dans une table de la base de données. Chaque tuple de la table contient tous les événements emmis par un agrégat lors d’un commit. La version correspond simplement aux nombre de commits sur cet agrégat.
La gestion de la conccurrence est gérée par une contrainte d’unicité sur l’identifiant de l’agrégat et sa version. Si la contrainte est brisée au moment de l’insertion du commit, cela signifie qu’une autre commande vous a devançé pendant que vous traitiez votre logique. J’ai emprunté cette technique à l’entrepôt d’événements CQRS de Jonathan Oliver.
{
try
{
persistenceManager.ExecuteNonQuery(
"INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
new
{
Id = Guid.NewGuid(),
Version = aggregate.Version + 1,
AggregateId = aggregate.Id,
Data = Serialize(aggregate.UncommitedEvents)
});
}
catch (SqlException se)
{
// Thanks Jonathan Oliver's CQRS Event Store
if (se.Number == UniqueKeyViolation) throw new ConcurrencyException();
throw;
}
}
Comment faut il réagir lorsqu’un problème de concurrence est détecté? Il suffit simplement de réessayer la commande!
Bus de commandes
Le bus est responsable dans mon exemple de réessayer la commande lorsque cela arrive:
{
var handler = container.Build<IHandleCommand<T>>();
var handled = false;
while (handled == false)
{
try
{
handler.Handle(cmd);
// Trigger persistence and concurrency checking.
persistenceManager.Commit();
handled = true;
}
catch (ConcurrencyException)
{
// Hit a concurrency exception, must retry the command.
}
}
}
Persistence manager
La dernière piece du puzzle se situe dans le gestionnaire de persistence. Il faut naturellement rajouter un appel à l’entrepôt au moment du commit:
{
context[TRANSACTION_KEY] = tx;
try
{
foreach (var ar in aggregates)
{
lazyEventStore.PersistUncommitedEvents(ar);
}
}
catch (ConcurrencyException)
{
tx.Rollback();
context[TRANSACTION_KEY] = null;
throw;
}
// At this stage, no concurrency issues, so pass on to the event handlers
foreach (var ar in aggregates)
{
foreach (var evt in ar.UncommitedEvents)
{
eventBus.Publish(evt);
}
}
context[TRANSACTION_KEY] = null;
tx.Commit();
context[AGGREGATE_KEY] = null;
}
Et voilà. Notez que nous n’avons pas eu besoin de toucher au domaine, ou à la couche de présentation. Cela n’a nécessité qu’un changement mineur dans l’infrastructure.
Démarrer l’application exemple
Les sources pour toute la série de billets sont disponible sur GitHub à http://github.com/jletroui/TransitioningToEventSourcing
.
Vous aurez simplement besoin de créer une base de données vide intitulée « DDDPart6″ dans SQLExpress avant de lancer l’application.
Les 3 dernières étapes de cette série étant très simples, l’exemple « partie 6″ couvre en fait les parties 6 à 8.
Liste des billets « migrer vers le Event Sourcing »:
- Partie 1: L’application DDD « légère ».
- Partie 2: Etre CQRS en utilisant des DTOs pour les requêtes.
- Partie 3: Définir les commandes de façon explicite.
- Partie 4: Garder trace des changements.
- Partie 5: Utiliser les événements pour mettre à jour la base de données.
- Partie 6: Stocker les événements.
- Partie 7: Utiliser les événements pour construire un modèle de lecture.
- Partie 8: Supprimer la base relationnelle, devenir « Event Sourced ».
- Partie 9: Passer rapidement sur les points avancés: versionement, stratégies de fusion intelligentes, consistance décalée, montée en charge coté requête, montée en charge coté transactionnel.
Trackbacks & Pingbacks
- Transitioning your DDD “light” application to CQRS and Event Sourcing | Julien's blog
- Transitioning to Event Sourcing, part 1: the DDD “light” application | Julien's blog
- Transitioning to Event Sourcing, part 3: commands | Julien's blog
- Transitioning to Event Sourcing, part 4: track state changes | Julien's blog
- Transitioning to Event Sourcing, part 2: go CQRS with DTOs | Julien's blog
- Transitioning to Event Sourcing, part 5: use events for updating your domain database | Julien's blog


I believe System.Transactions will roll back the transaction if there’s a unique constraint exception from the database, causing your ‘Rollback’ to throw InvalidOperationException…
To be clear: my comment applies in the case that you’re using System.Transactions, not SqlTransaction, but would apply with an inner SqlTransaction within a TransactionScope.