Use Java/Scala on Impala With High Availability

You can use Java/Scala on Impala with the high availability option. The high availability option allows you to connect to Impala with a random active DataNode to prevent your job from failing. It can also be used to distribute the workload across all DataNodes.

  1. Install the following packages:

    libraryDependencies ++= Seq(
      "org.apache.hive" % "hive-jdbc" % "1.2.1",
      "org.slf4j" % "slf4j-api" % "1.7.5",
      "org.slf4j" % "slf4j-simple" % "1.7.5",
      "org.apache.hadoop" % "hadoop-client" % "2.6.0"
    )
  2. Declare the get_all_active_datanode function to return the list of all active DataNodes by running the following lines of code:

    /**
     * @param list_datanodes: The list of DataNodes (eg: "dn1;dn2;dn3;dn4;dn5;dn6;dn7;dn8;dn9").
     * @param user: The user that can be used to connect to the database.
     * @param pwd: The user's password.
     * @param databaseName: The name of the database the user can access.
     */
    
    def get_all_active_datanode(list_datanodes: Seq[String], user: String, pwd: String, databaseName: String): Seq[String] = {
      val JDBC_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"
      val list_active_name_node = new ListBuffer[String]()
      Class.forName(JDBC_DRIVER_NAME)
      var cpt = 1
      for (dn <- list_datanodes) {
        val connectionURL = "jdbc:hive2://" + dn + ":" + sys.env.getOrElse("PORT_IMPALA", "21050") + "/"+databaseName
        try {
          // To check if the connection works.
          val con = DriverManager.getConnection(connectionURL, user, pwd)
          list_active_name_node += dn
        }
        catch {
          case e: Throwable =>
            if (cpt == list_datanodes.size) {
              // If both NameNodes have been tested, it means that there is no active NameNode.
              throw new Exception("No DataNode available")
            }
            else {
              cpt += 1
    
            }
        }
      }
      list_active_name_node.toList
    }
  3. Declare the connection_dn function to establish a connection to a specific DataNode by running the following lines of code:

    def connection_dn(datanode_uri: String, user:String, pwd:String, databaseName: String = "default"):Try[Connection] = {
      val JDBC_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"
      Class.forName(JDBC_DRIVER_NAME)
      val connectionURL = "jdbc:hive2://" + datanode_uri + ":" + sys.env.getOrElse("PORT_IMPALA", "21050") + "/" + databaseName
      Try(DriverManager.getConnection(connectionURL, user, pwd))
    }
  4. Declare the random_node_connect function to establish a connection to an available random DataNode by running the following lines of code:

    def random_node_connect(list_datanodes: Seq[String], user: String, pwd: String, databaseName: String):Connection ={
      val data_node_list = list_datanodes.toBuffer
      while (data_node_list.nonEmpty){
        val r = Random.nextInt(data_node_list.size)
        connection_dn(list_datanodes(r), user, pwd, databaseName) match {
          case Success(con) => return con
          case Failure(e) => data_node_list -= list_datanodes(r)
        }
      }
      throw new Exception("No DataNode available")
    }

    You can now connect to an available random DataNode or to a specific one.

    Example 1. Example of use
    val dns = sys.env.getOrElse("LIST_DATANODES", "<url_of_datanode1>;<url_of_datanode2>" )
    val dn_list = dns.split(";")
    val user = sys.env.get("IMPALA_USER")
    val pwd = sys.env.get("IMPALA_PWD")
    val databaseName = sys.env.get("IMPALA_DB_TEST")
    
    // To connect to a random active DataNode.
    val con = random_node_connect(dn_list,user, pwd, databaseName)
    
    // To initialize the statement.
    val stmt = con.createStatement()
    
    // To execute the metadata invalidation query.
    val sqlStatementInvalidate = "INVALIDATE METADATA"
    stmt.execute(sqlStatementInvalidate)