Skip to content
Aug 23 14

Testing Apache Hive systems

by Julien

Une version française de ce billet est disponible ici.

Context

Testing a BI system based on Hive requires a bit of effort. Some projects can just use the embedded mode of Hive. But sometimes, this is not enough. In particular when you are using external BI tools you want to integrate in your tests.

This post explains the solution we end up using at Triton Digital for such a project.

Hadoop client

To tests the launch of mapred tasks, we need a hadoop client. Hive is using directly the “hadoop” script for launching mapred tasks.

A few lines in SBT allows us to bypass the Hadoop distribution. Let’s begin by the necessary dependencies:

import java.io.PrintWriter

resolvers += "Cloudera repo" at "https://repository.cloudera.com/artifactory/cloudera-repos"

ivyConfigurations += config("hadoop")

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client"   % "2.3.0-mr1-cdh5.0.0"  % "hadoop",
  "org.apache.hive"   % "hive-exec"       % "0.12.0-cdh5.0.0"     % "hadoop"
)

val dumpHadoopClasspath = TaskKey[Unit]("dump-hadoop-classpath", "Dumps hadoop classpath in a file")

dumpHadoopClasspath := {
  val printer = new PrintWriter("hadoop.classpath")
  printer.print(update.value.select(configurationFilter("hadoop")).map(_.getCanonicalPath).mkString(":"))
  printer.close()
}

test in Test <<= (test in Test) dependsOn dumpHadoopClasspath

Since we don’t need those jars in the classes of our project, we place them in an isolated ivy configuration labeled “hadoop”.

We also build the resulting classpath in a file so we can use it in a personalized hadoop startup script.

“hadoop” script

We start from the regular hadoop script that one can find in the bin directory of any hadoop distribution. We just modify the start of the script to use the classpath we created above:

# This one is commented since we manually 'take over' the config below
#. "$bin"/hadoop-config.sh

export HADOOP_HOME=$(pwd)
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
HADOOP_USER_CLASSPATH_FIRST=true
HADOOP_CLASSPATH=$(cat hadoop.classpath)

At this point, we should have a working hadoop client:

> sbt update
[info] Loading global plugins from /Users/jletrouit/.sbt/0.13/plugins
[info] Loading project definition from /Users/jletrouit/Documents/perso/hivetest/project
[info] Set current project to hivetest (in build file:/Users/jletrouit/Documents/perso/hivetest/)
[info] Updating {file:/Users/jletrouit/Documents/perso/hivetest/}hivetest...
[info] Resolving org.apache.velocity#velocity;1.7 ...
[info] Done updating.
[success] Total time: 9 s, completed 1-Jun-2014 3:30:09 PM

> ./hadoop version
Hadoop 2.3.0-cdh5.0.0
Subversion git://github.sf.cloudera.com/CDH/cdh.git -r 8e266e052e423af592871e2dfe09d54c03f6a0e8
Compiled by jenkins on 2014-03-28T04:29Z
Compiled with protoc 2.5.0
From source with checksum fae92214f92a3313887764456097e0

Hive test server

Next we need to start a Hive instance in our tests. For that, we have to first start a hadoop cluster. Here are the dependencies you will need:

libraryDependencies ++= Seq(
  "org.apache.hive"   % "hive-jdbc"     % "0.12.0-cdh5.0.0" excludeAll(
    ExclusionRule(organization = "junit"),
    ExclusionRule(organization = "org.apache.avro")
  ),
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-test"     % "2.3.0-mr1-cdh5.0.0"  % "test" exclude("net.java.dev.jets3t", "jets3t"),
  "org.apache.hadoop" % "hadoop-auth"     % "2.3.0-cdh5.0.0"      % "test",
  "org.specs2"        %% "specs2"         % "2.3.12"              % "test"
)

The hadoop tests dependencies allow us to start a mini cluster:

  val currentDir = new File(".").getCanonicalPath()
  val conf = new Configuration()

  // This is making sure we are not picking up locally installed hadoop libraries and stay isolated
  System.setProperty("java.library.path","")
  // We could use a temporary directory, but those logs can be useful for debugging a test failing
  System.setProperty("hadoop.log.dir", "logs/hadoop") // MAPREDUCE-2785

  conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, createTempDirectory("dfs_base"))
  conf.set("hadoop.home.dir", currentDir)
  conf.set("dfs.permissions", "false")
  conf.set("hadoop.security.authorization", "false")

  val miniDFS = new MiniDFSCluster.Builder(conf).build()

  val miniMR = new MiniMRCluster(
    1,      // numTaskTrackers
    miniDFS.getFileSystem().getUri().toString(),
    1,      // numTaskTrackerDirectories
    null,   // racks
    null,   // hosts
    new JobConf(conf))

  // Save those for later
  val jt = miniMR.createJobConf(new JobConf(conf)).get("mapred.job.tracker")
  val warehouseDir = "file" + File.pathSeparator + createTempDirectory("hive_warehouse")

We can then start a hive server based on the previous cluster:

  val hiveConf = new HiveConf(getClass())

  hiveConf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  // Hive still need to use a hadoop command line tool. This one bundled with the project is pointing to the
  // minimal hadoop client jars we are downloading through SBT in the "hadoop" ivy config.
  hiveConf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")

  val server = new HiveServer2()
  server.init(hiveConf)
  server.start()

However, we will need a couple of adjustments to make everything work together.

Transfer the custom configuration to Hive sessions

The configuration of the HiveServer2 is not passed to the internal Hive session that is created when connecting through JDBC. Instead, the server implementation is creating a brand new instance of the configuration. This is working well on a standard deployment, where the new instance is picking up the hive-site.xml configuration file. But obviously, this is not picking our programatically customized configuration. A small hack allows us to inject this config to the JDBC session:

val hiveConf = new HiveConf(getClass())
configureHive(hiveConf)
hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK.varname, classOf[TestHiveSessionHook].getCanonicalName())

val server = new HiveServer2()
server.init(hiveConf)
server.start()

def configureHive(conf: HiveConf) {
  conf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  conf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")
}

class TestHiveSessionHook extends HiveSessionHook {
  def run(ctx: HiveSessionHookContext) {
    TestHiveServer2.configureHive(ctx.getSessionConf)
  }
}

Tame down timeouts

Usually, all Hive components reside in separate JVMs. So even if the JDBC DriverManager is static, JDBC timeouts for satellite components dont step on each other. In our case, we need to make sure all of them are reasonably high to allow our request to complete:

conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT.varname, "1200")
conf.set(HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT.varname, "1200")

We can now create a genuine JDBC connection:

using(TestHiveServer2.createConnection) { con =>

  val smt = con.createStatement()

  smt.executeUpdate("create table ngram (ngram string, year int, occurences int, documents int) row format delimited fields terminated by '\\t' stored as textfile")
  smt.executeUpdate(s"load data local inpath '$dir/googlebooks-eng-all-1gram-20120701-z-sample.tsv' overwrite into table ngram")

  using(smt.executeQuery("select ngram, SUM(documents) as total_documents from ngram group by ngram")) { rs =>
    List(
      {rs.next(); rs.getString(1)} -> rs.getInt(2),
      {rs.next(); rs.getString(1)} -> rs.getInt(2)
    ) mustEqual List(
      "zenith" -> 426197,
      "zooplankton" -> 24939
    )
  }
}

And since it is a genuine, full fledged Hive instance, we can even pause the tests with something like:

StdIn.readLine()

and inspect your Hive database using the beeline command tool by connecting to “jdbc:hive2://localhost”.

You can find a fully working sample project on GitHub: https://github.com/jletroui/hivetest

Happy testing!

Nov 14 11

Transitioning to Event Sourcing, part 9: additional tools and patterns

by Julien

Une version française de ce billet est disponible ici.

I will pass briefly on topics of interest and additional features a CQRS / Event Sourcing infrastructure could implement. Each of those topics would probably deserve their own post or even a book. If you need more information, there are a lot of material on theses subjects in discussion groups. Greg Young will also soon release an entire book on these subjects. So here are brief introductions:

Snapshots

Obviously, pure event sourcing like we implemented in part 8 has its limitations. It is fine for aggregates with few thousands events, but after that, we need to implement snapshoting.
The idea is to allow the aggregate to create a snapshot. When loading an aggregate, we only fetch a snapshot and the events that occurred since the snapshotting. Snapshots are stored in a table that is containing both the latest version of the aggregate, the snapshot data, and the snapshot version.
You should delegate the snapshoting process to a standalone process. The application itself do not have to manage snapshot creation. The event store will just “consume” snapshots created by the snapshotter. The snapshotter just need to query the snapshot table for aggregates that had, say, 100 commits or more since their last snapshot. It can simply load the aggregate in question and ask it for a new snapshot.

Versioning

What happen when you need to change an event? In most cases, you don’t want to change the events already committed in the event store. You simply create a new event version (for example CustomerAddressCorrectedv2), and make the aggregate consume this event instead of the old one. You then inserts a converter at the output of the event store that will convert the old ones to the new ones before passing it to the aggregate constructor.
How do you convert an old event to a new one? Let’s say you have a new field in the event. You will have to take the same kind of decision as when you insert a new column in a table: you have to calculate a default value for that new field.
What happen when this is completely impossible? As explained by Greg Young, this probably means that the new event is not a new version of the same type of event, but actually a new event type. In that case, no upconversion is possible.

Rinat Abdulin is covering a few examples and more details in his versioning bliki page.

View model

Now that you are not tied to the transactional model for your views, you have much more liberty. In particular, why not store directly the object you will need to display in a page instead of trying to do a cumbersome paradigm shift from and to a relational database? Databases that are storing unrelated object trees are called document databases. On .NET, I would highly recommend RavenDb. On other platforms, there are numerous choices like MongoDb. Some of them are even producing JSON output, so you could transfer the object directly to your AJAX application, without any transformation taking place in your application layer.

Again, Rinat Abdulin is providing an interesting read on the subject.

Smart merging strategies

So far, our concurrency strategy has been quite dumb: we retry the entire command when we detect a conflict. And this is fine for most of the applications. But if we need, we could also get the events that have been committed in between and look at them. We may be able to commit the events even if the command has been raced by an other one. That is, if the other events are not incompatible with our commit. Since events are capturing a detailed and very high meaning of what happened, this is usually simple to creates compatibility rules between pair of events. A good starting rule is, if there are one instance of the same kind of event in the 2 commits, we consider that as a real concurrency issue. But we can add smarter rules that actually look at the content of events.

This is particularly useful for highly collaborative applications. But it also opens to additional possibilities in deconnected systems. This is the case in the domain of mining prospection or in the police. For those cases, the client can embark a copy of the event store, and work from it. When the client reconnects to the server later, the commits are merged. Only conflicting events are merged manually by the user. By the way, this is how most of the relational database systems are implementing replication, through their transaction log.

Service integration

This is a big topic. I will here only scratch the surface. An immense amount of money is spent worldwide integrating services within companies and across companies. There are multiple reason why integration is hard. Most of the time, applications are not designed to be integrated, and are implemented as if there we an isolated component. Fortunately, with the trend in SOA, integration becomes more of a concern now. An other problem are that most of today’s integration technologies are synchronous by default: REST, web services, CORBA, RMI, WCF and so on. All this synchronous communication does not scale. First, network latency kicks in. So if a service needs to call an other service that needs to call an other service that… You get the idea. And then, the more components you add to the chain, the more likely you will get a failure along the way. And I am not talking about the maintenance nightmare that such a web of interconnected services are producing.

Event sourcing is allowing for a very clean approach. Combined with publish / subscribe, applications becomes disconnected in time, and decoupled in responsibilities. If other applications need to integrate with yours, just ask them to subscribe to your event stream. You can publish your event stream with components called Service Buses, that precisely allow that. Once you are publishing to a service bus, you don’t even need to be aware that a new application is now integrated with yours. All that application has to do is to download the history of your event stream, subscribes to the stream, and then derive whatever information they need from it, or react to events in your application in very little time:

This is solving the “always up” issue. If one or the other application goes down, the events can be queued in the service bus until the consumer system goes back up.
If you choose a serialization format carefully, it also solves most of the maintainability issues. The source application can create new types of event without impacting the consumer applications. Since the applications are responsible for projecting what they need from the event stream, they will be responsible for taking into account (or not) the new types of event. Finally, consumer applications will not need new features or data from your application, since the event stream already includes everything that ever happened in your system.

Eventual consistency

Eventual consistency is when there is a delay between the commit of the aggregate and the corresponding refresh of your view model. Or more precisely, when those 2 processes happen in different transactions. Concretely, you only commit events at commit time, and a different process is picking them up to execute the view model handlers. If you already have a service bus (see “Service integration”), you can use it. And indeed, the view model can be viewed as a separate service, almost a different bounded context if you will.

However, most of the applications don’t need such a separation. Until you need it, you should commit the changes to your view model along with the events.

When do you need to split things that way? One reason is scalability, and we will talk about this one later. One other factor could be performance. Your view model might be huge, or have parts of it that are slow to update. You may want to isolate the update of those parts in a separate process, in order to keep a good performance when executing commands.

Scaling the query side

Once you have eventual consistency, it is quite trivial to scale the read side. This is good news, because most applications have usually a lot more reads than writes. All you have to do is to put a copy of your view model on each web node. Each of those view models would subscribe to the event stream:

That way, you have an almost linear scalability of your read side: just add more nodes. The other benefit is the gained performance due to the local access to the view model. You don’t need a network call anymore because your web layer sits on the same node as its view model.

Now, if you want to scale at very large levels, your problem is reduced to scaling the publish / subscribe middle ware.

Scaling the write side

For scaling this one, you will first need to process commands asynchronously. The UI would need to send the command and not wait for the result. This usually triggers a very different kind of user experience. Think of it as ordering something on Amazon: what you get when you are clicking on the “submit” button is not a success or failure statement, but something more along the lines of “Your order has been taken into account. You will receive a confirmation email shortly”.

If you need it, there are however ways for the UI to asynchronously fetch a command result.

Once you are sending commands to a queue, you can have competing nodes treating those commands. But that is only scaling the business logic treatment, not the event storage. Luckily, by its structure, it is also quite simple to shard events by their aggregate id. Since commands contains the aggregate id, you can route your commands to the right queue:

Transitioning to Event Sourcing posts:

Nov 10 11

Mobile development: going HTML5 or native?

by Julien

Une version française de ce billet est disponible ici.

Having tested mobile HTML5 development, I heard a lot of misconceptions about HTML5 applications on mobile devices. Here are some points of interest:

App store

Natively, HTML5 AJAX websites can be bookmarked on the device’s home. However, it seems most users prefer to search and install apps from the App store (although there are exceptions). I have heard this argument over and over in favor of native apps.

It turns out you can perfectly embed your HTML5 website for a mobile device in a wrapper that will make it available on each app store.

Advantage: this is a tie.

Offline capabilities

With the HTML5 local storage, you can cache and store data like a native app.

Advantage: this is a tie.

“Native” feeling

It turns out that proponents of native apps often compare very well designed native apps to poorly designed HTML5 websites. On well designed websites, it is frankly impossible for users to tell the difference. The touch interfaces are as reactive as on native apps.

Advantage: this is a tie.

Videos

iOS safari browser supports the <video> tag, so you can embed videos in your HTML page.
There are limitations though: the player is the browser’s built in one. You can not add specific features to the video player, like overlay information, advertisement insertion, additional buttons, etc..

Advantage: native apps.

Graphical effects, animations

iOS safari supports hardware accelerated 3D animations and transitions. So your sliding will be perfectly smooth in an HTML5 app. But there are some things that are less well supported like drop shadows that are slowing down transitions and touch reactivity.

Advantage: native apps.

Portability

Your HTML5 application will be portable with minimal work on all mobile platforms. On the other hand , you will need to redevelop from scratch your native application on each platform with different technologies and languages.

Advantage: HTML5

Time to market, cost of development

HTML5 is here clearly a winner. You will need much bigger teams, with more difficult to find skills, to achieve the same result with native apps. At comparable team size, you will be able to go several times faster with HTML5.

Advantage: HTML5

In app store

Unless you accept Apple to take 30% of your income, you will need to make your app purely HTML5.

Advantage: HTML5

Conclusion

We can see that both methods have their pros and cons. Generally speaking, native apps gives you a bit more control, especially for applications that are a bit off track, the ones that needs very specific customization, or the ones that are heavy on the graphics. But this comes at a price: you will need difficult to find talents, and much more of them, or your time to market will be significantly impacted.

So, choose native apps if you need absolute control on graphics and video.
Choose HTML5 if your application is data or content oriented, your budget is limited, you need an in app store, or if you need to be extremely reactive to changes.

Nov 7 11

Transitioning to Event Sourcing, part 8: remove the domain database, go event sourced

by Julien

Une version française de ce billet est disponible ici.

Overview

Finally we can terminate this refactoring. It is now trivial to use events to load our aggregates.

Benefits

Complete responsibility segregation

Loading our aggregates from the events allows us to “free” the persistent tables that are still performing the role of view model. This view model can now evolve independently, there is no more coupling to the transactional part of the application. And as always, less coupling means easier maintainability, and should speed up the addition of new features.

It also means that the transactional part of your application is no longer tied to a relational model. It is therefore much more easier to make it evolve. From the DDD point of view, it lowers the complexity barrier that push some teams to not re-factor their domain model as their understanding of the business evolve.

Implementation

What changed

What we need to change is the way we are loading our aggregates in the repository:

public T ById(Guid key)
{
    var events = eventStore.LoadEventHistory(key);

    var resVal = (T)constructor.Invoke(new Object[] { events });
    AddToContext(resVal);
    return resVal;
}

As you can see, we are first loading all the event history of the particular aggregate, and pass them to a new constructor on the aggregate.
Let’s look at the event store new method first:

public IEventInputStream LoadEventHistory(Guid aggregateId)
{
    using (var reader = persistenceManager.ExecuteQuery("SELECT [Version], [Data] FROM [Events] WHERE aggregate_id = @AggregateId ORDER BY [Version] ASC",
        new { AggregateId = aggregateId }))
    {
        var events = new List<IEvent>();
        var version = 0;

        while (reader.Read())
        {
            version = reader.GetInt32(0);
            var data = ((SqlDataReader)reader).GetSqlBinary(1).Value;
            events.AddRange(Deserialize(data));
        }

        return new SimpleEventInputStream(events, version, aggregateId);
    }
}

It is basically concatenating events of the commits of the given aggregate.

The new constructor in the aggregate root is even simpler. It is just calling the aggregate’s “Apply()” methods we already defined in part 4:

public AggregateRoot(IEventInputStream events) : this(events.AggregateId)
{
    Version = events.Version;

    foreach (var evt in events.Events)
    {
        ExecuteHandler(evt);
    }
}

private void ExecuteHandler<T>(T evt) where T : IEvent
{
    var handler = applyDelegates[this.GetType()][evt.GetType()] as Action<AggregateRoot, IEvent>;
    if (handler != null) handler(this, evt);
}

And that is it. You now have a fully working CQRS / Event Sourced application!

Running the sample

The sources for the entire series is available on GitHub at http://github.com/jletroui/TransitioningToEventSourcing
.

To run this sample, simply create a new “DDDPart6″ database in SQLExpress before launching it.
Last 3 steps of this series being very small and composable together, the single “part 6″ sample is actually covering parts 6 to 8.

Transitioning to Event Sourcing posts:

Nov 7 11

Transitioning to Event Sourcing, part 7: build a view model

by Julien

Une version française de ce billet est disponible ici.

Overview

In this part, we are concentrating on the query part of CQRS. We will now be able to get views specifically for our pages, pre-calculated.

Benefits

Simplicity

Your SQL tables for the views are now solely responsible for display. Please denormalize as much as you want. You are not constrained by the transactional model anymore, you just need to focus on a single responsibility. It means individual, simpler tables or documents. It means less entanglements (no more foreign keys needed!), and less dependencies. Your code and data model will become less complex, and thus more maintainable. It will probably speed up your team when implementing new features.

Performance

Usually, this step gets rid of much of the performance problems an application with a single shared data model usually experience. With a classical application, transactional behavior results in a very entangled normalized database. It means displaying a page in your application requires big, inefficient queries with lots of joins and other performance crippling SQL operations. With a view model specifically tailored for a given page, this problem disappear. You can now pre calculate your reports in real time (would that bring any business value?) just by handling the events.

Implementation

What changed

Nothing! That is the cool thing about this step is that we already have a view model: the old entities persisted through our persisting event handlers. Our DTOs are already querying these tables. So no, this is not the most optimized view model, but it is still a perfectly valid starting point. For now, we are still using this model, so we can’t still modify it, but we can add views nonetheless. Actually, we could do that since we are persisting changes through the event handlers.

Running the sample

The sources for the entire series is available on GitHub at http://github.com/jletroui/TransitioningToEventSourcing
.

To run this sample, simply create a new “DDDPart6″ database in SQLExpress before launching it.
Last 3 steps of this series being very small and composable together, the single “part 6″ sample is actually covering parts 6 to 8.

Transitioning to Event Sourcing posts:

Nov 6 11

Transitioning to Event Sourcing, part 6: store events

by Julien

Une version française de ce billet est disponible ici.

Overview

All the hard work is behind now. The most crucial part missing for going event sourced is actually to store the events, which is quite trivial.

Benefits

Log

At this stage, we are not doing a lot with the events, but storing them is giving your business a great value: complete, perfect log of what happened in the application. We’ll see in part 7 that you can derive all sorts of valuable information from that event stream.

It also provide an invaluable asset: perfect, automatic integration for whichever other application in your enterprise. By “perfect”, I mean that whatever the client application needs from yours, everything is in the event log. By listening to this log, the other application can extract whatever information they need, and also react to whatever happen. By “automatic”, I mean your team does not need to do anything special to accommodate a new integrating application. The integrating application just subscribe to your event stream, and they are extracting whatever they need themselves. Your team doe not have to add features to a web service or a rest api to give access to something the other team would need.

I will detail a bit more how you can achieve that from the event log in the last post of this series.

Implementation

What changed

We’ve added a crucial new component in the infrastructure:

The aggregates

In order to deal with concurrency, we have added a small property in aggregates:

public interface IAggregateRoot
{
    Guid Id { get; }
    IEnumerable<IEvent> UncommitedEvents { get; }
    int Version { get; } // New!
}

The event store

The event store is the component that will be responsible for storing events:

public interface IEventStore
{
    void PersistUncommitedEvents(IAggregateRoot aggregate);
}

The implementation is pretty straightforward. I chose to store the events in an “Events” table in SQL. Each row will contain all the events emitted during one commit for a given aggregate. Concurrency is achieved by adding a unique constraint on the aggregate id and the version. If you break this constraint when committing, it means that an other command raced you while your where processing this one. I borrowed this technique from Jonathan Oliver’s Event Store.

public void PersistUncommitedEvents(IAggregateRoot aggregate)
{
    try
    {
        persistenceManager.ExecuteNonQuery(
            "INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
            new
            {
                Id = Guid.NewGuid(),
                Version = aggregate.Version + 1,
                AggregateId = aggregate.Id,
                Data = Serialize(aggregate.UncommitedEvents)
            });
    }
    catch (SqlException se)
    {
        // Thanks Jonathan Oliver's CQRS Event Store
        if (se.Number == UniqueKeyViolation) throw new ConcurrencyException();
        throw;
    }
}

So what do you do when you detect a concurrency issue? Simple: you just need to retry your command!

Command bus

The command bus is responsible for retrying a failed attempt when such a concurrency occur. It will reload the aggregate and starts over:

public void Send<T>(T cmd) where T : ICommand
{
    var handler = container.Build<IHandleCommand<T>>();
    var handled = false;

    while (handled == false)
    {
        try
        {
            handler.Handle(cmd);
            // Trigger persistence and concurrency checking.
            persistenceManager.Commit();
            handled = true;
        }
        catch (ConcurrencyException)
        {
            // Hit a concurrency exception, must retry the command.
        }
    }
}

Persistence manager

The final bit is calling the event store while committing in the persistence manager:

using (var tx = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
    context[TRANSACTION_KEY] = tx;

    try
    {
        foreach (var ar in aggregates)
        {
            lazyEventStore.PersistUncommitedEvents(ar);
        }
    }
    catch (ConcurrencyException)
    {
        tx.Rollback();
        context[TRANSACTION_KEY] = null;
        throw;
    }

    // At this stage, no concurrency issues, so pass on to the event handlers
    foreach (var ar in aggregates)
    {
        foreach (var evt in ar.UncommitedEvents)
        {
            eventBus.Publish(evt);
        }
    }

    context[TRANSACTION_KEY] = null;
    tx.Commit();

    context[AGGREGATE_KEY] = null;
}

And that is it. We don’t need to change anything in the domain or presentation layer, this was purely an infrastructure change.

Running the sample

The sources for the entire series is available on GitHub at http://github.com/jletroui/TransitioningToEventSourcing
.

To run this sample, simply create a new “DDDPart6″ database in SQLExpress before launching it.
Last 3 steps of this series being very small and composable together, the single “part 6″ sample is actually covering parts 6 to 8.

Transitioning to Event Sourcing posts:

Oct 15 11

Quick update

by Julien

If you are following me for more than one year, you may have noticed I stopped my series “Transitioning to Event Sourcing” a while ago. I intend to finish it really soon. The reason I have been silent so long is I have been learning and experimenting intensively new platforms, technologies and languages. I hope to give back some of this learnings in this blog in the coming months. So keep reading!

Sep 27 11

Monitoring Ubuntu nodes with Nagios

by Julien

Une version française de ce billet est disponible ici.

Nagios is considered by a lot of people as the number 1 tool when it comes to monitor from one server to a reasonable server farm. It has a good documentation, and lucky for people like me that are dealing with Ubuntu / Debian servers, the Ubuntu packages allows simple installation. The only problem are the Ubuntu packages are placing things in different places than the official “from the source” installation guides, and there are not much tutorial for installing a full monitoring system for total noobs like me. So here is my try to explain this.

Overview

The idea is to have one central monitoring Nagios host that will monitor a bunch of remote hosts. This is easily achieved through a plugin called Nagios Remote Plugin Executor (NRPE for short), which name quite explain what it does. This plugin has 2 parts:

  1. A daemon that you install on each remote host and that is accepting plugin commands from the nagios host
  2. A plugin in the Nagios host that knows how to send commands to each of the remote hosts

Setuping the remote hosts

This part is quite easy. Here are the commands:

YOUR_NAGIOS_HOST_IP=<put your central Nagios host ip here>
sudo apt-get -y install nagios-nrpe-server
sudo sed -i.bak -r -e 's/allowed_hosts=127.0.0.1/allowed_hosts=$YOUR_NAGIOS_HOST_IP/g' /etc/nagios/nrpe.cfg
sudo /etc/init.d/nagios-nrpe-server restart

The second line is installing the daemon. The next one is instructing the daemon to accept commands only from your central nagios host (this is a minimal security precaution). Finally, you restart the daemon for taking the config change into account.

Note: when executing plugins remotely, it is more secure to not allow arguments to the plugins from the Nagios host, this is preventing script insertion, etc… This is the default setting in the config file, and this means you have to hard code the arguments of each command on the remote host. You can see the default ones and add some more at the end of the nrpe.cfg file. Here are the default commands:

command[check_users]=/usr/lib/nagios/plugins/check_users -w 5 -c 10
command[check_load]=/usr/lib/nagios/plugins/check_load -w 15,10,5 -c 30,25,20
command[check_hda1]=/usr/lib/nagios/plugins/check_disk -w 20% -c 10% -p /dev/hda1
command[check_zombie_procs]=/usr/lib/nagios/plugins/check_procs -w 5 -c 10 -s Z
command[check_total_procs]=/usr/lib/nagios/plugins/check_procs -w 150 -c 200

Setuping the central Nagios host

Here is the command:

sudo apt-get -y install nagios3 nagios-nrpe-plugin

And that is it! The package will install postfix (required for sending email notification), apache and configure the ‘nagiosadmin’ user for you. All you have to do is to configure the objects in the /etc/nagios3/conf.d directory according to the documentation. Here is an example configuration of a remote host as well as a service calling the default remote hda1 check:

define host{
        use                     generic-host            ; Name of host template to use
        host_name               myServer1
        alias                   My Server 1
        address                 <ip of myServer1 here>
        }
define service{
        use                             generic-service         ; Name of service template to use
        host_name                       myServer1
        service_description             Disk Space
        check_command                   check_nrpe_1arg!check_hda1
        }

You can test the configuration with:

sudo /usr/sbin/nagios3 -v /etc/nagios3/nagios.cfg

If everything is fine, you can restart Nagios with:

sudo /etc/init.d/nagios3 restart

Finally, you can open a browser and go to http://YourNagiosHostIp/nagios3. The user is nagiosadmin and the password is the one you chose during installation.

This should give you a head start for working with Nagios.

Mar 7 11

Local announcement: Jackson Harper at devLAB Montreal this wednesday

by Julien

Jackson has created a fresh new web framework working on mono, inspired by Sinatra, Node.js, and the likes. You can read more on the buzz on his blog. Talk details here.

Mar 7 11

Visual Studio Talk Show on CQRS

by Julien

The Visual Studio Talk show (which despite its name is a french speaking podcast) invited me to talk about CQRS. You can listen to the show here.