Dec 6, 2015

Find total and average salary of employees - MapReduce sample example

Problem statement:- Compute total and average salary of organization XYZ and group by based on sex(male or female).
Input data are in text file as tab separated. Schema of input data is - sex at position 4th and salary at 9th position. Download sample input file.
100 Steven King M SKING 515.123.4567 17-JUN-03 AD_PRES 25798.9 90
Expected output:-
F Total: 291800.0 :: Average: 7117.073
M Total: 424363.34 :: Average: 6333.7812

We can think of this problem in terms of database SQL query as "SELECT SUM(SALARY), AVG(SALARY) FROM EMPLOYEES1 GROUP BY SEX" and same can be solved by HQL in hive.In the context of map/reduce, we have to write mapper(map method) and reducer (reduce method ) class.
In map method, process input file line by line, split the given input line and extract sex and salary. Write extracted sex and salary in context object. Output of mapper is key as sex(M or F) and value as salary list of each employee as
<M sal1, sal2 ,sal3 ,.....>
<F  sal1, sla2, sal3,.......>
In reduce method, salary list is iterated , total and average is computed. Total and average salary is written in context as Text with sex M or F.
Note:- In between map and reduce task, hadoop framework perform shuffle and sort based on key value. It can be verified by the output of this map/reduce program.In output file, record corresponding for M followed by for F (F come first in lexicographical order).

Sample Code:- 
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @author http://www.devinline.com
*/
public class AverageAndTotalSalaryCompute {
/*
 * data schema(tab separated) :-100 Steven King M SKING 515.123.4567
 * 17-JUN-03 AD_PRES 25798.9 90 Sex at position 4th and salary at 9th
 * position
 */
public static class MapperClass extends
  Mapper<LongWritable, Text, Text, FloatWritable> {
 public void map(LongWritable key, Text empRecord, Context con)
   throws IOException, InterruptedException {
  String[] word = empRecord.toString().split("\\t");
  String sex = word[3];
  try {
   Float salary = Float.parseFloat(word[8]);
   con.write(new Text(sex), new FloatWritable(salary));
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

public static class ReducerClass extends
  Reducer<Text, FloatWritable, Text, Text> {
 public void reduce(Text key, Iterable<FloatWritable> valueList,
   Context con) throws IOException, InterruptedException {
  try {
   Float total = (float) 0;
   int count = 0;
   for (FloatWritable var : valueList) {
    total += var.get();
    System.out.println("reducer " + var.get());
    count++;
   }
   Float avg = (Float) total / count;
   String out = "Total: " + total + " :: " + "Average: " + avg;
   con.write(key, new Text(out));
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

public static void main(String[] args) {
 Configuration conf = new Configuration();
 try {
  Job job = Job.getInstance(conf, "FindAverageAndTotalSalary");
  job.setJarByClass(AverageAndTotalSalaryCompute.class);
  job.setMapperClass(MapperClass.class);
  job.setReducerClass(ReducerClass.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(FloatWritable.class);
  // Path p1 = new Path(args[0]);
  // Path p2 = new Path(args[1]);
  // FileInputFormat.addInputPath(job, p1);
  // FileOutputFormat.setOutputPath(job, p2);
  Path pathInput = new Path(
    "hdfs://192.168.213.133:54310/user/hduser1/employee_records.txt");
  Path pathOutputDir = new Path(
    "hdfs://192.168.213.133:54310/user/hduser1/testfs/output_mapred00");
  FileInputFormat.addInputPath(job, pathInput);
  FileOutputFormat.setOutputPath(job, pathOutputDir);
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 } catch (IOException e) {
  e.printStackTrace();
 } catch (ClassNotFoundException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }

}
}

In main method, Job object is using input and output directory of HDFS so start hadoop services (<hadoop_home>/sbin/start-all.sh). Copy input file from local file system to HDFS and change input location accordingly or uncomment 4 commented lines in main method and pass input and output information of local file system(comment HDFS file references).
Execute above program unit(Right click -> Run -> Run as hadoop) and verify the output using following commands.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -cat /user/hduser1/testfs/output_mapred00/part-r-00000
F Total: 291800.0 :: Average: 7117.073
M Total: 424363.34 :: Average: 6333.7812
Notice the output, F record followed by M record due to intermediate shuffle and sort operation by hadoop framework between map and reduce operation. Change input file mark some of row with sex value as T and execute above sample program unit and verify the output. It will appear like in lexicographically sorted order.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -cat /user/hduser1/testfs/output_mapred00/part-r-00000
F Total: 282200.0 :: Average: 7055.0
M Total: 412063.34 :: Average: 6438.4897
T Total: 21900.0 :: Average: 5475.0

Location: Hyderabad, Telangana, India