From 778bab40eadb90ad2ab3e62725f4da8527126727 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 28 Feb 2012 00:23:47 +0100 Subject: [PATCH 1/2] Removing dataflow docs for Java, as there is no such thing --- akka-docs/java/dataflow.rst | 195 ----------------------------------- akka-docs/java/index.rst | 1 - akka-docs/scala/dataflow.rst | 11 +- 3 files changed, 4 insertions(+), 203 deletions(-) delete mode 100644 akka-docs/java/dataflow.rst diff --git a/akka-docs/java/dataflow.rst b/akka-docs/java/dataflow.rst deleted file mode 100644 index fc2121792a..0000000000 --- a/akka-docs/java/dataflow.rst +++ /dev/null @@ -1,195 +0,0 @@ -Dataflow Concurrency (Java) -=========================== - -.. sidebar:: Contents - - .. contents:: :local: - -Introduction ------------- - -**IMPORTANT: As of Akka 1.1, Akka Future, Promise and DefaultPromise have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** - -Akka implements `Oz-style dataflow concurrency `_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads. - -Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output. - -The best way to learn how to program with dataflow variables is to read the fantastic book `Concepts, Techniques, and Models of Computer Programming `_. By Peter Van Roy and Seif Haridi. - -The documentation is not as complete as it should be, something we will improve shortly. For now, besides above listed resources on dataflow concurrency, I recommend you to read the documentation for the GPars implementation, which is heavily influenced by the Akka implementation: - -* ``_ -* ``_ - -Dataflow Variables ------------------- - -Dataflow Variable defines three different operations: - -1. Define a Dataflow Variable - -.. code-block:: java - - import static akka.dataflow.DataFlow.*; - - DataFlowVariable x = new DataFlowVariable(); - -2. Wait for Dataflow Variable to be bound - -.. code-block:: java - - x.get(); - -3. Bind Dataflow Variable - -.. code-block:: java - - x.set(3); - -A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception. - -You can also shutdown a dataflow variable like this: - -.. code-block:: java - - x.shutdown(); - -Threads -------- - -You can easily create millions lightweight (event-driven) threads on a regular workstation. - -.. code-block:: java - - import static akka.dataflow.DataFlow.*; - import akka.japi.Effect; - - thread(new Effect() { - public void apply() { ... } - }); - -You can also set the thread to a reference to be able to control its life-cycle: - -.. code-block:: java - - import static akka.dataflow.DataFlow.*; - import akka.japi.Effect; - - ActorRef t = thread(new Effect() { - public void apply() { ... } - }); - - ... // time passes - - t.tell(new Exit()); // shut down the thread - -Examples --------- - -Most of these examples are taken from the `Oz wikipedia page `_ - -Simple DataFlowVariable example -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language). -Sort of the "Hello World" of dataflow concurrency. - -Example in Oz: - -.. code-block:: ruby - - thread - Z = X+Y % will wait until both X and Y are bound to a value. - {Browse Z} % shows the value of Z. - end - thread X = 40 end - thread Y = 2 end - -Example in Akka: - -.. code-block:: java - - import static akka.dataflow.DataFlow.*; - import akka.japi.Effect; - - DataFlowVariable x = new DataFlowVariable(); - DataFlowVariable y = new DataFlowVariable(); - DataFlowVariable z = new DataFlowVariable(); - - thread(new Effect() { - public void apply() { - z.set(x.get() + y.get()); - System.out.println("z = " + z.get()); - } - }); - - thread(new Effect() { - public void apply() { - x.set(40); - } - }); - - thread(new Effect() { - public void apply() { - y.set(40); - } - }); - -Example on life-cycle management of DataFlowVariables -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Shows how to shutdown dataflow variables and bind threads to values to be able to interact with them (exit etc.). - -Example in Akka: - -.. code-block:: java - - import static akka.dataflow.DataFlow.*; - import akka.japi.Effect; - - // create four 'int' data flow variables - DataFlowVariable x = new DataFlowVariable(); - DataFlowVariable y = new DataFlowVariable(); - DataFlowVariable z = new DataFlowVariable(); - DataFlowVariable v = new DataFlowVariable(); - - ActorRef main = thread(new Effect() { - public void apply() { - System.out.println("Thread 'main'") - if (x.get() > y.get()) { - z.set(x); - System.out.println("'z' set to 'x': " + z.get()); - } else { - z.set(y); - System.out.println("'z' set to 'y': " + z.get()); - } - - // main completed, shut down the data flow variables - x.shutdown(); - y.shutdown(); - z.shutdown(); - v.shutdown(); - } - }); - - ActorRef setY = thread(new Effect() { - public void apply() { - System.out.println("Thread 'setY', sleeping..."); - Thread.sleep(5000); - y.set(2); - System.out.println("'y' set to: " + y.get()); - } - }); - - ActorRef setV = thread(new Effect() { - public void apply() { - System.out.println("Thread 'setV'"); - y.set(2); - System.out.println("'v' set to y: " + v.get()); - } - }); - - // shut down the threads - main.tell(new Exit()); - setY.tell(new Exit()); - setV.tell(new Exit()); diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index 9ed521e5e7..4b7dcb5ebf 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -12,7 +12,6 @@ Java API event-bus scheduler futures - dataflow fault-tolerance dispatchers routing diff --git a/akka-docs/scala/dataflow.rst b/akka-docs/scala/dataflow.rst index 20ef9616ca..5c318e676b 100644 --- a/akka-docs/scala/dataflow.rst +++ b/akka-docs/scala/dataflow.rst @@ -26,12 +26,9 @@ Scala's Delimited Continuations plugin is required to use the Dataflow API. To e .. code-block:: scala - import sbt._ - - class MyAkkaProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AutoCompilerPlugins { - val continuationsPlugin = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.1") - override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable") - } + autoCompilerPlugins := true, + libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % ) }, + scalacOptions += "-P:continuations:enable", Dataflow Variables ------------------ @@ -117,7 +114,7 @@ To run these examples: :: - Welcome to Scala version 2.9.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25). + Welcome to Scala version 2.9.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25). Type in expressions to have them evaluated. Type :help for more information. From b730c0b427de7888b77313b0141184a8ae179c25 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Mon, 27 Feb 2012 20:01:08 -0700 Subject: [PATCH 2/2] Capture early exception within Future.flow, fixes #1869 --- .../src/test/scala/akka/dispatch/FutureSpec.scala | 7 +++++++ akka-actor/src/main/scala/akka/dispatch/Future.scala | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 1a8930437e..7c8cb8948d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -895,6 +895,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } Await.ready(complex, timeout.duration) must be('completed) } + + "should capture first exception with dataflow" in { + import Future.flow + val f1 = flow { 40 / 0 } + intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) + } + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d7b4f17922..b67feaf11a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -307,7 +307,11 @@ object Future { def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { val p = Promise[A] dispatchTask({ () ⇒ - (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { + try { + (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { + case NonFatal(e) ⇒ p tryComplete Left(e) + } + } catch { case NonFatal(e) ⇒ p tryComplete Left(e) } }, true)