Use Java/Scala on Impala With High Availability
-
Install the following packages:
libraryDependencies ++= Seq( "org.apache.hive" % "hive-jdbc" % "1.2.1", "org.slf4j" % "slf4j-api" % "1.7.5", "org.slf4j" % "slf4j-simple" % "1.7.5", "org.apache.hadoop" % "hadoop-client" % "2.6.0" )
-
Declare the
get_all_active_datanode
function to return the list of all active DataNodes by running the following lines of code:/** * @param list_datanodes: The list of DataNodes (eg: "dn1;dn2;dn3;dn4;dn5;dn6;dn7;dn8;dn9"). * @param user: The user that can be used to connect to the database. * @param pwd: The user's password. * @param databaseName: The name of the database the user can access. */ def get_all_active_datanode(list_datanodes: Seq[String], user: String, pwd: String, databaseName: String): Seq[String] = { val JDBC_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver" val list_active_name_node = new ListBuffer[String]() Class.forName(JDBC_DRIVER_NAME) var cpt = 1 for (dn <- list_datanodes) { val connectionURL = "jdbc:hive2://" + dn + ":" + sys.env.getOrElse("PORT_IMPALA", "21050") + "/"+databaseName try { // To check if the connection works. val con = DriverManager.getConnection(connectionURL, user, pwd) list_active_name_node += dn } catch { case e: Throwable => if (cpt == list_datanodes.size) { // If both NameNodes have been tested, it means that there is no active NameNode. throw new Exception("No DataNode available") } else { cpt += 1 } } } list_active_name_node.toList }
-
Declare the
connection_dn
function to establish a connection to a specific DataNode by running the following lines of code:def connection_dn(datanode_uri: String, user:String, pwd:String, databaseName: String = "default"):Try[Connection] = { val JDBC_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver" Class.forName(JDBC_DRIVER_NAME) val connectionURL = "jdbc:hive2://" + datanode_uri + ":" + sys.env.getOrElse("PORT_IMPALA", "21050") + "/" + databaseName Try(DriverManager.getConnection(connectionURL, user, pwd)) }
-
Declare the
random_node_connect
function to establish a connection to an available random DataNode by running the following lines of code:def random_node_connect(list_datanodes: Seq[String], user: String, pwd: String, databaseName: String):Connection ={ val data_node_list = list_datanodes.toBuffer while (data_node_list.nonEmpty){ val r = Random.nextInt(data_node_list.size) connection_dn(list_datanodes(r), user, pwd, databaseName) match { case Success(con) => return con case Failure(e) => data_node_list -= list_datanodes(r) } } throw new Exception("No DataNode available") }
You can now connect to an available random DataNode or to a specific one.
Example 1. Example of useval dns = sys.env.getOrElse("LIST_DATANODES", "<url_of_datanode1>;<url_of_datanode2>" ) val dn_list = dns.split(";") val user = sys.env.get("IMPALA_USER") val pwd = sys.env.get("IMPALA_PWD") val databaseName = sys.env.get("IMPALA_DB_TEST") // To connect to a random active DataNode. val con = random_node_connect(dn_list,user, pwd, databaseName) // To initialize the statement. val stmt = con.createStatement() // To execute the metadata invalidation query. val sqlStatementInvalidate = "INVALIDATE METADATA" stmt.execute(sqlStatementInvalidate)