Erlangen Save

Distributed, asychronous message passing system for Clozure Common Lisp

Project README

Erlangen

Distributed, asynchronous message passing system for Clozure Common Lisp.

Warning: Erlangen is immature, experimental software, subject to bugs and changes.

Documentation

Getting Started

First, make sure you have Clozure Common Lisp and Quicklisp installed (if you use a Debian based OS you can install CCL via this package). Next clone Erlangen into quicklisp/local-projects.

You can now start Clozure Common Lisp and load Erlangen:

(ql:quickload :erlangen)
(use-package :erlangen)

Alternatively, you can build Erlangen as a stand-alone executable that boots into a REPL that uses the packages erlangen and erlangen.management:

cd ~/quicklisp/local-projects/erlangen
make bin/erlangen-kernel && bin/erlangen-kernel

Programming a Parallelized Map with Asynchronous Agents and Message Passing

Let’s jump straight into a practical example. The Erlangen repository contains the erlangen.example package which implements parallel-map, a parallelized map function. Its like Common Lisp’s map over a single vector, only that it spreads the work across multiple concurrent agents (fancy processes/native threads).

parallel-map returns the results of calling a function on the elements in a vector in a new vector. It uses up to level agents to parallelize the workload. Optionally, a result-type can be specified as the element type of the result vector. Here is roughly how parallel-map works:

  1. it spawns some worker agents and attaches to them in monitor mode, so that it will receive an exit notification for each worker
  2. it sends each worker agent a message with a chunk of work
  3. it waits for and receives the exit notification of each worker, which contains the chunk’s result, and inserts them into the final result vector

The worker agents initially do nothing at all. They each just wait to receive a function to execute, and quit when they are done. Note that we use the Trivia pattern matching library to match received messages.

(defun worker ()
  (ematch (receive)
    ((and (type function) function)
     (funcall function))))

Eventually, each worker will receive a map-thunk closure, which maps the mapping function over a chunk of the vector bounded by start and end.

(defun map-chunk (function vector start end result-type)
  (lambda ()
    (let ((results (make-array (- end start) :element-type result-type)))
      (loop for i from start below end do
           (setf (aref results (- i start))
                 (funcall function (aref vector i))))
      (values start end results))))

Now let’s look at parallel-map. To distribute the work, it computes

  • length—the length of our input vector
  • n-chunks—the number of chunks we will divide the work up into
  • chunk-size—the minimum length (in elements) of a chunk

and spawns a worker agent for each chunk.

(defun parallel-map (function vector &key (level 2) (result-type t))
  (let* ((length (length vector))
         (n-chunks (min level length))
         (chunk-size (floor length n-chunks))
         (workers (loop for i from 1 to n-chunks collect
                       (spawn 'worker :attach :monitor))))

Next it sends each worker a closure for the chunk it should process. It divides the work into n-chunks intervals of at least chunk-size length, that fully cover the vector.

    (loop for worker in workers
          for chunk from 1
          for start from 0 by chunk-size
          for end = (if (< chunk n-chunks)
                        (+ start chunk-size)
                        length)
       do (send (map-chunk function vector start end result-type) worker))

Finally it allocates a vector to store the results in, and waits to receive each chunk result. If any worker exits unexpectedly, parallel-map exits with that workers exit reason. Again, because we attached to the worker agents in monitor mode, all remaining workers will also receive the exit signal and shut down.

    (loop with results = (make-array length :element-type result-type)
          for worker in workers do
         (ematch (receive)
           ((list (type agent) :ok start end chunk-result)
            (replace results chunk-result :start1 start :end1 end))
           ((list (type agent) :exit reason)
            (exit reason)))
       finally (return results))))

Now we can spawn parallel-map agents like this

(spawn '(parallel-map 1+ #(2 4 6 8 10 12 14) :level 3) :attach :monitor)
(receive)
→ (#<AGENT #x302002A191ED> :OK #(3 5 7 9 11 13 15))

Getting Distributed

What fun are agents if they aren’t distributed over a network? Erlangen comes with support for distribution via TCP/IP built in. Each instance of Erlangen can act as a node, and talk to other Erlangen nodes. In order to facilitate port discovery of of remote nodes, there needs to be a port mapper running on the host. To build and run the Erlangen port mapper, execute the commands

make bin/erlangen-port-mapper
bin/erlangen-port-mapper localhost &

in a shell in the root of the Erlangen repository. Now build the Erlangen kernel in the same way to conveniently run additional Erlangen instances, and use it to start a node named map-node.

make bin/erlangen-kernel
bin/erlangen-kernel -n -e '(node :host "localhost" :name "map-node")'

Hint: if you use Emacs, you can start a new Erlangen instance with Slime via C-u M-x slime RET /path/to/erlangen-kernel.

Finally, we can also make our initial Erlangen instance a node, and offload some work to map-node:

(spawn '(node :host "localhost"))
(spawn '(erlangen.examples:parallel-map 1+ #(2 4 6 8 10 12 14))
       :attach :monitor
       :node "map-node")
(receive)
→ ("localhost/map-node/0" :OK #(3 5 7 9 11 13 15))

What happened? We spawned an agent on the remote map-node instance to run parallel-map, and received its exit notification transparently over the network.

Open Source Agenda is not affiliated with "Erlangen" Project. README Source: eugeneia/erlangen
Stars
65
Open Issues
2
Last Commit
6 years ago
Repository

Open Source Agenda Badge

Open Source Agenda Rating