Aug 8, 2015

Producer consumer problem using BlockingQueue

BlockingQueue was introduced in Java 5 with special feature of blocking property with take() and put() method. If queue is empty and on executing take() will block the current thread indefinitely until the operation can succeed(i.e : queue is filled up again at least with one element). Similarly, if queue is full and put(e) is executed by a thread then that thread will also be blocked until some space is vacant in queue.
Because of its blocking property, BlockingQueue finds its uses in solving producer-consumer problem. With respect to capacity, BlockingQueue is classified into two category: -
1. Bounded blockingQueue: A capacity bounded BlockingQueue is created with some initial size.
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
2. Unbounded blockingQueue:  Default size is Integer.MAX_VALUE.
    BlockingQueue<String> bQueue = new ArrayBlockingQueue<String>();

Below sample code shows how to use BlockingQueue to solve producer consumer problem.
package com.devinline.thread;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerUsingBlockingQueue {
 private static Random random = new Random();

 static class Producer implements Runnable {

  private final BlockingQueue<Integer> queue;

  Producer(BlockingQueue<Integer> q) {
   queue = q;
  }

  public void run() {
   try {
    while (true) {
     queue.put(produce());
     Thread.sleep(50);
    }
   } catch (InterruptedException ex) {
   }
  }

  int produce() {
   int retVal = random.nextInt();
   System.out.println("Thread " + Thread.currentThread().getName()
     + " produced " + retVal);
   return retVal;
  }
 }

 static class Consumer implements Runnable {
  private final BlockingQueue<Integer> queue;

  Consumer(BlockingQueue<Integer> q) {
   queue = q;
  }

  public void run() {
   try {
    while (true) {
     consume(queue.take());
     Thread.sleep(200);// Not required added to introduce delay in display
    }
   } catch (InterruptedException ex) {
   }
  }

  void consume(Object x) {
   System.out.println("Thread " + Thread.currentThread().getName()
     + " consumed " + x);
  }
 }

 public static void main(String[] args) {
  // create a bounded blocking queue with max capacity 3
  BlockingQueue<Integer> bondedQueue = new ArrayBlockingQueue<Integer>(3);
  Producer p = new Producer(bondedQueue);
  Consumer c1 = new Consumer(bondedQueue);
  Consumer c2 = new Consumer(bondedQueue);
  new Thread(p, "Producer-1").start();
  new Thread(c1, "consumer-1").start();
  new Thread(c2, "consumer-2").start();
 }
}
====== Sample output====
Thread Producer-1 produced -940622800
Thread consumer-1 consumed -940622800
Thread Producer-1 produced 1787661584
Thread consumer-2 consumed 1787661584
Thread Producer-1 produced 1405767574
Thread Producer-1 produced 1741766637
Thread Producer-1 produced -331991439
Thread consumer-1 consumed 1405767574
Thread Producer-1 produced -1515288115
Thread consumer-2 consumed 1741766637
........
=====================
Question:- What are other ways to solve producer consumer problem ?
1. Using synchronized keyword and wait()/notify() methods
2. Producer consumer problem with semaphore and mutex
3. Using java 5 Lock interface and Condition variable.
Location: Hyderabad, Telangana, India