Use Python on HDFS or Impala With High Availability
-
Install the following package:
from hdfs import InsecureClient
-
Run the following lines of code to get the active NameNode:
def get_url_active_namenode(user, list_name_nodes, port_hdfs="50070"): cpt = 1 # Loop to identify which of the 2 NameNode is active. for url in list_name_nodes: # To create the HDFS client. hadoop = InsecureClient(url + ':' + str(port_hdfs), user=user) try: # To test if the HDFS client works. hadoop.status('/') return url except: # If an error occurs, it means that the NameNode used to connect to HDFS is not active. if cpt == len(HDFS_URLS): # If both NameNodes have been tested, it means that there is no active NameNode. # HDFS is not reachable. raise NameError("No NameNode available") else: cpt += 1
Example of the function returning the active NameNodelist_name_nodes = ["<url_of_namenode_1>", "<url_of_namenode_2>"] user= "user_test" url = get_url_active_namenode(user, list_name_nodes)
-
Run the following lines of code to get an HDFS client:
def return_client_hdfs(user, list_name_nodes, port_hdfs="50070"): url = get_url_active_namenode(user, list_name_nodes, port_hdfs) client_hdfs = InsecureClient(url, user=user) return client_hdfs
Example of the function returning an HDFS clientlist_name_nodes = ["<url_of_namenode_1>", "<url_of_namenode_2>"] user= "user_test" client_hdfs = return_client_hdfs(user=user, list_name_nodes=list_name_nodes)
-
Run the following lines of code to get an active random DataNode for HDFS or Impala:
# os.environ['PORT_HDFS'] = 50070 # os.environ['PORT_IMPALA'] = 21050 def get_client_ibis_impala(user, user_password, list_name_nodes, list_data_nodes): # TO get the active NameNode. url_hdfs = get_url_active_namenode(user, list_name_nodes, port_hdfs=os.environ['PORT_HDFS']) # Use the URL of the active NameNode to create an Ibis HDFS client. ibis_hdfs = ibis.impala.hdfs_connect(host=url_hdfs, port=int(os.environ['PORT_HDFS'])) # Loop through all DataNodes to test if they are active or not. while len(list_data_nodes) > 0: # Randomly select a DataNode. data_node = list_data_nodes[random.randint(0, len(list_data_nodes) - 1)] try: # To test if the DataNode is active. ibis_client = ibis.impala.connect(host=data_node, port=int(os.environ['PORT_IMPALA']), hdfs_client=ibis_hdfs, user=user, password=user_password, auth_mechanism='PLAIN', timeout=0.5) ibis_client.list_databases() return ibis_client except: # If an error occurs, it means that the DataNode is not ative or does not exist. # -> Remove this DataNode from our list to avoid an infinite loop and avoid testing a DataNode twice. data_node_list.remove(data_node) raise NameError("No DataNode available for Impala")
Example of the function returning an active random DataNodeuser="user_test" pwd = "<pwd_of_user_test>" list_name_nodes = ["<url_of_namenode_1>", "<url_of_namenode_2>"] list_data_nodes = ["<url_of_datanode_1>", "<url_of_datanode_2>", "<url_of_datanode_3>"] client_impala = get_client_ibis_impala(user , pwd, list_name_nodes, list_data_nodes)