Use Python on HDFS or Impala With High Availability

You can use Python on HDFS or Impala with the high availability option. The high availability option allows you to connect to HDFS or Impala with a random active DataNode to prevent your job from failing. It can also be used to distribute the workload across all DataNodes.

  1. Install the following package:

    from hdfs import InsecureClient
  2. Run the following lines of code to get the active NameNode:

    def get_url_active_namenode(user, list_name_nodes, port_hdfs="50070"):
        cpt = 1
        # Loop to identify which of the 2 NameNode is active.
        for url in list_name_nodes:
            # To create the HDFS client.
            hadoop = InsecureClient(url + ':' + str(port_hdfs), user=user)
            try:
                # To test if the HDFS client works.
                hadoop.status('/')
                return url
            except:
                # If an error occurs, it means that the NameNode used to connect to HDFS is not active.
                if cpt == len(HDFS_URLS):
                    # If both NameNodes have been tested, it means that there is no active NameNode.
                    # HDFS is not reachable.
                    raise NameError("No NameNode available")
                else:
                    cpt += 1
    Example of the function returning the active NameNode
    list_name_nodes = ["<url_of_namenode_1>", "<url_of_namenode_2>"]
    user= "user_test"
    url = get_url_active_namenode(user, list_name_nodes)
  3. Run the following lines of code to get an HDFS client:

    def return_client_hdfs(user, list_name_nodes, port_hdfs="50070"):
        url = get_url_active_namenode(user, list_name_nodes, port_hdfs)
        client_hdfs = InsecureClient(url, user=user)
        return client_hdfs
    Example of the function returning an HDFS client
    list_name_nodes = ["<url_of_namenode_1>", "<url_of_namenode_2>"]
    user= "user_test"
    client_hdfs = return_client_hdfs(user=user, list_name_nodes=list_name_nodes)
  4. Run the following lines of code to get an active random DataNode for HDFS or Impala:

    # os.environ['PORT_HDFS'] = 50070
    # os.environ['PORT_IMPALA'] = 21050
    
    def get_client_ibis_impala(user, user_password, list_name_nodes, list_data_nodes):
        # TO get the active NameNode.
        url_hdfs = get_url_active_namenode(user, list_name_nodes, port_hdfs=os.environ['PORT_HDFS'])
    
        # Use the URL of the active NameNode to create an Ibis HDFS client.
        ibis_hdfs = ibis.impala.hdfs_connect(host=url_hdfs, port=int(os.environ['PORT_HDFS']))
    
        # Loop through all DataNodes to test if they are active or not.
        while len(list_data_nodes) > 0:
            # Randomly select a DataNode.
            data_node = list_data_nodes[random.randint(0, len(list_data_nodes) - 1)]
            try:
                # To test if the DataNode is active.
                ibis_client = ibis.impala.connect(host=data_node, port=int(os.environ['PORT_IMPALA']),
                                                  hdfs_client=ibis_hdfs,
                                                  user=user,
                                                  password=user_password,
                                                  auth_mechanism='PLAIN',
                                                  timeout=0.5)
                ibis_client.list_databases()
                return ibis_client
    
            except:
                # If an error occurs, it means that the DataNode is not ative or does not exist.
                # -> Remove this DataNode from our list to avoid an infinite loop and avoid testing a DataNode twice.
                data_node_list.remove(data_node)
    
        raise NameError("No DataNode available for Impala")
    Example of the function returning an active random DataNode
    user="user_test"
    pwd = "<pwd_of_user_test>"
    list_name_nodes = ["<url_of_namenode_1>", "<url_of_namenode_2>"]
    list_data_nodes = ["<url_of_datanode_1>", "<url_of_datanode_2>", "<url_of_datanode_3>"]
    client_impala = get_client_ibis_impala(user , pwd, list_name_nodes, list_data_nodes)