Producer consumer problem - Using java 5 Lock interface and Condition variable

Java provides two ways of locking mechanism to deal with multiple threads accessing shard resources in concurrent environment.Implicit locking and explicit locking. Refer this for more detail about Explicit and implicit locking in Java. The main agenda of this post is to solve producer consumer problem using Java 5 Lock interface and Condition variable.
Lock:- Java provides a concrete implementation of Lock interface in form of class ReentrantLock and instance of it is used to take a hold before entering into critical section by every thread. Read custom implementation of Lock interface.
Condition variables:-  These are instance of java.util.concurrent.locks.Condition class, which provides inter thread communication methods similar to wait, notify and notifyAll e.g. await(), signal() and signalAll(). 
Below sample program shows how does tow condition variables are used for inter thread communication- once a thread(lets say producer) finds queue is not empty then this thread wait till one space is vacant in queue. Similarly, another thread(consumer) finds queue is empty, it waits till queue is filled and it communicates to waiting producer thread via signal()/signalAll().
package com.dev.thread;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerUsingLockAndCondition {
 //Shared resources used by all threads in the system 
 static class SharedResource {
  private static Queue<Integer> queue;
  private static int MAX_QUEUE_SIZE;
  private static Random random;

  public SharedResource() {
   queue = new LinkedList<Integer>();
   random = new Random();
   MAX_QUEUE_SIZE = 10;
  }
 }

 static class ProducerConsumerImplementation {
  // create lock instance followed by condition variable
  private final Lock lock = new ReentrantLock();
  private final Condition bufferFull = lock.newCondition();
  private final Condition bufferEmpty = lock.newCondition();

  public void put() {
   try {
    lock.lock(); // Acquire lock and block other threads
    while (SharedResource.queue.size() == SharedResource.MAX_QUEUE_SIZE) {
     System.out.println("Size of buffer is "
       + SharedResource.queue.size());
     bufferFull.await(); // wait till buffer is full, no place to
          // store
    }// while close
    int nextval = (Integer) SharedResource.random.nextInt();
    boolean status = (Boolean) SharedResource.queue.offer(nextval);
    if (status) {
     System.out.println("Thread "
       + Thread.currentThread().getName()
       + " added value " + nextval + "in queue ");
     bufferEmpty.signalAll();// similar to notifyAll -
           // communicate waiting thread
           // that queue is not empty now

    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   } finally {
    lock.unlock();// Release lock
   }
  }

  public void get() {
   Integer returnVal = Integer.MIN_VALUE;
   try {
    lock.lock();// aquire lock
    while (SharedResource.queue.size() == 0) {
     System.out
       .println("No element in Buffer, wait at least one element is available");
     bufferEmpty.await();
    }
    System.out.println("Size of buffer is "
      + SharedResource.queue.size());
    returnVal = (Integer) SharedResource.queue.poll();
    if (returnVal != null) {
     System.out.println("Thread "
       + Thread.currentThread().getName()
       + " consumed value " + returnVal + " in queue ");

     bufferFull.signalAll();
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   } finally {
    lock.unlock();// release lock
   }
  }
 }

 static class Producer implements Runnable {
  SharedResource sharedObj;
  ProducerConsumerImplementation pci;

  public Producer(SharedResource sharedObj,
    ProducerConsumerImplementation pci) {
   this.sharedObj = sharedObj;
   this.pci = pci;
  }

  public void run() {
   int i = 0;
   while (true) {
    pci.put();
    i++;
   }
  }
 }

 static class Consumer implements Runnable {
  SharedResource sharedObj;
  ProducerConsumerImplementation pci;

  public Consumer(SharedResource sharedObj,
    ProducerConsumerImplementation pci) {
   this.sharedObj = sharedObj;
   this.pci = pci;
  }

  public void run() {
   int i = 0;
   while (true) {
    pci.get();
    i++;
   }
  }
 }

 public static void main(String[] args) {
  SharedResource sharedObj = new SharedResource();
  ProducerConsumerImplementation pci = new ProducerConsumerImplementation();
  Thread tp1 = new Thread(new Producer(sharedObj, pci), "producer-1");
  Thread tc1 = new Thread(new Consumer(sharedObj, pci), "consumer-1");
  // Thread tp2 = new Thread(new Producer(sharedObj, pci), "producer-2");
  // Thread tc2 = new Thread(new Consumer(sharedObj, pci), "consumer-2");
  tc1.start();
  tp1.start();
  // tc2.start();
  // tp2.start();
 }
}
Sample Output:-
========
No element in Buffer, wait at least one element is available
Thread producer-1 added value -1270291669in queue
Thread producer-1 added value -614340306in queue
Thread producer-1 added value -1628691633in queue
Thread producer-1 added value 1590227267in queue
Thread producer-1 added value -843619989in queue
Thread producer-1 added value 567297268in queue
Thread producer-1 added value -532278952in queue
Thread producer-1 added value -1183713861in queue
Thread producer-1 added value -875729542in queue
Thread producer-1 added value 1805541117in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value -1270291669 in queue
Thread producer-1 added value -1255541427in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value -614340306 in queue
Thread producer-1 added value 1312484504in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value -1628691633 in queue
Thread producer-1 added value 1749418542in queue
Size of buffer is 10
Size of buffer is 10
Thread consumer-1 consumed value 1590227267 in queue
Thread producer-1 added value -572591107in queue
Size of buffer is 10
......
==========
Question:- What are other ways to solve producer consumer problem ?
1. Using synchronized keyword and wait()/notify() methods
2. Using concurrent queue interface BlockingQueue.
3. Producer consumer problem with semaphore and mutex.

2 Comments

Previous Post Next Post