Tutoriel pour apprendre le langage Scala par l'exemple


précédentsommairesuivant

17. Abstractions pour les programmes concurrents

Ce chapitre passe en revue les modèles de conception les plus courants pour la programmation concurrente et montre leur implémentation en Scala.

17-1. Signaux et moniteurs

Exemple 17-1-1 : Un moniteur est l'outil de base pour garantir l'exclusion mutuelle des processus en Scala. Toute instance de la classe AnyRef peut servir de moniteur grâce à l'invocation d'une ou plusieurs des méthodes suivantes :

 
Sélectionnez
1.
2.
3.
4.
5.
def synchronized[A] (e: => A): A
def wait()
def wait(msec: Long)
def notify()
def notifyAll()

La méthode synchronized exécute le traitement qui lui est passé en argument en exclusion mutuelle : à un instant précis, il ne peut y avoir qu'un seul thread qui exécute le paramètre synchronized d'un moniteur donné.

Les threads peuvent être suspendus dans un moniteur dans l'attente d'un signal. Les threads qui appellent la méthode wait attendent jusqu'à ce qu'un autre thread appelle la méthode notify du même objet. Les appels à notify alors qu'aucun thread n'attend le signal sont ignorés.

Il existe également une forme de wait avec délai, qui suspend le thread en attente d'un signal ou pendant le délai indiqué (en millisecondes). En outre, la méthode notifyAll débloque tous les threads qui attendent le signal. En Scala, ces méthodes, ainsi que la classe Monitor, sont des primitives - elles sont implémentées dans l'environnement d'exécution sous-jacent.

Le plus souvent, un thread attend qu'une certaine condition soit vérifiée. Si celle-ci ne l'est pas lors de l'appel à wait, le thread se bloque jusqu'à ce qu'un autre thread établisse cette condition. C'est cet autre thread qui est responsable du réveil des threads bloqués via un appel à notify ou notifyAll. Notez cependant qu'il n'est pas garanti qu'un thread bloqué redémarre immédiatement après l'appel à notify : cet appel pourrait réveiller d'abord un autre thread qui pourrait invalider à nouveau la condition. La forme correcte de l'attente d'une condition C consiste donc à utiliser une boucle while :

 
Sélectionnez
while (!C) wait()

Pour illustrer l'utilisation des moniteurs, prenons l'exemple d'une classe tampon de taille bornée :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
class BoundedBuffer[A](N: Int) {
    var in = 0, out = 0, n = 0
    val elems = new Array[A](N)

    def put(x: A) = synchronized {
        while (n >= N) wait()
        elems(in) = x ; in = (in + 1) % N ; n = n + 1
        if (n == 1) notifyAll()
    }

    def get: A = synchronized {
        while (n == 0) wait()
        val x = elems(out) ; out = (out + 1) % N ; n = n - 1
        if (n == N - 1) notifyAll() x
    }
}

Voici un programme utilisant un tampon borné pour communiquer entre un processus producteur et un processus consommateur :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
import scala.concurrent.ops._
...
    val buf = new BoundedBuffer[String](10)
    spawn { while (true) { val s = produceString ; buf.put(s) } }
    spawn { while (true) { val s = buf.get ; consumeString(s) } }
}

La méthode spawn crée un thread qui exécute l'expression contenue dans son paramètre. Cette méthode est définie de la façon suivante dans l'objet concurrent.ops :

 
Sélectionnez
1.
2.
3.
def spawn(p: => Unit) {
    val t = new Thread() { override def run() = p }
    t.start()

17-2. Variables de synchronisation

Une variable synchronisée (ou « syncvar ») fournit les opérations get et put pour lire et modifier le contenu de la variable. Les opérations get sont bloquantes tant que la variable n'a pas été définie. L'opération unset réinitialise la variable et la place dans l'état indéfini.

Voici l'implémentation standard des variables synchronisées :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
package scala.concurrent class SyncVar[A] {
    private var isDefined: Boolean = false
    private var value: A = _
    def get = synchronized {
        while (!isDefined) wait()
        value
    }
    def set(x: A) = synchronized {
        value = x; isDefined = true; notifyAll()
    }
    def isSet: Boolean = synchronized {
        isDefined
    }
    def unset = synchronized {
        isDefined = false
    }
}

17-3. Futures

Une future est une valeur qui est traitée en parallèle d'un autre thread client et qui pourra être utilisée ultérieurement par un thread client. Les futures permettent de mieux utiliser les ressources de traitement parallèle. Leur utilisation classique est de la forme :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
import scala.concurrent.ops._
...
    val x = future(calculTrèsLong)
    unAutreCalculTrèsLong
    val y = f(x()) + g(x())
}

La méthode future est définie de la façon suivante dans l'objet scala.concurrent.ops :

 
Sélectionnez
1.
2.
3.
4.
5.
def future[A](p: => A): Unit => A = {
    val result = new SyncVar[A]
    fork { result.set(p) }
    (() => result.get)
}

Cette méthode prend en paramètre un traitement p à réaliser. Le type de ce traitement est quelconque - il est représenté par le paramètre de type A de future. La méthode utilise variable synchronisée result pour stocker le résultat du traitement. Puis, elle crée un thread pour effectuer le traitement et affecter son résultat à result. En parallèle de ce thread, la fonction renvoie une fonction anonyme de type A qui, lorsqu'elle est appelée, attend le résultat du traitement et le renvoie. Lorsque cette fonction sera appelée de nouveau, elle renverra immédiatement le résultat déjà calculé.

17-4. Traitements parallèles

L'exemple suivant présente la fonction par qui prend en paramètre une paire de traitements et qui renvoie une paire contenant leurs résultats. Ces deux traitements s'effectuent en parallèle.

Cette fonction est définie de la façon suivante dans l'objet scala.concurrent.ops :

 
Sélectionnez
1.
2.
3.
4.
5.
def par[A, B](xp: => A, yp: => B): (A, B) = {
    val y = new SyncVar[B]
    spawn { y set yp }
    (xp, y.get)
}

Au même endroit est définie la fonction replicate qui exécute plusieurs fois le même traitement en parallèle. Chaque réplique reçoit en paramètre un nombre entier qui l'identifie :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
def replicate(start: Int, end: Int)(p: Int => Unit) {
    if (start == end)
        ()
    else if (start + 1 == end)
        p(start)
    else {
        val mid = (start + end) / 2
        spawn { replicate(start, mid)(p) }
        replicate(mid, end)(p)
    }
}

La fonction suivante utilise replicate pour traiter en parallèle tous les éléments d'un tableau :

 
Sélectionnez
1.
2.
3.
4.
5.
def parMap[A,B](f: A => B, xs: Array[A]): Array[B] = {
 val results = new Array[B](xs.length)
 replicate(0, xs.length) { i => results(i) = f(xs(i)) }
 results
}

17-5. Sémaphores

Les verrous (ou sémaphores) sont un mécanisme classique de synchronisation des processus. Un verrou a deux actions atomiques : acquire et release. Voici l'implémentation d'un verrou en Scala :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
package scala.concurrent

class Lock {
    var available = true
    def acquire = synchronized {
        while (!available) wait()
        available = false
    }
    def release = synchronized {
        available = true
        notify()
    }
}

17-6. Lecteurs/rédacteurs

Une forme de synchronisation plus complexe fait la distinction entre les lecteurs qui accèdent à une ressource commune sans la modifier et les rédacteurs qui peuvent à la fois y accéder en lecture et la modifier. Pour synchroniser les lecteurs et les rédacteurs, il faut implémenter les opérations startRead, startWrite, endRead et endWrite telles que :

  • il peut y avoir plusieurs lecteurs concurrents ;
  • il ne peut y avoir qu'un seul rédacteur à la fois ;
  • les demandes d'écriture en attente ont priorité sur les demandes de lecture, mais ne préemptent pas les opérations de lecture en cours.

L'implémentation suivante d'un verrou lecteurs/rédacteurs repose sur le concept de boîte aux lettres (voir la section 17.10Boîtes aux lettres) :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
import scala.concurrent._

class ReadersWriters {
    val m = new MailBox
    private case class Writers(n: Int), Readers(n: Int) { m send this }
    Writers(0); Readers(0)
    def startRead = m receive {
        case Writers(n) if n == 0 => m receive {
            case Readers(n) => Writers(0); Readers(n+1)
        }
    }
    def startWrite = m receive {
        case Writers(n) =>
            Writers(n+1)
            m receive { case Readers(n) if n == 0 => }
    }
    def endRead = m receive {
        case Readers(n) => Readers(n-1)
    }
    def endWrite = m receive {
        case Writers(n) => Writers(n-1); if (n == 0) Readers(0)
    }
}

17-7. Canaux asynchrones

Les canaux asynchrones sont un mécanisme fondamental de communication interprocessus. Leur implémentation utilise une classe simple de listes chaînées :

 
Sélectionnez
1.
2.
3.
4.
class LinkedList[A] {
    var elem: A = _
    var next: LinkedList[A] = null
}

Pour faciliter l'insertion et la suppression des éléments dans les listes chaînées, chaque référence de la liste pointe vers le nœud qui précède le nœud qui, conceptuellement, forme le début de la liste. Les listes chaînées vides commencent par un nœud factice dont le successeur est null.

La classe Channel utilise une liste chaînée pour stocker les données qui ont été envoyées, mais non encore lue. À l'autre extrémité, les threads qui veulent lire dans un canal vide enregistrent leur présence en incrémentant le champ nreaders et attendent d'être prévenus.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
package scala.concurrent

class Channel[A] {
    class LinkedList[A] {
        var elem: A = _
        var next: LinkedList[A] = null
    }
    private var written = new LinkedList[A]
    private var lastWritten = written
    private var nreaders = 0

    def write(x: A) = synchronized {
        lastWritten.elem = x
        lastWritten.next = new LinkedList[A]
        lastWritten = lastWritten.next
        if (nreaders > 0) notify()
    }

    def read: A = synchronized {
        if (written.next == null) {
            nreaders = nreaders + 1; wait(); nreaders = nreaders - 1
        }
        val x = written.elem
        written = written.next
        x
    }
}

17-8. Canaux synchrones

Voici une implémentation de canaux synchrones, où l'expéditeur d'un message est bloqué tant que son message n'a pas été reçu. Les canaux synchrones n'ont besoin que d'une simple variable pour stocker les messages en transit, mais de trois signaux pour coordonner les processus lecteur et rédacteur.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
package scala.concurrent

class SyncChannel[A] {
    private var data: A = _
    private var reading = false
    private var writing = false

    def write(x: A) = synchronized {
        while (writing) wait()
        data = x
        writing = true
        if (reading) notifyAll()
        else while (!reading) wait()
    }

    def read: A = synchronized {
        while (reading) wait()
        reading = true
        while (!writing) wait()
        val x = data
        writing = false
        reading = false
        notifyAll()
        x
    }
}

17-9. Travailleurs

Voici une implémentation d'un serveur de calculs en Scala. Ce serveur implémente une méthode future qui évalue une expression en parallèle de son appelant. À la différence de l'implémentation de la section 17.3Futures, le serveur ne traite les futures qu'avec un nombre prédéfini de threads. Une implémentation possible de ce serveur pourrait, par exemple, exécuter chaque thread sur un processeur différent et ainsi éviter le surcoût inhérent au changement de contexte qui intervient lorsque plusieurs threads doivent se partager le même processeur.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
import scala.concurrent._, scala.concurrent.ops._

class ComputeServer(n: Int) {

    private abstract class Job {
        type T
        def task: T
        def ret(x: T)
    }

    private val openJobs = new Channel[Job]()

    private def processor(i: Int) {
        while (true) {
            val job = openJobs.read
            job.ret(job.task)
        }
    }

    def future[A](p: => A): () => A = {
        val reply = new SyncVar[A]()
        openJobs.write {
            new Job {
                type T = A
                def task = p
                def ret(x: A) = reply.set(x)
            }
        }
        () => reply.get
    }

    spawn(replicate(0, n) { processor })
}

Les expressions à calculer (c'est-à-dire les paramètres passés à future) sont écrites dans le canal openJobs. Un job est un objet avec :

  • un type abstrait T qui décrit le résultat de l'expression à calculer ;
  • une méthode task sans paramètre, renvoyant un résultat de type T qui représente l'expression à calculer ;
  • une méthode ret qui consomme le résultat lorsqu'il a été calculé.

Le serveur de calcul crée n processus processor au cours de son initialisation. Chacun de ces processus consomme sans fin un job ouvert, évalue la méthode task de ce job et passe le résultat à la méthode ret du job. La méthode future polymorphe crée un nouveau job dont la méthode ret est implémentée par une variable de synchronisation reply ; elle insère le job dans l'ensemble des jobs ouverts puis attend que la variable de synchronisation correspondante soit appelée.

Cet exemple montre comment utiliser les types abstraits. Le type T mémorise le type du résultat d'un job, qui peut varier d'un job à l'autre. Sans les types abstraits, il serait impossible d'implémenter la même classe avec un typage statique et l'utilisateur devrait tester dynamiquement les types et utiliser des coercitions de types.

Voici un exemple d'utilisation du serveur de calcul pour évaluer l'expression 41 + 1 :

 
Sélectionnez
1.
2.
3.
4.
5.
object Test with Executable {
    val server = new ComputeServer(1)
    val f = server.future(41 + 1)
    println(f())
}

17-10. Boîtes aux lettres

Les boîtes aux lettres sont des constructions souples et de haut niveau permettant de synchroniser des processus et de les faire communiquer. Elles permettent d'envoyer et de recevoir des messages qui, dans ce contexte, sont des objets quelconques. Le message spécial TIMEOUT sert à signaler l'expiration d'un délai :

 
Sélectionnez
case object TIMEOUT

Les boîtes aux lettres ont la signature suivante :

 
Sélectionnez
1.
2.
3.
4.
5.
class MailBox {
    def send(msg: Any)
    def receive[A](f: PartialFunction[Any, A]): A
    def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A
}

L'état d'une boîte aux lettres est constitué d'un multiset de messages. Les messages sont ajoutés à la boîte par sa méthode send et supprimés par la méthode receive qui reçoit en paramètre un traitement de message f - une fonction partielle des messages vers un type de résultat quelconque. Généralement, cette méthode est implémentée par une reconnaissance de motif : elle se bloque jusqu'à ce qu'il y ait un message dans la boîte qui corresponde à l'un des motifs. Le message en question est alors supprimé de la boîte et le thread bloqué est relancé en appliquant le traitement au message. Les envois et les réceptions de messages sont ordonnés dans le temps ; un récepteur r n'est appliqué à un message correspondant m que s'il n'y a pas d'autre paire {message, récepteur} antérieure à m,r.

À titre d'exemple, considérons un tampon à une case :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
class OnePlaceBuffer {
    private val m = new MailBox // Boîte aux lettres interne
    private case class Empty, Full(x: Int) // Types des messages gérés
    m send Empty // Initialisation
    def write(x: Int) {
        m receive { case Empty => m send Full(x) }
    }
    def read: Int =
        m receive { case Full(x) => m send Empty; x }
}

Voici comment peut être implémentée la classe MailBox :

 
Sélectionnez
1.
2.
3.
4.
5.
class MailBox {
    private abstract class Receiver extends Signal {
        def isDefined(msg: Any): Boolean
        var msg = null
    }

Nous définissons une classe interne pour les récepteurs, dotée d'une méthode de test isDefined, qui indique si le récepteur est défini pour un message donné. Cette classe hérite de la classe Signal la méthode notify qui sert à réveiller un thread récepteur. Lorsque ce dernier est réveillé, le message auquel il doit s'appliquer est stocké dans la variable msg de Receiver.

 
Sélectionnez
1.
2.
3.
4.
    private val sent = new LinkedList[Any]
    private var lastSent = sent
    private val receivers = new LinkedList[Receiver]
    private var lastReceiver = receivers

La classe MailBox gère deux listes chaînées, une pour les messages envoyés, mais non encore consommés, l'autre pour les récepteurs en attente.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
    def send(msg: Any) = synchronized {
        var r = receivers, r1 = r.next
        while (r1 != null && !r1.elem.isDefined(msg)) {
            r = r1; r1 = r1.next
        }
        if (r1 != null) {
            r.next = r1.next; r1.elem.msg = msg; r1.elem.notify
        } else {
            lastSent = insert(lastSent, msg)
        }
    }

La méthode send commence par tester si un récepteur en attente peut s'appliquer au message envoyé. En ce cas, ce récepteur est prévenu par notify. Sinon, le message est ajouté à la fin de la liste des messages envoyés.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
    def receive[A](f: PartialFunction[Any, A]): A = {
        val msg: Any = synchronized {
            var s = sent, s1 = s.next
            while (s1 != null && !f.isDefinedAt(s1.elem)) {
                s = s1; s1 = s1.next
            }
            if (s1 != null) {
                s.next = s1.next; s1.elem
            } else {
                val r = insert(lastReceiver, new Receiver {
                    def isDefined(msg: Any) = f.isDefinedAt(msg)
                })
                lastReceiver = r
                r.elem.wait()
                r.elem.msg
            }
        }
        f(msg)
    }

La méthode receive teste d'abord si la fonction f de traitement du message peut s'appliquer à un message qui a déjà été envoyé, mais qui n'a pas encore été consommé. En ce cas, le thread se poursuit en appliquant f au message. Sinon, la méthode crée un nouveau récepteur et le place dans la liste des récepteurs, puis le thread se bloque et attend d'être réveillé par ce récepteur. Lorsqu'il se réveille, le thread applique f au message qui était stocké dans le récepteur. La méthode insert des listes chaînées est définie de la façon suivante :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
def insert(l: LinkedList[A], x: A): LinkedList[A] = {
    l.next = new LinkedList[A]
    l.next.elem = x
    l.next.next = l.next
    l
}

La classe MailBox définit également une méthode receiveWithin qui ne bloque le thread que pendant le temps maximal indiqué. Si aucun message n'est reçu dans cet intervalle de temps (exprimé en millisecondes), le traitement du message est débloqué par le message spécial TIMEOUT. L'implémentation de receiveWithin ressemble beaucoup à celle de receive :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
    def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A = {
        val msg: Any = synchronized {
            var s = sent, s1 = s.next
            while (s1 != null && !f.isDefinedAt(s1.elem)) {
                s = s1; s1 = s1.next
            }
            if (s1 != null) {
                s.next = s1.next; s1.elem
            } else {
                val r = insert(lastReceiver, new Receiver {
                    def isDefined(msg: Any) = f.isDefinedAt(msg)
                })
                lastReceiver = r
                r.elem.wait(msec)
                if (r.elem.msg == null) r.elem.msg = TIMEOUT
                r.elem.msg
            }
        }
        f(msg)
    }
} // Fin de MailBox

Les seules différences entre les deux méthodes sont l'appel temporisé à wait et l'instruction qui le suit.

17-11. Acteurs

Le chapitre 3Programmer avec des acteurs et des messages a présenté un exemple de programme implémentant un service d'enchères. Ce service reposait sur des processus acteurs de haut niveau qui fonctionnaient en inspectant les messages de leurs boîtes aux lettres à l'aide de reconnaissance de motif. Le paquetage scala.actors contient une implémentation améliorée et optimisée des acteurs, mais nous n'en présenterons ici qu'une version simplifiée.

Un acteur simplifié est simplement un thread qui utilise une boîte aux lettres pour communiquer. Il peut donc être vu comme une composition par mixin assemblant la classe Thread standard de Java à la classe MailBox. Nous redéfinissons également la méthode run de la classe Thread pour qu'elle ait le comportement de l'acteur défini par sa méthode act. La méthode ! appelle simplement la méthode send de la classe MailBox :

 
Sélectionnez
1.
2.
3.
4.
5.
abstract class Actor extends Thread with MailBox {
    def act(): Unit
    override def run(): Unit = act()
    def !(msg: Any) = send(msg)
}

précédentsommairesuivant

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

  

Les sources présentées sur cette page sont libres de droits et vous pouvez les utiliser à votre convenance. Par contre, la page de présentation constitue une œuvre intellectuelle protégée par les droits d'auteur. Copyright © 2017 Martin Odersky. Aucune reproduction, même partielle, ne peut être faite de ce site et de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.