Illustration des threads
Utilisation de wait()/notify() ou notifyAll()
La classe Object de java possède trois "final" méthodes permettant aux threads de communiquer en se servant de "synchronized". Ces trois méthodes "final" sont wait(), notify() et notifyAll(). Dans l'exemple ci-dessous on va se servir de ces méthodes pour traiter le problème du Producer/Consumer;
ProducerConsumerSolution import java.util.Vector; import java.util.logging.Level; import java.util.logging.Logger; /** * Java program to solve Producer Consumer problem using wait and notify * method in Java. Producer Consumer is also a popular concurrency design pattern. * * @author OMARA */ public class ProducerConsumerSolution { public static void main(String args[]) { Vector sharedQueue = new Vector(); int size = 4; Thread prodThread = new Thread(new Producer(sharedQueue, size), "Producer"); Thread consThread = new Thread(new Consumer(sharedQueue, size), "Consumer"); prodThread.start(); consThread.start(); } } class Producer implements Runnable { private final Vector sharedQueue; private final int SIZE; public Producer(Vector sharedQueue, int size) { this.sharedQueue = sharedQueue; this.SIZE = size; } @Override public void run() { for (int i = 0; i < 7; i++) { System.out.println("Produced: " + i); try { produce(i); } catch (InterruptedException ex) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } } private void produce(int i) throws InterruptedException { //wait if queue is full while (sharedQueue.size() == SIZE) { synchronized (sharedQueue) { System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + sharedQueue.size()); sharedQueue.wait(); } } //producing element and notify consumers synchronized (sharedQueue) { sharedQueue.add(i); sharedQueue.notifyAll(); } } } class Consumer implements Runnable { private final Vector sharedQueue; private final int SIZE; public Consumer(Vector sharedQueue, int size) { this.sharedQueue = sharedQueue; this.SIZE = size; } @Override public void run() { while (true) { try { System.out.println("Consumed: " + consume()); Thread.sleep(50); } catch (InterruptedException ex) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); } } } private int consume() throws InterruptedException { //wait if queue is empty while (sharedQueue.isEmpty()) { synchronized (sharedQueue) { System.out.println("Queue is empty " + Thread.currentThread().getName() + " is waiting , size: " + sharedQueue.size()); sharedQueue.wait(); } } //Otherwise consume element and notify waiting producer synchronized (sharedQueue) { sharedQueue.notifyAll(); return (Integer) sharedQueue.remove(0); } } }
Tout le cœur du programme se trouve dans la méthode "produce(int i)" pour le Thread Producer et la méthode "consume()" pour Thread Consumer:
Fonctionnement du Thread Producer
- On a la méthode "produce(i) au sein de la méthode "run()" ( coeur de la thread), qui s'exécute au sein d'une boucle for(i=0; i<7; i++ ) ; avec cette boucle on remplit le vecteur qui a comme taille maximum 4 éléments; si "i=0", on a "produce(0)" et donc on n'entre pas dans la boucle while de de la méthode "produce(0)"; On ajoute seulement un élément dans le vecteur et on notifie à tous les threads bloqués mis en attente sur l'objet shardeQueue . Au démarrage de la méthode consume() il n'y avait rien dans le vecteur donc la méthode "consume()" affiche juste message convenu à cet effet et lance la méthode" wait()".
- La méthode "produce ()" est codée conformément à la façon décrite ci-dessus pour coder une section de code synchronisée utilisant "wait()" et "notify()".
Fonctionnement du Thread Consumer
- La méthode "consume()" de la classe Consumer est exécutée au sein de la méthode run() (cœur du thread) dans une boucle infinie; si la Queue est vide, on met un verrou sur l'objet "sharedQueue" , on affiche un message et la méthode wait() est appelée; les verrous sur l'objet sharedQueue dans le thread Consumer est levé et c'est le Thread Producer qui prend la main sur la partie contenant la méthode notifyAll() et un élément est ajouté dans la Queue.
- A ce niveau la méthode "wait()" dans consume() se termine; on sort de la boucle "tant que la Queue est vide) et le Consumer enlève un élément de la "Queue" et appelle la méthode "notifyAll()"; en effet la dernière fois que "la méthode "wait()" fut appelée dans le thread Producer (le producteur) ( ce dernier était en train d'attendre).
- Producer après avoir reçu la notification ajoute un élément dans la Queue ( voir la logique de fonctionnement de la méthode wait()).
Affichage du résultat donné lors de l'exécution du programme
Produced: 0 Queue is empty Consumer is waiting , size: 0 Produced: 1 Produced: 2 Produced: 3 Consumed: 0 Produced: 4 Produced: 5 Queue is full Producer is waiting , size: 4 Consumed: 1 Produced: 6 Queue is full Producer is waiting , size: 4 Consumed: 2 Consumed: 3 Consumed: 4 Consumed: 5 Consumed: 6 Queue is empty Consumer is waiting , size: 0 ....
...
Utilisation de Blocking Queue pour implémenter le Pattern Producer Consumer
Producteur / consommateur est un problème classique pour la programmation concurrente; cette approche où on a un thread qui produit et un autre qui consomme permet de voir clairement le fonctionnement des threads; ici on va se servir de "Blocking Queue" (voir signification) pour contrôler comment les deux threads Producteur et le Consommateur vont travailler ensemble tout en ayant chacun son timing.
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class ProducerConsumerPattern { public static void main(String args[]){ //Creating shared object BlockingQueue sharedQueue = new LinkedBlockingQueue(); //Creating Producer and Consumer Thread Thread prodThread = new Thread(new Producer(sharedQueue)); Thread consThread = new Thread(new Consumer(sharedQueue)); //Starting producer and Consumer thread prodThread.start(); consThread.start(); } } //Producer Producer implements Runnable { private final sharedQueue; public Producer(BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=0; i<10; i++){ try { System.out.println("Produced: " + i); sharedQueue.put(i); } catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } } } //Consumer Class in Java class Consumer implements Runnable{ private final BlockingQueue sharedQueue; public Consumer (BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { while(true){ try { System.out.println("Consumed: "+ sharedQueue.take()); } catch (InterruptedException ex) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); } } } }
Affichage du résultat donné lors de l'exécution du programme
Produced: 0 Produced: 1 Produced: 2 Consumed: 0 Produced: 3 Consumed: 1 Produced: 4 Consumed: 2 Produced: 5 Consumed: 3 Produced: 6 Consumed: 4 Produced: 7 Consumed: 5 Produced: 8 Consumed: 6 Produced: 9 Consumed: 7 Consumed: 8 Consumed: 9 ....
...
Illustration Inter-blocage
...
public class Interblocage { public static void main(String[] args) { final int[] tab1 = { 1, 2, 3, 4 }; final int[] tab2 = { 0, 1, 0, 1 }; /* tab1 et tab2 sont susceptibles d'etre modifiés par d'autres threads */ final int[] tabAdd = new int[4]; final int[] tabSub = new int[4]; // réalise l'addition tabAdd = tab1 + tab2 Thread tacheAdd = new Thread() { public void run() { synchronized(tab1) { System.out.println("Thread tacheAdd lock tab1"); travailHarassant(); synchronized(tab2) { System.out.println("Thread tacheAdd lock tab2"); for (int i=0; i<4 ; i++) tabAdd[i] = tab1[i] + tab2[i]; } } } }; // réalise la soustraction tabAdd = tab1 - tab2 Thread tacheSub = new Thread() { public void run() { synchronized(tab2) { System.out.println("Thread tacheSub lock tab2"); travailHarassant(); synchronized(tab1) { System.out.println("Thread tacheSub lock tab1"); for (int i=0; i<4 ; i++) tabAdd[i] = tab1[i] - tab2[i]; } } } }; tacheAdd.start(); tacheSub.start(); } static void travailHarassant() { try { Thread.sleep((int)(Math.random()*50+25)); } catch (InterruptedException e) {} } }
......