I. Bases▲
La plupart des collections basiques de Java ne supportent pas la concurrence. En effet, la gestion de la concurrence a un coût sur les performances et il n'y a pas de nécessité à ce que la concurrence soit gérée en permanence. Ainsi des classes telles que Vector et Hashtable sont quasiment considérées comme dépréciées.
L'API des collections introduit cependant de la concept de fail-fast qui consiste à offrir des mécanismes simples et non-sûr (principe de « pour le mieux ») de détection de modification concurrente. Il s'agit principalement de faire échouer la navigation par Iterator lorsqu'intervient une modification structurelle sur la collection.
II. Synchronisation d'une collection▲
La classe utilitaire Collections possède un certain nombre de méthodes préfixées par synchronized afin d'encapsuler une instance non thread-safe dans une instance qui l'est :
//
com.developpez.lmauzaize.java.concurrence.collections.CollectionsSynchronizedDemo
class
Tache implements
Runnable {
int
nbThreads;
int
nbPerThread;
List<
String>
values;
Tache
(
int
nbThreads, int
nbPerThread) {
this
.nbThreads =
nbThreads;
this
.nbPerThread =
nbPerThread;
}
public
void
run
(
) {
for
(
int
i =
0
; i <
nbPerThread; i+
+
) {
values.add
(
Thread.currentThread
(
).getName
(
));
}
}
public
void
execute
(
boolean
sync) throws
InterruptedException {
values =
new
ArrayList<
>
(
1
);
if
(
sync) {
values =
Collections.synchronizedList
(
values);
}
ExecutorService executor =
Executors.newCachedThreadPool
(
);
for
(
int
i =
0
; i <
nbThreads; i+
+
) {
executor.submit
(
this
);
}
executor.shutdown
(
);
if
(
!
executor.awaitTermination
(
2
, TimeUnit.SECONDS)) {
throw
new
CancellationException
(
);
}
Logger.println
(
"
Synchronisé=%-5b
Attendu=%,06d,
Réel=%,06d
"
, sync, nbThreads*
nbPerThread, values.size
(
));
}
}
;
Tache tache =
new
Tache
(
20_000, 10
);
tache.execute
(
false
);
tache.execute
(
true
);
00:00:00.018 [main ] Synchronisé=false Attendu=200 000, Réel=195 059
00:00:00.075 [main ] Synchronisé=true Attendu=200 000, Réel=200 000
Le fait de synchroniser une collection permet seulement de protéger chaque appel de méthode, mais ne protège pas le bloc qui englobe ses appels. Si devez effectuer différents appels sur une collection et que ces appels dépendent les uns des autres, vous devez utiliser un système de synchronisation externe !
//
com.developpez.lmauzaize.java.concurrence.collections.CollectionsSynchronizedInstanceVsBloc
class
Tache implements
Runnable {
int
nbThreads;
int
nbPerThread;
List<
Integer>
values;
boolean
sync;
Tache
(
int
nbThreads, int
nbPerThread) {
this
.nbThreads =
nbThreads;
this
.nbPerThread =
nbPerThread;
}
public
void
run
(
) {
for
(
int
i =
0
; i <
nbPerThread; i+
+
) {
if
(
sync) {
syncAdd
(
);
}
else
{
add
(
);
}
}
}
void
add
(
) {
values.add
(
values.size
(
));
}
void
syncAdd
(
) {
synchronized
(
values) {
add
(
);
}
}
void
newList
(
) {
values =
new
ArrayList<
>
(
1
);
if
(
!
sync) {
values =
Collections.synchronizedList
(
values);
}
}
int
check
(
) {
int
check =
values.size
(
);
for
(
int
i =
0
; i <
values.size
(
); i+
+
) {
if
(
i !
=
values.get
(
i)) {
check-
-
;
}
}
return
check;
}
Tache execute
(
boolean
sync) throws
InterruptedException {
this
.sync =
sync;
newList
(
);
ExecutorService executor =
Executors.newCachedThreadPool
(
);
for
(
int
i =
0
; i <
nbThreads; i+
+
) {
executor.submit
(
this
);
}
executor.shutdown
(
);
if
(
!
executor.awaitTermination
(
5
, TimeUnit.SECONDS)) {
throw
new
CancellationException
(
);
}
Logger.println
(
"
Synchronisé=%-5b
Attendu=%,06d,
Réel=%,06d/%,06d
"
, sync, nbThreads*
nbPerThread, check
(
), values.size
(
));
return
this
;
}
}
;
new
Tache
(
20_000, 10
).execute
(
false
).execute
(
true
);
00:00:00.019 [main ] Synchronisé=false Attendu=200 000, Réel=195 419/200 000
00:00:00.122 [main ] Synchronisé=true Attendu=200 000, Réel=200 000/200 000
Pour toutes les instances renvoyées par ces méthodes, le verrou utilisé est l'instance elle-même (ni un verrou interne, ni la collection encapsulée), les Iterators ne sont pas protégés. Le verrou est également partagé par les vues. Voici la liste des interfaces supportées (ainsi que la liste des vues associées) :
III. Files bloquantes▲
III-A. Interfaces▲
Il est commun dans un environnement de traitement parallèle d'accumuler des données d'un côté et de les dépiler de l'autre. C'est le principe du modèle producteur-consommateur, du pattern « Pipes and Filters » ou des pools d'exécution. La structure de données qui se prête le mieux à ces manipulations est la file (queue, First In First Out). En Java, il existe également l'interface Deque (prononcé « dèc ») qui permet également de gérer une pile (Last In First Out). Les variantes « bloquantes » sont représentées par les interfaces BlockingQueue et BlockingDeque.
Une file bloquante offre différents types d'opérations :
-
non-bloquante : si la file n'est pas en mesure de répondre à la demande (vide ou pleine), celle-ci est rejetée. Il existe alors deux formes de retour.
-
bloquante : la demande est mise en attente jusqu'à ce que l'état de la file le permette, il n'est en revanche pas possible d'examiner comme avec les méthodes element() et peek(). Il existe deux formes d'attente :
III-B. SynchronousQueue▲
SynchronousQueue est la plus simple des files bloquantes. En réalité, il ne s'agit pas d'une file puisqu'elle a une capacité de 0 :
//
com.developpez.lmauzaize.java.concurrence.collections.SynchronousQueueDemo
SynchronousQueue<
String>
queue =
new
SynchronousQueue<
>
(
);
//
Non-bloquant
avec
exception
try
{
queue.add
(
"
A
"
);
}
catch
(
Exception e) {
Logger.println
(
"
Ajout
refusé:
%s
"
, e);
}
//
Non-bloquant
avec
valeur
Logger.println
(
"
Ajout?
%b
"
, queue.offer
(
"
A
"
));
new
Thread
(
"
Consommateur
"
) {
public
void
run
(
) {
Logger.println
(
"
Lecture
non-bloquante
:
%s
"
, queue.poll
(
));
Thread.sleep
(
1_500);
Logger.println
(
"
Lecture
non-bloquante
:
%s
"
, queue.poll
(
));
Logger.println
(
"
Attente
"
);
Logger.println
(
"
Lecture
bloquante
:
%s
"
, queue.take
(
));
}
}
.start
(
);
new
Thread
(
"
Producteur
"
) {
public
void
run
(
) {
Thread.sleep
(
1_000);
Logger.println
(
"
Ajout
bloquant
:
A
"
);
queue.put
(
"
A
"
);
Thread.sleep
(
1_000);
Logger.println
(
"
Ajout
non-bloquant
:
B
"
);
queue.add
(
"
B
"
);
}
}
.start
(
);
00:00:00.021 [main ] Ajout refusé: java.lang.IllegalStateException: Queue full
00:00:00.029 [main ] Ajout? false
00:00:00.030 [Consommateur ] Lecture non-bloquante : null
00:00:01.031 [Producteur ] Ajout bloquant : A
00:00:01.530 [Consommateur ] Lecture non-bloquante : A
00:00:01.530 [Consommateur ] Attente
00:00:02.530 [Producteur ] Ajout non-bloquant : B
00:00:02.530 [Consommateur ] Lecture bloquante : B
Comme il s'agit d'une file sans stockage, l'itération sur une telle file ne renvoie jamais d'élément, même si un ajout est en attente :
//
com.developpez.lmauzaize.java.concurrence.collections.SynchronousQueueIteration
SynchronousQueue<
String>
queue =
new
SynchronousQueue<
>
(
);
new
Thread
(
"
Producteur
"
) {
public
void
run
(
) {
Logger.println
(
"
Ajout
"
);
queue.put
(
getName
(
));
Logger.println
(
"
Ajouté
"
);
}
}
.start
(
);
Thread.sleep
(
1_000);
int
i =
0
;
Logger.println
(
"
début
"
);
for
(
String element : queue) {
Logger.println
(
"
[%02d]
%s
"
, i+
+
, element);
}
Logger.println
(
"
fin
"
);
Logger.println
(
"
%s
"
, queue.poll
(
));
00:00:00.020 [Producteur ] Ajout
00:00:00.999 [main ] début
00:00:00.999 [main ] fin
00:00:01.000 [main ] Producteur
00:00:01.000 [Producteur ] Ajouté
III-C. Linked/Array BlockingQueue▲
LinkedBlockingQueue est une implémentation basée sur les listes chaînées. L'implémentation jumelle est ArrayBlockingQueue qui est elle basée sur une table fixe. Globalement, les ArrayBlockingQueue offrent de meilleurs performances (voir exemple ci-dessous) mais ont l'inconvénient de fixer leur utilisation mémoire sur la taille maximale tandis que les LinkedBlockingQueue auront une utilisation mémoire proportionnelle à la taille réelle de la file.
//
com.developpez.lmauzaize.java.concurrence.collections.BlockingQueueLinkedVsArray
class
BlockingQueueLinkedVsArray {
//
Actions
//
//
Remplir/vider
la
file
void
remplir
(
);
void
vider
(
);
//
//
Lances/attends
des
threads
qui
lisent
la
file
jusqu'à
épuisement.
void
lancerConsommateurs
(
);
void
attendreConsommateurs
(
);
//
//
Lances/attends
des
threads
qui
remplissent
la
file
void
lancerProducteurs
(
);
void
attendreProducteurs
(
);
//
Bench
//
//
Exécutes
des
actions
avant/après
chaque
mesure
void
avant
(
);
void
après
(
);
//
//
Action
à
mesurer
void
run
(
);
//
//
Joue
le
scénario
plusieurs
fois
pour
chaque
implémentation,
puis
affiche
le
temps
moyen
d'exécution
void
bench
(
);
}
//
com.developpez.lmauzaize.java.concurrence.collections.BlockingQueueLinkedVsArray
//
Remplis
la
file
en
utilisant
un
seul
thread
new
BlockingQueueLinkedVsArray
(
) {
{
action =
"
AjoutSéquentiel
"
; }
void
run
(
) {
remplir
(
);
}
}
.bench
(
);
//
Remplis
la
file
en
utilisant
plusieurs
threads
new
BlockingQueueLinkedVsArray
(
) {
{
action =
"
AjoutParallèle
"
; }
void
run
(
) throws
Exception {
lancerProducteurs
(
);
attendreProducteurs
(
);
}
}
.bench
(
);
//
Vides
la
file
en
utilisant
un
seul
thread
new
BlockingQueueLinkedVsArray
(
) {
{
action =
"
RetraitSéquentiel
"
; }
void
avant
(
) {
remplir
(
);
}
void
run
(
) throws
Exception {
vider
(
);
}
}
.bench
(
);
//
Vides
la
file
en
utilisant
plusieurs
threads
new
BlockingQueueLinkedVsArray
(
) {
{
action =
"
RetraitParallèle
"
; }
void
avant
(
) {
remplir
(
);
}
void
run
(
) throws
Exception {
lancerConsommateurs
(
);
attendreConsommateurs
(
);
}
}
.bench
(
);
//
Vides
et
remplis
la
file
en
utilisant
plusieurs
threads
new
BlockingQueueLinkedVsArray
(
) {
{
action =
"
Parallèle
"
; }
void
run
(
) throws
Exception {
lancerConsommateurs
(
);
lancerProducteurs
(
);
attendreProducteurs
(
);
attendreConsommateurs
(
);
}
}
.bench
(
);
Paramètres :
- Itérations par implémentation : 10
- Nombre de producteurs : 1 000
- Nombre de consommateurs : 1 000
- Nombre total d'éléments : 2 000 000
Test: AjoutSéquentiel , File: LinkedBlockingQueue, Temps: 00:00:00.317
Test: AjoutSéquentiel , File: ArrayBlockingQueue, Temps: 00:00:00.055
Test: AjoutParallèle , File: LinkedBlockingQueue, Temps: 00:00:00.201
Test: AjoutParallèle , File: ArrayBlockingQueue, Temps: 00:00:00.075
Test: RetraitSéquentiel, File: LinkedBlockingQueue, Temps: 00:00:00.071
Test: RetraitSéquentiel, File: ArrayBlockingQueue, Temps: 00:00:00.055
Test: RetraitParallèle , File: LinkedBlockingQueue, Temps: 00:00:00.138
Test: RetraitParallèle , File: ArrayBlockingQueue, Temps: 00:00:00.090
Test: Parallèle , File: LinkedBlockingQueue, Temps: 00:00:00.342
Test: Parallèle , File: ArrayBlockingQueue, Temps: 00:00:00.194
Contrairement aux collections « standard » (et SynchronousQueue), il est tout à fait possible d'itérer sur la collection même si des modifications interviennent entre temps. Cependant, les modifications peuvent ou pas être visibles lors de l'itération.
//
com.developpez.lmauzaize.java.concurrence.collections.ArrayBlockingQueueIteration
BlockingQueue<
String>
file =
new
ArrayBlockingQueue<
>
(
100
);
for
(
int
i =
0
; i <
3
; i+
+
) {
new
Thread
(
"
Producteur-
"
+
i) {
public
void
run
(
) {
for
(
int
i =
0
; i <
4
; i+
+
) {
String e =
getName
(
) +
"
-
"
+
i;
Logger.println
(
"
Ajout
%s
"
, e);
file.put
(
e);
Thread.sleep
(
40
);
}
}
}
.start
(
);
}
int
i =
0
;
Thread.sleep
(
50
);
Logger.println
(
"
début
"
);
for
(
String élément : file) {
Logger.println
(
"
[%02d]
%s
"
, i+
+
, élément);
Thread.sleep
(
50
);
}
Logger.println
(
"
fin
"
);
00:00:00.022 [Producteur-0 ] Ajout Producteur-0-0
00:00:00.022 [Producteur-1 ] Ajout Producteur-1-0
00:00:00.022 [Producteur-2 ] Ajout Producteur-2-0
00:00:00.049 [main ] début
00:00:00.049 [main ] [00] Producteur-0-0
00:00:00.071 [Producteur-2 ] Ajout Producteur-2-1
00:00:00.071 [Producteur-0 ] Ajout Producteur-0-1
00:00:00.071 [Producteur-1 ] Ajout Producteur-1-1
00:00:00.100 [main ] [01] Producteur-1-0
00:00:00.111 [Producteur-2 ] Ajout Producteur-2-2
00:00:00.111 [Producteur-0 ] Ajout Producteur-0-2
00:00:00.111 [Producteur-1 ] Ajout Producteur-1-2
00:00:00.150 [main ] [02] Producteur-2-0
00:00:00.151 [Producteur-0 ] Ajout Producteur-0-3
00:00:00.151 [Producteur-2 ] Ajout Producteur-2-3
00:00:00.151 [Producteur-1 ] Ajout Producteur-1-3
00:00:00.200 [main ] [03] Producteur-2-1
00:00:00.250 [main ] [04] Producteur-0-1
00:00:00.300 [main ] [05] Producteur-1-1
00:00:00.350 [main ] [06] Producteur-2-2
00:00:00.400 [main ] [07] Producteur-0-2
00:00:00.450 [main ] [08] Producteur-1-2
00:00:00.500 [main ] [09] Producteur-0-3
00:00:00.550 [main ] [10] Producteur-2-3
00:00:00.600 [main ] [11] Producteur-1-3
00:00:00.650 [main ] fin
III-D. PriorityBlockingQueue▲
La file bloquante à priorité (PriorityBlockingQueue) est la variante concurrente de PriorityQueue. La priorité est géré soit par l'ordre naturel, soit un ordre spécifique.
//
com.developpez.lmauzaize.java.concurrence.collections.PriorityBlockingQueueDemo
class
Élément implements
Comparable<
Élément>
{
String nom;
int
priorité;
Élément
(
String nom, int
priorité) {
this
.nom =
nom;
this
.priorité =
priorité;
}
public
int
compareTo
(
Élément that) {
return
this
.priorité -
that.priorité;
}
@
Override
public
String toString
(
) {
return
String.format
(
"
{%s:%02d}
"
, nom, priorité);
}
}
PriorityBlockingQueue<
Élément>
file =
new
PriorityBlockingQueue<
>
(
);
file.offer
(
new
Élément
(
"
A
"
, 10
));
file.offer
(
new
Élément
(
"
B
"
, 0
));
file.offer
(
new
Élément
(
"
C
"
, 20
));
file.offer
(
new
Élément
(
"
D
"
, 10
));
file.offer
(
new
Élément
(
"
E
"
, 10
));
file.offer
(
new
Élément
(
"
F
"
, 10
));
Logger.println
(
"
file
=%s
"
, file);
List<
Élément>
éléments =
new
ArrayList<
>
(
file.size
(
));
file.drainTo
(
éléments);
Logger.println
(
"
éléments=%s
"
, éléments);
00:00:00.020 [main ] file =[{B:00}, {A:10}, {F:10}, {D:10}, {E:10}, {C:20}]
00:00:00.028 [main ] éléments=[{B:00}, {A:10}, {E:10}, {D:10}, {F:10}, {C:20}]
L'itération sur ce type de file ne tient pas compte de la priorité des éléments (idem pour sa représentation sous forme de chaîne de caractères). Si vous souhaitez manipuler cette liste conformément aux priorités, vous devez utiliser les méthodes spécifiques aux files (ex : peek, poll, drainTo).
Ce sont les éléments avec la plus petite priorité qui sont retirés en premier. A priorité égale l'ordre n'est pas spécifié.
III-E. DelayQueue▲
Les files différées (DelayQueue) permettent de retarder le traitement des éléments qui la composent. En effet, elles se composent uniquement d'éléments différées (Delayed) qui expirent si le délai est inférieur et égal à zéro.
Tant qu'aucun élément n'a expiré, la file ne possède pas de tête à tirer ; c'est-à-dire que :
- peek() renvoient un élément qui n'a pas encore expiré.
- element() renvoient un élément qui n'a pas encore expiré.
- poll() renvoient null.
- remove() lancent une NoSuchElementException.
- drainTo() ne retirent aucun élément.
- take() attend le premier élément à expirer.
//
com.developpez.lmauzaize.java.concurrence.collections.DelayQueueDemo
class
Différé implements
Delayed {
long
expiration;
String toString;
Différé
(
long
temps, TimeUnit unité) {
long
now =
System.nanoTime
(
);
long
nanos =
unité.toNanos
(
temps);
expiration =
now +
nanos;
toString =
temps +
"
"
+
unité;
}
public
long
getDelay
(
TimeUnit unité) {
long
now =
System.nanoTime
(
);
long
temps =
expiration -
now;
long
délai =
unité.convert
(
temps, TimeUnit.NANOSECONDS);
return
délai;
}
public
int
compareTo
(
Delayed o) {
Différé d =
(
Différé) o;
long
diff =
this
.expiration -
d.expiration;
if
(
diff >
Integer.MAX_VALUE) return
Integer.MAX_VALUE;
if
(
diff <
Integer.MIN_VALUE) return
Integer.MIN_VALUE;
return
(
int
) diff;
}
public
String toString
(
) {
return
toString;
}
}
DelayQueue<
Différé>
file =
new
DelayQueue<
>
(
);
for
(
long
délai : new
long
[] {
5
, 2
, 1
, 3
, 4
, 6
}
) {
Différé d =
new
Différé
(
délai, TimeUnit.SECONDS);
file.add
(
d);
}
Runnable afficherFile =
(
) -
>
{
Logger.println
(
"
File
:
%s
"
, file);
Logger.println
(
"
%-10s=
%s
"
, "
size
"
, file.size
(
));
Logger.println
(
"
%-10s=
%s
"
, "
iterator
"
, new
ArrayList<
>
(
file));
Logger.println
(
"
%-10s=
%s
"
, "
array
"
, Arrays.toString
(
file.toArray
(
)));
Logger.println
(
"
%-10s=
%s
"
, "
peek
"
, file.peek
(
));
Logger.println
(
"
%-10s=
%s
"
, "
element
"
, file.element
(
));
Logger.println
(
"
***
"
);
Logger.println
(
"
%-10s=
%s
"
, "
poll
"
, file.poll
(
));
Object remove;
try
{
remove =
file.remove
(
);
}
catch
(
NoSuchElementException e) {
remove =
e;
}
Logger.println
(
"
%-10s=
%s
"
, "
remove
"
, remove);
List<
Différé>
liste =
new
ArrayList<
>
(
);
file.drainTo
(
liste);
Logger.println
(
"
%-10s=
%s
"
, "
drainTo
"
, liste);
}
;
afficherFile.run
(
);
Logger.println
(
"
"
);
Logger.println
(
"
Sleep
"
);
Logger.println
(
"
"
);
TimeUnit.SECONDS.sleep
(
5
);
afficherFile.run
(
);
00:00:00.021 [main ] File : [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:00.030 [main ] size = 6
00:00:00.030 [main ] iterator = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:00.030 [main ] array = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:00.031 [main ] peek = 1 SECONDS
00:00:00.031 [main ] element = 1 SECONDS
00:00:00.031 [main ] ***
00:00:00.031 [main ] poll = null
00:00:00.032 [main ] remove = java.util.NoSuchElementException
00:00:00.032 [main ] drainTo = []
00:00:00.032 [main ]
00:00:00.032 [main ] Sleep
00:00:00.033 [main ]
00:00:05.033 [main ] File : [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:05.033 [main ] size = 6
00:00:05.034 [main ] iterator = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:05.034 [main ] array = [1 SECONDS, 3 SECONDS, 2 SECONDS, 5 SECONDS, 4 SECONDS, 6 SECONDS]
00:00:05.035 [main ] peek = 1 SECONDS
00:00:05.035 [main ] element = 1 SECONDS
00:00:05.036 [main ] ***
00:00:05.036 [main ] poll = 1 SECONDS
00:00:05.037 [main ] remove = 2 SECONDS
00:00:05.038 [main ] drainTo = [3 SECONDS, 4 SECONDS, 5 SECONDS]
III-F. TransferQueue (1.7+)▲
Les files de transfert (TransferQueue) sont des files bloquantes qui permettent à un « producteur » d'attendre que sa ressource soit « consommée ». L'interface demeure la même pour le consommateur mais le producteur peut :
- tenter de faire consommer sa ressource immédiatement via tryTransfer(E). La méthode renvoie un booléen indiquant si l'élément a été transféré,
- tenter de faire consommer sa ressource pendant un certain délai via tryTransfer(E, long, TimeUnit). La méthode renvoie un booléen indiquant si l'élément a été transféré,
- attendre que sa ressource soit consommée via transfer(E). La méthode attend jusqu'au traitement de la ressource ou l'interruption du thread.
La seule implémentation de cette interface est la classe LinkedTransferQueue, comme son nom l'indique elle repose sur une liste chaînée.
//
com.developpez.lmauzaize.java.concurrence.collections.TransferQueueDémo
class
TransferQueueDémo {
TransferQueue<
String>
file =
new
LinkedTransferQueue<
>
(
);
void
tryTransfer
(
String valeur) {
Logger.println
(
"
résultat=%b
"
, file.tryTransfer
(
valeur));
}
;
void
tryTransferAttente
(
String valeur) {
Logger.println
(
"
résultat=%b
"
, file.tryTransfer
(
valeur, 5
, TimeUnit.SECONDS));
}
;
void
transfer
(
String valeur) {
file.transfer
(
valeur);
Logger.println
(
"
transfert
terminé
"
);
}
;
void
avecConsommateur
(
) {
new
Thread
(
"
Consommateur
"
) {
public
void
run
(
) {
Logger.println
(
"
pause
"
);
TimeUnit.SECONDS.sleep
(
2
);
Logger.println
(
"
lecture:
%s
"
, file.take
(
));
}
}
.start
(
);
}
;
}
TransferQueueDémo démo =
new
TransferQueueDémo
(
);
//
tryTransfer
-
immédiat
Logger.println
(
"
tryTransfer:
A
"
);
démo.tryTransfer
(
"
A
"
);
Logger.println
(
"
"
);
//
tryTransfer
-
expiration
Logger.println
(
"
tryTransfer:
B,
attente
5s
"
);
démo.tryTransferAttente
(
"
B
"
);
Logger.println
(
"
"
);
//
tryTransfer
-
attente
Logger.println
(
"
tryTransfer:
C,
attente
5s
"
);
démo.avecConsommateur
(
);
démo.tryTransferAttente
(
"
C
"
);
Logger.println
(
"
"
);
//
transfer
Logger.println
(
"
transfer:
D
"
);
démo.avecConsommateur
(
);
démo.transfer
(
"
D
"
);
00:00:00.021 [main ] tryTransfer: A
00:00:00.030 [main ] résultat=false
00:00:00.030 [main ]
00:00:00.031 [main ] tryTransfer: B, attente 5s
00:00:05.032 [main ] résultat=false
00:00:05.032 [main ]
00:00:05.033 [main ] tryTransfer: C, attente 5s
00:00:05.035 [Consommateur ] pause
00:00:07.036 [Consommateur ] lecture: C
00:00:07.036 [main ] résultat=true
00:00:07.037 [main ]
00:00:07.038 [main ] transfer: D
00:00:07.039 [Consommateur ] pause
00:00:09.039 [Consommateur ] lecture: D
00:00:09.039 [main ] transfert terminé
IV. CopyOnWrite List/Set▲
Les collections CopyOnWrite* sont des collections concurrentes qui stockent leurs éléments dans un tableau. Chaque modification entraîne la création d'un nouveau tableau en effectuant une copie de l'original.
Les itérateurs conservent une référence du tableau utilisé lors de leur création. Ils sont hermétiques aux modifications qui surviennent sur la collection source et ne supportent pas les opérations de modifications (remove, add et set).
Ces collections sont à utiliser si le nombre de modification est peu important et/ou que le nombre d'élément n'est pas important. Les modifications sont très coûteuses mais ces collections se révèlent efficaces dès lors que les parcours surpassement largement les modifications. La classe de base est CopyOnWriteArrayList, il existe une variante CopyOnWriteArraySet qui implémente l'interface Set ; cette dernière encapsule une instance de CopyOnWriteArrayList pour stocker les données.
//
com.developpez.lmauzaize.java.concurrence.collections.CopyOnWriteArrayListDemo
class
CopyOnWriteArrayListDemo {
//
Pour
chaque
implémentation,
initialise
une
instance,
puis
joue
le
scénario
10
fois
et
enfin
affiche
le
temps
moyen
par
itération.
void
test
(
String action);
//
Actions
:
créer
une
tâche
qui
sera
exécutée
N
fois
en
parallèle
avec
une
pause
optionnelle
avant.
//
-
Effectues
"n"
insertions
par
thread
CopyOnWriteArrayListDemo add
(
thread, int
n, int
délai);
//
-
Itères
la
collection
à
chaque
thread
CopyOnWriteArrayListDemo iterator
(
int
thread, final
int
délai);
//
-
Effectues
"n"
retrait
par
thread
CopyOnWriteArrayListDemo remove
(
int
thread, int
quantité, int
délai);
}
//
com.developpez.lmauzaize.java.concurrence.collections.CopyOnWriteArrayListDemo
final
CopyOnWriteArrayListDemo démo =
new
CopyOnWriteArrayListDemo
(
);
int
thread, quantité, remplissage, délai =
0
;
démo.add
(
thread=
1
, quantité=
10_000, délai).test
(
remplissage=
0
, "
Ajout
linéaire
"
);
démo.add
(
thread=
100
, quantité=
100
, délai).test
(
remplissage=
0
, "
Ajout
parallèle
"
);
démo.iterator
(
thread=
1_000, délai).test
(
remplissage=
1_000, "
Parcours
[1k/1k]
"
);
démo.iterator
(
thread=
5_000, délai).test
(
remplissage=
1_000, "
Parcours
[5k/1k]
"
);
démo.iterator
(
thread=
1_000, délai).test
(
remplissage=
3_000, "
Parcours
[1k/3k]
"
);
démo.iterator
(
thread=
3_000, délai).test
(
remplissage=
3_000, "
Parcours
[3k/3k]
"
);
for
(
int
i =
0
; i <
3
; i+
+
) {
délai =
i*
100
;
démo
.add (
thread=
100
, quantité=
10
, délai)
.iterator
(
thread=
1_000, délai)
.remove (
thread=
100
, quantité=
5
, délai);
}
démo.test
(
remplissage=
1_000, "
Concurrence
"
);
Test: Ajout linéaire , File: CopyOnWriteArrayQueue, Temps: 00:00:00.035
Test: Ajout linéaire , File: ArrayBlockingQueue, Temps: 00:00:00.000
Test: Ajout parallèle , File: CopyOnWriteArrayQueue, Temps: 00:00:00.040
Test: Ajout parallèle , File: ArrayBlockingQueue, Temps: 00:00:00.001
Test: Parcours [1k/1k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.005
Test: Parcours [1k/1k] , File: ArrayBlockingQueue, Temps: 00:00:00.038
Test: Parcours [5k/1k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.007
Test: Parcours [5k/1k] , File: ArrayBlockingQueue, Temps: 00:00:00.172
Test: Parcours [1k/3k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.003
Test: Parcours [1k/3k] , File: ArrayBlockingQueue, Temps: 00:00:00.085
Test: Parcours [3k/3k] , File: CopyOnWriteArrayQueue, Temps: 00:00:00.005
Test: Parcours [3k/3k] , File: ArrayBlockingQueue, Temps: 00:00:00.263
Test: Concurrence , File: CopyOnWriteArrayQueue, Temps: 00:00:00.331
Test: Concurrence , File: ArrayBlockingQueue, Temps: 00:00:00.409
V. Maps concurrentes▲
V-A. ConcurrentSkipList▲
Une liste à enjambements (Skip-list) est une structure de données triées qui utilise plusieurs niveaux de listes chaînées pour accéder plus rapidement à un élément. Voici un schéma montrant la composition interne d'une telle structure :
1
1-----4---6
1---3-4---6-----9
1-2-3-4-5-6-7-8-9-10
Pour localiser un élément, on procède couche par couche. On recherche le plus petit élément correspondant dans la couche courante, puis on passe dans la couche suivante par mouvement vertical (en réalité, un pointeur sur le nœud correspondant de la couche suivante). Il s'agit donc d'une sorte de structure arborescente dont un nœud est chaîné à la fois à son voisin et à son équivalent dans la couche inférieure.
En Java, cette structure est représentée par la classe ConcurrentSkipListMap qui gère également les accès concurrents. Voici une liste de points clés à retenir :
- l'itération (en concurrence avec les modifications) est supportée même si les modifications postérieures peuvent ne pas être visibles ;
- Les paires clé-valeur renvoyées ne sont pas modifiables ;
- déterminer la taille du dictionnaire nécessite de le parcourir ;
- les opérations massives (ex : putAll, ou même equals) ne sont pas nécessairement atomiques ;
- la clé null n'est pas supportée ;
- les opérations « par ordre décroissant » (ex : descendingMap) sont moins performantes que les opérations classiques.
ConcurrentSkipListSet est une version implémentant l'interface Set ; elle utilise une ConcurrentSkipListMap en interne.
//
com.developpez.lmauzaize.java.concurrence.collections.ConcurrentSkipListMapDemo
class
ConcurrentSkipListMapDemo {
//
Modifies
le
nom
du
thread
courant
et
exécutes
séquentiellement
ses
actions
class
Activité implements
Runnable {
String nom;
List<
Runnable>
actions =
new
ArrayList<
>
(
);
//
Actions
/**
Insère
la
clé
"
<
nom
>
-
<
i
>
"
*
*/
Activité put
(
int
i);
/**
Supprime
la
clé
*
*/
Activité remove
(
String clé);
/**
Crée
une
map
content
"
n
"
clé
(
"
<
nom
>
-
<
i
>
"
)
et
l
'
insère
massivement
*
*/
Activité putAll
(
int
début, int
n);
/**
Itère
sur
les
entrées
et
fait
une
pause
entre
chaque
*
*/
Activité parcours
(
TimeUnit unité, long
temps);
/**
Effectue
une
pause
*
*/
Activité pause
(
TimeUnit unité, int
temps);
}
ConcurrentSkipListMap<
String, String>
map =
new
ConcurrentSkipListMap<
>
(
);
List<
Activité>
activités =
new
ArrayList<
>
(
);
List<
Runnable>
après =
Collections.emptyList
(
);
List<
Runnable>
avant =
Collections.emptyList
(
);
//
Pré/post
actions
/**
Indique
les
actions
avant
"
exécuter
"
*
*/
ConcurrentSkipListMapDemo avant
(
);
/**
Indique
les
actions
après
"
exécuter
"
*
*/
ConcurrentSkipListMapDemo après
(
);
/**
Insère
"
n
"
clés
*
*/
ConcurrentSkipListMapDemo remplir
(
int
n);
/**
Affiche
les
noms
de
threads
consécutifs
*
*/
ConcurrentSkipListMapDemo verifierChangementThread
(
);
/**
Créé
une
nouvelle
activité
.
Les
activités
sont
exécutées
en
parallèle
*
*/
Activité activité
(
String nom);
/**
Exécutes
les
actions
"
avant
"
,
les
activités
en
parallèle
,
puis
actions
"
après
"
.
Enfin
,
réinitialise
l
'
instance
.
*
*/
void
exécuter
(
);
}
//
com.developpez.lmauzaize.java.concurrence.collections.ConcurrentSkipListMapDemo
ConcurrentSkipListMapDemo demo =
new
ConcurrentSkipListMapDemo
(
);
Logger.println
(
"
---------------------------------
"
);
Logger.println
(
"
Itération
"
);
demo.avant
(
).remplir
(
5
);
demo.activité
(
"
iter0
"
) .parcours
(
TimeUnit.MILLISECONDS, 100
);
demo.activité
(
"
iter1
"
) .pause
(
TimeUnit.MILLISECONDS, 200
).parcours
(
TimeUnit.MILLISECONDS, 100
);
demo.activité
(
"
put_10
"
).pause
(
TimeUnit.MILLISECONDS, 500
).put
(
10
);
demo.activité
(
"
rmv_02
"
).pause
(
TimeUnit.MILLISECONDS, 400
).remove
(
"
main-02
"
);
demo.activité
(
"
iter2
"
) .pause
(
TimeUnit.MILLISECONDS, 800
).parcours
(
TimeUnit.MILLISECONDS, 0
);
demo.exécuter
(
);
Logger.println
(
"
---------------------------------
"
);
Logger.println
(
"
Ajout
massif
"
);
for
(
int
i =
0
; i <
5
; i+
+
) {
demo.activité
(
"
putAll
"
+
i).putAll
(
0
, 200_000);
}
demo.après
(
).verifierChangementThread
(
);
demo.exécuter
(
);
00:00:00.022 [main ] ---------------------------------
00:00:00.031 [main ] Itération
00:00:00.139 [iter0 ] main-00=main
00:00:00.239 [iter0 ] main-01=main
00:00:00.339 [iter0 ] main-02=main
00:00:00.339 [iter1 ] main-00=main
00:00:00.439 [iter0 ] main-03=main
00:00:00.439 [rmv_02 ] Retrait main-02
00:00:00.440 [iter1 ] main-01=main
00:00:00.539 [iter0 ] main-04=main
00:00:00.539 [put_10 ] Ajout put_10-0010
00:00:00.540 [iter1 ] main-02=main
00:00:00.640 [iter1 ] main-03=main
00:00:00.740 [iter1 ] main-04=main
00:00:00.839 [iter2 ] main-00=main
00:00:00.839 [iter2 ] main-01=main
00:00:00.840 [iter1 ] put_10-0010=put_10
00:00:00.840 [iter2 ] main-03=main
00:00:00.841 [iter2 ] main-04=main
00:00:00.841 [iter2 ] put_10-0010=put_10
00:00:00.842 [main ] ---------------------------------
00:00:00.843 [main ] Ajout massif
00:00:02.306 [main ] [putAll0 x 200 000, putAll1 x 200 000, putAll2 x 200 000, putAll3 x 200 000, putAll4 x 200 000]
V-B. ConcurrentHashMap▲
Contrairement aux « Skip-List », ce type de dictionnaire repose sur l'identité et l'empreinte d'un objet (comme HashMap, avec la gestion de la concurrence en plus). Cette implémentation est à privilégier par rapport à Hashtable. En effet, Hashtable offre de moins bonnes performances et ne supporte pas l'itération en concurrence avec les modifications (elle implémente le principe fail-fast). Pour améliorer les performances, ConcurrentHashMap repose sur deux principes :
- des lectures non-bloquantes (la plupart du temps) ;
- le partitionnement de sa structure interne.
Lors de modification, seule la partition concernée est verrouillée. Le nombre de partition se contrôle à l'aide du paramètre concurrencyLevel. En réalité, le nombre de partition correspondra à la première puissance de 2 supérieure (ou égale), sans pouvoir excéder 65 536. Ce nombre doit correspondre aux nombres de threads qui peuvent potentiellement modifier simultanément la Map. Ainsi si un seul thread à la fois est censé écrire dans la Map (et quelque soit le nombre de lecture concurrente), une valeur de 1 est parfaitement adéquate. S'il n'est pas nécessaire d'avoir une estimation exacte de la concurrence, il est nécessaire d'avoir un ordre de grandeur proche sous peine de dégrader les performances de la Map (temps d'exécution des opérations et consommation mémoire) que ce nombre soit sous-estimé ou sur-estimé.
Comme pour la plupart des Maps, il est également recommandé de « tailler » correctement la Map, c'est-à-dire de positionner la taille (initial capacity) et le facteur de charge (load factor).
Les mesures qui suivent concernent l'impact du niveau de concurrence sur les performances de la Map. Ce test consiste à exécuter 16 threads qui vont ajouter 1 000 000 clés aléatoires (parmis 1 024 clés possibles). On fait varier le niveau de concurrence à chaque scénario.
//
com.developpez.lmauzaize.java.concurrence.collections.ConcurrentHashMapConcurrence
//
Paramètres
int
thread =
16
;
int
taille =
2_000_000;
int
limite =
1_024;
//
Génération
des
valeurs
possibles
String[] cache =
IntStream.range
(
0
, limite).mapToObj
(
String::valueOf).toArray
(
String[]::new
);
//
Génère
une
liste
fixe
de
tâches
pour
tous
les
scénarios
List<
Callable<
String>
>
tasks =
new
ArrayList<
>
(
thread);
Random random =
new
Random
(
);
for
(
int
i =
0
; i <
thread; i+
+
) {
List<
String>
list =
random.ints
(
0
, limite).limit
(
taille).mapToObj
(
key -
>
cache[key]).collect
(
Collectors.toCollection
(
(
) -
>
new
ArrayList<
>
(
taille)));
Callable<
String>
task =
(
) -
>
{
String name =
Thread.currentThread
(
).getName
(
);
for
(
String item : list) {
map.put
(
item, name);
}
return
name;
}
;
tasks.add
(
task);
}
ExecutorService threadpool =
Executors.newFixedThreadPool
(
thread);
for
(
int
concurrence =
1
; concurrence <=
256
; concurrence*
=
2
) {
map =
new
ConcurrentHashMap<
>
(
limite, 1
.0f
, concurrence);
int
count =
0
;
int
total =
0
;
for
(
int
i =
0
; i <
50
; i+
+
) {
map.clear
(
);
long
start =
System.currentTimeMillis
(
);
threadpool.invokeAll
(
tasks);
long
spent =
System.currentTimeMillis
(
) -
start;
if
(
i >=
20
) {
count+
+
;
total +
=
spent;
}
}
Logger.println
(
"
test:
%04d,
classe:
%17s
temps:
%tT.%<tL
"
, concurrence, map.getClass
(
).getSimpleName
(
), new
Date
(
total /
count));
}
threadpool.shutdownNow
(
);
00:00:00.016 [main ] test: 0001, classe: ConcurrentHashMap temps: 00:00:00.639
00:00:32.648 [main ] test: 0002, classe: ConcurrentHashMap temps: 00:00:00.637
00:01:06.252 [main ] test: 0004, classe: ConcurrentHashMap temps: 00:00:00.664
00:01:40.124 [main ] test: 0008, classe: ConcurrentHashMap temps: 00:00:00.682
00:02:13.029 [main ] test: 0016, classe: ConcurrentHashMap temps: 00:00:00.665
00:02:49.347 [main ] test: 0032, classe: ConcurrentHashMap temps: 00:00:00.698
00:03:22.275 [main ] test: 0064, classe: ConcurrentHashMap temps: 00:00:00.666
00:03:56.879 [main ] test: 0128, classe: ConcurrentHashMap temps: 00:00:00.667
00:04:32.157 [main ] test: 0256, classe: ConcurrentHashMap temps: 00:00:00.699
Dans cet exemple, on observe aucun impact significatif du niveau de concurrence. Ces mesures ont été effectuées sur un Intel Core i5-3470 @ 3.20 GHz (4 coeurs / 4 threads) et un impact plus significatif pourrait être observé sur une machine avec plus de coeurs.
Pour réaliser les mesures suivantes, nous allons utiliser cette classe utilitaire :
//
com.developpez.lmauzaize.java.concurrence.collections.ConcurrentHashMapDemo
class
ConcurrentHashMapDemo {
//
Type
de
map
utilisable
static
enum
TypeMap {
CONCURRENT, HASHTABLE;
}
TypeMap[] typeMaps =
TypeMap.values
(
);
//
Paramètres
des
maps
int
taille =
16
, concurrence =
16
;
float
charge =
0
.75f
;
//
Tâches
//
-
Ajoute
n
clés
au
hasard
Runnable ajout
(
int
n);
//
-
Lis
n
clés
au
hasard
Runnable lecture
(
int
n);
//
Exécute
la
liste
de
tâches
programmées
en
plusieurs
itérations
et
indique
le
temps
d'exécution
moyen
void
bench
(
);
}
La seconde mesure consiste à comparer les performances avec une Hashtable. Une première série de scénario consiste à observer l'impact du nombre de thread, on conserve cependant un nombre de constant d'insertion totale. Le dernier scénario consiste à observer l'impact des lectures concurrentes.
//
com.developpez.lmauzaize.java.concurrence.collections.ConcurrentHashMapVsHashtable
ConcurrentHashMapDemo demo =
new
ConcurrentHashMapDemo
(
);
demo.tache
(
3
, demo.ajout
(
2_000_000));
demo.bench
(
);
Logger.println
(
"
"
);
demo.tache
(
30
, demo.ajout
(
200_000));
demo.bench
(
);
Logger.println
(
"
"
);
demo.tache
(
300
, demo.ajout
(
20_000));
demo.bench
(
);
Logger.println
(
"
"
);
demo.tache
(
3
, demo.ajout
(
2_000_000));
demo.tache
(
1_000, demo.lecture
(
100
));
demo.bench
(
);
Logger.println
(
"
"
);
00:00:00.062 [main ] test: [3*put(2M)], classe: ConcurrentHashMap temps: 00:00:00.907
00:02:14.320 [main ] test: [3*put(2M)], classe: Hashtable temps: 00:00:02.620
00:02:14.321 [main ]
00:02:58.471 [main ] test: [30*put(200k)], classe: ConcurrentHashMap temps: 00:00:00.792
00:05:12.714 [main ] test: [30*put(200k)], classe: Hashtable temps: 00:00:02.693
00:05:12.715 [main ]
00:05:55.347 [main ] test: [300*put(20k)], classe: ConcurrentHashMap temps: 00:00:00.822
00:08:11.216 [main ] test: [300*put(20k)], classe: Hashtable temps: 00:00:02.722
00:08:11.217 [main ]
00:09:04.250 [main ] test: [3*put(2M), 1k*get(100)], classe: ConcurrentHashMap temps: 00:00:00.982
00:11:16.949 [main ] test: [3*put(2M), 1k*get(100)], classe: Hashtable temps: 00:00:02.643
00:11:16.950 [main ]
En premier lieu, on observe de bien meilleures performances (d'un facteur 3) de la ConcurrentHashMap par rapport à la Hashtable.
Autre fait remarquable, le nombre de thread n'a pas d'incidence sur le temps de traitement global. Ceci peut-être dû comme précédemment au nombre de coeurs de la machine de test.
Enfin, le dernier scénario met en lumière le faible impact des lectures concurrentes.
VI. Conclusion▲
Au cours de ce troisième article, nous avons vu comment gérer le stockage de vos données pour une utilisation concurrente. Le prochain article sera dédié aux différents types de verrou.
VII. Remerciements▲
Je remercie tous les contributeurs de l'API Java et les packages java.util.concurrent.* et plus particulièrement Doug Lea qui est l'un des principaux auteurs.
Je remercie également Mickael Baron, Thierry Leriche-Dessirier alias thierryler et Claude Leloup pour leur relecture attentive, leurs remarques et leurs bons conseils.
Je tiens aussi à remercier la communauté Developpez.com qui a mis en place tous les outils, procédures et l'hébergement nécessaires à la publication de cet article.
Enfin mon épouse et mes enfants pour leur patience et leur tolérance durant les nombreuses heures qui ont été nécessaires à la rédaction de cet article.
VIII. Annexes▲
VIII-A. Sources des exemples▲
Tous les exemples donnés dans cet article sont disponibles sous la forme d'un projet Maven hébergé sous GitHub. Tous les exemples cités contiennent une première ligne commentaire indiquant l'emplacement du fichier dans les sources.
Si vous ne savez pas comment importer le projet, je vous invite à consulter l'article « Importer un projet Maven dans Eclipse en 5 minutes ».
VIII-B. Java Concurrent Animated▲
Java Concurrent Animated est un projet Swing visant à montrer graphiquement le comportement de différents composants de l'API concurrente de Java.