Dec 10, 2015

Read,Write and Delete operation in HDFS using FileSystem API (Java)

HDFS(Hadoop file system) is most commonly used storage entity in hadoop ecosystem. Read and write operation is very common when we deal with HDFS. Along with file system commands we have file system API to deal with read/write/delete operation programmatically. In following post we will see how to read a file from HDFS, write/create a file on HDFS and delete a file/directories from HDFS.

Read operation on HDFS

In order to read a file from HDFS, create a Configuration object followed by a FileSystem object by passing configuration object in it. Add core-site.xml as resource to configuration object.
Note:-  Apache Hadoop doc states that configurations are specified by resources and Hadoop by default specifies two resources, loaded in-order from the classpath: core-default.xml and core-site.xml. 
Now, create a Path object from pathString- a fully qualified file name and open this file using FileSystem object and read file until reach at end of file.

public static void readHDFSFile(String srcFileLocation) throws IOException {
 /*
  * fully qualified name = HDFS location(ip address + port) +
  * fileLocation hdfs://192.168.213.133:54310/<fileLocation>
  */
 String pathString = "hdfs://192.168.213.133:54310/" + srcFileLocation;

 // Create configuration object - get config files from classpath
 Configuration conf = new Configuration();

 /*
  * Add configuration file core-site.xml to configuration object.
  * core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
  */
 conf.addResource(new Path(
   "/usr/local/hadoop2.6.1/etc/hadoop/core-site.xml"));

 FileSystem fs = null;
 try {
  // create a FileSystem object passing configuration object config
  fs = FileSystem.get(conf);

  // Create path object and check for its existence
  Path path = new Path(pathString);
  if (fs.exists(path)) {
   BufferedReader br = new BufferedReader(new InputStreamReader(
     fs.open(path)));
   String line;
   line = br.readLine();
   while (line != null) {
    System.out.println(line);
    line = br.readLine();
   }
  } else {
   System.out.println("File does not exist on HDFS");
  }

 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  // Close file descriptors
  if (fs != null)
   fs.close();
 }
}

Write operation on HDFS

In write operation ,we create a file in HDFS and copy content form source file which is available in local file system.Similar to read operation, create Configuration object followed by FileSystem object and Path object. Here we are creating file only if file does not exist on HDFS(we have an option to overwrite by passing overwrite flag as true in create method - fs.create(path, overwrite).
/*
 * Create file in HDFS. 
 * srcFileLocation - source file(fully qualified) is in local file system 
 * dstFileLocation - relative path with respect to node where HDFS exist
 */
public static void writeFileInHDFS(String dstFileLocation,
  String srcFileLocation) throws IOException {

 /*
  * fully qualified name = HDFS location(ip address + port) +
  * fileLocation hdfs://192.168.213.133:54310/<dstFileLocation>
  */
 String dstPathString = "hdfs://192.168.213.133:54310/"
   + dstFileLocation;

 // Create configuration object - get config files from classpath
 Configuration conf = new Configuration();

 /*
  * Add configuration file core-site.xml to configuration object
  * core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
  */
 conf.addResource(new Path(
   "/usr/local/hadoop2.6.1/etc/hadoop/core-site.xml"));

 // create a FileSystem object passing configuration object config
 FileSystem fs = null;
 FSDataOutputStream out = null;
 InputStream in = null;
 try {
  fs = FileSystem.get(conf);
  File sourceFileObj = new File(srcFileLocation);
  // Check source file exist, then only create file on HDFS
  if (sourceFileObj.exists()) {
   // Create path object and check for its existence
   Path dstPathObj = new Path(dstPathString);
   // Create file on HDFS if it does not exist
   if (!fs.exists(dstPathObj)) {
    System.out.println("-----Write operation in progress(check write "+
     "permission on the given location in HDFS)----");
    out = fs.create(dstPathObj);
    in = new BufferedInputStream(new FileInputStream(
      sourceFileObj));
    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
     out.write(b, 0, numBytes);
    }

   } else {
    System.out.println("File already exist in HDFS !!");
    return;
   }
   // Do check for existence of newly createdfile
   if (fs.exists(dstPathObj)) {
    System.out.println("File created successfully in HDFS "
      + fs.getFileChecksum(dstPathObj));
   }
  } else {
   System.out.println("Source file does not exist in local file system !!");
  }

 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  // Close file descriptors
  if (in != null)
   in.close();
  if (out != null)
   out.close();
  if (fs != null)
   fs.close();
 }
}

Delete operation on HDFS

In order to delete a file/directories from HDFS we follow similar steps as read and write operation.
For deleting a file we use - fs.delete(path, false), false indicates files are not deleted recursively,
for deleting directories and files recursively pass true instead of false.
/* Delete a file or directory from HDFS */
public static boolean deleteFileFromHDFS(String fileLocation)
  throws IOException {
 /*
  * fully qualified name = HDFS location(ip address + port) +
  * fileLocation hdfs://192.168.213.133:54310/<fileLocation>
  */
 String pathString = "hdfs://192.168.213.133:54310/" + fileLocation;

 // Create configuration object - get config files from class path
 Configuration conf = new Configuration();

 /*
  * Add configuration file core-site.xml to configuration object
  * core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
  */
 conf.addResource(new Path(
   "/usr/local/hadoop2.6.1/etc/hadoop/core-site.xml"));

 FileSystem fs = null;
 boolean status = false;
 try {
  // create a FileSystem object passing configuration object config
  fs = FileSystem.get(conf);

  // Create path object and check for its existence
  Path path = new Path(pathString);
  if (fs.exists(path)) {
   // false indicates do not deletes recursively
   status = fs.delete(path, false);
   
  } else {
   System.out.println("File does not exist on HDFS");
   status = false;
  }

 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  // Close file descriptors
  if (fs != null)
   fs.close();
 }
 return status;
}

Complete sample program - Read, Write and Delete on HDFS

Create a Mapreduce project in eclipse and create a class file name it as ReadWriteDeleteOperationInHDFS.java. Copy following same code line and paste in created class file.

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * @author devinline
 */
public class ReadWriteDeleteOperationInHDFS {

public static void readHDFSFile(String srcFileLocation) throws IOException {
 /*
  * fully qualified name = HDFS location(ip address + port) +
  * fileLocation hdfs://192.168.213.133:54310/<fileLocation>
  */
 String pathString = "hdfs://192.168.213.133:54310/" + srcFileLocation;

 // Create configuration object - get config files from classpath
 Configuration conf = new Configuration();

 /*
  * Add configuration file core-site.xml to configuration object
  * core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
  */
 conf.addResource(new Path(
   "/usr/local/hadoop2.6.1/etc/hadoop/core-site.xml"));

 FileSystem fs = null;
 try {
  // create a FileSystem object passing configuration object config
  fs = FileSystem.get(conf);

  // Create path object and check for its existence
  Path path = new Path(pathString);
  if (fs.exists(path)) {
   BufferedReader br = new BufferedReader(new InputStreamReader(
     fs.open(path)));
   String line;
   line = br.readLine();
   while (line != null) {
    System.out.println(line);
    line = br.readLine();
   }
  } else {
   System.out.println("File does not exist on HDFS");
  }

 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  // Close file descriptors
  if (fs != null)
   fs.close();
 }
}

/*
 * Create file in HDFS. 
 * srcFileLocation - source file(fully qualified) is in local file system 
 * dstFileLocation - relative path with respect to node where HDFS exist
 */
public static void writeFileInHDFS(String dstFileLocation,
  String srcFileLocation) throws IOException {

 /*
  * fully qualified name = HDFS location(ip address + port) +
  * fileLocation hdfs://192.168.213.133:54310/<dstFileLocation>
  */
 String dstPathString = "hdfs://192.168.213.133:54310/"
   + dstFileLocation;

 // Create configuration object - get config files from classpath
 Configuration conf = new Configuration();

 /*
  * Add configuration file core-site.xml to configuration object
  * core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
  */
 conf.addResource(new Path(
   "/usr/local/hadoop2.6.1/etc/hadoop/core-site.xml"));

 // create a FileSystem object passing configuration object config
 FileSystem fs = null;
 FSDataOutputStream out = null;
 InputStream in = null;
 try {
  fs = FileSystem.get(conf);
  File sourceFileObj = new File(srcFileLocation);
  // Check source file exist, then only create file on HDFS
  if (sourceFileObj.exists()) {
   // Create path object and check for its existence
   Path dstPathObj = new Path(dstPathString);
   // Create file on HDFS if it does not exist
   if (!fs.exists(dstPathObj)) {
    System.out.println("-----Write operation in progress(check write "+
        "permission on the given location in HDFS)----");
    out = fs.create(dstPathObj);
    
    in = new BufferedInputStream(new FileInputStream(
      sourceFileObj));
    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
     out.write(b, 0, numBytes);
    }

   } else {
    System.out.println("File already exist in HDFS !!");
    return;
   }
   // Do check for existence of newly createdfile
   if (fs.exists(dstPathObj)) {
    System.out.println("File created successfully in HDFS "
      + fs.getFileChecksum(dstPathObj));
   }
  } else {
   System.out.println("Source file does not exist in local file system !!");
  }

 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  // Close file descriptors
  if (in != null)
   in.close();
  if (out != null)
   out.close();
  if (fs != null)
   fs.close();
 }
}

/* Delete a file or directory from HDFS */
public static boolean deleteFileFromHDFS(String fileLocation)
  throws IOException {
 /*
  * fully qualified name = HDFS location(ip address + port) +
  * fileLocation hdfs://192.168.213.133:54310/<fileLocation>
  */
 String pathString = "hdfs://192.168.213.133:54310/" + fileLocation;

 // Create configuration object - get config files from class path
 Configuration conf = new Configuration();

 /*
  * Add configuration file core-site.xml to configuration object
  * core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
  */
 conf.addResource(new Path(
   "/usr/local/hadoop2.6.1/etc/hadoop/core-site.xml"));

 FileSystem fs = null;
 boolean status = false;
 try {
  // create a FileSystem object passing configuration object config
  fs = FileSystem.get(conf);

  // Create path object and check for its existence
  Path path = new Path(pathString);
  if (fs.exists(path)) {
   status = fs.delete(path, false);// false indicates do not deletes recursively
   
  } else {
   System.out.println("File does not exist on HDFS");
   status = false;
  }

 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  // Close file descriptors
  if (fs != null)
   fs.close();
 }
 return status;
}

public static void main(String[] args) throws Exception {
 Scanner scn = new Scanner(System.in);
 System.out.println("Enter input 1-3:(1.Read 2.Write 3. Delete)");
 int option = scn.nextInt();
 switch (option) {
 case 1:
  ReadWriteDeleteOperationInHDFS
    .readHDFSFile("wheatherInputData/input_temp.txt");
  break;
 case 2:
  ReadWriteDeleteOperationInHDFS.writeFileInHDFS(
    "/user/hduser1/testfs/output.txt",
    "/home/zytham/hadoop_poc/input.txt");
  break;
 case 3:
  boolean status = ReadWriteDeleteOperationInHDFS
    .deleteFileFromHDFS("/user/hduser1/testfs/output.txt");
  System.out.println("File delete status is " + status);
  break;

 }

}
} 
Start hadoop services(dfs and yarn) by executing one command(./start-all.sh) as follows from <hadoop_home>/sbin.
hduser1@ubuntu:/usr/local/hadoop2.6.1/sbin$ ./start-all.sh



For above sample program we have created a file "/wheatherInputData/input_temp.txt" in HDFS and given write permission for all user to HDFS directory "/user/hduser1/testfs". Refer below commands for reference.
Copy a file form local file system to HDFS for read operation
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -put /home/zytham/input_temp.txt /wheatherInputData/
Give write permission to all user
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -chmod -R 777 /user/hduser1/testfs/

Now execute above program, Right click -> Run -> Run as hadoop
========Sample output==================
Enter input 1-3:(1.Read 2.Write 3. Delete)
1
CA_25-Jan-2014 00:12:345 15.7 01:19:345 23.1 02:34:542 12.3 03:12:187 16 04:00:093
CA_26-Jan-2014 00:54:245 15.7 01:19:345 23.1 02:34:542 12.3 03:12:187 16 04:00:093

Enter input 1-3:(1.Create 2.Write 3. Delete)
2
-----Write operation in progress(check write permission on the given location in HDFS)----
File created successfully in HDFS MD5-of-0MD5-of-0CRC32:70bc8f4b72a86921468bf8e8441dce51

Enter input 1-3:(1.Create 2.Write 3. Delete)
3
File delete status is true

Enter input 1-3:(1.Create 2.Write 3. Delete)
3
File does not exist on HDFS
File delete status is false
==================================

Debugging

1. java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.213.133:54310/wheatherInputData/input_temp.txt, expected: file:///
Reason:-  core-site.xml is not added as resource to configuration object. By default, it is added by hadoop to configuration object.
Solution:- Add configuration file core-site.xml to configuration object. core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
2. org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=zytham, access=WRITE, inode="/user/hduser1/testfs":hduser1:supergroup:drwxrwx--x
Reason:- Client who is executing write operation does not have write permission on given directory.
Solution:- Give write permission to client(user) by executing following command.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -chmod -R 777 /user/hduser1/testfs/

Location: Hyderabad, Telangana, India