Producer consumer problem using wait() and Notify() - multiple producer and multiple consumer

Producer consumer problem:- Two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.(Source : Wikipedia)
It is also termed as "bounded-buffer problem".

Producer consumer problem can be solved in various ways like using Blocking queue, using Java 5 concurrency Lock interface and condition class and using semaphore & mutex. In this post we will see how to solve this bounded-buffer problem using wait()/notify() and synchronisation block.

synchronisation block in Java is designed to allows only one thread/process to enter into critical section(or execute code lines which might cause inconsistency if multiple thread to execute it at same time).Every object in java has an implicit monitor - it means only one thread is allowed to use that particular object in critical section
wait()/notify() -They are typically used to wait for some other thread to accomplish a task, or to wait until a certain condition is satisfied.Consider a system with two threads and a shared resource.When any thread say thread-1 executes code lines with synchronisation block with an object say sharedObject as an argument then that thread acquires monitor of that object and no other thread can enter inside synchronisation block with same monitor, unless it is release by thread-1.
wait() relinquishes monitor and wait for other threads to complete its task and gives sigle of completion using notify() or notifyAll() method.Following diagram summarises above discussion.

Producer consumer problem solution using - wait()/notify()/notifyAll()
package com.dev.thread;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;

/**
 * Producer Consumer problem using wait and notify method in Java.
 * by devinline.com
 */
public class ProducerConsumerExample {
 static final int MAX_QUEUE_SIZE = 10;

 static class ProducerConsumerImplementaion {
  private static Queue<Integer> sharedQueue;
  private static Random random;

  public ProducerConsumerImplementaion() {
   sharedQueue = new LinkedList<Integer>();
   random = new Random();
  }

  private void produce() {
   // wait if queue is full
   while (sharedQueue.size() == ProducerConsumerExample.MAX_QUEUE_SIZE) {
    synchronized (sharedQueue) {
     System.out.println("Queue is full "
       + Thread.currentThread().getName()
       + " is waiting , size: " + sharedQueue.size());
     try {
      sharedQueue.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }

   // producing element and notify consumers
   synchronized (sharedQueue) {
    int nextval = (Integer) random.nextInt();
    boolean added = sharedQueue.offer(nextval);
    if (added) {
     System.out.println("Thread "
       + Thread.currentThread().getName()
       + " produced  value " + nextval
       + " in shared Queue ");
     sharedQueue.notifyAll();
    }
   }
  }

  private void consume() throws InterruptedException {
   // wait if queue is empty
   while (sharedQueue.size() == 0) {
    synchronized (sharedQueue) {
     System.out.println("Queue is empty "
       + Thread.currentThread().getName()
       + " is waiting , size: " + sharedQueue.size());
     sharedQueue.wait();
    }
   }

   // Otherwise consume element and notify waiting producer
   synchronized (sharedQueue) {
    Integer returnVal = sharedQueue.poll();
    if (returnVal != null) {
     System.out.println("Thread "
       + Thread.currentThread().getName()
       + " consumed value " + returnVal
       + " in shared Queue ");
     sharedQueue.notifyAll();
    }
   }
  }
 }

 static class Producer implements Runnable {
  ProducerConsumerImplementaion pci;

  public Producer(ProducerConsumerImplementaion pci) {
   this.pci = pci;
  }

  public void run() {
   while (true) {
    pci.produce();
    try {
     Thread.sleep(60);// Added just to simulate queue is
        // full/empty scenerio
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }

 }

 static class Consumer implements Runnable {

  ProducerConsumerImplementaion pci;

  public Consumer(ProducerConsumerImplementaion pci) {
   this.pci = pci;
  }

  public void run() {
   while (true) {
    try {
     pci.consume();
     Thread.sleep(30);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }

 }

 public static void main(String args[]) {
  ProducerConsumerImplementaion pci = new ProducerConsumerImplementaion();

  Thread prodThread1 = new Thread(new Producer(pci), "Producer-1");
  Thread prodThread2 = new Thread(new Producer(pci), "Producer-2");
  Thread consThread1 = new Thread(new Consumer(pci), "Consumer-1");
  Thread consThread2 = new Thread(new Consumer(pci), "Consumer-2");

  prodThread1.start();
  consThread1.start();
  prodThread2.start();
  consThread2.start();
 }
}
==========Sample output======
Thread Producer-1 produced  value -449384893 in shared Queue
Thread Consumer-2 consumed value -449384893 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Thread Producer-2 produced  value 1799162552 in shared Queue
Thread Consumer-1 consumed value 1799162552 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Queue is empty Consumer-2 is waiting , size: 0
Thread Producer-1 produced  value -603658666 in shared Queue
Thread Consumer-2 consumed value -603658666 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Thread Producer-2 produced  value 1818829212 in shared Queue
Thread Consumer-1 consumed value 1818829212 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Queue is empty Consumer-2 is waiting , size: 0
Thread Producer-2 produced  value 121278833 in shared Queue
Thread Producer-1 produced  value 2105959988 in shared Queue
Thread Consumer-1 consumed value 121278833 in shared Queue
Thread Consumer-2 consumed value 2105959988 in shared Queue
.......
=========================
Question:- What are other ways to solve producer consumer problem ?
1. Producer consumer problem with semaphore and mutex
2. Using concurrent queue interface BlockingQueue.
3. Using java 5 Lock interface and Condition variable.

1 Comments

Previous Post Next Post