It is also termed as "bounded-buffer problem".
Producer consumer problem can be solved in various ways like using Blocking queue, using Java 5 concurrency Lock interface and condition class and using semaphore & mutex. In this post we will see how to solve this bounded-buffer problem using wait()/notify() and synchronisation block.
synchronisation block in Java is designed to allows only one thread/process to enter into critical section(or execute code lines which might cause inconsistency if multiple thread to execute it at same time).Every object in java has an implicit monitor - it means only one thread is allowed to use that particular object in critical section
wait()/notify() -They are typically used to wait for some other thread to accomplish a task, or to wait until a certain condition is satisfied.Consider a system with two threads and a shared resource.When any thread say thread-1 executes code lines with synchronisation block with an object say sharedObject as an argument then that thread acquires monitor of that object and no other thread can enter inside synchronisation block with same monitor, unless it is release by thread-1.
wait() relinquishes monitor and wait for other threads to complete its task and gives sigle of completion using notify() or notifyAll() method.Following diagram summarises above discussion.
Producer consumer problem solution using - wait()/notify()/notifyAll()
package com.dev.thread; import java.util.LinkedList; import java.util.Queue; import java.util.Random; /** * Producer Consumer problem using wait and notify method in Java. * by devinline.com */ public class ProducerConsumerExample { static final int MAX_QUEUE_SIZE = 10; static class ProducerConsumerImplementaion { private static Queue<Integer> sharedQueue; private static Random random; public ProducerConsumerImplementaion() { sharedQueue = new LinkedList<Integer>(); random = new Random(); } private void produce() { // wait if queue is full while (sharedQueue.size() == ProducerConsumerExample.MAX_QUEUE_SIZE) { synchronized (sharedQueue) { System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + sharedQueue.size()); try { sharedQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } // producing element and notify consumers synchronized (sharedQueue) { int nextval = (Integer) random.nextInt(); boolean added = sharedQueue.offer(nextval); if (added) { System.out.println("Thread " + Thread.currentThread().getName() + " produced value " + nextval + " in shared Queue "); sharedQueue.notifyAll(); } } } private void consume() throws InterruptedException { // wait if queue is empty while (sharedQueue.size() == 0) { synchronized (sharedQueue) { System.out.println("Queue is empty " + Thread.currentThread().getName() + " is waiting , size: " + sharedQueue.size()); sharedQueue.wait(); } } // Otherwise consume element and notify waiting producer synchronized (sharedQueue) { Integer returnVal = sharedQueue.poll(); if (returnVal != null) { System.out.println("Thread " + Thread.currentThread().getName() + " consumed value " + returnVal + " in shared Queue "); sharedQueue.notifyAll(); } } } } static class Producer implements Runnable { ProducerConsumerImplementaion pci; public Producer(ProducerConsumerImplementaion pci) { this.pci = pci; } public void run() { while (true) { pci.produce(); try { Thread.sleep(60);// Added just to simulate queue is // full/empty scenerio } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer implements Runnable { ProducerConsumerImplementaion pci; public Consumer(ProducerConsumerImplementaion pci) { this.pci = pci; } public void run() { while (true) { try { pci.consume(); Thread.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String args[]) { ProducerConsumerImplementaion pci = new ProducerConsumerImplementaion(); Thread prodThread1 = new Thread(new Producer(pci), "Producer-1"); Thread prodThread2 = new Thread(new Producer(pci), "Producer-2"); Thread consThread1 = new Thread(new Consumer(pci), "Consumer-1"); Thread consThread2 = new Thread(new Consumer(pci), "Consumer-2"); prodThread1.start(); consThread1.start(); prodThread2.start(); consThread2.start(); } }==========Sample output======
Thread Producer-1 produced value -449384893 in shared Queue
Thread Consumer-2 consumed value -449384893 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Thread Producer-2 produced value 1799162552 in shared Queue
Thread Consumer-1 consumed value 1799162552 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Queue is empty Consumer-2 is waiting , size: 0
Thread Producer-1 produced value -603658666 in shared Queue
Thread Consumer-2 consumed value -603658666 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Thread Producer-2 produced value 1818829212 in shared Queue
Thread Consumer-1 consumed value 1818829212 in shared Queue
Queue is empty Consumer-1 is waiting , size: 0
Queue is empty Consumer-2 is waiting , size: 0
Thread Producer-2 produced value 121278833 in shared Queue
Thread Producer-1 produced value 2105959988 in shared Queue
Thread Consumer-1 consumed value 121278833 in shared Queue
Thread Consumer-2 consumed value 2105959988 in shared Queue
.......
=========================
Question:- What are other ways to solve producer consumer problem ?
1. Producer consumer problem with semaphore and mutex
2. Using concurrent queue interface BlockingQueue.
3. Using java 5 Lock interface and Condition variable.
0 comments:
Post a Comment