diff --git a/akka-docs/scala/dataflow.rst b/akka-docs/scala/dataflow.rst index 8b19b0703d..9ffbe95bdb 100644 --- a/akka-docs/scala/dataflow.rst +++ b/akka-docs/scala/dataflow.rst @@ -4,9 +4,7 @@ Dataflow Concurrency (Scala) Description ----------- -**IMPORTANT: As of Akka 1.1, Akka Future, CompletableFuture and DefaultCompletableFuture have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** - -Akka implements `Oz-style dataflow concurrency `_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads. +Akka implements `Oz-style dataflow concurrency `_ by using a special API for `Futures `_ that allows single assignment variables and multiple lightweight (event-based) processes/threads. Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output. @@ -17,18 +15,32 @@ The documentation is not as complete as it should be, something we will improve * ``_ * ``_ +Getting Started +--------------- + +Scala's Delimited Continuations plugin is required to use the Dataflow API. To enable the plugin when using sbt, your project must inherit the ``AutoCompilerPlugins`` trait and contain a bit of configuration as is seen in this example: + +.. code-block:: scala + + import sbt._ + + class MyAkkaProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AutoCompilerPlugins { + val continuationsPlugin = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.0.RC3") + override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable") + } + Dataflow Variables ------------------ Dataflow Variable defines three different operations: -1. Define a Dataflow Variable +1. Define a Dataflow Variable (with a timeout) .. code-block:: scala - val x = new DataFlowVariable[Int] + val x = new DefaultCompletableFuture[Int](5000) -2. Wait for Dataflow Variable to be bound +2. Wait for Dataflow Variable to be bound (must be contained within a ``Future.flow`` block as described in the next section) .. code-block:: scala @@ -40,32 +52,47 @@ Dataflow Variable defines three different operations: x << 3 -A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception. - -You can also shutdown a dataflow variable like this: +4. Bind Dataflow Variable with a Future (must be contained within a ``Future.flow`` block as described in the next section) .. code-block:: scala - x.shutdown + x << y -Threads -------- +A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will be ignored. -You can easily create millions lightweight (event-driven) threads on a regular workstation. +Dataflow Delimiter +------------------ + +Dataflow is implemented in Akka using Scala's Delimited Continuations. To use the Dataflow API the code must be contained within a ``Future.flow`` block. For example: .. code-block:: scala - thread { ... } + import Future.flow -You can also set the thread to a reference to be able to control its life-cycle: + val a = Future( ... ) + val b = Future( ... ) + val c = new DefaultCompletableFuture[Int](5000) + + flow { + c << (a() + b()) + } + + val result = c.get() + +The ``flow`` method also returns a ``Future`` for the result of the contained expression, so the previous example could also be written like this: .. code-block:: scala - val t = thread { ... } + import Future.flow - ... // time passes + val a = Future( ... ) + val b = Future( ... ) - t ! 'exit // shut down the thread + val c = flow { + a() + b() + } + + val result = c.get() Examples -------- @@ -84,7 +111,7 @@ To run these examples: :: - Welcome to Scala version 2.8.0.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_22). + Welcome to Scala version 2.9.0.RC1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25). Type in expressions to have them evaluated. Type :help for more information. @@ -116,16 +143,17 @@ Example in Akka: .. code-block:: scala - import akka.dataflow.DataFlow._ + import akka.dispatch._ + import Future.flow - val x, y, z = new DataFlowVariable[Int] + val x, y, z = new DefaultCompletableFuture[Int](5000) - thread { + flow { z << x() + y() println("z = " + z()) } - thread { x << 40 } - thread { y << 2 } + x << 40 + y << 2 Example of using DataFlowVariable with recursion ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^