pekko/akka-docs/scala/dataflow.rst

257 lines
6.6 KiB
ReStructuredText
Raw Normal View History

2011-04-09 19:55:46 -06:00
Dataflow Concurrency (Scala)
============================
Description
2011-04-10 13:07:57 -06:00
-----------
2011-04-09 19:55:46 -06:00
2011-05-10 09:53:58 +02:00
Akka implements `Oz-style dataflow concurrency <http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency>`_ by using a special API for :ref:`futures-scala` that allows single assignment variables and multiple lightweight (event-based) processes/threads.
2011-04-09 19:55:46 -06:00
Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output.
The best way to learn how to program with dataflow variables is to read the fantastic book `Concepts, Techniques, and Models of Computer Programming <http://www.info.ucl.ac.be/%7Epvr/book.html>`_. By Peter Van Roy and Seif Haridi.
The documentation is not as complete as it should be, something we will improve shortly. For now, besides above listed resources on dataflow concurrency, I recommend you to read the documentation for the GPars implementation, which is heavily influenced by the Akka implementation:
2011-04-10 13:07:57 -06:00
2011-04-09 19:55:46 -06:00
* `<http://gpars.codehaus.org/Dataflow>`_
* `<http://www.gpars.org/guide/guide/7.%20Dataflow%20Concurrency.html>`_
2011-05-03 14:34:28 -06:00
Getting Started
---------------
Scala's Delimited Continuations plugin is required to use the Dataflow API. To enable the plugin when using sbt, your project must inherit the ``AutoCompilerPlugins`` trait and contain a bit of configuration as is seen in this example:
.. code-block:: scala
autoCompilerPlugins := true,
libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % <scalaVersion>) },
scalacOptions += "-P:continuations:enable",
2011-05-03 14:34:28 -06:00
2011-04-09 19:55:46 -06:00
Dataflow Variables
------------------
Dataflow Variable defines four different operations:
2011-04-09 19:55:46 -06:00
1. Define a Dataflow Variable
2011-04-09 19:55:46 -06:00
.. code-block:: scala
val x = Promise[Int]()
2011-04-09 19:55:46 -06:00
2011-05-03 14:34:28 -06:00
2. Wait for Dataflow Variable to be bound (must be contained within a ``Future.flow`` block as described in the next section)
2011-04-09 19:55:46 -06:00
.. code-block:: scala
x()
3. Bind Dataflow Variable (must be contained within a ``Future.flow`` block as described in the next section)
2011-04-09 19:55:46 -06:00
.. code-block:: scala
x << 3
2011-05-03 14:34:28 -06:00
4. Bind Dataflow Variable with a Future (must be contained within a ``Future.flow`` block as described in the next section)
2011-04-09 19:55:46 -06:00
.. code-block:: scala
2011-05-03 14:34:28 -06:00
x << y
2011-04-09 19:55:46 -06:00
2011-05-03 14:34:28 -06:00
A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will be ignored.
2011-04-09 19:55:46 -06:00
2011-05-03 14:34:28 -06:00
Dataflow Delimiter
------------------
Dataflow is implemented in Akka using Scala's Delimited Continuations. To use the Dataflow API the code must be contained within a ``Future.flow`` block. For example:
2011-04-09 19:55:46 -06:00
.. code-block:: scala
2011-05-03 14:34:28 -06:00
import Future.flow
2011-12-15 17:10:07 +01:00
implicit val dispatcher = ...
2011-05-03 14:34:28 -06:00
val a = Future( ... )
val b = Future( ... )
val c = Promise[Int]()
2011-05-03 14:34:28 -06:00
flow {
c << (a() + b())
}
2011-12-15 17:10:07 +01:00
val result = Await.result(c, timeout)
2011-04-09 19:55:46 -06:00
2011-05-03 14:34:28 -06:00
The ``flow`` method also returns a ``Future`` for the result of the contained expression, so the previous example could also be written like this:
2011-04-09 19:55:46 -06:00
.. code-block:: scala
2011-05-03 14:34:28 -06:00
import Future.flow
2011-12-15 17:10:07 +01:00
implicit val dispatcher = ...
2011-04-09 19:55:46 -06:00
2011-05-03 14:34:28 -06:00
val a = Future( ... )
val b = Future( ... )
val c = flow {
a() + b()
}
2011-04-09 19:55:46 -06:00
2011-12-15 17:10:07 +01:00
val result = Await.result(c, timeout)
2011-04-09 19:55:46 -06:00
Examples
2011-04-10 13:07:57 -06:00
--------
2011-04-09 19:55:46 -06:00
Most of these examples are taken from the `Oz wikipedia page <http://en.wikipedia.org/wiki/Oz_%28programming_language%29>`_
To run these examples:
1. Start REPL
::
$ sbt
> project akka-actor
> console
::
Welcome to Scala version 2.9.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25).
2011-04-09 19:55:46 -06:00
Type in expressions to have them evaluated.
Type :help for more information.
scala>
2. Paste the examples (below) into the Scala REPL.
Note: Do not try to run the Oz version, it is only there for reference.
3. Have fun.
Simple DataFlowVariable example
2011-04-10 13:07:57 -06:00
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2011-04-09 19:55:46 -06:00
This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language).
Sort of the "Hello World" of dataflow concurrency.
Example in Oz:
.. code-block:: ruby
thread
Z = X+Y % will wait until both X and Y are bound to a value.
{Browse Z} % shows the value of Z.
end
thread X = 40 end
thread Y = 2 end
Example in Akka:
.. code-block:: scala
2011-05-03 14:34:28 -06:00
import akka.dispatch._
import Future.flow
2011-12-15 17:10:07 +01:00
implicit val dispatcher = ...
2011-04-09 19:55:46 -06:00
val x, y, z = Promise[Int]()
2011-04-09 19:55:46 -06:00
2011-05-03 14:34:28 -06:00
flow {
2011-04-09 19:55:46 -06:00
z << x() + y()
println("z = " + z())
}
flow { x << 40 }
flow { y << 2 }
2011-04-09 19:55:46 -06:00
Example of using DataFlowVariable with recursion
2011-04-10 13:07:57 -06:00
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2011-04-09 19:55:46 -06:00
Using DataFlowVariable and recursion to calculate sum.
Example in Oz:
.. code-block:: ruby
fun {Ints N Max}
if N == Max then nil
else
{Delay 1000}
N|{Ints N+1 Max}
end
end
fun {Sum S Stream}
case Stream of nil then S
[] H|T then S|{Sum H+S T} end
end
local X Y in
thread X = {Ints 0 1000} end
thread Y = {Sum 0 X} end
{Browse Y}
end
Example in Akka:
.. code-block:: scala
import akka.dispatch._
import Future.flow
2011-12-15 17:10:07 +01:00
implicit val dispatcher = ...
2011-04-09 19:55:46 -06:00
def ints(n: Int, max: Int): List[Int] = {
2011-04-09 19:55:46 -06:00
if (n == max) Nil
else n :: ints(n + 1, max)
}
2011-04-09 19:55:46 -06:00
def sum(s: Int, stream: List[Int]): List[Int] = stream match {
2011-04-09 19:55:46 -06:00
case Nil => s :: Nil
case h :: t => s :: sum(h + s, t)
}
val x, y = Promise[List[Int]]()
2011-04-09 19:55:46 -06:00
flow { x << ints(0, 1000) }
flow { y << sum(0, x()) }
flow { println("List of sums: " + y()) }
2011-04-09 19:55:46 -06:00
Example using concurrent Futures
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2011-04-09 19:55:46 -06:00
Shows how to have a calculation run in another thread.
2011-04-09 19:55:46 -06:00
Example in Akka:
2011-04-10 13:07:57 -06:00
.. code-block:: scala
2011-05-03 17:10:26 +02:00
import akka.dispatch._
import Future.flow
2011-12-15 17:10:07 +01:00
implicit val dispatcher = ...
2011-04-09 19:55:46 -06:00
2011-04-10 13:07:57 -06:00
// create four 'Int' data flow variables
val x, y, z, v = Promise[Int]()
2011-04-09 19:55:46 -06:00
flow {
2011-04-10 13:07:57 -06:00
println("Thread 'main'")
2011-04-09 19:55:46 -06:00
2011-04-10 13:07:57 -06:00
x << 1
println("'x' set to: " + x())
2011-04-09 19:55:46 -06:00
2011-04-10 13:07:57 -06:00
println("Waiting for 'y' to be set...")
2011-04-09 19:55:46 -06:00
2011-04-10 13:07:57 -06:00
if (x() > y()) {
z << x
println("'z' set to 'x': " + z())
} else {
z << y
println("'z' set to 'y': " + z())
}
2011-04-09 19:55:46 -06:00
}
flow {
y << Future {
println("Thread 'setY', sleeping")
Thread.sleep(2000)
2
}
2011-04-10 13:07:57 -06:00
println("'y' set to: " + y())
}
flow {
2011-04-10 13:07:57 -06:00
println("Thread 'setV'")
v << y
println("'v' set to 'y': " + v())
}