Skip to content

Tester un système Apache Hive

by Julien on août 23rd, 2014

An english version of this post is available here.

Vue d’ensemble

Tester un système d’intelligence d’affaire basé sur Hive demande quelques efforts. Certains projets peuvent se contenter de Hive en mode embarqué. Mais parfois, cela ne suffit pas. Par exemple, lorsque le système comprends des outils externes à la JVM, comme une suite d’intelligence d’affaire.

Ce billet explique la solution que nous avons fini par adopter chez Triton Digital.

Client Hadoop

Pour tester le lancement des tâches mapred, la première chose à mettre en place est un client Hadoop. En effet, Hive appelle directement le script “hadoop” de la distribution Hadoop.

Un peu de travail dans SBT permet de se passer d’une distribution complète. Commençons par les jars nécessaires:

import java.io.PrintWriter

resolvers += "Cloudera repo" at "https://repository.cloudera.com/artifactory/cloudera-repos"

ivyConfigurations += config("hadoop")

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client"   % "2.3.0-mr1-cdh5.0.0"  % "hadoop",
  "org.apache.hive"   % "hive-exec"       % "0.12.0-cdh5.0.0"     % "hadoop"
)

val dumpHadoopClasspath = TaskKey[Unit]("dump-hadoop-classpath", "Dumps hadoop classpath in a file")

dumpHadoopClasspath := {
  val printer = new PrintWriter("hadoop.classpath")
  printer.print(update.value.select(configurationFilter("hadoop")).map(_.getCanonicalPath).mkString(":"))
  printer.close()
}

test in Test <<= (test in Test) dependsOn dumpHadoopClasspath

Comme nous n’avons pas besoin de ces jars dans les classes du projet, nous plaçons les dépendances dans une configuration ivy spéciale nommée “hadoop”.

Nous nous assurons également de construire le classpath résultant dans un fichier pour qu’il puisse être utilisé dans notre version personnalisée du script “hadoop”.

Script “hadoop”

Nous partons du script régulier que l’on peut trouver dans n’importe quelle distribution tarball d’Hadoop. Nous modifions simplement le début pour y utiliser notre classpath construit plus haut:

# This one is commented since we manually 'take over' the config below
#. "$bin"/hadoop-config.sh

export HADOOP_HOME=$(pwd)
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
HADOOP_USER_CLASSPATH_FIRST=true
HADOOP_CLASSPATH=$(cat hadoop.classpath)

A partir de là, nous devrions avoir une version du client Hadoop qui fonctionne:

> sbt update
[info] Loading global plugins from /Users/jletrouit/.sbt/0.13/plugins
[info] Loading project definition from /Users/jletrouit/Documents/perso/hivetest/project
[info] Set current project to hivetest (in build file:/Users/jletrouit/Documents/perso/hivetest/)
[info] Updating {file:/Users/jletrouit/Documents/perso/hivetest/}hivetest...
[info] Resolving org.apache.velocity#velocity;1.7 ...
[info] Done updating.
[success] Total time: 9 s, completed 1-Jun-2014 3:30:09 PM

> ./hadoop version
Hadoop 2.3.0-cdh5.0.0
Subversion git://github.sf.cloudera.com/CDH/cdh.git -r 8e266e052e423af592871e2dfe09d54c03f6a0e8
Compiled by jenkins on 2014-03-28T04:29Z
Compiled with protoc 2.5.0
From source with checksum fae92214f92a3313887764456097e0

Serveur Hive de test

La prochaine étape est un peu plus lourde. Pour démarrer une instance Hive de test, nous avons besoin d’un certain nombre de dépendances:

libraryDependencies ++= Seq(
  "org.apache.hive"   % "hive-jdbc"     % "0.12.0-cdh5.0.0" excludeAll(
    ExclusionRule(organization = "junit"),
    ExclusionRule(organization = "org.apache.avro")
  ),
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0",
  "org.apache.hadoop" % "hadoop-common"   % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-hdfs"     % "2.3.0-cdh5.0.0"      % "test" classifier("tests"),
  "org.apache.hadoop" % "hadoop-test"     % "2.3.0-mr1-cdh5.0.0"  % "test" exclude("net.java.dev.jets3t", "jets3t"),
  "org.apache.hadoop" % "hadoop-auth"     % "2.3.0-cdh5.0.0"      % "test",
  "org.specs2"        %% "specs2"         % "2.3.12"              % "test"
)

Ces dépendances nous permettent de démarrer un mini HDFS et un mini cluster MapReduce:

  val currentDir = new File(".").getCanonicalPath()
  val conf = new Configuration()

  // This is making sure we are not picking up locally installed hadoop libraries and stay isolated
  System.setProperty("java.library.path","")
  // We could use a temporary directory, but those logs can be useful for debugging a test failing
  System.setProperty("hadoop.log.dir", "logs/hadoop") // MAPREDUCE-2785

  conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, createTempDirectory("dfs_base"))
  conf.set("hadoop.home.dir", currentDir)
  conf.set("dfs.permissions", "false")
  conf.set("hadoop.security.authorization", "false")

  val miniDFS = new MiniDFSCluster.Builder(conf).build()

  val miniMR = new MiniMRCluster(
    1,      // numTaskTrackers
    miniDFS.getFileSystem().getUri().toString(),
    1,      // numTaskTrackerDirectories
    null,   // racks
    null,   // hosts
    new JobConf(conf))

  // Save those for later
  val jt = miniMR.createJobConf(new JobConf(conf)).get("mapred.job.tracker")
  val warehouseDir = "file" + File.pathSeparator + createTempDirectory("hive_warehouse")

Et enfin un serveur Hive:

  val hiveConf = new HiveConf(getClass())

  hiveConf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  // Hive still need to use a hadoop command line tool. This one bundled with the project is pointing to the
  // minimal hadoop client jars we are downloading through SBT in the "hadoop" ivy config.
  hiveConf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")

  val server = new HiveServer2()
  server.init(hiveConf)
  server.start()

Cependant, nous avons besoin de certains ajustements supplémentaires.

Transferrer la configuration Hive à la session JDBC

La configuration Hive n’est pas transferrée à la session JDBC du serveur. A la place, une nouvelle instance est créée. Dans une distribution normale, ce n’est pas un problème, la configuration fraiche étant chargée du fichier hive-site.xml. Dans notre cas, nous créons notre configuration de manière programmatique, en particulier afin d’assigner les ports et les adresses de notre namenode et tasktracker. Un petit hack permet de réinjecter cette configuration dans la session JDBC:

val hiveConf = new HiveConf(getClass())
configureHive(hiveConf)
hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK.varname, classOf[TestHiveSessionHook].getCanonicalName())

val server = new HiveServer2()
server.init(hiveConf)
server.start()

def configureHive(conf: HiveConf) {
  conf.set(HiveConf.ConfVars.HADOOPJT.varname, jt)
  conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir)
  conf.set(HiveConf.ConfVars.HADOOPBIN.varname, s"$currentDir/hadoop")
}

class TestHiveSessionHook extends HiveSessionHook {
  def run(ctx: HiveSessionHookContext) {
    TestHiveServer2.configureHive(ctx.getSessionConf)
  }
}

Faire taire les timeouts JDBC

D’ordinaire, les différents composants résident dans des process différents. Mais dans notre cas ou tout réside dans la même JVM, les timeouts des composants satellites de Hive se marchent sur les pieds, puisque le timeout dans le DriverManager de JDBC est un champ statique partagé. Nous nous assurons qu’il reste à une valeur convenable pour que nos requêtes de test aient le temps de se compléter:

conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT.varname, "1200")
conf.set(HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT.varname, "1200")

Finalement, le test

Nous pouvons maintenant écrire un test créant une véritable connection JDBC:

using(TestHiveServer2.createConnection) { con =>

  val smt = con.createStatement()

  smt.executeUpdate("create table ngram (ngram string, year int, occurences int, documents int) row format delimited fields terminated by '\\t' stored as textfile")
  smt.executeUpdate(s"load data local inpath '$dir/googlebooks-eng-all-1gram-20120701-z-sample.tsv' overwrite into table ngram")

  using(smt.executeQuery("select ngram, SUM(documents) as total_documents from ngram group by ngram")) { rs =>
    List(
      {rs.next(); rs.getString(1)} -> rs.getInt(2),
      {rs.next(); rs.getString(1)} -> rs.getInt(2)
    ) mustEqual List(
      "zenith" -> 426197,
      "zooplankton" -> 24939
    )
  }
}

Contrairement au mode embarqué, vous pouvez faire une pause au milieu de votre test (par exemple avec StdIn.readLine()), et inspecter votre base de donnée Hive avec le client beeline, en vous connectant à “jdbc:hive2://localhost”.

Code source

Vous trouverez un exemple complet de cette technique sur GitHub: https://github.com/jletroui/hivetest

Bon tests!

One Comment
  1. Julien, thanks for putting this together, good work!

    However, the challenge for me is that my shop isn’t quite into Scala just yet, so I needed to go with a Java solution.

    http://www.lopakalogic.com/articles/hadoop-articles/hive-testing/

    Its an alternative to the Scala approach

Leave a Reply

Note: XHTML is allowed. Your email address will never be published.

Subscribe to this comment feed via RSS