Skip to content

Testing Apache Hive systems

by Julien on August 23rd, 2014

Une version française de ce billet est disponible ici.

Context

Testing a BI system based on Hive requires a bit of effort. Some projects can just use the embedded mode of Hive. But sometimes, this is not enough. In particular when you are using external BI tools you want to integrate in your tests.

This post explains the solution we end up using at Triton Digital for such a project.

Hadoop client

To tests the launch of mapred tasks, we need a hadoop client. Hive is using directly the “hadoop” script for launching mapred tasks.

A few lines in SBT allows us to bypass the Hadoop distribution. Let’s begin by the necessary dependencies:

import java.io.PrintWriter

resolvers += "Cloudera repo" at "https://repository.cloudera.com/artifactory/cloudera-repos"

ivyConfigurations += config("hadoop")

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client"   % "2.3.0-mr1-cdh5.0.0"  % "hadoop",
  "org.apache.hive"   % "hive-exec"       % "0.12.0-cdh5.0.0"     % "hadoop"
)

val dumpHadoopClasspath = TaskKey[Unit]("dump-hadoop-classpath", "Dumps hadoop classpath in a file")

dumpHadoopClasspath := {
  val printer = new PrintWriter("hadoop.classpath")
  printer.print(update.value.select(configurationFilter("hadoop")).map(_.getCanonicalPath).mkString(":"))
  printer.close()
}

test in Test <<= (test in Test) dependsOn dumpHadoopClasspath

Since we don’t need those jars in the classes of our project, we place them in an isolated ivy configuration labeled “hadoop”.

We also build the resulting classpath in a file so we can use it in a personalized hadoop startup script.

“hadoop” script

We start from the regular hadoop script that one can find in the bin directory of any hadoop distribution. We just modify the start of the script to use the classpath we created above:

# This one is commented since we manually 'take over' the config below
#. "$bin"/hadoop-config.sh

export HADOOP_HOME=$(pwd)
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
HADOOP_USER_CLASSPATH_FIRST=true
HADOOP_CLASSPATH=$(cat hadoop.classpath)

At this point, we should have a working hadoop client:

> sbt update
[info] Loading global plugins from /Users/jletrouit/.sbt/0.13/plugins
[info] Loading project definition from /Users/jletrouit/Documents/perso/hivetest/project
[info] Set current project to hivetest (in build file:/Users/jletrouit/Documents/perso/hivetest/)
[info] Updating {file:/Users/jletrouit/Documents/perso/hivetest/}hivetest...
[info] Resolving org.apache.velocity#velocity;1.7 ...
[info] Done updating.
[success] Total time: 9 s, completed 1-Jun-2014 3:30:09 PM

> ./hadoop version
Hadoop 2.3.0-cdh5.0.0
Subversion git://github.sf.cloudera.com/CDH/cdh.git -r 8e266e052e423af592871e2dfe09d54c03f6a0e8
Compiled by jenkins on 2014-03-28T04:29Z
Compiled with protoc 2.5.0
From source with checksum fae92214f92a3313887764456097e0

Hive test server

Next we need to start a Hive instance in our tests. For that, we have to first start a hadoop cluster. Here are the dependencies you will need:

libraryDependencies ++= Seq(
  "org.apache.hive"   % "hive-jdbc"     % "0.12.0-cdh5.0.0" excludeAll(
    ExclusionRule(organization = "junit"),
    ExclusionRule(organization = "org.apache.avro")
  ),
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-test"     % "2.3.0-mr1-cdh5.0.0"  % "test" exclude("net.java.dev.jets3t", "jets3t"),
  "org.apache.hadoop" % "hadoop-auth"     % "2.3.0-cdh5.0.0"      % "test",
  "org.specs2"        %% "specs2"         % "2.3.12"              % "test"
)

The hadoop tests dependencies allow us to start a mini cluster:

  val currentDir = new File(".").getCanonicalPath()
  val conf = new Configuration()

  // This is making sure we are not picking up locally installed hadoop libraries and stay isolated
  System.setProperty("java.library.path","")
  // We could use a temporary directory, but those logs can be useful for debugging a test failing
  System.setProperty("hadoop.log.dir", "logs/hadoop") // MAPREDUCE-2785

  conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, createTempDirectory("dfs_base"))
  conf.set("hadoop.home.dir", currentDir)
  conf.set("dfs.permissions", "false")
  conf.set("hadoop.security.authorization", "false")

  val miniDFS = new MiniDFSCluster.Builder(conf).build()

  val miniMR = new MiniMRCluster(
    1,      // numTaskTrackers
    miniDFS.getFileSystem().getUri().toString(),
    1,      // numTaskTrackerDirectories
    null,   // racks
    null,   // hosts
    new JobConf(conf))

  // Save those for later
  val jt = miniMR.createJobConf(new JobConf(conf)).get("mapred.job.tracker")
  val warehouseDir = "file" + File.pathSeparator + createTempDirectory("hive_warehouse")

We can then start a hive server based on the previous cluster:

  val hiveConf = new HiveConf(getClass())

  hiveConf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  // Hive still need to use a hadoop command line tool. This one bundled with the project is pointing to the
  // minimal hadoop client jars we are downloading through SBT in the "hadoop" ivy config.
  hiveConf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")

  val server = new HiveServer2()
  server.init(hiveConf)
  server.start()

However, we will need a couple of adjustments to make everything work together.

Transfer the custom configuration to Hive sessions

The configuration of the HiveServer2 is not passed to the internal Hive session that is created when connecting through JDBC. Instead, the server implementation is creating a brand new instance of the configuration. This is working well on a standard deployment, where the new instance is picking up the hive-site.xml configuration file. But obviously, this is not picking our programatically customized configuration. A small hack allows us to inject this config to the JDBC session:

val hiveConf = new HiveConf(getClass())
configureHive(hiveConf)
hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK.varname, classOf[TestHiveSessionHook].getCanonicalName())

val server = new HiveServer2()
server.init(hiveConf)
server.start()

def configureHive(conf: HiveConf) {
  conf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  conf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")
}

class TestHiveSessionHook extends HiveSessionHook {
  def run(ctx: HiveSessionHookContext) {
    TestHiveServer2.configureHive(ctx.getSessionConf)
  }
}

Tame down timeouts

Usually, all Hive components reside in separate JVMs. So even if the JDBC DriverManager is static, JDBC timeouts for satellite components dont step on each other. In our case, we need to make sure all of them are reasonably high to allow our request to complete:

conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT.varname, "1200")
conf.set(HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT.varname, "1200")

We can now create a genuine JDBC connection:

using(TestHiveServer2.createConnection) { con =>

  val smt = con.createStatement()

  smt.executeUpdate("create table ngram (ngram string, year int, occurences int, documents int) row format delimited fields terminated by '\\t' stored as textfile")
  smt.executeUpdate(s"load data local inpath '$dir/googlebooks-eng-all-1gram-20120701-z-sample.tsv' overwrite into table ngram")

  using(smt.executeQuery("select ngram, SUM(documents) as total_documents from ngram group by ngram")) { rs =>
    List(
      {rs.next(); rs.getString(1)} -> rs.getInt(2),
      {rs.next(); rs.getString(1)} -> rs.getInt(2)
    ) mustEqual List(
      "zenith" -> 426197,
      "zooplankton" -> 24939
    )
  }
}

And since it is a genuine, full fledged Hive instance, we can even pause the tests with something like:

StdIn.readLine()

and inspect your Hive database using the beeline command tool by connecting to “jdbc:hive2://localhost”.

You can find a fully working sample project on GitHub: https://github.com/jletroui/hivetest

Happy testing!

One Comment
  1. Julien, thanks for putting this together, good work!

    However, the challenge for me is that my shop isn’t quite into Scala just yet, so I needed to go with a Java solution.

    http://www.lopakalogic.com/articles/hadoop-articles/hive-testing/

    Its an alternative to the Scala approach

Leave a Reply

Note: XHTML is allowed. Your email address will never be published.

Subscribe to this comment feed via RSS