Dec 31, 2015

Oracle database : Can't create table from package - ORA-01031: insufficient privileges

A database user having DBA role not able to create a table from package, it is hard to believe first but this is how it works.
If a procedure (or a procedure inside a package) is executed roles are disabled automatically and so all grants associated with the role. If role is disabled, CREATE TABLE privilege is revoked too that's why it will throw insufficient privilege error. 
Consider the following example,
Package specification:-
create or replace package pl_sql_examples as 
  Procedure create_table(v_in IN  varchar2);
end pl_sql_examples;
Package body:-
create or replace PACKAGE BODY PL_SQL_EXAMPLES AS

  Procedure create_table(v_in IN varchar2) AS
  create_tbl_stmt varchar2(32767);
  BEGIN
    create_tbl_stmt := 'CREATE TABLE customers_dump( customer_id number(10) NOT NULL,
                            customer_name varchar2(50) NOT NULL,city varchar2(50))';
    execute immediate create_tbl_stmt;
  END create_table;

END PL_SQL_EXAMPLES;
Now execute above procedure from hr user (HR User has DBA role assigned) from sql developer/sql plus client :
execute PL_SQL_EXAMPLES.create_table('dummy');
Error starting at line : 1 in command -
execute PL_SQL_EXAMPLES.create_table('111') Error report -
ORA-01031: insufficient privileges
ORA-06512: at "HR.PL_SQL_EXAMPLES", line 8
ORA-06512: at line 1
01031. 00000 -  "insufficient privileges"
*Cause:    An attempt was made to perform a database operation without
           the necessary privileges.
*Action:   Ask your database administrator or designated security
           administrator to grant you the necessary privileges

Error states that, this user does not have sufficient privilege to create table.(It's strange, Hr user is DBA).Reason behind is- If a procedure (or a procedure inside a package) is executed roles are disabled automatically and so all grants associated with the role (i.e CREATE TABLE privilege is missing for hr since it is given through role)

Solution:- GRANT CREATE TABLE privilege to hr manually instead of through role.Execute following command from system user context
GRANT CREATE ANY TABLE TO HR;

Now execute the above procedure to create table, it should create table customers_dump successfully.
execute PL_SQL_EXAMPLES.create_table('dummy');
anonymous block completed

Note:- It is not recommended to create table from package, if possible it is suggested to avoid it. Sometimes it is inevitable.

Dec 28, 2015

Producer consumer problem - Using java 5 Lock interface and Condition variable

Java provides two ways of locking mechanism to deal with multiple threads accessing shard resources in concurrent environment.Implicit locking and explicit locking. Refer this for more detail about Explicit and implicit locking in Java. The main agenda of this post is to solve producer consumer problem using Java 5 Lock interface and Condition variable.
Lock:- Java provides a concrete implementation of Lock interface in form of class ReentrantLock and instance of it is used to take a hold before entering into critical section by every thread. Read custom implementation of Lock interface.
Condition variables:-  These are instance of java.util.concurrent.locks.Condition class, which provides inter thread communication methods similar to wait, notify and notifyAll e.g. await(), signal() and signalAll(). 
Below sample program shows how does tow condition variables are used for inter thread communication- once a thread(lets say producer) finds queue is not empty then this thread wait till one space is vacant in queue. Similarly, another thread(consumer) finds queue is empty, it waits till queue is filled and it communicates to waiting producer thread via signal()/signalAll().
package com.dev.thread;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerUsingLockAndCondition {
 //Shared resources used by all threads in the system 
 static class SharedResource {
  private static Queue<Integer> queue;
  private static int MAX_QUEUE_SIZE;
  private static Random random;

  public SharedResource() {
   queue = new LinkedList<Integer>();
   random = new Random();
   MAX_QUEUE_SIZE = 10;
  }
 }

 static class ProducerConsumerImplementation {
  // create lock instance followed by condition variable
  private final Lock lock = new ReentrantLock();
  private final Condition bufferFull = lock.newCondition();
  private final Condition bufferEmpty = lock.newCondition();

  public void put() {
   try {
    lock.lock(); // Acquire lock and block other threads
    while (SharedResource.queue.size() == SharedResource.MAX_QUEUE_SIZE) {
     System.out.println("Size of buffer is "
       + SharedResource.queue.size());
     bufferFull.await(); // wait till buffer is full, no place to
          // store
    }// while close
    int nextval = (Integer) SharedResource.random.nextInt();
    boolean status = (Boolean) SharedResource.queue.offer(nextval);
    if (status) {
     System.out.println("Thread "
       + Thread.currentThread().getName()
       + " added value " + nextval + "in queue ");
     bufferEmpty.signalAll();// similar to notifyAll -
           // communicate waiting thread
           // that queue is not empty now

    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   } finally {
    lock.unlock();// Release lock
   }
  }

  public void get() {
   Integer returnVal = Integer.MIN_VALUE;
   try {
    lock.lock();// aquire lock
    while (SharedResource.queue.size() == 0) {
     System.out
       .println("No element in Buffer, wait at least one element is available");
     bufferEmpty.await();
    }
    System.out.println("Size of buffer is "
      + SharedResource.queue.size());
    returnVal = (Integer) SharedResource.queue.poll();
    if (returnVal != null) {
     System.out.println("Thread "
       + Thread.currentThread().getName()
       + " consumed value " + returnVal + " in queue ");

     bufferFull.signalAll();
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   } finally {
    lock.unlock();// release lock
   }
  }
 }

 static class Producer implements Runnable {
  SharedResource sharedObj;
  ProducerConsumerImplementation pci;

  public Producer(SharedResource sharedObj,
    ProducerConsumerImplementation pci) {
   this.sharedObj = sharedObj;
   this.pci = pci;
  }

  public void run() {
   int i = 0;
   while (true) {
    pci.put();
    i++;
   }
  }
 }

 static class Consumer implements Runnable {
  SharedResource sharedObj;
  ProducerConsumerImplementation pci;

  public Consumer(SharedResource sharedObj,
    ProducerConsumerImplementation pci) {
   this.sharedObj = sharedObj;
   this.pci = pci;
  }

  public void run() {
   int i = 0;
   while (true) {
    pci.get();
    i++;
   }
  }
 }

 public static void main(String[] args) {
  SharedResource sharedObj = new SharedResource();
  ProducerConsumerImplementation pci = new ProducerConsumerImplementation();
  Thread tp1 = new Thread(new Producer(sharedObj, pci), "producer-1");
  Thread tc1 = new Thread(new Consumer(sharedObj, pci), "consumer-1");
  // Thread tp2 = new Thread(new Producer(sharedObj, pci), "producer-2");
  // Thread tc2 = new Thread(new Consumer(sharedObj, pci), "consumer-2");
  tc1.start();
  tp1.start();
  // tc2.start();
  // tp2.start();
 }
}
Sample Output:-
========
No element in Buffer, wait at least one element is available
Thread producer-1 added value -1270291669in queue
Thread producer-1 added value -614340306in queue
Thread producer-1 added value -1628691633in queue
Thread producer-1 added value 1590227267in queue
Thread producer-1 added value -843619989in queue
Thread producer-1 added value 567297268in queue
Thread producer-1 added value -532278952in queue
Thread producer-1 added value -1183713861in queue
Thread producer-1 added value -875729542in queue
Thread producer-1 added value 1805541117in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value -1270291669 in queue
Thread producer-1 added value -1255541427in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value -614340306 in queue
Thread producer-1 added value 1312484504in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value -1628691633 in queue
Thread producer-1 added value 1749418542in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value 1590227267 in queue
Thread producer-1 added value -572591107in queue
Size of buffer is 10
......
==========
Question:- What are other ways to solve producer consumer problem ?
1. Using synchronized keyword and wait()/notify() methods
2. Using concurrent queue interface BlockingQueue.
3. Producer consumer problem with semaphore and mutex.

Dec 17, 2015

Mapreduce: Analyse customer feedback stored in text file about mobile phone and separate out positive & negative feedback in separate files

Mapreduce, a data processing framework(engine), can be used to analyse various kinds of data (logs, feedbacks, sales details, etc) sources. In previous post, we analyses time-temperature statistics and generates report with max/min temperature for various cities. In this post we will analyse customer feedback/review comments, for various mobile phones, stored in text file and conclude that which mobile can be be good buy.
Note:- Data used for sample program is fictitious, ONLY for educational purpose and it does not convey any message regarding good or bad of product. 
Problem statement:- Analyse text file storing customer feedback about various mobile phone from various vendor using mapreduce and separate out positive & negative comments in separate file corresponding to each mobile phone with price.And corresponding to each mobile set display total number of comments too.Download sample input file.

Input schema
:- <Mobile_set_detail><TAB><Price><TAB><Vendor><TAB><Comment>
Example:-  Lenovo A6000 Plus Rs. 7,499.00 Flipkart Satisfied with  phone.

Expected output:-
In positiveFeedback_file : <Mobile_detail><TAB><Comment_count><TAB><All_comments_separated by ||>
Apple Iphone 4s - 16 Gb:Rs. 12,243.00 Comments(2) Amazingly smooth and has a much better battery life. || good for style and long term uses. || 
 In negativeFeedback_file : <Mobile_detail><TAB><Comment_count><TAB><All_comments_separated by ||>
Lenovo VIBE P1m (Black, 16 GB):Rs. 7,999 Comments(3)  Poor service so do not buy. ||Poor service so do not buy. ||Do not prefer and not reccomend ||

As part of solving this use case we will learn about -
1. Life cycle of mapper and reduce class- setup - > map()/reduce() -> cleanup()
For each mapper/reducer task order of execution of these three method is same. setup() method provides an opportunity to alter/setup or modify input or supporting data for mapper or reducer class.
In cleanup method resources can be released.
2. MultipleOutputs - more than one reduce file can be generated using MultipleOutputs

Sample code for mapper, reducer and driver class

Mapper class :- In below mapper class, input file is read and map method is executed for each line. Parse the input line and write in context. Both key and value is of type Text.
/*
* Mapper executes setup for each task in sequence : setup - > map -> cleanup
*/
class ReviewMapperClass extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context) {
 try {
  String inputLine = value.toString();
  String feedback = inputLine.split("\\t")[3];
  String productId = inputLine.split("\\t")[0];
  String price = inputLine.split("\\t")[1];
  String mapperKey = productId + ":" + price;
  context.write(new Text(mapperKey), new Text(feedback));
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}

Reducer class:- In reducer class, setup() method creates positive and negative words list(based  on these words in comments positive ande negative comments is separated out).
In reduce method, list is iterated and positive/negative feedback pattern is matched against POS_QUALIFY_PATTERN which is created using wordList.get(0) which gives positive comment words and similarly, NEG_QUALIFY_PATTERN is created using wordList.get(0) which gives negative comment words.If match is found corresponding comment string(sbfPos/sbNeg) is updated with count.
Once for loop is terminated, both file(positiveReview and negativeReview) is updated with comments count and comment string.
/*
* Reducer executes on mapper output in sequence : setup - > map -> cleanup we
* have not overridden setup and cleanup.
*/
class ReviewReducerClass extends Reducer<Text, Text, Text, Text> {
MultipleOutputs<Text, Text> multiOutput;
List<String> wordList = new LinkedList<String>();

@Override
protected void setup(Context context) {
 multiOutput = new MultipleOutputs<Text, Text>(context);
 Configuration conf = context.getConfiguration();
 wordList.add(conf.get("positiveWords"));
 wordList.add(conf.get("negativeWords"));
}

@Override
public void reduce(Text key, Iterable<Text> feedbackList, Context con) {
 Matcher matcherQualifyPositive;
 Matcher matcherQualifyNegative;
 final String POS_QUALIFY_PATTERN = "(?)(.*)(" + wordList.get(0)
   + ")(.*)";
 final String NEG_QUALIFY_PATTERN = "(?)(.*)(" + wordList.get(1)
   + ")(.*)";
 Pattern posQualifyPattern = Pattern.compile(POS_QUALIFY_PATTERN,
   Pattern.CASE_INSENSITIVE);
 Pattern negQualifyPattern = Pattern.compile(NEG_QUALIFY_PATTERN,
   Pattern.CASE_INSENSITIVE);

 int countPos = 0;
 int countNeg = 0;
 try {
  StringBuffer sbfPos = new StringBuffer("");
  StringBuffer sbfNeg = new StringBuffer("");
  for (Text strVal : feedbackList) {
   matcherQualifyPositive = posQualifyPattern.matcher(strVal
     .toString());
   matcherQualifyNegative = negQualifyPattern.matcher(strVal
     .toString());
   if (matcherQualifyPositive.find()) {
    if (!matcherQualifyNegative.find()) {
     sbfPos.append(strVal).append(" || ");
     countPos++;
    }
   } else if (matcherQualifyNegative.find()) {
    sbfNeg.append(strVal).append("||");
    countNeg++;
   }
  }
  /* Write on both positive and negative feedback file */
  if (countPos != 0 && !sbfPos.equals("")) {
   multiOutput.write(PositiveAndNegativeReview.positiveReview,
   new Text(key.toString() + " Comments("+ countPos + ")"),
     new Text(sbfPos.toString()));
  }
  if (countNeg != 0 && !sbfNeg.equals("")) {
   multiOutput.write(PositiveAndNegativeReview.negativeReview,
   new Text(key.toString() + " Comments("+ countNeg + ")"),
     new Text(sbfNeg.toString()));
  }
  System.out.println(sbfNeg.toString());
  System.out.println(sbfPos.toString());
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}

@Override
protected void cleanup(Context context) {
 wordList = null;
 multiOutput = null;
}
}

Driver class
:-
public class PositiveAndNegativeReview {
public static String positiveReview = "positiveReview";
public static String negativeReview = "negativeReview";

/**
 * Uses of setUp and cleanup in Mapper and Reducer - 
 */
public static void main(String[] args) {
 final String POSITIVE_WORD = "good |satisfied |classic|class|happy |thanks |
  recommend |good to go|best |rocking |yo |fancy |stylish |must buy |
  amazing |smooth |awesome |damn good ";
 final String NEGATIVE_WORD = "not good |Do not |donot |poor |
  not satisfied |very poor|not happy |worst |
  not recommend |do noy buy|not-satisfied|waste |bad |
  false |not stylish |should not buy |not amazing |
  not smooth |wasted |damn bad ";

 Configuration conf = new Configuration();
 conf.set("positiveWords", POSITIVE_WORD);
 conf.set("negativeWords", NEGATIVE_WORD);
 try {
  Job job = Job.getInstance(conf, "Filer file with good feedback!!");
  job.setMapperClass(ReviewMapperClass.class);
  job.setReducerClass(ReviewReducerClass.class);
  job.setJarByClass(ReviewFilterForBestBuy.class);
  /*
   * Set below four property carefully otherwise job fails silently
   * after first context.write
   */
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  /* Optional, it's good to set */
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  /* Multiple output setting */
  MultipleOutputs.addNamedOutput(job, negativeReview,
    TextOutputFormat.class, Text.class, Text.class);
  MultipleOutputs.addNamedOutput(job, positiveReview,
    TextOutputFormat.class, Text.class, Text.class);

  Path pathInput = new Path(
  "hdfs://localhost:54310/user/hduser1/feedbackPosNeg.txt");
  Path pathOutputDir = new Path(
  "hdfs://localhost:54310/user/hduser1/testfs/output_dir_feedback");
  FileInputFormat.setInputPaths(job, pathInput);
  FileOutputFormat.setOutputPath(job, pathOutputDir);
  System.exit(job.waitForCompletion(true) ? 1 : 0);
 } catch (IOException e) {
  e.printStackTrace();
 } catch (ClassNotFoundException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}
Start hadoop services(./start-all.sh from sbin directory) and execute driver program. verify output directory - it should two files(negativeReview-r-00000 and positiveReview-r-00000).Download sample output file.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -cat /user/hduser1/testfs/output_dir_feedback/positiveReview-r-00000
Apple Iphone 4s - 16 Gb - Black:Rs. 12,617.00 Comments(2) Yo like it.  || Amazingly smooth and has a much better battery life. || 
Apple iPhone 5s 40 16GB 41:Rs. 38,269.00 Comments(1) Good phone.  || 
Lenovo A2010 (Black, 8 GB):Rs. 4,990 Comments(4) Very stylish and fancy.  || Very stylish and fancy.  || Good phone.  || Very good in low end.  || 

Dec 13, 2015

Textual description of firstImageUrl

Apache spark Setup in windows 7 - standalone mode

Apache Spark is a general-purpose cluster computing system to process big data workloads. It is very possible to use spark with Hadoop HDFS, Amazon EC2 and others persistence storage system including local file system. For leaning Apache spark, it is very possible to setup it in standalone mode and start executing spark API's in Scala,Python or R shell. In this post we will setup spark and execute some sparks API's.

Download Apache spark:-
Download pre-build version of Apache spark and unzip it in some directory. I have placed it in following location E:\spark-1.5.2-bin-hadoop2.6.
Note:- It is also possible to download source code and build using Maven or SBT.Refer this for other options of download.

Download and install Scala:-
Download Scala executables and install it.It is prerequisite for working with Apache spark, spark is written in Scala. Scala installed at "C:\Program Files (x86)\scala".

Set-up SCALA_HOME and HADOOP_HOME :-
Once we are done with the installation of Spark and Scala, configure environment variable for SCALA_HOME and HADOOP_HOME.
SCALA_HOME  =  C:\Program Files (x86)\scala

As of now we do not want to stick with Hadoop ,we just want to learn Apache spark. So we need to download winutils.exe and configure it as HAOOP_HOME.Unzip it and add path before bin directory as HADOOP_HOME.
HADOOP_HOME = E:\dev\hadoop\hadoop-common-2.2.0-bin-master

Update PATH environment variable :- 
Add Spark bin directory in PATH environment variable so that Scala or python shell can be started without visiting bin directory every time.

Start Spark’s shells(Scala or Python version) :- 
Python version :  pyspark
Scala version :     spark-shell 
Start cmd,type pyspark and press enter. If we have followed steps properly, it should open Python version of the Spark shell and as shown below.

Similarly, we can start Scala version of the Spark shell by typing spark-shell and press enter in cmd.
Note:- Here we will get some error on console regarding hive directory write permission, we can ignore it,we can start executing spark API's and learn Apache spark.

Sample API's execution in python or scala shell :-
Create a RDD, display total number of lines in file and followed by first line of that file.
In Python version of the Spark shell
>>> lines = sc.textFile("README.md") # Create an RDD called lines
>>> lines.count() # Count the number of items in this RDD
98
>>> lines.first() # First item in this RDD, i.e. first line of README.md
u'# Apache Spark'

Note:- If you execute the same set of commands, console will be flooded with lines. I have suppressed it by changing log level to warning.

Join operation in MapReduce - Join two files(one in HDFS and other one is cached)

Join is very commonly used operation in relational add non-relational databases.It gives flexibility to use different result set and obtain some other meaningful results.In this post we will understand how to use Distributed cache in hadoop and write sample code for performing join operation on records present in two different locations. Download input files- employee_record and department_record.

Distributed cache:- 
DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications.Once we cache a file for our job, hadoop framework will make it available on each and every data nodes in file system where our map/reduce tasks are running.Files are only copied once per job.Distributed cached files are accessible to all mapper and reducer.
Distributed cache has been deprecated in hadoop 2,x (from 2.2 onwards). So, what is other alternative of distributed cache in hadoop 2.x. In hadoop2.x caching facility has been moved to Job class and Job internally calls Distributed cache to add resources.Below code line is from Job class:
/**
* Add a file to be localized
* @param uri The uri of the cache to be localized
*/
  public void addCacheFile(URI uri) {
    ensureState(JobState.DEFINE);
    DistributedCache.addCacheFile(uri, conf);
  }
How to cache resources with Job and access it in mapper/reducer:- .
In Driver class 
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.addCacheFile(new URI("/DIR/file.text"));
For accessing cachedFile by mapper/reducer
URI cachedFileURI = context.getCacheFiles()[0];
Path path = new Path(cachedFileURI.toString());

Sample code for Mapper, Reducer and Driver class

Mapper class:-
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> deptInputMap = new HashMap<String, String>();

/* This method is executed for each mapper task before map method */
protected void setup(Context context) throws IOException,
  InterruptedException {
 Path path = null;
 try {
  if (context.getCacheFiles() != null
    && context.getCacheFiles().length > 0) {
   URI cachedFileURI = context.getCacheFiles()[0];
   if (cachedFileURI != null) {
    System.out.println("Mapping File: "
      + cachedFileURI.toString());
    path = new Path(cachedFileURI.toString());
    BufferedReader br = new BufferedReader(
      new InputStreamReader(new FileInputStream(
        path.toString())));
    String inputLine = "";
    while ((inputLine = br.readLine()) != null) {
     String[] deptParts = inputLine.split("\\t");
     String deptId = deptParts[0];
     String dpetName = deptParts[1];
     String deptLocation = deptParts[3];
     deptInputMap.put(deptId, dpetName + " " + deptLocation);
    }
    br.close();
   } else {
    System.out.println("No mapping file exist!!");
   }
  } else {
   System.out.println("No cached file exist!!");
  }
 } catch (Exception e) {
  e.printStackTrace();
 }

}

/*
 * get dept detail for each deptid from hashmap and append with emp record,
 * write in context
 */
public void map(LongWritable key, Text value, Context context)
  throws IOException, InterruptedException {
 try {
  String[] empInputs = value.toString().split("\\t");
  String deptDetail = deptInputMap.get(empInputs[4]);
  String empDeptJoinDetail = value + " " + deptDetail;
  System.out.println(deptDetail);
  context.write(new Text(empDeptJoinDetail), NullWritable.get());
 } catch (Exception e) {

 }
}
}

Reducer class:
-
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends
 Reducer<Text, NullWritable, Text, NullWritable> {
@Override
public void reduce(Text key, Iterable<NullWritable> values, Context con) {
 try {
  con.write(key, NullWritable.get());
 } catch (IOException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}
}

Driver class
:-
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
* @author http://www.devinline.com
*/
public class JoinInHadoopSampleExample {

public static void main(String[] args) {
 Configuration conf = new Configuration();
 Job job;
 try {
  job = Job.getInstance(conf,
    "join operation- Employee and Dept files");
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);

  job.setMapperClass(JoinMapper.class);
  job.setReducerClass(JoinReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job,new Path(
   "hdfs://localhost:54310/user/hduser1/employee_records.txt"));
  try {
   job.addCacheFile(new URI(args[0]));//Cached file passed as program argument
  } catch (URISyntaxException e) {
   e.printStackTrace();
  }
  Path pathOutputDir = new Path(
    "hdfs://localhost:54310/user/hduser1/testfs/output_join1");
  FileOutputFormat.setOutputPath(job, pathOutputDir);
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 } catch (IOException e) {
  e.printStackTrace();
 } catch (ClassNotFoundException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }

}

}
Run hadoop services, place input file(employee_record) in hdfs and pass cached file(department_record) via program argument. Execute above driver class and verify that department detail is appended in output file.
hduser1@ubuntu:/usr/local/hadoop2.6.1/bin$ ./hadoop fs -tail /user/hduser1/testfs/output_join1/part-r-00000
198 Donald OConnell DOCONNEL 50 650.507.9833 21-JUN-07 SH_CLERK 2600  124 Shipping 1500
199 Douglas Grant DGRANT 50 650.507.9844 13-JAN-08 SH_CLERK 2600  124 Shipping 1500
200 Jennifer Whalen JWHALEN 10 515.123.4444 17-SEP-03 AD_ASST4400  101 Administration 1700
201 Michael Hartstein MHARTSTE 20 515.123.5555 17-FEB-04 MK_MAN 13000  100 Marketing 1800
202 Pat Fay PFAY 20 603.123.6666 17-AUG-05 MK_REP 6000 201 Marketing 1800
203 Susan Mavris SMAVRIS 40 515.123.7777 07-JUN-02 HR_REP 6500 101 Human Resources 2400
204 Hermann Baer HBAER 70 515.123.8888 07-JUN-02 PR_REP 10000 101 Public Relations 2700