Skip to content
août 23 14

Tester un système Apache Hive

by Julien

An english version of this post is available here.

Vue d’ensemble

Tester un système d’intelligence d’affaire basé sur Hive demande quelques efforts. Certains projets peuvent se contenter de Hive en mode embarqué. Mais parfois, cela ne suffit pas. Par exemple, lorsque le système comprends des outils externes à la JVM, comme une suite d’intelligence d’affaire.

Ce billet explique la solution que nous avons fini par adopter chez Triton Digital.

Client Hadoop

Pour tester le lancement des tâches mapred, la première chose à mettre en place est un client Hadoop. En effet, Hive appelle directement le script “hadoop” de la distribution Hadoop.

Un peu de travail dans SBT permet de se passer d’une distribution complète. Commençons par les jars nécessaires:

import java.io.PrintWriter

resolvers += "Cloudera repo" at "https://repository.cloudera.com/artifactory/cloudera-repos"

ivyConfigurations += config("hadoop")

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client"   % "2.3.0-mr1-cdh5.0.0"  % "hadoop",
  "org.apache.hive"   % "hive-exec"       % "0.12.0-cdh5.0.0"     % "hadoop"
)

val dumpHadoopClasspath = TaskKey[Unit]("dump-hadoop-classpath", "Dumps hadoop classpath in a file")

dumpHadoopClasspath := {
  val printer = new PrintWriter("hadoop.classpath")
  printer.print(update.value.select(configurationFilter("hadoop")).map(_.getCanonicalPath).mkString(":"))
  printer.close()
}

test in Test <<= (test in Test) dependsOn dumpHadoopClasspath

Comme nous n’avons pas besoin de ces jars dans les classes du projet, nous plaçons les dépendances dans une configuration ivy spéciale nommée “hadoop”.

Nous nous assurons également de construire le classpath résultant dans un fichier pour qu’il puisse être utilisé dans notre version personnalisée du script “hadoop”.

Script “hadoop”

Nous partons du script régulier que l’on peut trouver dans n’importe quelle distribution tarball d’Hadoop. Nous modifions simplement le début pour y utiliser notre classpath construit plus haut:

# This one is commented since we manually 'take over' the config below
#. "$bin"/hadoop-config.sh

export HADOOP_HOME=$(pwd)
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
HADOOP_USER_CLASSPATH_FIRST=true
HADOOP_CLASSPATH=$(cat hadoop.classpath)

A partir de là, nous devrions avoir une version du client Hadoop qui fonctionne:

> sbt update
[info] Loading global plugins from /Users/jletrouit/.sbt/0.13/plugins
[info] Loading project definition from /Users/jletrouit/Documents/perso/hivetest/project
[info] Set current project to hivetest (in build file:/Users/jletrouit/Documents/perso/hivetest/)
[info] Updating {file:/Users/jletrouit/Documents/perso/hivetest/}hivetest...
[info] Resolving org.apache.velocity#velocity;1.7 ...
[info] Done updating.
[success] Total time: 9 s, completed 1-Jun-2014 3:30:09 PM

> ./hadoop version
Hadoop 2.3.0-cdh5.0.0
Subversion git://github.sf.cloudera.com/CDH/cdh.git -r 8e266e052e423af592871e2dfe09d54c03f6a0e8
Compiled by jenkins on 2014-03-28T04:29Z
Compiled with protoc 2.5.0
From source with checksum fae92214f92a3313887764456097e0

Serveur Hive de test

La prochaine étape est un peu plus lourde. Pour démarrer une instance Hive de test, nous avons besoin d’un certain nombre de dépendances:

libraryDependencies ++= Seq(
  "org.apache.hive"   % "hive-jdbc"     % "0.12.0-cdh5.0.0" excludeAll(
    ExclusionRule(organization = "junit"),
    ExclusionRule(organization = "org.apache.avro")
  ),
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-test"     % "2.3.0-mr1-cdh5.0.0"  % "test" exclude("net.java.dev.jets3t", "jets3t"),
  "org.apache.hadoop" % "hadoop-auth"     % "2.3.0-cdh5.0.0"      % "test",
  "org.specs2"        %% "specs2"         % "2.3.12"              % "test"
)

Ces dépendances nous permettent de démarrer un mini HDFS et un mini cluster MapReduce:

  val currentDir = new File(".").getCanonicalPath()
  val conf = new Configuration()

  // This is making sure we are not picking up locally installed hadoop libraries and stay isolated
  System.setProperty("java.library.path","")
  // We could use a temporary directory, but those logs can be useful for debugging a test failing
  System.setProperty("hadoop.log.dir", "logs/hadoop") // MAPREDUCE-2785

  conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, createTempDirectory("dfs_base"))
  conf.set("hadoop.home.dir", currentDir)
  conf.set("dfs.permissions", "false")
  conf.set("hadoop.security.authorization", "false")

  val miniDFS = new MiniDFSCluster.Builder(conf).build()

  val miniMR = new MiniMRCluster(
    1,      // numTaskTrackers
    miniDFS.getFileSystem().getUri().toString(),
    1,      // numTaskTrackerDirectories
    null,   // racks
    null,   // hosts
    new JobConf(conf))

  // Save those for later
  val jt = miniMR.createJobConf(new JobConf(conf)).get("mapred.job.tracker")
  val warehouseDir = "file" + File.pathSeparator + createTempDirectory("hive_warehouse")

Et enfin un serveur Hive:

  val hiveConf = new HiveConf(getClass())

  hiveConf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  // Hive still need to use a hadoop command line tool. This one bundled with the project is pointing to the
  // minimal hadoop client jars we are downloading through SBT in the "hadoop" ivy config.
  hiveConf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")

  val server = new HiveServer2()
  server.init(hiveConf)
  server.start()

Cependant, nous avons besoin de certains ajustements supplémentaires.

Transferrer la configuration Hive à la session JDBC

La configuration Hive n’est pas transferrée à la session JDBC du serveur. A la place, une nouvelle instance est créée. Dans une distribution normale, ce n’est pas un problème, la configuration fraiche étant chargée du fichier hive-site.xml. Dans notre cas, nous créons notre configuration de manière programmatique, en particulier afin d’assigner les ports et les adresses de notre namenode et tasktracker. Un petit hack permet de réinjecter cette configuration dans la session JDBC:

val hiveConf = new HiveConf(getClass())
configureHive(hiveConf)
hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK.varname, classOf[TestHiveSessionHook].getCanonicalName())

val server = new HiveServer2()
server.init(hiveConf)
server.start()

def configureHive(conf: HiveConf) {
  conf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  conf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")
}

class TestHiveSessionHook extends HiveSessionHook {
  def run(ctx: HiveSessionHookContext) {
    TestHiveServer2.configureHive(ctx.getSessionConf)
  }
}

Faire taire les timeouts JDBC

D’ordinaire, les différents composants résident dans des process différents. Mais dans notre cas ou tout réside dans la même JVM, les timeouts des composants satellites de Hive se marchent sur les pieds, puisque le timeout dans le DriverManager de JDBC est un champ statique partagé. Nous nous assurons qu’il reste à une valeur convenable pour que nos requêtes de test aient le temps de se compléter:

conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT.varname, "1200")
conf.set(HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT.varname, "1200")

Finalement, le test

Nous pouvons maintenant écrire un test créant une véritable connection JDBC:

using(TestHiveServer2.createConnection) { con =>

  val smt = con.createStatement()

  smt.executeUpdate("create table ngram (ngram string, year int, occurences int, documents int) row format delimited fields terminated by '\\t' stored as textfile")
  smt.executeUpdate(s"load data local inpath '$dir/googlebooks-eng-all-1gram-20120701-z-sample.tsv' overwrite into table ngram")

  using(smt.executeQuery("select ngram, SUM(documents) as total_documents from ngram group by ngram")) { rs =>
    List(
      {rs.next(); rs.getString(1)} -> rs.getInt(2),
      {rs.next(); rs.getString(1)} -> rs.getInt(2)
    ) mustEqual List(
      "zenith" -> 426197,
      "zooplankton" -> 24939
    )
  }
}

Contrairement au mode embarqué, vous pouvez faire une pause au milieu de votre test (par exemple avec StdIn.readLine()), et inspecter votre base de donnée Hive avec le client beeline, en vous connectant à “jdbc:hive2://localhost”.

Code source

Vous trouverez un exemple complet de cette technique sur GitHub: https://github.com/jletroui/hivetest

Bon tests!

nov. 14 11

Migrer vers le Event Sourcing, partie 9: outils et patrons additionnels

by Julien

An english version of this post is available here.

Je vais essayer de passer brièvement sur quelques possibilités que les systèmes centrés sur les événements peuvent offrir. Chaque sujet mériterait probablement un billet complet, ou même un livre. Si vous souhaitez plus d’informations, il y a beaucoup de matériel disponible dans le groupe d’usager DDD / CQRS. Greg Young va également prochainement sortir un ouvrage sur ces thèmes.

Clichés

Évidemment, le event sourcing pur va rencontrer rapidement des limitations quand les agrégats vont avoir plusieurs milliers d’événements. Vous allez donc probablement devoir implémenter un système de clichés. L’idée est de demander à l’agrégat de nous fournir un cliché de son état, et de stocker ce cliché. Lorsque l’on charge un agrégat dans l’application, nous demandons alors simplement le dernier cliché ainsi que les événements publiés depuis que ce cliché ai été stocké. Les clichés peuvent simplement être stocker dans une table comprenant l’identifiant de l’agrégat, sa version actuelle, le cliché courant et la version du cliché.

Vous devriez implémenter la génération des clichés dans un système dédié. En effet, c’est une optimisation qui n’est en rien la responsabilité de la couche d’affaire. Le processus créant les clichés a simplement besoin de faire une requête sur la table des clichés pour tous les agrégats qui n’ont pas eu de cliché depuis un certain nombre de commits. Il suffit alors de charger les dits agrégats, puis de leur demander un cliché qui servira en retour à mettre à jour la même table.

Versionnement

Qu’arrive-t-il lorsque vous avez besoin de modifier les informations portées par un type d’événement suite à l’évolution de votre modèle d’affaire?

Vous pouvez tout simplement créer une nouvelle version de cet événement (par exemple: CustomerAddressCorrected_v2) et modifier l’agrégat pour ne consommer que ce dernier événement. Il n’est pas nécessaire de mettre à jour tous les événements dans votre dépôt d’événements. Par contre, vous pouvez insérer un convertisseur aprés la requête au dépôt de façon à ce que les agrégats ne recoivent que les dernière versions des types d’événements dans leur constructeur.
Comment convertir un événement d’un ancien type vers sa nouvelle version? Prenons l’exemple d’un champ qui aurait été ajouté. Vous allez devoir prendre la même décision que lorsque vous rajoutez une colonne dans une table de base de données. Il va vous falloir trouver une valeur par défaut.
Bien sûr, il va y avoir des situations ou la conversion sera impossible. Selon Greg Young, ce genre de cas indique souvent que le nouveau type d’événement n’est pas une nouvelle version de cet événement mais bien un nouveau type d’événement. Dans ce cas, la conversion n’a pas de sens.

Rinat Abdulin couvre (en anglais) quelques exemples et donne plus de détails dans sa page sur le versionnement.

Modèle d’affichage / vues

Puisque nous ne sommes plus contraints par le modèle transactionnel pour nos vues, nous avons la liberté d’utiliser des systèmes plus efficaces. En particulier, pourquoi ne pas stocker directement une version sérialisée de la vue dans la base de données? Bien sûr, on pourrait utiliser une colonne de type blob pour ce faire, ainsi que quelque colonnes servant à indexer chaque ligne pour les recherches. Mais on peut aussi utiliser une base de donnée documentaire qui existe précisément pour ça. En .NET, je vous recommanderais RavenDb. Sur les autres plate-formes, il y a beaucoup de choix, comme par exemple MongoDb. Quelque unes de ces solution produisent même directement du JSON, que vous pouvez donc transférer, sans traitement de votre part, à votre application AJAX.

Pour ce sujet encore, Rinat Abdulin a publié une lecture intéressante.

Fusion intelligente de commits

Jusqu’à présent, notre gestion de concurrence fut assez faible: nous réessayons une commande lorsque nous détectons un conflit. C’est suffisant pour la majorité des applications. Mais si nécessaire, nous pouvons tirer partie du fait que les événements renferment énormément d’informations contextuelles sur ce qui s’est passé. Lorsque nous rencontrons un conflit, nous pouvons charger les événements commités dans intervalle du traitement de la commande en court, et les examiner. Nous pouvons avoir des règles qui pour chaque paire d’événements commité / à commiter indique s’il y a un réel conflit. Le plus souvent, il n’y a pas de conflit si les événements sont de type différent. Ce peut être la règle par défaut. Mais des règles plus fines peuvent être appliquées au cas par cas. Par exemple, si 2 changements de nom changent pour le même nom, alors il n’y a pas de conflit.

Cette stratégie de fusion est particulièrement utile dans les applications fortement collaboratives. Mais elle ouvre également d’autre possibilités dans le cas des applications déconnectées. Certains clients d’applications ne peuvent être connectés à leur serveur. C’est le cas par exemple dans les domaines de la prospection minière, ou encore pour la police. Dans ces cas, le client peut embarquer une copie du dépôt d’événement et fonctionner à partir de cette copie. Lorsque le client se reconnecte au serveur, tous les commits sont fusionnés. Seuls les événements rentrant en conflit devront être fusionnés à la main par l’utilisateur. C’est d’ailleurs la technique utilisée par la plupart des systèmes relationnels de base de données à travers leur journal des transactions.

Intégration

C’est un sujet énorme. L’intégration de systèmes intra- et inter- entreprise est habituellement un casse tête qui réclame beaucoup d’argent. Il y a beaucoup de facteurs en jeux. L’un d’eux est le fait que les applications ne sont pas toujours conçues pour être intégrées. Heureusement, cela change avec la popularisation de SOA. Un autre problème est le fait que la plupart des technologies d’intégration sur le marché sont synchrone: REST, Web Services, CORBA, RMI, WCF, etc… C’est un problème car les communications synchrones ne peuvent évoluer en charge. Imaginez un service qui a besoin d’appeler un service qui a besoin d’appeler un 3e service, qui… Le délai des appels réseau augmente rapidement. Pire, plus vous ajoutez de composants à la chaîne, plus les risques de rupture d’une transaction augmentent. Et je ne parles même pas du cauchemar que représente la maintenance d’une telle toile d’araignée.

Event sourcing offre une approche propre à de nombreux de ces problèmes. Combiné avec de la publication /abonnement, les applications deviennent déconnectées dans le temps, et découplées dans leur responsabilités. Si d’autres applications souhaitent s’intégrer à la votre, elles ont simplement besoin de s’abonner à votre flux d’événements. Vous pouvez pour ce faire publier votre flux dans un bus de service qui va router vos événements automatiquement à tous les abonnés. Votre application n’a donc même plus besoin de savoir qui est abonné. Et de nouvelles applications peuvent s’abonner sans même que vous ayez besoin de le savoir. Ces applications peuvent alors extraire toute l’information dont elle ont besoin à partir du flux de vos événements.

Cela résous le problème de la faillibilité d’un composant de la chaîne. En effet, ce genre d’intégration oblige les applications à supporter des workflows asynchrone. Si une application tombe, les messages vont simplement s’accumuler dans le bus de service jusqu’à ce qu’elle revienne en ligne et traite les messages. De même, si vous choisissez attentivement votre format de sérialisation, les problèmes de maintenabilité sont fortement réduit. En effet, votre application peut emmètre de nouveaux types de messages sans impacter les applications abonnées. Ces dernières seront responsable de mettre à jour leur logique d’intégration si elles souhaitent profiter des dernières versions des événements.

Cohérence retardée (eventual consistency)

On parle de cohérence retardée lorsqu’il y a un délai entre un commit d’un agrégat et la mise à jour des vues. Ou plus précisément, lorsque ces 2 processus utilisent des transactions différentes. Celà veut dire que vous ne commitez que les événements résultant d’une commande dans le dépôt d’événements. Un processus différent est responsable de propager ce nouveau commit aux gestionnaires de vues. Si vous avez déjà un bus de service, vous pouvez l’utiliser directement pour cette fonction. D’ailleurs, on pourrait quasiment considérer le modèle d’affichage comme un service distinct de la logique d’affaire, presque un “bounded context” différent.

Ceci dit, dans la plupart des cas, ce délai n’est pas nécessaire. Tant que vous n’en avez pas besoin, je vous conseillerais de l’éviter, et de continuer à mettre à jour vos vues en même temps que le dépôt d’événements.

Mais alors quand est ce qu’il est utile d’introduire ce délai? Nous verrons dans la section suivante qu’un cas d’utilisation est la montée en charge. Un autre cas est la performance. Une partie de vos vues peuvent être lourdes à mettre à jours, par exemple la mise à jours d’un cube OLAP pour vos rapports. Il peut alors être nécessaire d’isoler la mise à jours de cette partie dans un processus séparé, afin de préserver une bonne performance dans le traitement des commandes.

Monter en charge les lectures et rapports

Une fois que vous avez en place un système de cohérence retardée, il est aisé de faire évoluer l’aspect “lecture” de votre application. C’est une bonne nouvelle car la plupart des applications ont un volume de lectures beaucoup plus important que leur volume d’écritures. Tout ce que vous avez à faire est de créer plusieurs copies de votre modèle d’affichage. Vous pouvez même en avoir une copie sur chacune de vos machines web. Chaque copie serait bien évidemment abonnée au flux d’événements:

De cette manière, vous avez une évolutivité quasiment linéaire en ajoutant simplement des serveurs d’application. L’autre avantage de cette configuration est que maintenant votre modèle d’affichage est hébergé localement sur votre serveur d’application. Vous n’avez plus besoin d’un appel réseau pour retourner votre page.

Si vous voulez évoluer à de très large volumes, votre problème est réduit à faire évoluer votre service de publication / abonnement, ce qui est en général un problème plus simple.

Monter en charge les écritures

La première étape consiste à rendre le traitement des commandes asynchrones. Au lieu de traiter immédiatement la commande, le serveur d’application l’envoie simplement dans une queue. Bien entendu, cela signifie que l’expérience usager sera modifiée. Prenez Amazon par exemple: lorsque vous soumettez une commande, la page suivante ne vous indique pas si la commande fut un succès ou non. A la place, vous avez un message du style “Votre commande à été prise en compte. Vous recevrez un courriel de confirmation sous peu.”.

Ceci dit, si vous avez vraiment besoin, il existe des techniques pour recevoir le résultat de l’exécution d’une commande de façon asynchrone.

Plusieurs serveurs de logique d’affaire peuvent alors concourir pour vider la queue des commandes. Mais cela ne permet que de faire évoluer le traitement de la logique d’affaire. Le dépôt d’événements devient alors le goulot d’étranglement. Heureusement, de par leur nature, les événements et les commandes peuvent être facilement partitionnés (“sharding”) selon l’identifiant de l’agrégat. Vous pouvez alors router les commandes au serveur de logique d’affaire concerné (grâce au fait que les agrégats soient un “consistency boundary”):

Liste des billets “migrer vers le Event Sourcing”:

nov. 10 11

Développement mobile: applications natives ou HTML5?

by Julien

An english version of this post is available here.

Ayant testé dernièrement le développement d’application HTML pour iPad, je réalise que beaucoup de personnes ont des idées préconçues sur le HTML5 comparé à des applications natives. Voici quelques observations que j’ai pu faire:

App store

Les sites web AJAX peuvent être “bookmarkés” directement sur l’accueil de l’appareil. Cependant, il semble que la plupart des usagers préfèrent aller chercher leurs applications sur le app store (bien qu’il y ai des exceptions).

En fait, vous pouvez parfaitement envelopper votre site web HTML5 dans un conteneur qui va le rendre accessible dans chaque app store.

Résultat: égalité.

Capacité hors ligne

Avec le stockage local d’HTML5, vous pouvez facilement mettre vos données en cache sur l’appareil et rentre votre site web disponible hors ligne.

Résultat: égalité.

Feeling “Natif”

Les personnes qui m’ont opposés cet argument m’ont en fait comparé de belles applications natives avec des sites HTML5 pauvrement conçus. Sur des sites HTML5 bien designés, il est impossible pour l’utilisateur de réaliser que l’application n’est pas native. En particulier, l’interface est aussi réactive au toucher que sur les applications natives.

Résultat: égalité.

Vidéos

Les navigateurs mobiles supportent pour la plupart l’élément HTML5 <video>. Vous pouvez donc par exemple visualiser les vidéos sur Vimeo.
Il y a cependant des limitations. Le lecteur sera celui du navigateur. Vous ne pouvez pas ajouter de fonctions spécifiques à votre lecteur comme des informations supplémentaires, des boutons ou encore insérer de la publicité.

Résultat: avantage aux applications natives.

Effets graphiques, animations

Les navigateurs mobiles supportent les effets et animations 3D accélérées par le matériel. Votre “glissé” sera donc parfaitement fluide dans un site HTML5.
Mais il y a des limitations. Trop de transparence, ou encore des ombres portées vont saccader les animations et rendre le toucher moins réactif. Et je ne suis pas sûr que les constructeurs aient intérêt à lever tous ces problèmes de performances, ils sont donc là pour durer.

Résultat: avantage aux applications natives.

Portabilité

Votre application HTML5 sera portable avec très peu de travail sur toutes les plate-formes mobiles. De l’autre coté, vous devrait redévelopper votre application native de zéro sur chaque plate-forme, qui plus est avec des technologies et langages de programmation différents.

Résultat: avantage au HTML5.

Délai de mise en marché, coût de développement

C’est sur ce point en particulier que le HTML5 brille particulièrement. Vous aurez besoin de plus grosses équipes de développement, avec des compétences plus difficiles à trouver, pour atteindre un resultat comparable sur une application native. Avec une équipe comparable, vous irez plusieurs fois plus rapidement avec un site HTML5.

Résultat: avantage au HTML5.

Magasin embarqué

A moins que reverser 30% de vos revenus à Apple ne vous gêne pas, vous devrez avoir une application HTML5 pure.

Résultat: avantage au HTML5.

Conclusions

Chaque méthode de développement a ses avantages et inconvénients. Coté HTML5 vous avez des limitations techniques que vous n’avez pas avec les applications natives. Cependant, ces inconvénients n’impactent pas la plupart des applications orientées contenu ou données. Pour les autres, la liberté technique se paie au prix fort en terme de coût et de réactivité.

Donc, choisissez une application native si vous avez besoin d’un contrôle absolu sur le graphisme ou les vidéos (jeux, vidéo à la demande, etc…) ou encore que le prix ne soit pas un problème pour votre compagnie.
HTML5 sera dans la plupart des cas un meilleur choix si vous avez une équipe limitée ou que vous ayez besoin d’un magasin embarqué ou encore que votre réactivité aux changements soit primordiale.

nov. 7 11

Migrer vers le Event Sourcing, partie 8: supprimer la base relationnelle, devenir “Event Sourced”

by Julien

An english version of this post is available here.

Vue d’ensemble

Voici maintenant la dernière étape. Après tous ces changement, il devient étonnement facile de charger les agrégats à partir de leurs événements.

Avantages

Séparation complète des responsabilités

Charger nos agrégats à partir des événements nous permet de “libérer” les tables de persistance qui font pour l’instant office de modèle d’affichage. Le modèle d’affichage peut maintenant évoluer complètement indépendamment du modèle transactionnel. Comme toujours, l’absence de couplage permet une plus grande maintenabilité. Cela devrait accélérer l’ajout de nouvelles fonctionnalités.

Mais cela veut aussi dire que la partie transactionnelle de l’application n’est plus liée à un modèle relationnel dans la base de données. Elle devient donc également plus facile à faire évoluer. D’un point de vue DDD, cela permet d’abaisser la barrière de complexité qui empêche certaines équipes de refactoriser leur modèle de domaine à chaque fois qu’elles gagnent en compréhension du domaine d’affaire, ce qui est pourtant fondamental pour espérer récolter les fruits promis par le DDD.

Implémentation

Ce qui change

Nous avons besoin de changer la façon dont nous chargeons les agrégats dans le repository:

public T ById(Guid key)
{
    var events = eventStore.LoadEventHistory(key);

    var resVal = (T)constructor.Invoke(new Object[] { events });
    AddToContext(resVal);
    return resVal;
}

Nous chargeons dans une première étape l’historique complet des événements de cet agrégat, puis appelons un nouveau constructueur qui se nourrit de ces événements.
Regardons d’abord comment les événements sont chargés:

public IEventInputStream LoadEventHistory(Guid aggregateId)
{
    using (var reader = persistenceManager.ExecuteQuery("SELECT [Version], [Data] FROM [Events] WHERE aggregate_id = @AggregateId ORDER BY [Version] ASC",
        new { AggregateId = aggregateId }))
    {
        var events = new List<IEvent>();
        var version = 0;

        while (reader.Read())
        {
            version = reader.GetInt32(0);
            var data = ((SqlDataReader)reader).GetSqlBinary(1).Value;
            events.AddRange(Deserialize(data));
        }

        return new SimpleEventInputStream(events, version, aggregateId);
    }
}

Cela consiste simplement à concaténer les événements des commits de cet agrégat.

Le nouveau constructeur de l’agrégat quant à lui appelle simplement les méthodes “Apply()” que nous avons définis dans la partie 4:

public AggregateRoot(IEventInputStream events) : this(events.AggregateId)
{
    Version = events.Version;

    foreach (var evt in events.Events)
    {
        ExecuteHandler(evt);
    }
}

private void ExecuteHandler<T>(T evt) where T : IEvent
{
    var handler = applyDelegates[this.GetType()][evt.GetType()] as Action<AggregateRoot, IEvent>;
    if (handler != null) handler(this, evt);
}

Et c’est tout. Vous avez maintenant une application CQRS / Event Sourced!

Démarrer l’application exemple

Les sources pour toute la série de billets sont disponible sur GitHub à http://github.com/jletroui/TransitioningToEventSourcing
.

Vous aurez simplement besoin de créer une base de données vide intitulée “DDDPart6″ dans SQLExpress avant de lancer l’application.
Les 3 dernières étapes de cette série étant très simples, l’exemple “partie 6″ couvre en fait les parties 6 à 8.

Liste des billets “migrer vers le Event Sourcing”:

nov. 7 11

Migrer vers le Event Sourcing, partie 7: construire un modèle de lecture

by Julien

An english version of this post is available here.

Vue d’ensemble

Dans cette étape, nous nous concentrons sur la partie “requêtage” de CQRS. Nous allons pouvoir utiliser des vues conçues pour des pages spécifiques de l’application.

Avantages

Simplicité

Les tables SQL des vues sont uniquement responsables de l’affichage. Vous pouvez dé-normaliser autant que vous voulez. Vous n’êtes plus contrains par l’aspect transactionnel de votre application. Vous n’avez qu’à vous soucier de l’affichage et de la recherche. Cela signifie que ces tables sont plus simples, qu’elles contiennent moins de clefs étrangères, et qu’elle sont plus isolées (moins de dépendances). Le code source et le schéma de données résultants seront moins complexe, et donc plus maintenable. Cela accélérera probablement votre équipe lorsque vous ajouterez des fonctionnalités à votre application.

Performance

Habituellement, cette étape se débarrasse de la plupart des problèmes de performance dont votre application soufrait avec une simple modèle de données partagé. Avec une application classique, votre modèle de données transactionnel résultait habituellement en un modèle “toile d’araignée”, très intriqué. Cela signifie en général que pour afficher certaines pages, vous avez besoin de grosse requêtes inefficaces, comprenant beaucoup de jointures et autres opérations coûteuses pour la performance. Avec un modèle d’affichage conçu spécifiquement pour une page données, ces problèmes disparaissent. Vous pouvez par exemple pré calculer vos rapports en temps réel (est ce que cela a une valeur d’affaire pour votre compagnie?).

Implémentation

Ce qui a changé

Rien! Ce qui est sympathique, c’est que nous avons déjà un modèle d’affichage: l’ancien modèle transactionnel qui nous persistons avec les gestionnaires d’événements. Nos DTOs sont déjà récupérés à partir de ces tables. Alors bien sûr, ce n’est pas le modèle d’affichage le plus optimisé, mais c’est un bon point de départ. Pour l’instant, nous ne pouvons toujours pas le modifier, car nous utilisons toujours les tables pour charger nos entités. Cette limitation sautera lors de la prochaine étape, et en attendant, rien ne nous empêche de créer d’autres tables spécifiques à nos vues. En fait, nous pouvions le faire depuis que nous persistons les entités via nos gestionnaires d’événements.

Démarrer l’application exemple

Les sources pour toute la série de billets sont disponible sur GitHub à http://github.com/jletroui/TransitioningToEventSourcing
.

Vous aurez simplement besoin de créer une base de données vide intitulée “DDDPart6″ dans SQLExpress avant de lancer l’application.
Les 3 dernières étapes de cette série étant très simples, l’exemple “partie 6″ couvre en fait les parties 6 à 8.

Liste des billets “migrer vers le Event Sourcing”:

nov. 6 11

Migrer vers le Event Sourcing, partie 6: stocker les événements

by Julien

An english version of this post is available here.

Vue d’ensemble

Le travail difficile est maintenant terminé. La dernière partie essentielle manquante est celle consistant à stocker les événements. Ce qui est trivial.

Avantages

Journal

A cette étape, nous ne faisons pas grand chose avec le flux d’événements stockés. Malgré tout, il y a déjà une valeur d’affaire substantielle: un journal complet et parfait de tout ce qui s’est passé dans votre application. Nous verrons dans l’étape suivante ce que nous pouvons faire avec.

A l’échelle de votre entreprise cependant, c’est une mine d’or. En effet, cela vous permet une intégration parfaite et “gratuite” pour les autres services / applications de votre organisation. Par “parfaite”, je veux dire que peu importe ce que les autres applications ont besoin de votre part, elles le trouveront dans le journal, puisqu’il retrace exactement tout ce qui s’est passé. En s’abonnant au flux d’événements, elles peuvent même réagir à se qui se passe. Par “gratuite”, je veux dire que votre équipe n’a rien besoin de rajouter ou modifier dans votre application lorsqu’une nouvelle application veut s’y connecter, ou qu’une application déjà connectée a besoin d’information additionnelle. Les autres applications ont juste besoin de s’abonner au flux, et d’en dériver n’importe quelle information sans aucun changement de votre part. Bye bye les web services ou services REST que vous aviez besoin de maintenir vous même.

Je détaillerais les solutions techniques pour permettre ceci dans le dernier billet de cette série.

Implémentation

Ce qui change

Nous avons ajouté un composant crucial dans l’infrastructure:

Les aggregats

Pour pouvoir gérer les problèmes de concurrence, nous avons ajouté un champ dans les agrégats:

public interface IAggregateRoot
{
    Guid Id { get; }
    IEnumerable<IEvent> UncommitedEvents { get; }
    int Version { get; } // New!
}

L’entrepôt d’événementsr

L’entrepôt d’événements est responsable de stocker et d’indexer les événements.

public interface IEventStore
{
    void PersistUncommitedEvents(IAggregateRoot aggregate);
}

L’implémentation est plutôt simple J’ai choisi de stocker les événements dans une table de la base de données. Chaque tuple de la table contient tous les événements emmis par un agrégat lors d’un commit. La version correspond simplement aux nombre de commits sur cet agrégat.

La gestion de la conccurrence est gérée par une contrainte d’unicité sur l’identifiant de l’agrégat et sa version. Si la contrainte est brisée au moment de l’insertion du commit, cela signifie qu’une autre commande vous a devançé pendant que vous traitiez votre logique. J’ai emprunté cette technique à l’entrepôt d’événements CQRS de Jonathan Oliver.

public void PersistUncommitedEvents(IAggregateRoot aggregate)
{
    try
    {
        persistenceManager.ExecuteNonQuery(
            "INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
            new
            {
                Id = Guid.NewGuid(),
                Version = aggregate.Version + 1,
                AggregateId = aggregate.Id,
                Data = Serialize(aggregate.UncommitedEvents)
            });
    }
    catch (SqlException se)
    {
        // Thanks Jonathan Oliver's CQRS Event Store
        if (se.Number == UniqueKeyViolation) throw new ConcurrencyException();
        throw;
    }
}

Comment faut il réagir lorsqu’un problème de concurrence est détecté? Il suffit simplement de réessayer la commande!

Bus de commandes

Le bus est responsable dans mon exemple de réessayer la commande lorsque cela arrive:

public void Send<T>(T cmd) where T : ICommand
{
    var handler = container.Build<IHandleCommand<T>>();
    var handled = false;

    while (handled == false)
    {
        try
        {
            handler.Handle(cmd);
            // Trigger persistence and concurrency checking.
            persistenceManager.Commit();
            handled = true;
        }
        catch (ConcurrencyException)
        {
            // Hit a concurrency exception, must retry the command.
        }
    }
}

Persistence manager

La dernière piece du puzzle se situe dans le gestionnaire de persistence. Il faut naturellement rajouter un appel à l’entrepôt au moment du commit:

using (var tx = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
    context[TRANSACTION_KEY] = tx;

    try
    {
        foreach (var ar in aggregates)
        {
            lazyEventStore.PersistUncommitedEvents(ar);
        }
    }
    catch (ConcurrencyException)
    {
        tx.Rollback();
        context[TRANSACTION_KEY] = null;
        throw;
    }

    // At this stage, no concurrency issues, so pass on to the event handlers
    foreach (var ar in aggregates)
    {
        foreach (var evt in ar.UncommitedEvents)
        {
            eventBus.Publish(evt);
        }
    }

    context[TRANSACTION_KEY] = null;
    tx.Commit();

    context[AGGREGATE_KEY] = null;
}

Et voilà. Notez que nous n’avons pas eu besoin de toucher au domaine, ou à la couche de présentation. Cela n’a nécessité qu’un changement mineur dans l’infrastructure.

Démarrer l’application exemple

Les sources pour toute la série de billets sont disponible sur GitHub à http://github.com/jletroui/TransitioningToEventSourcing
.

Vous aurez simplement besoin de créer une base de données vide intitulée “DDDPart6″ dans SQLExpress avant de lancer l’application.
Les 3 dernières étapes de cette série étant très simples, l’exemple “partie 6″ couvre en fait les parties 6 à 8.

Liste des billets “migrer vers le Event Sourcing”:

oct. 15 11

Que se passe-t-il?

by Julien

Si vous me suivez depuis plus d’un an, vous avez peut être remarqué que j’ai arrêté ma série “Migrer vers le Event Sourcing” il y a déjà un certain temps. Je penses pouvoir la finir bientôt. La raison de cet arrêt est que j’ai depuis quelque mois appris et expérimenté avec beaucoup de nouveaux langages et technologies, me laissant peu d’énergie pour ce blog. La période “folle” étant terminée, j’espère pouvoir vous en faire profiter dans les mois qui viennent.

sept. 27 11

Surveillance de serveurs Ubuntu avec Nagios

by Julien

An english version of this post is available here.

Nagios est considéré par beaucoup comme le système de choix pour surveiller de 1 serveur à une ferme de taille raisonnable. Il a une bonne documentation, et pour les chanceux travaillant avec Ubuntu / Debian, les packages permettent une installation simple. Malheureusement les packages Ubuntu ne placent pas les choses au même endroit que la documentation officielle, et il y a trés peu de resources sur l’installation d’un système complet de monitoring pour les complets débutants comme moi. J’essaie donc dans ce post de combler ce manque.

Vue d’ensemble

L’idée est d’avoir un server central Nagios surveillant un certain nombre de serveurs distants. Le plugin ‘Nagios Remote Plugin Executor’ (exécuteur de plugin Nagios à distance) permet de faire cela simplement. Le plugin consiste en 2 parties:

  1. Un démon installer sur chaque serveur distant qui accepte les commands de plugin du server Nagios central.
  2. Le plugin en tant que tel sur le serveur Nagios qui peut envoyer des commandes aux serveurs distants.

Installer le démon sur les serveurs distants

Voici les quelques commandes nécessaires:

YOUR_NAGIOS_HOST_IP=<insérer votre IP de serveur Nagios central ici>
sudo apt-get -y install nagios-nrpe-server
sudo sed -i.bak -r -e 's/allowed_hosts=127.0.0.1/allowed_hosts=$YOUR_NAGIOS_HOST_IP/g' /etc/nagios/nrpe.cfg
sudo /etc/init.d/nagios-nrpe-server restart

La seconde ligne installe le démon. La 3e modifie la configuration du démon pour n’accepter des requête que de votre serveur central (précaution de sécurité minimale). Finallement, la dernière ligne redémarre le démon pour prendre en compte les changements.

Note: lorsqu’on exécute des plugins à distance, il est préferrable de ne pas autoriser le passage de paramètres pour des raisons de sécurité. C’est le défaut lorsqu’on installe le démon. Cela oblige donc à spécifier les paramères en dur sur chaque serveur distant. Vous pouvez ajuster les comandes acceptées à la fin du fichier de configuration nrpe.cfg. Voici par exemple les commandes par défaut:

command[check_users]=/usr/lib/nagios/plugins/check_users -w 5 -c 10
command[check_load]=/usr/lib/nagios/plugins/check_load -w 15,10,5 -c 30,25,20
command[check_hda1]=/usr/lib/nagios/plugins/check_disk -w 20% -c 10% -p /dev/hda1
command[check_zombie_procs]=/usr/lib/nagios/plugins/check_procs -w 5 -c 10 -s Z
command[check_total_procs]=/usr/lib/nagios/plugins/check_procs -w 150 -c 200

Installer le serveur central Nagios

Voici la commande:

sudo apt-get -y install nagios3 nagios-nrpe-plugin

Le package va installer postfix (pour envoyer les notifications par email), apache, et configurer l’utilisateur ‘nagiosadmin’ pour vous. Vous n’avez plus qu’à configurer les objets Nagios dans /etc/nagios3/conf.d en suivant la documentation régulière, trés complète sur ce sujet. Voici un exemple de configuration d’hôte distant, ainsi qu’un service appelant une commande sur ce serveur:

define host{
        use                     generic-host            ; Name of host template to use
        host_name               myServer1
        alias                   My Server 1
        address                 <ip of myServer1 here>
        }
define service{
        use                             generic-service         ; Name of service template to use
        host_name                       myServer1
        service_description             Disk Space
        check_command                   check_nrpe_1arg!check_hda1
        }

Une fois votre configuration terminée, vous pouvez la tester avec la commande suivante:

sudo /usr/sbin/nagios3 -v /etc/nagios3/nagios.cfg

Puis si tout va bien, redémarrer Nagios pour charger la nouvelle configuration:

sudo /etc/init.d/nagios3 restart

Finallement, vous pouvez ouvrir un navigateur à l’adresse http://IPDeVotreServeurNagios/nagios3. L’utilisateur est nagiosadmin et le mot de passe est celui que vous avez choisi durant installation.

Voilà, cela devrait vous permettre de mettre en place facilement votre première surveillance avec Nagios. Tous les commentaires sont évidemment les bienvenus.

mars 7 11

Annonce locale: Jackson Harper à devLAB ce mercredi à Montréal

by Julien

Jackson a créé un nouveau framework web pour mono / .NET dans les lignes de Sinatra ou Node.js, ce qui est relativement rare dans le paysage .NET et nous change d’ASP.NET MVC. Vous pouvez en lire plus à ce sujet sur son blog. Les détail de sa présentation de mercredi sont ici.

mars 7 11

Visual Studio Talk Show sur CQRS

by Julien

Le podcast Visual Studio Talk Show (qui est un podcast en français en dépit de son nom) m’a invité pour parler de CQRS et Event Sourcing. Vous pouvez écouter l’émission ici.