Input data are in text file as tab separated. Schema of input data is - sex at position 4th and salary at 9th position. Download sample input file.
100 Steven King M SKING 515.123.4567 17-JUN-03 AD_PRES 25798.9 90
Expected output:-
F Total: 291800.0 :: Average: 7117.073
M Total: 424363.34 :: Average: 6333.7812
We can think of this problem in terms of database SQL query as "SELECT SUM(SALARY), AVG(SALARY) FROM EMPLOYEES1 GROUP BY SEX" and same can be solved by HQL in hive.In the context of map/reduce, we have to write mapper(map method) and reducer (reduce method ) class.
In map method, process input file line by line, split the given input line and extract sex and salary. Write extracted sex and salary in context object. Output of mapper is key as sex(M or F) and value as salary list of each employee as
<M sal1, sal2 ,sal3 ,.....>
<F sal1, sla2, sal3,.......>
In reduce method, salary list is iterated , total and average is computed. Total and average salary is written in context as Text with sex M or F.
Note:- In between map and reduce task, hadoop framework perform shuffle and sort based on key value. It can be verified by the output of this map/reduce program.In output file, record corresponding for M followed by for F (F come first in lexicographical order).
Sample Code:-
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author http://www.devinline.com */ public class AverageAndTotalSalaryCompute { /* * data schema(tab separated) :-100 Steven King M SKING 515.123.4567 * 17-JUN-03 AD_PRES 25798.9 90 Sex at position 4th and salary at 9th * position */ public static class MapperClass extends Mapper<LongWritable, Text, Text, FloatWritable> { public void map(LongWritable key, Text empRecord, Context con) throws IOException, InterruptedException { String[] word = empRecord.toString().split("\\t"); String sex = word[3]; try { Float salary = Float.parseFloat(word[8]); con.write(new Text(sex), new FloatWritable(salary)); } catch (Exception e) { e.printStackTrace(); } } } public static class ReducerClass extends Reducer<Text, FloatWritable, Text, Text> { public void reduce(Text key, Iterable<FloatWritable> valueList, Context con) throws IOException, InterruptedException { try { Float total = (float) 0; int count = 0; for (FloatWritable var : valueList) { total += var.get(); System.out.println("reducer " + var.get()); count++; } Float avg = (Float) total / count; String out = "Total: " + total + " :: " + "Average: " + avg; con.write(key, new Text(out)); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf, "FindAverageAndTotalSalary"); job.setJarByClass(AverageAndTotalSalaryCompute.class); job.setMapperClass(MapperClass.class); job.setReducerClass(ReducerClass.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); // Path p1 = new Path(args[0]); // Path p2 = new Path(args[1]); // FileInputFormat.addInputPath(job, p1); // FileOutputFormat.setOutputPath(job, p2); Path pathInput = new Path( "hdfs://192.168.213.133:54310/user/hduser1/employee_records.txt"); Path pathOutputDir = new Path( "hdfs://192.168.213.133:54310/user/hduser1/testfs/output_mapred00"); FileInputFormat.addInputPath(job, pathInput); FileOutputFormat.setOutputPath(job, pathOutputDir); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
In main method, Job object is using input and output directory of HDFS so start hadoop services (<hadoop_home>/sbin/start-all.sh). Copy input file from local file system to HDFS and change input location accordingly or uncomment 4 commented lines in main method and pass input and output information of local file system(comment HDFS file references).
Execute above program unit(Right click -> Run -> Run as hadoop) and verify the output using following commands.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -cat /user/hduser1/testfs/output_mapred00/part-r-00000
F Total: 291800.0 :: Average: 7117.073
M Total: 424363.34 :: Average: 6333.7812
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -cat /user/hduser1/testfs/output_mapred00/part-r-00000
F Total: 282200.0 :: Average: 7055.0
M Total: 412063.34 :: Average: 6438.4897
T Total: 21900.0 :: Average: 5475.0
The given information was excellent and useful. This is one of the excellent blog, I have come across. Do share more.
ReplyDeleteR Training in Chennai
R Programming Training in Chennai
Data Science Course in Chennai
Data Science Training in Chennai
Data Science Training in Anna Nagar
Machine Learning Course in Chennai
Machine Learning Training in Chennai
R Programming Training in Chennai
Big data is a term that describes the large volume of data – both structured and unstructured – that inundates a business on a day-to-day basis. IEEE Projects for CSE in Big Data But it’s not the amount of data that’s important. Project Centers in Chennai It’s what organizations do with the data that matters. Big data can be analyzed for insights that lead to better decisions and strategic business moves.
ReplyDeleteSpring Framework has already made serious inroads as an integrated technology stack for building user-facing applications. Corporate TRaining Spring Framework the authors explore the idea of using Java in Big Data platforms.
Specifically, Spring Framework provides various tasks are geared around preparing data for further analysis and visualization. Spring Training in Chennai