core-async

core.async est une implémentation de CSP (Communicating Sequential Processes) disponible en Clojure/Clojurescript. Non présent au coeur de Clojure, c'est une simple librairie à intégrer.

Son objectif est de fournir un moyen de communiquer entre différents processus sans partager d'état. L'abstraction proposée par core.async est l'utilisation de queues (channels) entre les producteurs de données et les consommateurs.

Le slogan de la documentation Golang résume parfaitement cette approche :

Do not communicate by sharing memory; instead, share memory by communicating.

Le moyen de communiquer est le channel pouvant être créé grâce à chan.

(def c (chan))    ;; unbuffered channel === (chan 1)
(def c (chan 10)) ;; buffered channel

L'écriture dans un channel se fait grâce aux symboles >>! (bloquant jusqu'à consommation) et >!(non bloquant)

(>>! c 10) ;; in one thread

Les valeurs contenues dans un un channel peuvent être consommées grâce à <<! (bloquant) et respectivement<!(non bloquant)

(println (<<! c)) ;; in another thread

Go blocks

Les symboles non bloquants (<! et >!) ne peuvent être utilisés que dans des go blocks. Un go block est une unité d'exécution qui sera traitée par un pool de threads. Les symboles non bloquants ne vont pas bloquer le thread mais "parquer" le go block en attente de traitement.

;; producer - writes number 1 to 100 into the channel c each second
(go
  (doseq [v (range 100)]
    (<! (timeout 1000))
    (>! c v)))

;; consumer - reads number 1 to 100 from the channel c
(go
  (loop []
    (println (<! c)) ;; do not block the thread, the go block is parked
    (recur)))

Attente d'un channel

Dans le cas où nous avons plusieurs producteurs, il peut s'avérer utile de consommer une donnée dès que celle-ci est disponible.

alts!! et respectivement alts! au sein d'un go block permettent de consommer dès que possible. alts! prend en paramètre une séquence de channels et retourne une séquence composée d'une valeur et du channel à partir duquel celle-ci a été produite.

;; creates two channels
(def one-chan (chan))
(def two-chan (chan))

;; produces 1 every second
(go
  (loop []
    (<! (timeout 1000))
    (>! one-chan 1)
    (recur)))

;; produces 2 every two seconds
(go
  (loop []
    (<! (timeout 2000))
    (>! two-chan 2)
    (recur)))

;; consumes both channels as soon as possible
(go
  (loop []
    (let [[v c] (alts! [one-chan two-chan])]
      (prn v)
      (recur))))

Go blocks & I/O bloquante

Comme le pool de threads à disposition des go blocks est borné, effectuer de l'I/O bloquante n'est pas particulièrement adapté. core.async met à disposition une fonction nommée thread qui exécute un traitement dans un nouveau thread en retournant un channel qui fournira le résultat dès résolution.

(defn blocking-io [])
    (thread
      (Thread/sleep 2000) ;; waiting for I/O
      "Response!")

(go
  (println (<! (blocking-io))))

Aller plus loin

Nous avons abordé les principales primitives de core-async, les quelques liens suivants vous permettront de creuser le sujet :

Clojure core.async Channels

core.async Walkthrough

Précédent - Suivant -

Powered by mcorbin - Available on Github