Dec 17, 2015

Mapreduce: Analyse customer feedback stored in text file about mobile phone and separate out positive & negative feedback in separate files

Mapreduce, a data processing framework(engine), can be used to analyse various kinds of data (logs, feedbacks, sales details, etc) sources. In previous post, we analyses time-temperature statistics and generates report with max/min temperature for various cities. In this post we will analyse customer feedback/review comments, for various mobile phones, stored in text file and conclude that which can mobile can be be good buy.
Note:- Data used for sample program is fictitious, ONLY for educational purpose and it does not convey any message regarding good or bad of product. 
Problem statement:- Analyse text file storing customer feedback about various mobile phone from various vendor using mapreduce and separate out positive & negative comments in separate file corresponding to each mobile phone with price.And corresponding to each mobile set display total number of comments too.Download sample input file.

Input schema
:- <Mobile_set_detail><TAB><Price><TAB><Vendor><TAB><Comment>
Example:-  Lenovo A6000 Plus Rs. 7,499.00 Flipkart Satisfied with  phone.

Expected output:-
In positiveFeedback_file : <Mobile_detail><TAB><Comment_count><TAB><All_comments_separated by ||>
Apple Iphone 4s - 16 Gb:Rs. 12,243.00 Comments(2) Amazingly smooth and has a much better battery life. || good for style and long term uses. || 
 In negativeFeedback_file : <Mobile_detail><TAB><Comment_count><TAB><All_comments_separated by ||>
Lenovo VIBE P1m (Black, 16 GB):Rs. 7,999 Comments(3)  Poor service so do not buy. ||Poor service so do not buy. ||Do not prefer and not reccomend ||

As part of solving this use case we will learn about -
1. Life cycle of mapper and reduce class- setup - > map()/reduce() -> cleanup()
For each mapper/reducer task order of execution of these three method is same. setup() method provides an opportunity to alter/setup or modify input or supporting data for mapper or reducer class.
In cleanup method resources can be released.
2. MultipleOutputs - more than one reduce file can be generated using MultipleOutputs

Sample code for mapper, reducer and driver class

Mapper class :- In below mapper class, input file is read and map method is executed for each line. Parse the input line and write in context. Both key and value is of type Text.
/*
* Mapper executes setup for each task in sequence : setup - > map -> cleanup
*/
class ReviewMapperClass extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context) {
 try {
  String inputLine = value.toString();
  String feedback = inputLine.split("\\t")[3];
  String productId = inputLine.split("\\t")[0];
  String price = inputLine.split("\\t")[1];
  String mapperKey = productId + ":" + price;
  context.write(new Text(mapperKey), new Text(feedback));
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}

Reducer class:- In reducer class, setup() method creates positive and negative words list(based  on these words in comments positive ande negative comments is separated out).
In reduce method, list is iterated and positive/negative feedback pattern is matched against POS_QUALIFY_PATTERN which is created using wordList.get(0) which gives positive comment words and similarly, NEG_QUALIFY_PATTERN is created using wordList.get(0) which gives negative comment words.If match is found corresponding comment string(sbfPos/sbNeg) is updated with count.
Once for loop is terminated, both file(positiveReview and negativeReview) is updated with comments count and comment string.
/*
* Reducer executes on mapper output in sequence : setup - > map -> cleanup we
* have not overridden setup and cleanup.
*/
class ReviewReducerClass extends Reducer<Text, Text, Text, Text> {
MultipleOutputs<Text, Text> multiOutput;
List<String> wordList = new LinkedList<String>();

@Override
protected void setup(Context context) {
 multiOutput = new MultipleOutputs<Text, Text>(context);
 Configuration conf = context.getConfiguration();
 wordList.add(conf.get("positiveWords"));
 wordList.add(conf.get("negativeWords"));
}

@Override
public void reduce(Text key, Iterable<Text> feedbackList, Context con) {
 Matcher matcherQualifyPositive;
 Matcher matcherQualifyNegative;
 final String POS_QUALIFY_PATTERN = "(?)(.*)(" + wordList.get(0)
   + ")(.*)";
 final String NEG_QUALIFY_PATTERN = "(?)(.*)(" + wordList.get(1)
   + ")(.*)";
 Pattern posQualifyPattern = Pattern.compile(POS_QUALIFY_PATTERN,
   Pattern.CASE_INSENSITIVE);
 Pattern negQualifyPattern = Pattern.compile(NEG_QUALIFY_PATTERN,
   Pattern.CASE_INSENSITIVE);

 int countPos = 0;
 int countNeg = 0;
 try {
  StringBuffer sbfPos = new StringBuffer("");
  StringBuffer sbfNeg = new StringBuffer("");
  for (Text strVal : feedbackList) {
   matcherQualifyPositive = posQualifyPattern.matcher(strVal
     .toString());
   matcherQualifyNegative = negQualifyPattern.matcher(strVal
     .toString());
   if (matcherQualifyPositive.find()) {
    if (!matcherQualifyNegative.find()) {
     sbfPos.append(strVal).append(" || ");
     countPos++;
    }
   } else if (matcherQualifyNegative.find()) {
    sbfNeg.append(strVal).append("||");
    countNeg++;
   }
  }
  /* Write on both positive and negative feedback file */
  if (countPos != 0 && !sbfPos.equals("")) {
   multiOutput.write(PositiveAndNegativeReview.positiveReview,
   new Text(key.toString() + " Comments("+ countPos + ")"),
     new Text(sbfPos.toString()));
  }
  if (countNeg != 0 && !sbfNeg.equals("")) {
   multiOutput.write(PositiveAndNegativeReview.negativeReview,
   new Text(key.toString() + " Comments("+ countNeg + ")"),
     new Text(sbfNeg.toString()));
  }
  System.out.println(sbfNeg.toString());
  System.out.println(sbfPos.toString());
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}

@Override
protected void cleanup(Context context) {
 wordList = null;
 multiOutput = null;
}
}

Driver class
:-
public class PositiveAndNegativeReview {
public static String positiveReview = "positiveReview";
public static String negativeReview = "negativeReview";

/**
 * Uses of setUp and cleanup in Mapper and Reducer - 
 */
public static void main(String[] args) {
 final String POSITIVE_WORD = "good |satisfied |classic|class|happy |thanks |
  recommend |good to go|best |rocking |yo |fancy |stylish |must buy |
  amazing |smooth |awesome |damn good ";
 final String NEGATIVE_WORD = "not good |Do not |donot |poor |
  not satisfied |very poor|not happy |worst |
  not recommend |do noy buy|not-satisfied|waste |bad |
  false |not stylish |should not buy |not amazing |
  not smooth |wasted |damn bad ";

 Configuration conf = new Configuration();
 conf.set("positiveWords", POSITIVE_WORD);
 conf.set("negativeWords", NEGATIVE_WORD);
 try {
  Job job = Job.getInstance(conf, "Filer file with good feedback!!");
  job.setMapperClass(ReviewMapperClass.class);
  job.setReducerClass(ReviewReducerClass.class);
  job.setJarByClass(ReviewFilterForBestBuy.class);
  /*
   * Set below four property carefully otherwise job fails silently
   * after first context.write
   */
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  /* Optional, it's good to set */
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  /* Multiple output setting */
  MultipleOutputs.addNamedOutput(job, negativeReview,
    TextOutputFormat.class, Text.class, Text.class);
  MultipleOutputs.addNamedOutput(job, positiveReview,
    TextOutputFormat.class, Text.class, Text.class);

  Path pathInput = new Path(
  "hdfs://localhost:54310/user/hduser1/feedbackPosNeg.txt");
  Path pathOutputDir = new Path(
  "hdfs://localhost:54310/user/hduser1/testfs/output_dir_feedback");
  FileInputFormat.setInputPaths(job, pathInput);
  FileOutputFormat.setOutputPath(job, pathOutputDir);
  System.exit(job.waitForCompletion(true) ? 1 : 0);
 } catch (IOException e) {
  e.printStackTrace();
 } catch (ClassNotFoundException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}
Start hadoop services(./start-all.sh from sbin directory) and execute driver program. verify output directory - it should two files(negativeReview-r-00000 and positiveReview-r-00000).Download sample output file.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -cat /user/hduser1/testfs/output_dir_feedback/positiveReview-r-00000
Apple Iphone 4s - 16 Gb - Black:Rs. 12,617.00 Comments(2) Yo like it.  || Amazingly smooth and has a much better battery life. || 
Apple iPhone 5s 40 16GB 41:Rs. 38,269.00 Comments(1) Good phone.  || 
Lenovo A2010 (Black, 8 GB):Rs. 4,990 Comments(4) Very stylish and fancy.  || Very stylish and fancy.  || Good phone.  || Very good in low end.  || 

Location: Hyderabad, Telangana, India