I. Bases▲
La plupart des collections basiques de Java ne supportent pas la concurrence. En effet, la gestion de la concurrence a un coût sur les performances et il n'y a pas de nécessité à ce que la concurrence soit gérée en permanence. Ainsi des classes telles que Vector et Hashtable sont quasiment considérées comme dépréciées.
L'API des collections introduit cependant de la concept de fail-fast qui consiste à offrir des mécanismes simples et non-sûr (principe de « pour le mieux ») de détection de modification concurrente. Il s'agit principalement de faire échouer la navigation par Iterator lorsqu'intervient une modification structurelle sur la collection.
II. Synchronisation d'une collection▲
La classe utilitaire Collections possède un certain nombre de méthodes préfixées par synchronized afin d'encapsuler une instance non thread-safe dans une instance qui l'est :
//com.developpez.lmauzaize.java.concurrence.collections.CollectionsSynchronizedDemo
class Tache implements Runnable {
int nbThreads;
int nbPerThread;
List<String> values;
Tache(int nbThreads, int nbPerThread) {
this.nbThreads = nbThreads;
this.nbPerThread = nbPerThread;
}
public void run() {
for (int i = 0; i < nbPerThread; i++) {
values.add(Thread.currentThread().getName());
}
}
public void execute(boolean sync) throws InterruptedException {
values = new ArrayList<>(1);
if (sync) {
values = Collections.synchronizedList(values);
}
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < nbThreads; i++) {
executor.submit(this);
}
executor.shutdown();
if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
throw new CancellationException();
}
Logger.println("Synchronisé=%-5b Attendu=%,06d, Réel=%,06d", sync, nbThreads*nbPerThread, values.size());
}
};
Tache tache = new Tache(20_000, 10);
tache.execute(false);
tache.execute(true);00:00:00.018 [main ] Synchronisé=false Attendu=200 000, Réel=195 059
00:00:00.075 [main ] Synchronisé=true Attendu=200 000, Réel=200 000Le fait de synchroniser une collection permet seulement de protéger chaque appel de méthode, mais ne protège pas le bloc qui englobe ses appels. Si devez effectuer différents appels sur une collection et que ces appels dépendent les uns des autres, vous devez utiliser un système de synchronisation externe !
//com.developpez.lmauzaize.java.concurrence.collections.CollectionsSynchronizedInstanceVsBloc
class Tache implements Runnable {
int nbThreads;
int nbPerThread;
List<Integer> values;
boolean sync;
Tache(int nbThreads, int nbPerThread) {
this.nbThreads = nbThreads;
this.nbPerThread = nbPerThread;
}
public void run() {
for (int i = 0; i < nbPerThread; i++) {
if (sync) {
syncAdd();
} else {
add();
}
}
}
void add() {
values.add(values.size());
}
void syncAdd() {
synchronized (values) {
add();
}
}
void newList() {
values = new ArrayList<>(1);
if (!sync) {
values = Collections.synchronizedList(values);
}
}
int check() {
int check = values.size();
for (int i = 0; i < values.size(); i++) {
if (i != values.get(i)) {
check--;
}
}
return check;
}
Tache execute(boolean sync) throws InterruptedException {
this.sync = sync;
newList();
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < nbThreads; i++) {
executor.submit(this);
}
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
throw new CancellationException();
}
Logger.println("Synchronisé=%-5b Attendu=%,06d, Réel=%,06d/%,06d", sync, nbThreads*nbPerThread, check(), values.size());
return this;
}
};
new Tache(20_000, 10).execute(false).execute(true);00:00:00.019 [main ] Synchronisé=false Attendu=200 000, Réel=195 419/200 000
00:00:00.122 [main ] Synchronisé=true Attendu=200 000, Réel=200 000/200 000Pour toutes les instances renvoyées par ces méthodes, le verrou utilisé est l'instance elle-même (ni un verrou interne, ni la collection encapsulée), les Iterators ne sont pas protégés. Le verrou est également partagé par les vues. Voici la liste des interfaces supportées (ainsi que la liste des vues associées) :
III. Files bloquantes▲
III-A. Interfaces▲
Il est commun dans un environnement de traitement parallèle d'accumuler des données d'un côté et de les dépiler de l'autre. C'est le principe du modèle producteur-consommateur, du pattern « Pipes and Filters » ou des pools d'exécution. La structure de données qui se prête le mieux à ces manipulations est la file (queue, First In First Out). En Java, il existe également l'interface Deque (prononcé « dèc ») qui permet également de gérer une pile (Last In First Out). Les variantes « bloquantes » sont représentées par les interfaces BlockingQueue et BlockingDeque.
Une file bloquante offre différents types d'opérations :
-
non-bloquante : si la file n'est pas en mesure de répondre à la demande (vide ou pleine), celle-ci est rejetée. Il existe alors deux formes de retour.
-
bloquante : la demande est mise en attente jusqu'à ce que l'état de la file le permette, il n'est en revanche pas possible d'examiner comme avec les méthodes element() et peek(). Il existe deux formes d'attente :
III-B. SynchronousQueue▲
SynchronousQueue est la plus simple des files bloquantes. En réalité, il ne s'agit pas d'une file puisqu'elle a une capacité de 0 :
//com.developpez.lmauzaize.java.concurrence.collections.SynchronousQueueDemo
SynchronousQueue<String> queue = new SynchronousQueue<>();
//Non-bloquant avec exception
try {
queue.add("A");
} catch (Exception e) {
Logger.println("Ajout refusé: %s", e);
}
//Non-bloquant avec valeur
Logger.println("Ajout? %b", queue.offer("A"));
new Thread("Consommateur") {
public void run() {
Logger.println("Lecture non-bloquante : %s", queue.poll());
Thread.sleep(1_500);
Logger.println("Lecture non-bloquante : %s", queue.poll());
Logger.println("Attente");
Logger.println("Lecture bloquante : %s", queue.take());
}
}.start();
new Thread("Producteur") {
public void run() {
Thread.sleep(1_000);
Logger.println("Ajout bloquant : A");
queue.put("A");
Thread.sleep(1_000);
Logger.println("Ajout non-bloquant : B");
queue.add("B");
}
}.start();00:00:00.021 [main ] Ajout refusé: java.lang.IllegalStateException: Queue full
00:00:00.029 [main ] Ajout? false
00:00:00.030 [Consommateur ] Lecture non-bloquante : null
00:00:01.031 [Producteur ] Ajout bloquant : A
00:00:01.530 [Consommateur ] Lecture non-bloquante : A
00:00:01.530 [Consommateur ] Attente
00:00:02.530 [Producteur ] Ajout non-bloquant : B
00:00:02.530 [Consommateur ] Lecture bloquante : BComme il s'agit d'une file sans stockage, l'itération sur une telle file ne renvoie jamais d'élément, même si un ajout est en attente :
//com.developpez.lmauzaize.java.concurrence.collections.SynchronousQueueIteration
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread("Producteur") {
public void run() {
Logger.println("Ajout");
queue.put(getName());
Logger.println("Ajouté");
}
}.start();
Thread.sleep(1_000);
int i = 0;
Logger.println("début");
for (String element : queue) {
Logger.println("[%02d] %s", i++, element);
}
Logger.println("fin");
Logger.println("%s", queue.poll());00:00:00.020 [Producteur ] Ajout
00:00:00.999 [main ] début
00:00:00.999 [main ] fin
00:00:01.000 [main ] Producteur
00:00:01.000 [Producteur ] AjoutéIII-C. Linked/Array BlockingQueue▲
LinkedBlockingQueue est une implémentation basée sur les listes chaînées. L'implémentation jumelle est ArrayBlockingQueue qui est elle basée sur une table fixe. Globalement, les ArrayBlockingQueue offrent de meilleurs performances (voir exemple ci-dessous) mais ont l'inconvénient de fixer leur utilisation mémoire sur la taille maximale tandis que les LinkedBlockingQueue auront une utilisation mémoire proportionnelle à la taille réelle de la file.
//com.developpez.lmauzaize.java.concurrence.collections.BlockingQueueLinkedVsArray
class BlockingQueueLinkedVsArray {
// Actions
//// Remplir/vider la file
void remplir();
void vider();
//// Lances/attends des threads qui lisent la file jusqu'à épuisement.
void lancerConsommateurs();
void attendreConsommateurs();
//// Lances/attends des threads qui remplissent la file
void lancerProducteurs();
void attendreProducteurs();
// Bench
//// Exécutes des actions avant/après chaque mesure
void avant();
void après();
//// Action à mesurer
void run();
//// Joue le scénario plusieurs fois pour chaque implémentation, puis affiche le temps moyen d'exécution
void bench();
}//com.developpez.lmauzaize.java.concurrence.collections.BlockingQueueLinkedVsArray
// Remplis la file en utilisant un seul thread
new BlockingQueueLinkedVsArray() {
{ action = "AjoutSéquentiel"; }
void run() {
remplir();
}
}.bench();
// Remplis la file en utilisant plusieurs threads
new BlockingQueueLinkedVsArray() {
{ action = "AjoutParallèle"; }
void run() throws Exception {
lancerProducteurs();
attendreProducteurs();
}
}.bench();
// Vides la file en utilisant un seul thread
new BlockingQueueLinkedVsArray() {
{ action = "RetraitSéquentiel"; }
void avant() {
remplir();
}
void run() throws Exception {
vider();
}
}.bench();
// Vides la file en utilisant plusieurs threads
new BlockingQueueLinkedVsArray() {
{ action = "RetraitParallèle"; }
void avant() {
remplir();
}
void run() throws Exception {
lancerConsommateurs();
attendreConsommateurs();
}
}.bench();
// Vides et remplis la file en utilisant plusieurs threads
new BlockingQueueLinkedVsArray() {
{ action = "Parallèle"; }
void run() throws Exception {
lancerConsommateurs();
lancerProducteurs();
attendreProducteurs();
attendreConsommateurs();
}
}.bench();Paramètres :
- Itérations par implémentation : 10
- Nombre de producteurs : 1 000
- Nombre de consommateurs : 1 000
- Nombre total d'éléments : 2 000 000
Test: AjoutSéquentiel , File: LinkedBlockingQueue, Temps: 00:00:00.317
Test: AjoutSéquentiel , File: ArrayBlockingQueue, Temps: 00:00:00.055
Test: AjoutParallèle , File: LinkedBlockingQueue, Temps: 00:00:00.201
Test: AjoutParallèle , File: ArrayBlockingQueue, Temps: 00:00:00.075
Test: RetraitSéquentiel, File: LinkedBlockingQueue, Temps: 00:00:00.071
Test: RetraitSéquentiel, File: ArrayBlockingQueue, Temps: 00:00:00.055
Test: RetraitParallèle , File: LinkedBlockingQueue, Temps: 00:00:00.138
Test: RetraitParallèle , File: ArrayBlockingQueue, Temps: 00:00:00.090
Test: Parallèle , File: LinkedBlockingQueue, Temps: 00:00:00.342
Test: Parallèle , File: ArrayBlockingQueue, Temps: 00:00:00.194Contrairement aux collections « standard » (et SynchronousQueue), il est tout à fait possible d'itérer sur la collection même si des modifications interviennent entre temps. Cependant, les modifications peuvent ou pas être visibles lors de l'itération.
//com.developpez.lmauzaize.java.concurrence.collections.ArrayBlockingQueueIteration
BlockingQueue<String> file = new ArrayBlockingQueue<>(100);
for (int i = 0; i < 3; i++) {
new Thread("Producteur-" + i) {
public void run() {
for (int i = 0; i < 4; i++) {
String e = getName() + "-" + i;
Logger.println("Ajout %s", e);
file.put(e);
Thread.sleep(40);
}
}
}.start();
}
int i = 0;
Thread.sleep(50);
Logger.println("début");
for (String élément : file) {
Logger.println("[%02d] %s", i++, élément);
Thread.sleep(50);
}
Logger.println("fin");00:00:00.022 [Producteur-0 ] Ajout Producteur-0-0
00:00:00.022 [Producteur-1 ] Ajout Producteur-1-0
00:00:00.022 [Producteur-2 ] Ajout Producteur-2-0
00:00:00.049 [main ] début
00:00:00.049 [main ] [00] Producteur-0-0
00:00:00.071 [Producteur-2 ] Ajout Producteur-2-1
00:00:00.071 [Producteur-0 ] Ajout Producteur-0-1
00:00:00.071 [Producteur-1 ] Ajout Producteur-1-1
00:00:00.100 [main ] [01] Producteur-1-0
00:00:00.111 [Producteur-2 ] Ajout Producteur-2-2
00:00:00.111 [Producteur-0 ] Ajout Producteur-0-2
00:00:00.111 [Producteur-1 ] Ajout Producteur-1-2
00:00:00.150 [main ] [02] Producteur-2-0
00:00:00.151 [Producteur-0 ] Ajout Producteur-0-3
00:00:00.151 [Producteur-2 ] Ajout Producteur-2-3
00:00:00.151 [Producteur-1 ] Ajout Producteur-1-3
00:00:00.200 [main ] [03] Producteur-2-1
00:00:00.250 [main ] [04] Producteur-0-1
00:00:00.300 [main ] [05] Producteur-1-1
00:00:00.350 [main ] [06] Producteur-2-2
00:00:00.400 [main ] [07] Producteur-0-2
00:00:00.450 [main ] [08] Producteur-1-2
00:00:00.500 [main ] [09] Producteur-0-3
00:00:00.550 [main ] [10] Producteur-2-3
00:00:00.600 [main ] [11] Producteur-1-3
00:00:00.650 [main ] finIII-D. PriorityBlockingQueue▲
La file bloquante à priorité (PriorityBlockingQueue) est la variante concurrente de PriorityQueue. La priorité est géré soit par l'ordre naturel, soit un ordre spécifique.
//com.developpez.lmauzaize.java.concurrence.collections.PriorityBlockingQueueDemo
class Élément implements Comparable<Élément> {
String nom;
int priorité;
Élément(String nom, int priorité) {
this.nom = nom;
this.priorité = priorité;
}
public int compareTo(Élément that) {
return this.priorité - that.priorité;
}
@Override
public String toString() {
return String.format("{%s:%02d}", nom, priorité);
}
}
PriorityBlockingQueue<Élément> file = new PriorityBlockingQueue<>();
file.offer(new Élément("A", 10));
file.offer(new Élément("B", 0));
file.offer(new Élément("C", 20));
file.offer(new Élément("D", 10));
file.offer(new Élément("E", 10));
file.offer(new Élément("F", 10));
Logger.println("file =%s", file);
List<Élément> éléments = new ArrayList<>(file.size());
file.drainTo(éléments);
Logger.println("éléments=%s", éléments);00:00:00.020 [main ] file =[{B:00}, {A:10}, {F:10}, {D:10}, {E:10}, {C:20}]
00:00:00.028 [main ] éléments=[{B:00}, {A:10}, {E:10}, {D:10}, {F:10}, {C:20}]L'itération sur ce type de file ne tient pas compte de la priorité des éléments (idem pour sa représentation sous forme de chaîne de caractères). Si vous souhaitez manipuler cette liste conformément aux priorités, vous devez utiliser les méthodes spécifiques aux files (ex : peek, poll, drainTo).
Ce sont les éléments avec la plus petite priorité qui sont retirés en premier. A priorité égale l'ordre n'est pas spécifié.
III-E. DelayQueue▲
Les files différées (DelayQueue) permettent de retarder le traitement des éléments qui la composent. En effet, elles se composent uniquement d'éléments différées (Delayed) qui expirent si le délai est inférieur et égal à zéro.
Tant qu'aucun élément n'a expiré, la file ne possède pas de tête à tirer ; c'est-à-dire que :
- peek() renvoient un élément qui n'a pas encore expiré.
- element() renvoient un élément qui n'a pas encore expiré.
- poll() renvoient null.
- remove() lancent une NoSuchElementException.
- drainTo() ne retirent aucun élément.
- take() attend le premier élément à expirer.
//com.developpez.lmauzaize.java.concurrence.collections.DelayQueueDemo
class Différé implements Delayed {
long expiration;
String toString;
Différé(long temps, TimeUnit unité) {
long now = System.nanoTime();
long nanos = unité.toNanos(temps);
expiration = now + nanos;
toString = temps + " " + unité;
}
public long getDelay(TimeUnit unité) {
long now = System.nanoTime();
long temps = expiration - now;
long délai = unité.convert(temps, TimeUnit.NANOSECONDS);
return délai;
}
public int compareTo(Delayed o) {
Différé d = (Différé) o;
long diff = this.expiration - d.expiration;
if (diff > Integer.MAX_VALUE) return Integer.MAX_VALUE;
if (diff < Integer.MIN_VALUE) return Integer.MIN_VALUE;
return (int) diff;
}
public String toString() {
return toString;
}
}
DelayQueue<Différé> file = new DelayQueue<>();
for (long délai : new long[] { 5, 2, 1, 3, 4, 6 }) {
Différé d = new Différé(délai, TimeUnit.SECONDS);
file.add(d);
}
Runnable afficherFile = () -> {
Logger.println("File : %s", file);
Logger.println(" %-10s= %s", "size" , file.size());
Logger.println(" %-10s= %s", "iterator", new ArrayList<>(file));
Logger.println(" %-10s= %s", "array", Arrays.toString(file.toArray()));
Logger.println(" %-10s= %s", "peek" , file.peek());
Logger.println(" %-10s= %s", "element" , file.element());
Logger.println(" ***");
Logger.println(" %-10s= %s", "poll" , file.poll());
Object remove;
try {
remove = file.remove();
} catch (NoSuchElementException e) {
remove = e;
}
Logger.println(" %-10s= %s", "remove" , remove);
List<Différé> liste = new ArrayList<>();
file.drainTo(liste);
Logger.println(" %-10s= %s", "drainTo", liste);
};
afficherFile.run();
Logger.println("");
Logger.println("Sleep");
Logger.println("");
TimeUnit.SECONDS.sleep(5);
afficherFile.run();00:00:00.021 [main ] File : [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:00.030 [main ] size = 6
00:00:00.030 [main ] iterator = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:00.030 [main ] array = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:00.031 [main ] peek = 1 SECONDS
00:00:00.031 [main ] element = 1 SECONDS
00:00:00.031 [main ] ***
00:00:00.031 [main ] poll = null
00:00:00.032 [main ] remove = java.util.NoSuchElementException
00:00:00.032 [main ] drainTo = []
00:00:00.032 [main ]
00:00:00.032 [main ] Sleep
00:00:00.033 [main ]
00:00:05.033 [main ] File : [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:05.033 [main ] size = 6
00:00:05.034 [main ] iterator = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:05.034 [main ] array = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:05.035 [main ] peek = 1 SECONDS
00:00:05.035 [main ] element = 1 SECONDS
00:00:05.036 [main ] ***
00:00:05.036 [main ] poll = 1 SECONDS
00:00:05.037 [main ] remove = 2 SECONDS
00:00:05.038 [main ] drainTo = [3 SECONDS, 4 SECONDS, 5 SECONDS]III-F. TransferQueue (1.7+)▲
Les files de transfert (TransferQueue) sont des files bloquantes qui permettent à un « producteur » d'attendre que sa ressource soit « consommée ». L'interface demeure la même pour le consommateur mais le producteur peut :
- tenter de faire consommer sa ressource immédiatement via tryTransfer(E). La méthode renvoie un booléen indiquant si l'élément a été transféré,
- tenter de faire consommer sa ressource pendant un certain délai via tryTransfer(E, long, TimeUnit). La méthode renvoie un booléen indiquant si l'élément a été transféré,
- attendre que sa ressource soit consommée via transfer(E). La méthode attend jusqu'au traitement de la ressource ou l'interruption du thread.
La seule implémentation de cette interface est la classe LinkedTransferQueue, comme son nom l'indique elle repose sur une liste chaînée.
//com.developpez.lmauzaize.java.concurrence.collections.TransferQueueDémo
class TransferQueueDémo {
TransferQueue<String> file = new LinkedTransferQueue<>();
void tryTransfer(String valeur) {
Logger.println(" résultat=%b", file.tryTransfer(valeur));
};
void tryTransferAttente(String valeur) {
Logger.println(" résultat=%b", file.tryTransfer(valeur, 5, TimeUnit.SECONDS));
};
void transfer(String valeur) {
file.transfer(valeur);
Logger.println(" transfert terminé");
};
void avecConsommateur() {
new Thread("Consommateur") {
public void run() {
Logger.println(" pause");
TimeUnit.SECONDS.sleep(2);
Logger.println(" lecture: %s", file.take());
}
}.start();
};
}
TransferQueueDémo démo = new TransferQueueDémo();
// tryTransfer - immédiat
Logger.println("tryTransfer: A");
démo.tryTransfer("A");
Logger.println("");
// tryTransfer - expiration
Logger.println("tryTransfer: B, attente 5s");
démo.tryTransferAttente("B");
Logger.println("");
// tryTransfer - attente
Logger.println("tryTransfer: C, attente 5s");
démo.avecConsommateur();
démo.tryTransferAttente("C");
Logger.println("");
// transfer
Logger.println("transfer: D");
démo.avecConsommateur();
démo.transfer("D");00:00:00.021 [main ] tryTransfer: A
00:00:00.030 [main ] résultat=false
00:00:00.030 [main ]
00:00:00.031 [main ] tryTransfer: B, attente 5s
00:00:05.032 [main ] résultat=false
00:00:05.032 [main ]
00:00:05.033 [main ] tryTransfer: C, attente 5s
00:00:05.035 [Consommateur ] pause
00:00:07.036 [Consommateur ] lecture: C
00:00:07.036 [main ] résultat=true
00:00:07.037 [main ]
00:00:07.038 [main ] transfer: D
00:00:07.039 [Consommateur ] pause
00:00:09.039 [Consommateur ] lecture: D
00:00:09.039 [main ] transfert terminéIV. CopyOnWrite List/Set▲
Les collections CopyOnWrite* sont des collections concurrentes qui stockent leurs éléments dans un tableau. Chaque modification entraîne la création d'un nouveau tableau en effectuant une copie de l'original.
Les itérateurs conservent une référence du tableau utilisé lors de leur création. Ils sont hermétiques aux modifications qui surviennent sur la collection source et ne supportent pas les opérations de modifications (remove, add et set).
Ces collections sont à utiliser si le nombre de modification est peu important et/ou que le nombre d'élément n'est pas important. Les modifications sont très coûteuses mais ces collections se révèlent efficaces dès lors que les parcours surpassement largement les modifications. La classe de base est CopyOnWriteArrayList, il existe une variante CopyOnWriteArraySet qui implémente l'interface Set ; cette dernière encapsule une instance de CopyOnWriteArrayList pour stocker les données.
//com.developpez.lmauzaize.java.concurrence.collections.CopyOnWriteArrayListDemo
class CopyOnWriteArrayListDemo {
// Pour chaque implémentation, initialise une instance, puis joue le scénario 10 fois et enfin affiche le temps moyen par itération.
void test(String action);
// Actions : créer une tâche qui sera exécutée N fois en parallèle avec une pause optionnelle avant.
// - Effectues "n" insertions par thread
CopyOnWriteArrayListDemo add(thread, int n, int délai);
// - Itères la collection à chaque thread
CopyOnWriteArrayListDemo iterator(int thread, final int délai);
// - Effectues "n" retrait par thread
CopyOnWriteArrayListDemo remove(int thread, int quantité, int délai);
}//com.developpez.lmauzaize.java.concurrence.collections.CopyOnWriteArrayListDemo
final CopyOnWriteArrayListDemo démo = new CopyOnWriteArrayListDemo();
int thread, quantité, remplissage, délai = 0;
démo.add(thread= 1, quantité=10_000, délai).test(remplissage=0, "Ajout linéaire");
démo.add(thread=100, quantité= 100, délai).test(remplissage=0, "Ajout parallèle");
démo.iterator(thread=1_000, délai).test(remplissage=1_000, "Parcours [1k/1k]");
démo.iterator(thread=5_000, délai).test(remplissage=1_000, "Parcours [5k/1k]");
démo.iterator(thread=1_000, délai).test(remplissage=3_000, "Parcours [1k/3k]");
démo.iterator(thread=3_000, délai).test(remplissage=3_000, "Parcours [3k/3k]");
for (int i = 0; i < 3; i++) {
délai = i*100;
démo
.add (thread= 100, quantité=10, délai)
.iterator(thread=1_000, délai)
.remove (thread= 100, quantité= 5, délai);
}
démo.test(remplissage=1_000, "Concurrence");Test: Ajout linéaire , File: CopyOnWriteArrayQueue, Temps: 00:00:00.035
Test: Ajout linéaire , File: ArrayBlockingQueue, Temps: 00:00:00.000
Test: Ajout parallèle , File: CopyOnWriteArrayQueue, Temps: 00:00:00.040
Test: Ajout parallèle , File: ArrayBlockingQueue, Temps: 00:00:00.001
Test: Parcours [1k/1k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.005
Test: Parcours [1k/1k] , File: ArrayBlockingQueue, Temps: 00:00:00.038
Test: Parcours [5k/1k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.007
Test: Parcours [5k/1k] , File: ArrayBlockingQueue, Temps: 00:00:00.172
Test: Parcours [1k/3k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.003
Test: Parcours [1k/3k] , File: ArrayBlockingQueue, Temps: 00:00:00.085
Test: Parcours [3k/3k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.005
Test: Parcours [3k/3k] , File: ArrayBlockingQueue, Temps: 00:00:00.263
Test: Concurrence , File: CopyOnWriteArrayQueue, Temps: 00:00:00.331
Test: Concurrence , File: ArrayBlockingQueue, Temps: 00:00:00.409V. Maps concurrentes▲
V-A. ConcurrentSkipList▲
Une liste à enjambements (Skip-list) est une structure de données triées qui utilise plusieurs niveaux de listes chaînées pour accéder plus rapidement à un élément. Voici un schéma montrant la composition interne d'une telle structure :
1
1-----4---6
1---3-4---6-----9
1-2-3-4-5-6-7-8-9-10Pour localiser un élément, on procède couche par couche. On recherche le plus petit élément correspondant dans la couche courante, puis on passe dans la couche suivante par mouvement vertical (en réalité, un pointeur sur le nœud correspondant de la couche suivante). Il s'agit donc d'une sorte de structure arborescente dont un nœud est chaîné à la fois à son voisin et à son équivalent dans la couche inférieure.
En Java, cette structure est représentée par la classe ConcurrentSkipListMap qui gère également les accès concurrents. Voici une liste de points clés à retenir :
- l'itération (en concurrence avec les modifications) est supportée même si les modifications postérieures peuvent ne pas être visibles ;
- Les paires clé-valeur renvoyées ne sont pas modifiables ;
- déterminer la taille du dictionnaire nécessite de le parcourir ;
- les opérations massives (ex : putAll, ou même equals) ne sont pas nécessairement atomiques ;
- la clé null n'est pas supportée ;
- les opérations « par ordre décroissant » (ex : descendingMap) sont moins performantes que les opérations classiques.
ConcurrentSkipListSet est une version implémentant l'interface Set ; elle utilise une ConcurrentSkipListMap en interne.
//com.developpez.lmauzaize.java.concurrence.collections.ConcurrentSkipListMapDemo
class ConcurrentSkipListMapDemo {
// Modifies le nom du thread courant et exécutes séquentiellement ses actions
class Activité implements Runnable {
String nom;
List<Runnable> actions = new ArrayList<>();
// Actions
/** Insère la clé "<nom>-<i>" **/
Activité put(int i);
/** Supprime la clé **/
Activité remove(String clé);
/** Crée une map content "n" clé ("<nom>-<i>") et l'insère massivement **/
Activité putAll(int début, int n);
/** Itère sur les entrées et fait une pause entre chaque **/
Activité parcours(TimeUnit unité, long temps);
/** Effectue une pause **/
Activité pause(TimeUnit unité, int temps);
}
ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<>();
List<Activité> activités = new ArrayList<>();
List<Runnable> après = Collections.emptyList();
List<Runnable> avant = Collections.emptyList();
// Pré/post actions
/** Indique les actions avant "exécuter" **/
ConcurrentSkipListMapDemo avant();
/** Indique les actions après "exécuter" **/
ConcurrentSkipListMapDemo après();
/** Insère "n" clés **/
ConcurrentSkipListMapDemo remplir(int n);
/** Affiche les noms de threads consécutifs **/
ConcurrentSkipListMapDemo verifierChangementThread();
/** Créé une nouvelle activité. Les activités sont exécutées en parallèle **/
Activité activité(String nom);
/** Exécutes les actions "avant", les activités en parallèle, puis actions "après". Enfin, réinitialise l'instance. **/
void exécuter();
}//com.developpez.lmauzaize.java.concurrence.collections.ConcurrentSkipListMapDemo
ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();
Logger.println("---------------------------------");
Logger.println("Itération");
demo.avant().remplir(5);
demo.activité("iter0") .parcours(TimeUnit.MILLISECONDS, 100);
demo.activité("iter1") .pause(TimeUnit.MILLISECONDS, 200).parcours(TimeUnit.MILLISECONDS, 100);
demo.activité("put_10").pause(TimeUnit.MILLISECONDS, 500).put(10);
demo.activité("rmv_02").pause(TimeUnit.MILLISECONDS, 400).remove("main-02");
demo.activité("iter2") .pause(TimeUnit.MILLISECONDS, 800).parcours(TimeUnit.MILLISECONDS, 0);
demo.exécuter();
Logger.println("---------------------------------");
Logger.println("Ajout massif");
for (int i = 0; i < 5; i++) {
demo.activité("putAll" + i).putAll(0, 200_000);
}
demo.après().verifierChangementThread();
demo.exécuter();00:00:00.022 [main ] ---------------------------------
00:00:00.031 [main ] Itération
00:00:00.139 [iter0 ] main-00=main
00:00:00.239 [iter0 ] main-01=main
00:00:00.339 [iter0 ] main-02=main
00:00:00.339 [iter1 ] main-00=main
00:00:00.439 [iter0 ] main-03=main
00:00:00.439 [rmv_02 ] Retrait main-02
00:00:00.440 [iter1 ] main-01=main
00:00:00.539 [iter0 ] main-04=main
00:00:00.539 [put_10 ] Ajout put_10-0010
00:00:00.540 [iter1 ] main-02=main
00:00:00.640 [iter1 ] main-03=main
00:00:00.740 [iter1 ] main-04=main
00:00:00.839 [iter2 ] main-00=main
00:00:00.839 [iter2 ] main-01=main
00:00:00.840 [iter1 ] put_10-0010=put_10
00:00:00.840 [iter2 ] main-03=main
00:00:00.841 [iter2 ] main-04=main
00:00:00.841 [iter2 ] put_10-0010=put_10
00:00:00.842 [main ] ---------------------------------
00:00:00.843 [main ] Ajout massif
00:00:02.306 [main ] [putAll0 x 200 000, putAll1 x 200 000, putAll2 x 200 000, putAll3 x 200 000, putAll4 x 200 000]V-B. ConcurrentHashMap▲
Contrairement aux « Skip-List », ce type de dictionnaire repose sur l'identité et l'empreinte d'un objet (comme HashMap, avec la gestion de la concurrence en plus). Cette implémentation est à privilégier par rapport à Hashtable. En effet, Hashtable offre de moins bonnes performances et ne supporte pas l'itération en concurrence avec les modifications (elle implémente le principe fail-fast). Pour améliorer les performances, ConcurrentHashMap repose sur deux principes :
- des lectures non-bloquantes (la plupart du temps) ;
- le partitionnement de sa structure interne.
Lors de modification, seule la partition concernée est verrouillée. Le nombre de partition se contrôle à l'aide du paramètre concurrencyLevel. En réalité, le nombre de partition correspondra à la première puissance de 2 supérieure (ou égale), sans pouvoir excéder 65 536. Ce nombre doit correspondre aux nombres de threads qui peuvent potentiellement modifier simultanément la Map. Ainsi si un seul thread à la fois est censé écrire dans la Map (et quelque soit le nombre de lecture concurrente), une valeur de 1 est parfaitement adéquate. S'il n'est pas nécessaire d'avoir une estimation exacte de la concurrence, il est nécessaire d'avoir un ordre de grandeur proche sous peine de dégrader les performances de la Map (temps d'exécution des opérations et consommation mémoire) que ce nombre soit sous-estimé ou sur-estimé.
Comme pour la plupart des Maps, il est également recommandé de « tailler » correctement la Map, c'est-à-dire de positionner la taille (initial capacity) et le facteur de charge (load factor).
Les mesures qui suivent concernent l'impact du niveau de concurrence sur les performances de la Map. Ce test consiste à exécuter 16 threads qui vont ajouter 1 000 000 clés aléatoires (parmis 1 024 clés possibles). On fait varier le niveau de concurrence à chaque scénario.
//com.developpez.lmauzaize.java.concurrence.collections.ConcurrentHashMapConcurrence
// Paramètres
int thread = 16;
int taille = 2_000_000;
int limite = 1_024;
// Génération des valeurs possibles
String[] cache = IntStream.range(0, limite).mapToObj(String::valueOf).toArray(String[]::new);
// Génère une liste fixe de tâches pour tous les scénarios
List<Callable<String>> tasks = new ArrayList<>(thread);
Random random = new Random();
for (int i = 0; i < thread; i++) {
List<String> list = random.ints(0, limite).limit(taille).mapToObj(key -> cache[key]).collect(Collectors.toCollection(() -> new ArrayList<>(taille)));
Callable<String> task = () -> {
String name = Thread.currentThread().getName();
for (String item : list) {
map.put(item, name);
}
return name;
};
tasks.add(task);
}
ExecutorService threadpool = Executors.newFixedThreadPool(thread);
for (int concurrence = 1; concurrence <= 256; concurrence*=2) {
map = new ConcurrentHashMap<>(limite, 1.0f, concurrence);
int count = 0;
int total = 0;
for (int i = 0; i < 50; i++) {
map.clear();
long start = System.currentTimeMillis();
threadpool.invokeAll(tasks);
long spent = System.currentTimeMillis() - start;
if (i >= 20) {
count++;
total += spent;
}
}
Logger.println("test: %04d, classe: %17s temps: %tT.%<tL", concurrence, map.getClass().getSimpleName(), new Date(total / count));
}
threadpool.shutdownNow();00:00:00.016 [main ] test: 0001, classe: ConcurrentHashMap temps: 00:00:00.639
00:00:32.648 [main ] test: 0002, classe: ConcurrentHashMap temps: 00:00:00.637
00:01:06.252 [main ] test: 0004, classe: ConcurrentHashMap temps: 00:00:00.664
00:01:40.124 [main ] test: 0008, classe: ConcurrentHashMap temps: 00:00:00.682
00:02:13.029 [main ] test: 0016, classe: ConcurrentHashMap temps: 00:00:00.665
00:02:49.347 [main ] test: 0032, classe: ConcurrentHashMap temps: 00:00:00.698
00:03:22.275 [main ] test: 0064, classe: ConcurrentHashMap temps: 00:00:00.666
00:03:56.879 [main ] test: 0128, classe: ConcurrentHashMap temps: 00:00:00.667
00:04:32.157 [main ] test: 0256, classe: ConcurrentHashMap temps: 00:00:00.699Dans cet exemple, on observe aucun impact significatif du niveau de concurrence. Ces mesures ont été effectuées sur un Intel Core i5-3470 @ 3.20 GHz (4 coeurs / 4 threads) et un impact plus significatif pourrait être observé sur une machine avec plus de coeurs.
Pour réaliser les mesures suivantes, nous allons utiliser cette classe utilitaire :
//com.developpez.lmauzaize.java.concurrence.collections.ConcurrentHashMapDemo
class ConcurrentHashMapDemo {
// Type de map utilisable
static enum TypeMap {
CONCURRENT, HASHTABLE;
}
TypeMap[] typeMaps = TypeMap.values();
// Paramètres des maps
int taille = 16, concurrence = 16;
float charge = 0.75f;
// Tâches
// - Ajoute n clés au hasard
Runnable ajout(int n);
// - Lis n clés au hasard
Runnable lecture(int n);
// Exécute la liste de tâches programmées en plusieurs itérations et indique le temps d'exécution moyen
void bench();
}La seconde mesure consiste à comparer les performances avec une Hashtable. Une première série de scénario consiste à observer l'impact du nombre de thread, on conserve cependant un nombre de constant d'insertion totale. Le dernier scénario consiste à observer l'impact des lectures concurrentes.
//com.developpez.lmauzaize.java.concurrence.collections.ConcurrentHashMapVsHashtable
ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();
demo.tache( 3, demo.ajout(2_000_000));
demo.bench();
Logger.println("");
demo.tache( 30, demo.ajout( 200_000));
demo.bench();
Logger.println("");
demo.tache( 300, demo.ajout( 20_000));
demo.bench();
Logger.println("");
demo.tache( 3, demo.ajout(2_000_000));
demo.tache(1_000, demo.lecture(100));
demo.bench();
Logger.println("");00:00:00.062 [main ] test: [3*put(2M)], classe: ConcurrentHashMap temps: 00:00:00.907
00:02:14.320 [main ] test: [3*put(2M)], classe: Hashtable temps: 00:00:02.620
00:02:14.321 [main ]
00:02:58.471 [main ] test: [30*put(200k)], classe: ConcurrentHashMap temps: 00:00:00.792
00:05:12.714 [main ] test: [30*put(200k)], classe: Hashtable temps: 00:00:02.693
00:05:12.715 [main ]
00:05:55.347 [main ] test: [300*put(20k)], classe: ConcurrentHashMap temps: 00:00:00.822
00:08:11.216 [main ] test: [300*put(20k)], classe: Hashtable temps: 00:00:02.722
00:08:11.217 [main ]
00:09:04.250 [main ] test: [3*put(2M), 1k*get(100)], classe: ConcurrentHashMap temps: 00:00:00.982
00:11:16.949 [main ] test: [3*put(2M), 1k*get(100)], classe: Hashtable temps: 00:00:02.643
00:11:16.950 [main ]En premier lieu, on observe de bien meilleures performances (d'un facteur 3) de la ConcurrentHashMap par rapport à la Hashtable.
Autre fait remarquable, le nombre de thread n'a pas d'incidence sur le temps de traitement global. Ceci peut-être dû comme précédemment au nombre de coeurs de la machine de test.
Enfin, le dernier scénario met en lumière le faible impact des lectures concurrentes.
VI. Conclusion▲
Au cours de ce troisième article, nous avons vu comment gérer le stockage de vos données pour une utilisation concurrente. Le prochain article sera dédié aux différents types de verrou.
VII. Remerciements▲
Je remercie tous les contributeurs de l'API Java et les packages java.util.concurrent.* et plus particulièrement Doug Lea qui est l'un des principaux auteurs.
Je remercie également Mickael Baron, Thierry Leriche-Dessirier alias thierryler et Claude Leloup pour leur relecture attentive, leurs remarques et leurs bons conseils.
Je tiens aussi à remercier la communauté Developpez.com qui a mis en place tous les outils, procédures et l'hébergement nécessaires à la publication de cet article.
Enfin mon épouse et mes enfants pour leur patience et leur tolérance durant les nombreuses heures qui ont été nécessaires à la rédaction de cet article.
VIII. Annexes▲
VIII-A. Sources des exemples▲
Tous les exemples donnés dans cet article sont disponibles sous la forme d'un projet Maven hébergé sous GitHub. Tous les exemples cités contiennent une première ligne commentaire indiquant l'emplacement du fichier dans les sources.
Si vous ne savez pas comment importer le projet, je vous invite à consulter l'article « Importer un projet Maven dans Eclipse en 5 minutes ».
VIII-B. Java Concurrent Animated▲
Java Concurrent Animated est un projet Swing visant à montrer graphiquement le comportement de différents composants de l'API concurrente de Java.


