17. Abstractions pour les programmes concurrents▲
Ce chapitre passe en revue les modèles de conception les plus courants pour la programmation concurrente et montre leur implémentation en Scala.
17-1. Signaux et moniteurs▲
Exemple 17-1-1 : Un moniteur est l'outil de base pour garantir l'exclusion mutuelle des processus en Scala. Toute instance de la classe AnyRef peut servir de moniteur grâce à l'invocation d'une ou plusieurs des méthodes suivantes :
2.
3.
4.
5.
def
synchronized[A]
(
e: =>
A): A
def
wait
(
)
def
wait
(
msec: Long
)
def
notify
(
)
def
notifyAll
(
)
La méthode synchronized exécute le traitement qui lui est passé en argument en exclusion mutuelle : à un instant précis, il ne peut y avoir qu'un seul thread qui exécute le paramètre synchronized d'un moniteur donné.
Les threads peuvent être suspendus dans un moniteur dans l'attente d'un signal. Les threads qui appellent la méthode wait attendent jusqu'à ce qu'un autre thread appelle la méthode notify du même objet. Les appels à notify alors qu'aucun thread n'attend le signal sont ignorés.
Il existe également une forme de wait avec délai, qui suspend le thread en attente d'un signal ou pendant le délai indiqué (en millisecondes). En outre, la méthode notifyAll débloque tous les threads qui attendent le signal. En Scala, ces méthodes, ainsi que la classe Monitor, sont des primitives - elles sont implémentées dans l'environnement d'exécution sous-jacent.
Le plus souvent, un thread attend qu'une certaine condition soit vérifiée. Si celle-ci ne l'est pas lors de l'appel à wait, le thread se bloque jusqu'à ce qu'un autre thread établisse cette condition. C'est cet autre thread qui est responsable du réveil des threads bloqués via un appel à notify ou notifyAll. Notez cependant qu'il n'est pas garanti qu'un thread bloqué redémarre immédiatement après l'appel à notify : cet appel pourrait réveiller d'abord un autre thread qui pourrait invalider à nouveau la condition. La forme correcte de l'attente d'une condition C consiste donc à utiliser une boucle while :
while
(!
C) wait
(
)
Pour illustrer l'utilisation des moniteurs, prenons l'exemple d'une classe tampon de taille bornée :
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
class
BoundedBuffer[A]
(
N: Int
) {
var
in =
0
, out =
0
, n =
0
val
elems =
new
Array
[A]
(
N)
def
put
(
x: A) =
synchronized {
while
(
n >=
N) wait
(
)
elems
(
in) =
x ; in =
(
in +
1
) %
N ; n =
n +
1
if
(
n ==
1
) notifyAll
(
)
}
def
get: A =
synchronized {
while
(
n ==
0
) wait
(
)
val
x =
elems
(
out) ; out =
(
out +
1
) %
N ; n =
n -
1
if
(
n ==
N -
1
) notifyAll
(
) x
}
}
Voici un programme utilisant un tampon borné pour communiquer entre un processus producteur et un processus consommateur :
2.
3.
4.
5.
6.
import
scala.concurrent.ops._
...
val
buf =
new
BoundedBuffer[String]
(
10
)
spawn {
while
(
true
) {
val
s =
produceString ; buf.put
(
s) }
}
spawn {
while
(
true
) {
val
s =
buf.get ; consumeString
(
s) }
}
}
La méthode spawn crée un thread qui exécute l'expression contenue dans son paramètre. Cette méthode est définie de la façon suivante dans l'objet concurrent.ops :
2.
3.
def
spawn
(
p: =>
Unit
) {
val
t =
new
Thread
(
) {
override
def
run
(
) =
p }
t.start
(
)
17-2. Variables de synchronisation▲
Une variable synchronisée (ou « syncvar ») fournit les opérations get et put pour lire et modifier le contenu de la variable. Les opérations get sont bloquantes tant que la variable n'a pas été définie. L'opération unset réinitialise la variable et la place dans l'état indéfini.
Voici l'implémentation standard des variables synchronisées :
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
package
scala.concurrent class
SyncVar[A]
{
private
var
isDefined: Boolean
=
false
private
var
value: A =
_
def
get =
synchronized {
while
(!
isDefined) wait
(
)
value
}
def
set
(
x: A) =
synchronized {
value =
x; isDefined =
true
; notifyAll
(
)
}
def
isSet: Boolean
=
synchronized {
isDefined
}
def
unset =
synchronized {
isDefined =
false
}
}
17-3. Futures▲
Une future est une valeur qui est traitée en parallèle d'un autre thread client et qui pourra être utilisée ultérieurement par un thread client. Les futures permettent de mieux utiliser les ressources de traitement parallèle. Leur utilisation classique est de la forme :
2.
3.
4.
5.
6.
import
scala.concurrent.ops._
...
val
x =
future
(
calculTrèsLong)
unAutreCalculTrèsLong
val
y =
f
(
x
(
)) +
g
(
x
(
))
}
La méthode future est définie de la façon suivante dans l'objet scala.concurrent.ops :
2.
3.
4.
5.
def
future[A]
(
p: =>
A): Unit
=>
A =
{
val
result =
new
SyncVar[A]
fork {
result.set
(
p) }
((
) =>
result.get)
}
Cette méthode prend en paramètre un traitement p à réaliser. Le type de ce traitement est quelconque - il est représenté par le paramètre de type A de future. La méthode utilise variable synchronisée result pour stocker le résultat du traitement. Puis, elle crée un thread pour effectuer le traitement et affecter son résultat à result. En parallèle de ce thread, la fonction renvoie une fonction anonyme de type A qui, lorsqu'elle est appelée, attend le résultat du traitement et le renvoie. Lorsque cette fonction sera appelée de nouveau, elle renverra immédiatement le résultat déjà calculé.
17-4. Traitements parallèles▲
L'exemple suivant présente la fonction par qui prend en paramètre une paire de traitements et qui renvoie une paire contenant leurs résultats. Ces deux traitements s'effectuent en parallèle.
Cette fonction est définie de la façon suivante dans l'objet scala.concurrent.ops :
2.
3.
4.
5.
def
par[A, B]
(
xp: =>
A, yp: =>
B): (
A, B) =
{
val
y =
new
SyncVar[B]
spawn {
y set yp }
(
xp, y.get)
}
Au même endroit est définie la fonction replicate qui exécute plusieurs fois le même traitement en parallèle. Chaque réplique reçoit en paramètre un nombre entier qui l'identifie :
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
def
replicate
(
start: Int
, end: Int
)(
p: Int
=>
Unit
) {
if
(
start ==
end)
(
)
else
if
(
start +
1
==
end)
p
(
start)
else
{
val
mid =
(
start +
end) /
2
spawn {
replicate
(
start, mid)(
p) }
replicate
(
mid, end)(
p)
}
}
La fonction suivante utilise replicate pour traiter en parallèle tous les éléments d'un tableau :
2.
3.
4.
5.
def
parMap[A,B]
(
f: A =>
B, xs: Array
[A]
): Array
[B]
=
{
val
results =
new
Array
[B]
(
xs.length)
replicate
(
0
, xs.length) {
i =>
results
(
i) =
f
(
xs
(
i)) }
results
}
17-5. Sémaphores▲
Les verrous (ou sémaphores) sont un mécanisme classique de synchronisation des processus. Un verrou a deux actions atomiques : acquire et release. Voici l'implémentation d'un verrou en Scala :
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
package
scala.concurrent
class
Lock {
var
available =
true
def
acquire =
synchronized {
while
(!
available) wait
(
)
available =
false
}
def
release =
synchronized {
available =
true
notify
(
)
}
}
17-6. Lecteurs/rédacteurs▲
Une forme de synchronisation plus complexe fait la distinction entre les lecteurs qui accèdent à une ressource commune sans la modifier et les rédacteurs qui peuvent à la fois y accéder en lecture et la modifier. Pour synchroniser les lecteurs et les rédacteurs, il faut implémenter les opérations startRead, startWrite, endRead et endWrite telles que :
- il peut y avoir plusieurs lecteurs concurrents ;
- il ne peut y avoir qu'un seul rédacteur à la fois ;
- les demandes d'écriture en attente ont priorité sur les demandes de lecture, mais ne préemptent pas les opérations de lecture en cours.
L'implémentation suivante d'un verrou lecteurs/rédacteurs repose sur le concept de boîte aux lettres (voir la section 17.10Boîtes aux lettres) :
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
import
scala.concurrent._
class
ReadersWriters {
val
m =
new
MailBox
private
case
class
Writers
(
n: Int
), Readers
(
n: Int
) {
m send this
}
Writers
(
0
); Readers
(
0
)
def
startRead =
m receive {
case
Writers
(
n) if
n ==
0
=>
m receive {
case
Readers
(
n) =>
Writers
(
0
); Readers
(
n+
1
)
}
}
def
startWrite =
m receive {
case
Writers
(
n) =>
Writers
(
n+
1
)
m receive {
case
Readers
(
n) if
n ==
0
=>
}
}
def
endRead =
m receive {
case
Readers
(
n) =>
Readers
(
n-
1
)
}
def
endWrite =
m receive {
case
Writers
(
n) =>
Writers
(
n-
1
); if
(
n ==
0
) Readers
(
0
)
}
}
17-7. Canaux asynchrones▲
Les canaux asynchrones sont un mécanisme fondamental de communication interprocessus. Leur implémentation utilise une classe simple de listes chaînées :
2.
3.
4.
class
LinkedList[A]
{
var
elem: A =
_
var
next: LinkedList[A]
=
null
}
Pour faciliter l'insertion et la suppression des éléments dans les listes chaînées, chaque référence de la liste pointe vers le nœud qui précède le nœud qui, conceptuellement, forme le début de la liste. Les listes chaînées vides commencent par un nœud factice dont le successeur est null.
La classe Channel utilise une liste chaînée pour stocker les données qui ont été envoyées, mais non encore lue. À l'autre extrémité, les threads qui veulent lire dans un canal vide enregistrent leur présence en incrémentant le champ nreaders et attendent d'être prévenus.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
package
scala.concurrent
class
Channel[A]
{
class
LinkedList[A]
{
var
elem: A =
_
var
next: LinkedList[A]
=
null
}
private
var
written =
new
LinkedList[A]
private
var
lastWritten =
written
private
var
nreaders =
0
def
write
(
x: A) =
synchronized {
lastWritten.elem =
x
lastWritten.next =
new
LinkedList[A]
lastWritten =
lastWritten.next
if
(
nreaders >
0
) notify
(
)
}
def
read: A =
synchronized {
if
(
written.next ==
null
) {
nreaders =
nreaders +
1
; wait
(
); nreaders =
nreaders -
1
}
val
x =
written.elem
written =
written.next
x
}
}
17-8. Canaux synchrones▲
Voici une implémentation de canaux synchrones, où l'expéditeur d'un message est bloqué tant que son message n'a pas été reçu. Les canaux synchrones n'ont besoin que d'une simple variable pour stocker les messages en transit, mais de trois signaux pour coordonner les processus lecteur et rédacteur.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
package
scala.concurrent
class
SyncChannel[A]
{
private
var
data: A =
_
private
var
reading =
false
private
var
writing =
false
def
write
(
x: A) =
synchronized {
while
(
writing) wait
(
)
data =
x
writing =
true
if
(
reading) notifyAll
(
)
else
while
(!
reading) wait
(
)
}
def
read: A =
synchronized {
while
(
reading) wait
(
)
reading =
true
while
(!
writing) wait
(
)
val
x =
data
writing =
false
reading =
false
notifyAll
(
)
x
}
}
17-9. Travailleurs▲
Voici une implémentation d'un serveur de calculs en Scala. Ce serveur implémente une méthode future qui évalue une expression en parallèle de son appelant. À la différence de l'implémentation de la section 17.3Futures, le serveur ne traite les futures qu'avec un nombre prédéfini de threads. Une implémentation possible de ce serveur pourrait, par exemple, exécuter chaque thread sur un processeur différent et ainsi éviter le surcoût inhérent au changement de contexte qui intervient lorsque plusieurs threads doivent se partager le même processeur.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
import
scala.concurrent._, scala.concurrent.ops._
class
ComputeServer
(
n: Int
) {
private
abstract
class
Job {
type
T
def
task: T
def
ret
(
x: T)
}
private
val
openJobs =
new
Channel[Job]
(
)
private
def
processor
(
i: Int
) {
while
(
true
) {
val
job =
openJobs.read
job.ret
(
job.task)
}
}
def
future[A]
(
p: =>
A): (
) =>
A =
{
val
reply =
new
SyncVar[A]
(
)
openJobs.write {
new
Job {
type
T =
A
def
task =
p
def
ret
(
x: A) =
reply.set
(
x)
}
}
(
) =>
reply.get
}
spawn
(
replicate
(
0
, n) {
processor }
)
}
Les expressions à calculer (c'est-à-dire les paramètres passés à future) sont écrites dans le canal openJobs. Un job est un objet avec :
- un type abstrait T qui décrit le résultat de l'expression à calculer ;
- une méthode task sans paramètre, renvoyant un résultat de type T qui représente l'expression à calculer ;
- une méthode ret qui consomme le résultat lorsqu'il a été calculé.
Le serveur de calcul crée n processus processor au cours de son initialisation. Chacun de ces processus consomme sans fin un job ouvert, évalue la méthode task de ce job et passe le résultat à la méthode ret du job. La méthode future polymorphe crée un nouveau job dont la méthode ret est implémentée par une variable de synchronisation reply ; elle insère le job dans l'ensemble des jobs ouverts puis attend que la variable de synchronisation correspondante soit appelée.
Cet exemple montre comment utiliser les types abstraits. Le type T mémorise le type du résultat d'un job, qui peut varier d'un job à l'autre. Sans les types abstraits, il serait impossible d'implémenter la même classe avec un typage statique et l'utilisateur devrait tester dynamiquement les types et utiliser des coercitions de types.
Voici un exemple d'utilisation du serveur de calcul pour évaluer l'expression 41 + 1 :
2.
3.
4.
5.
object
Test with
Executable {
val
server =
new
ComputeServer
(
1
)
val
f =
server.future
(
41
+
1
)
println
(
f
(
))
}
17-10. Boîtes aux lettres▲
Les boîtes aux lettres sont des constructions souples et de haut niveau permettant de synchroniser des processus et de les faire communiquer. Elles permettent d'envoyer et de recevoir des messages qui, dans ce contexte, sont des objets quelconques. Le message spécial TIMEOUT sert à signaler l'expiration d'un délai :
case
object
TIMEOUT
Les boîtes aux lettres ont la signature suivante :
2.
3.
4.
5.
class
MailBox {
def
send
(
msg: Any
)
def
receive[A]
(
f: PartialFunction
[Any, A]
): A
def
receiveWithin[A]
(
msec: Long
)(
f: PartialFunction
[Any, A]
): A
}
L'état d'une boîte aux lettres est constitué d'un multiset de messages. Les messages sont ajoutés à la boîte par sa méthode send et supprimés par la méthode receive qui reçoit en paramètre un traitement de message f - une fonction partielle des messages vers un type de résultat quelconque. Généralement, cette méthode est implémentée par une reconnaissance de motif : elle se bloque jusqu'à ce qu'il y ait un message dans la boîte qui corresponde à l'un des motifs. Le message en question est alors supprimé de la boîte et le thread bloqué est relancé en appliquant le traitement au message. Les envois et les réceptions de messages sont ordonnés dans le temps ; un récepteur r n'est appliqué à un message correspondant m que s'il n'y a pas d'autre paire {message, récepteur} antérieure à m,r.
À titre d'exemple, considérons un tampon à une case :
2.
3.
4.
5.
6.
7.
8.
9.
10.
class
OnePlaceBuffer {
private
val
m =
new
MailBox // Boîte aux lettres interne
private
case
class
Empty, Full
(
x: Int
) // Types des messages gérés
m send Empty // Initialisation
def
write
(
x: Int
) {
m receive {
case
Empty =>
m send Full
(
x) }
}
def
read: Int
=
m receive {
case
Full
(
x) =>
m send Empty; x }
}
Voici comment peut être implémentée la classe MailBox :
2.
3.
4.
5.
class
MailBox {
private
abstract
class
Receiver extends
Signal {
def
isDefined
(
msg: Any
): Boolean
var
msg =
null
}
Nous définissons une classe interne pour les récepteurs, dotée d'une méthode de test isDefined, qui indique si le récepteur est défini pour un message donné. Cette classe hérite de la classe Signal la méthode notify qui sert à réveiller un thread récepteur. Lorsque ce dernier est réveillé, le message auquel il doit s'appliquer est stocké dans la variable msg de Receiver.
2.
3.
4.
private
val
sent =
new
LinkedList[Any]
private
var
lastSent =
sent
private
val
receivers =
new
LinkedList[Receiver]
private
var
lastReceiver =
receivers
La classe MailBox gère deux listes chaînées, une pour les messages envoyés, mais non encore consommés, l'autre pour les récepteurs en attente.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
def
send
(
msg: Any
) =
synchronized {
var
r =
receivers, r1 =
r.next
while
(
r1 !=
null
&&
!
r1.elem.isDefined
(
msg)) {
r =
r1; r1 =
r1.next
}
if
(
r1 !=
null
) {
r.next =
r1.next; r1.elem.msg =
msg; r1.elem.notify
}
else
{
lastSent =
insert
(
lastSent, msg)
}
}
La méthode send commence par tester si un récepteur en attente peut s'appliquer au message envoyé. En ce cas, ce récepteur est prévenu par notify. Sinon, le message est ajouté à la fin de la liste des messages envoyés.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
def
receive[A]
(
f: PartialFunction
[Any, A]
): A =
{
val
msg: Any
=
synchronized {
var
s =
sent, s1 =
s.next
while
(
s1 !=
null
&&
!
f.isDefinedAt
(
s1.elem)) {
s =
s1; s1 =
s1.next
}
if
(
s1 !=
null
) {
s.next =
s1.next; s1.elem
}
else
{
val
r =
insert
(
lastReceiver, new
Receiver {
def
isDefined
(
msg: Any
) =
f.isDefinedAt
(
msg)
}
)
lastReceiver =
r
r.elem.wait
(
)
r.elem.msg
}
}
f
(
msg)
}
La méthode receive teste d'abord si la fonction f de traitement du message peut s'appliquer à un message qui a déjà été envoyé, mais qui n'a pas encore été consommé. En ce cas, le thread se poursuit en appliquant f au message. Sinon, la méthode crée un nouveau récepteur et le place dans la liste des récepteurs, puis le thread se bloque et attend d'être réveillé par ce récepteur. Lorsqu'il se réveille, le thread applique f au message qui était stocké dans le récepteur. La méthode insert des listes chaînées est définie de la façon suivante :
2.
3.
4.
5.
6.
def
insert
(
l: LinkedList[A]
, x: A): LinkedList[A]
=
{
l.next =
new
LinkedList[A]
l.next.elem =
x
l.next.next =
l.next
l
}
La classe MailBox définit également une méthode receiveWithin qui ne bloque le thread que pendant le temps maximal indiqué. Si aucun message n'est reçu dans cet intervalle de temps (exprimé en millisecondes), le traitement du message est débloqué par le message spécial TIMEOUT. L'implémentation de receiveWithin ressemble beaucoup à celle de receive :
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
def
receiveWithin[A]
(
msec: Long
)(
f: PartialFunction
[Any, A]
): A =
{
val
msg: Any
=
synchronized {
var
s =
sent, s1 =
s.next
while
(
s1 !=
null
&&
!
f.isDefinedAt
(
s1.elem)) {
s =
s1; s1 =
s1.next
}
if
(
s1 !=
null
) {
s.next =
s1.next; s1.elem
}
else
{
val
r =
insert
(
lastReceiver, new
Receiver {
def
isDefined
(
msg: Any
) =
f.isDefinedAt
(
msg)
}
)
lastReceiver =
r
r.elem.wait
(
msec)
if
(
r.elem.msg ==
null
) r.elem.msg =
TIMEOUT
r.elem.msg
}
}
f
(
msg)
}
}
// Fin de MailBox
Les seules différences entre les deux méthodes sont l'appel temporisé à wait et l'instruction qui le suit.
17-11. Acteurs▲
Le chapitre 3Programmer avec des acteurs et des messages a présenté un exemple de programme implémentant un service d'enchères. Ce service reposait sur des processus acteurs de haut niveau qui fonctionnaient en inspectant les messages de leurs boîtes aux lettres à l'aide de reconnaissance de motif. Le paquetage scala.actors contient une implémentation améliorée et optimisée des acteurs, mais nous n'en présenterons ici qu'une version simplifiée.
Un acteur simplifié est simplement un thread qui utilise une boîte aux lettres pour communiquer. Il peut donc être vu comme une composition par mixin assemblant la classe Thread standard de Java à la classe MailBox. Nous redéfinissons également la méthode run de la classe Thread pour qu'elle ait le comportement de l'acteur défini par sa méthode act. La méthode ! appelle simplement la méthode send de la classe MailBox :
2.
3.
4.
5.
abstract
class
Actor
extends
Thread with
MailBox {
def
act
(
): Unit
override
def
run
(
): Unit
=
act
(
)
def
!(
msg: Any
) =
send
(
msg)
}