Blue Flower

Chercher

Illustration des threads

Utilisation de wait()/notify() ou notifyAll()

La classe Object de java possède trois "final" méthodes permettant  aux threads de communiquer en se servant de "synchronized". Ces trois méthodes "final" sont wait(), notify() et notifyAll(). Dans l'exemple ci-dessous on va se servir de ces méthodes pour traiter le problème du Producer/Consumer;

ProducerConsumerSolution
 import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
 * Java program to solve Producer Consumer problem using wait and notify
 * method in Java. Producer Consumer is also a popular concurrency design pattern.
 *
 * @author OMARA
 */
public class ProducerConsumerSolution {
    public static void main(String args[]) {
        Vector sharedQueue = new Vector();
        int size = 4;
        Thread prodThread = new Thread(new Producer(sharedQueue, size), "Producer");
        Thread consThread = new Thread(new Consumer(sharedQueue, size), "Consumer");
        prodThread.start();
        consThread.start();
    }
}
class Producer implements Runnable {
    private final Vector sharedQueue;
    private final int SIZE;
    public Producer(Vector sharedQueue, int size) {
        this.sharedQueue = sharedQueue;
        this.SIZE = size;
    }
    @Override
    public void run() {
        for (int i = 0; i < 7; i++) {
            System.out.println("Produced: " + i);
            try {
                produce(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
    private void produce(int i) throws InterruptedException {
        //wait if queue is full
        while (sharedQueue.size() == SIZE) {
            synchronized (sharedQueue) {
                System.out.println("Queue is full " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedQueue.size());
                sharedQueue.wait();
            }
        }

        //producing element and notify consumers
        synchronized (sharedQueue) {
            sharedQueue.add(i);
            sharedQueue.notifyAll();
        }
    }
}
class Consumer implements Runnable {
    private final Vector sharedQueue;
    private final int SIZE;
    public Consumer(Vector sharedQueue, int size) {
        this.sharedQueue = sharedQueue;
        this.SIZE = size;
    }
    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Consumed: " + consume());
                Thread.sleep(50);
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
    private int consume() throws InterruptedException {
        //wait if queue is empty
        while (sharedQueue.isEmpty()) {
            synchronized (sharedQueue) {
                System.out.println("Queue is empty " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedQueue.size());
                sharedQueue.wait();
            }
        }
        //Otherwise consume element and notify waiting producer
        synchronized (sharedQueue) {
            sharedQueue.notifyAll();
            return (Integer) sharedQueue.remove(0);
        }
    }
}

Tout le cœur du programme se trouve dans la méthode "produce(int i)"  pour le Thread Producer et la méthode  "consume()" pour Thread Consumer:

Fonctionnement du Thread Producer

  1. On a la méthode "produce(i) au sein de la méthode "run()" ( coeur de la thread), qui s'exécute au sein d'une boucle  for(i=0; i<7; i++ ) ; avec cette boucle on remplit le vecteur qui a comme taille maximum 4 éléments; si "i=0", on a "produce(0)" et donc on n'entre pas dans la boucle while de de la méthode "produce(0)";  On  ajoute seulement un élément dans le vecteur et on notifie à tous les threads bloqués mis en attente sur l'objet shardeQueue . Au démarrage de la méthode consume() il n'y avait rien dans le vecteur donc la méthode "consume()" affiche juste message convenu à cet effet   et lance la méthode" wait()".
  2. La méthode "produce ()" est codée conformément à la façon décrite ci-dessus pour coder une section de code synchronisée utilisant "wait()" et "notify()".

Fonctionnement du Thread Consumer

  1. La méthode "consume()" de la classe Consumer  est exécutée au sein de la méthode  run() (cœur du thread)  dans  une boucle infinie;  si la Queue est vide, on met un verrou sur l'objet "sharedQueue" , on affiche un message et la méthode wait() est appelée; les verrous sur l'objet sharedQueue dans le thread Consumer est levé et c'est le Thread Producer qui prend la main sur la partie contenant la méthode notifyAll() et un élément est ajouté dans la Queue.
  2. A ce niveau  la méthode "wait()" dans consume() se termine; on sort de la boucle "tant que la Queue est vide) et le Consumer  enlève un élément de la "Queue" et appelle la méthode "notifyAll()"; en effet la dernière fois que "la méthode "wait()" fut appelée dans  le thread Producer (le producteur) ( ce dernier était en train d'attendre).
  3. Producer après avoir reçu la notification ajoute un élément dans la Queue ( voir la logique de fonctionnement de la méthode wait()).

Affichage du résultat donné lors de l'exécution du programme

Produced: 0
Queue is empty Consumer is waiting , size: 0
Produced: 1
Produced: 2
Produced: 3
Consumed: 0
Produced: 4
Produced: 5
Queue is full Producer is waiting , size: 4
 Consumed: 1
Produced: 6
Queue is full Producer is waiting , size: 4
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Queue is empty Consumer is waiting , size: 0
....

...


Utilisation de Blocking Queue pour implémenter le Pattern  Producer Consumer

Producteur / consommateur est un problème classique pour la programmation concurrente; cette approche où on a un thread qui produit et un autre qui consomme permet de voir clairement le fonctionnement des threads; ici on va se servir de "Blocking Queue" (voir signification) pour contrôler comment les deux threads  Producteur et le Consommateur   vont travailler ensemble tout en ayant chacun son timing.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ProducerConsumerPattern {
    public static void main(String args[]){
      //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();
     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));
     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
   }
}
//Producer 
Producer implements Runnable {
    private final  sharedQueue;
 public Producer(BlockingQueue sharedQueue) {
  this.sharedQueue = sharedQueue;
 }
    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);                
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
               Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);             }         }     } } //Consumer Class in Java class Consumer implements Runnable{   private final BlockingQueue sharedQueue;     public Consumer (BlockingQueue sharedQueue) {   this.sharedQueue = sharedQueue;     }     @Override     public void run() {        while(true){             try {               System.out.println("Consumed: "+ sharedQueue.take());            } catch (InterruptedException ex) {                 Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);             }         }     } }

Affichage du résultat donné lors de l'exécution du programme

Produced: 0
Produced: 1
Produced: 2
Consumed: 0
Produced: 3
Consumed: 1
Produced: 4
Consumed: 2
Produced: 5
Consumed: 3
Produced: 6
Consumed: 4
Produced: 7
Consumed: 5
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Consumed: 8
Consumed: 9
....

 ...


Illustration Inter-blocage

...

  public class Interblocage {
  public static void main(String[] args) {
    final int[] tab1 = { 1, 2, 3, 4 };
    final int[] tab2 = { 0, 1, 0, 1 };
    /* tab1 et tab2 sont susceptibles d'etre modifiés 
       par d'autres threads */
    final int[] tabAdd = new int[4];
    final int[] tabSub = new int[4];

    // réalise l'addition tabAdd = tab1 + tab2
    Thread tacheAdd = new Thread() {
      public void run() {
        synchronized(tab1) {
          System.out.println("Thread tacheAdd lock tab1");
          travailHarassant();
          synchronized(tab2) {
            System.out.println("Thread tacheAdd lock tab2");
            for (int i=0; i<4 ; i++)
              tabAdd[i] = tab1[i] + tab2[i];
          }
        }
      }
    };

    // réalise la soustraction tabAdd = tab1 - tab2
    Thread tacheSub = new Thread() {
      public void run() {
        synchronized(tab2) {
          System.out.println("Thread tacheSub lock tab2");
          travailHarassant();
          synchronized(tab1) {
            System.out.println("Thread tacheSub lock tab1");
            for (int i=0; i<4 ; i++)
              tabAdd[i] = tab1[i] - tab2[i];
          }
        }
      }
    };
    tacheAdd.start(); 
    tacheSub.start();
  }
  static void travailHarassant() {
    try { 
      Thread.sleep((int)(Math.random()*50+25));
    }
    catch (InterruptedException e) {}
  }
}

......

précédent