Dec 13, 2015

Join operation in MapReduce - Join two files(one in HDFS and other one is cached)

Join is very commonly used operation in relational add non-relational databases.It gives flexibility to use different result set and obtain some other meaningful results.In this post we will understand how to use Distributed cache in hadoop and write sample code for performing join operation on records present in two different locations. Download input files- employee_record and department_record.

Distributed cache:- 
DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications.Once we cache a file for our job, hadoop framework will make it available on each and every data nodes in file system where our map/reduce tasks are running.Files are only copied once per job.Distributed cached files are accessible to all mapper and reducer.
Distributed cache has been deprecated in hadoop 2,x (from 2.2 onwards). So, what is other alternative of distributed cache in hadoop 2.x. In hadoop2.x caching facility has been moved to Job class and Job internally calls Distributed cache to add resources.Below code line is from Job class:
/**
* Add a file to be localized
* @param uri The uri of the cache to be localized
*/
  public void addCacheFile(URI uri) {
    ensureState(JobState.DEFINE);
    DistributedCache.addCacheFile(uri, conf);
  }
How to cache resources with Job and access it in mapper/reducer:- .
In Driver class 
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.addCacheFile(new URI("/DIR/file.text"));
For accessing cachedFile by mapper/reducer
URI cachedFileURI = context.getCacheFiles()[0];
Path path = new Path(cachedFileURI.toString());

Sample code for Mapper, Reducer and Driver class

Mapper class:-
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> deptInputMap = new HashMap<String, String>();

/* This method is executed for each mapper task before map method */
protected void setup(Context context) throws IOException,
  InterruptedException {
 Path path = null;
 try {
  if (context.getCacheFiles() != null
    && context.getCacheFiles().length > 0) {
   URI cachedFileURI = context.getCacheFiles()[0];
   if (cachedFileURI != null) {
    System.out.println("Mapping File: "
      + cachedFileURI.toString());
    path = new Path(cachedFileURI.toString());
    BufferedReader br = new BufferedReader(
      new InputStreamReader(new FileInputStream(
        path.toString())));
    String inputLine = "";
    while ((inputLine = br.readLine()) != null) {
     String[] deptParts = inputLine.split("\\t");
     String deptId = deptParts[0];
     String dpetName = deptParts[1];
     String deptLocation = deptParts[3];
     deptInputMap.put(deptId, dpetName + " " + deptLocation);
    }
    br.close();
   } else {
    System.out.println("No mapping file exist!!");
   }
  } else {
   System.out.println("No cached file exist!!");
  }
 } catch (Exception e) {
  e.printStackTrace();
 }

}

/*
 * get dept detail for each deptid from hashmap and append with emp record,
 * write in context
 */
public void map(LongWritable key, Text value, Context context)
  throws IOException, InterruptedException {
 try {
  String[] empInputs = value.toString().split("\\t");
  String deptDetail = deptInputMap.get(empInputs[4]);
  String empDeptJoinDetail = value + " " + deptDetail;
  System.out.println(deptDetail);
  context.write(new Text(empDeptJoinDetail), NullWritable.get());
 } catch (Exception e) {

 }
}
}

Reducer class:
-
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends
 Reducer<Text, NullWritable, Text, NullWritable> {
@Override
public void reduce(Text key, Iterable<NullWritable> values, Context con) {
 try {
  con.write(key, NullWritable.get());
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}

Driver class
:-
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
* @author http://www.devinline.com
*/
public class JoinInHadoopSampleExample {

public static void main(String[] args) {
 Configuration conf = new Configuration();
 Job job;
 try {
  job = Job.getInstance(conf,
    "join operation- Employee and Dept files");
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);

  job.setMapperClass(JoinMapper.class);
  job.setReducerClass(JoinReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job,new Path(
   "hdfs://localhost:54310/user/hduser1/employee_records.txt"));
  try {
   job.addCacheFile(new URI(args[0]));//Cached file passed as program argument
  } catch (URISyntaxException e) {
   e.printStackTrace();
  }
  Path pathOutputDir = new Path(
    "hdfs://localhost:54310/user/hduser1/testfs/output_join1");
  FileOutputFormat.setOutputPath(job, pathOutputDir);
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 } catch (IOException e) {
  e.printStackTrace();
 } catch (ClassNotFoundException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }

}

}
Run hadoop services, place input file(employee_record) in hdfs and pass cached file(department_record) via program argument. Execute above driver class and verify that department detail is appended in output file.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -tail /user/hduser1/testfs/output_join1/part-r-00000
198 Donald OConnell DOCONNEL 50 650.507.9833 21-JUN-07 SH_CLERK 2600  124 Shipping 1500
199 Douglas Grant DGRANT 50 650.507.9844 13-JAN-08 SH_CLERK 2600  124 Shipping 1500
200 Jennifer Whalen JWHALEN 10 515.123.4444 17-SEP-03 AD_ASST4400  101 Administration 1700
201 Michael Hartstein MHARTSTE 20 515.123.5555 17-FEB-04 MK_MAN 13000  100 Marketing 1800
202 Pat Fay PFAY 20 603.123.6666 17-AUG-05 MK_REP 6000 201 Marketing 1800
203 Susan Mavris SMAVRIS 40 515.123.7777 07-JUN-02 HR_REP 6500 101 Human Resources 2400
204 Hermann Baer HBAER 70 515.123.8888 07-JUN-02 PR_REP 10000 101 Public Relations 2700

Location: Hyderabad, Telangana, India