Dec 12, 2015

MapReduce: Remove duplicate records from input file on HDFS

In previous post we learned about how to analyse time-temperature statistics and generate report with max/min temperature for each city(MultipleOutputs<Text, Text>). In this post we will write sample MapReduce program to understand how to remove duplicate records from a file. Download sample input file.
Mapper class:- In map method, we read input file line by line and make whole line as key of mapper output and NullWritable.get() as value which are written to context object.
con.write(row, NullWritable.get());
After grouping and mapper output will appear something like
key value list
<unique_employee_record_1> <NW, NW, NW>
<unique_employee_record_2> <NW, NW>
<unique_employee_record_3> <NW, NW,NW ,NW,NW>
<unique_employee_record_4> <NW, NW,NW>

Reducer class
:-  In reduce method each key unique_employee_record is written to context as key and NullWritable.get() as value.
con.write(<unique_employee_record_1>, NullWritable.get());

Sample code of mapper,reducer and driver class

Mapper class 
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RemoveDuplicateMapper extends
 Mapper<Object, Text, Text, NullWritable> {
@Override
public void map(Object key, Text row, Context con) {
 try {
  con.write(row, NullWritable.get());
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}
Reducer class :-
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RemoveDuplicateReducer extends
  Reducer<Text, NullWritable, Text, NullWritable> {
 @Override
 public void reduce(Text key, Iterable<NullWritable> Value, Context con) {
  try {
   con.write(key, NullWritable.get());
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
}
Driver class :-
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RemoveDuplicateRecordsDriver {
 public static void main(String[] str) {
  Configuration conf = new Configuration();
  try {
   Job job = Job.getInstance(conf, "Duplicate removal");
   job.setMapperClass(RemoveDuplicateMapper.class);
   job.setReducerClass(RemoveDuplicateReducer.class);
   job.setJarByClass(RemoveDuplicateRecordsDriver.class);
   job.setMapOutputKeyClass(Text.class);
   job.setMapOutputValueClass(NullWritable.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(NullWritable.class);
   FileInputFormat.addInputPath(job,new Path(
    "hdfs://localhost:54310/user/hduser1/employee_records_duplicates"));
   FileOutputFormat.setOutputPath(job,new Path(
    "hdfs://localhost:54310/user/hduser1/testfs/output_employee1"));
   System.exit(job.waitForCompletion(true) ? 1 : 0);
  } catch (IOException e) {
   e.printStackTrace();
  } catch (ClassNotFoundException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }

 }
}
Execute driver class(hadoop services are running and input file location modified as per your inconvenience). It will create an output directory with file  part-r-00000(no duplicate records).
To upload input file from local file system to HDFS
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -put /home/zytham/workspaceJuno/MapreduceSampleProject/employee_records_duplicates /user/hduser1/
Check the size of both input file and output file so that it can be verified duplicate record has been removed.Input file size is 13 K and output file size is 7.9 K.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -ls -h /user/hduser1/employee_records_duplicates
-rw-r--r--   1 hduser1 supergroup     13.1 K 2015-12-12 22:50 /user/hduser1/employee_records_duplicates
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -ls -h /user/hduser1/testfs/output_employee1/part-r-00000
-rw-r--r--   3 zytham supergroup      7.9 K 2015-12-12 22:56 /user/hduser1/testfs/output_employee1/part-r-00000

Lets modify this problem, how to remove duplicates row based on employee_id as key(Just keep first record wit the given employee_id).
In map method, make employee_id as key and whole row as value and write the same in context.
con.write(<emp_id>, row);
After grouping mapper output looks like
key value list
<unique_employee_id_1> <emp_record, emp_record, emp_record>
<unique_employee_id_2> <emp_record, emp_record>
<unique_employee_id_3> <emp_record, emp_record,emp_record,emp_record>
<unique_employee_id_4> <emp_record, emp_record,emp_record>
In reduce method, iterate over list for each input row and take first emp_record and write into context.
for (TextWritable val : values) {
 context.write(val, NullWritable.get()); break;
}

Location: Hyderabad, Telangana, India