=dat #3780 Deprecate dataflow
This commit is contained in:
parent
cdfd3f07c1
commit
66e4008494
8 changed files with 15 additions and 199 deletions
|
|
@ -66,12 +66,11 @@ trait AskSupport {
|
||||||
*
|
*
|
||||||
* {{{
|
* {{{
|
||||||
* val f = ask(worker, request)(timeout)
|
* val f = ask(worker, request)(timeout)
|
||||||
* flow {
|
* f.map { response =>
|
||||||
* EnrichedRequest(request, f())
|
* EnrichedMessage(response)
|
||||||
* } pipeTo nextActor
|
* } pipeTo nextActor
|
||||||
* }}}
|
* }}}
|
||||||
*
|
*
|
||||||
* See [[scala.concurrent.Future]] for a description of `flow`
|
|
||||||
*/
|
*/
|
||||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message
|
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message
|
||||||
|
|
||||||
|
|
@ -112,13 +111,12 @@ trait AskSupport {
|
||||||
* <b>Recommended usage:</b>
|
* <b>Recommended usage:</b>
|
||||||
*
|
*
|
||||||
* {{{
|
* {{{
|
||||||
* val f = ask(selection, request)(timeout)
|
* val f = ask(worker, request)(timeout)
|
||||||
* flow {
|
* f.map { response =>
|
||||||
* EnrichedRequest(request, f())
|
* EnrichedMessage(response)
|
||||||
* } pipeTo nextActor
|
* } pipeTo nextActor
|
||||||
* }}}
|
* }}}
|
||||||
*
|
*
|
||||||
* See [[scala.concurrent.Future]] for a description of `flow`
|
|
||||||
*/
|
*/
|
||||||
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message
|
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ package object dataflow {
|
||||||
*
|
*
|
||||||
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
|
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("dataflow is deprecated, superseded by Scala Async", "2.3")
|
||||||
def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
|
def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
|
||||||
val p = Promise[A]
|
val p = Promise[A]
|
||||||
executor.execute(
|
executor.execute(
|
||||||
|
|
@ -43,6 +44,7 @@ package object dataflow {
|
||||||
p.future
|
p.future
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("dataflow is deprecated, superseded by Scala Async", "2.3")
|
||||||
implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal {
|
implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -90,6 +92,7 @@ package object dataflow {
|
||||||
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any]))
|
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("dataflow is deprecated, superseded by Scala Async", "2.3")
|
||||||
implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal {
|
implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal {
|
||||||
/**
|
/**
|
||||||
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
|
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
|
||||||
|
|
|
||||||
|
|
@ -34,9 +34,6 @@ Akka is very modular and consists of several JARs containing different features.
|
||||||
|
|
||||||
- ``akka-cluster`` – Cluster membership management, elastic routers.
|
- ``akka-cluster`` – Cluster membership management, elastic routers.
|
||||||
|
|
||||||
- ``akka-dataflow`` – add-on to SIP-14 futures supporting implicit
|
|
||||||
continuation-passing style
|
|
||||||
|
|
||||||
- ``akka-file-mailbox`` – Akka durable mailbox (find more among community
|
- ``akka-file-mailbox`` – Akka durable mailbox (find more among community
|
||||||
projects)
|
projects)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,12 @@ Changed cluster expected-response-after configuration
|
||||||
Configuration property ``akka.cluster.failure-detector.heartbeat-request.expected-response-after``
|
Configuration property ``akka.cluster.failure-detector.heartbeat-request.expected-response-after``
|
||||||
has been renamed to ``akka.cluster.failure-detector.expected-response-after``.
|
has been renamed to ``akka.cluster.failure-detector.expected-response-after``.
|
||||||
|
|
||||||
|
Dataflow is Deprecated
|
||||||
|
======================
|
||||||
|
|
||||||
|
Akka dataflow is superseded by `Scala Async <https://github.com/scala/async>`_.
|
||||||
|
|
||||||
|
|
||||||
Removed Deprecated Features
|
Removed Deprecated Features
|
||||||
===========================
|
===========================
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,76 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package docs.dataflow
|
|
||||||
|
|
||||||
import language.postfixOps
|
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
|
||||||
import scala.concurrent.{ Await, Future, Promise }
|
|
||||||
import org.scalatest.WordSpec
|
|
||||||
import org.scalatest.matchers.MustMatchers
|
|
||||||
import scala.util.{ Try, Failure, Success }
|
|
||||||
|
|
||||||
class DataflowDocSpec extends WordSpec with MustMatchers {
|
|
||||||
|
|
||||||
//#import-akka-dataflow
|
|
||||||
import akka.dataflow._ //to get the flow method and implicit conversions
|
|
||||||
//#import-akka-dataflow
|
|
||||||
|
|
||||||
//#import-global-implicit
|
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
|
||||||
//#import-global-implicit
|
|
||||||
|
|
||||||
"demonstrate flow using hello world" in {
|
|
||||||
def println[T](any: Try[T]): Unit = any.get must be === "Hello world!"
|
|
||||||
//#simplest-hello-world
|
|
||||||
flow { "Hello world!" } onComplete println
|
|
||||||
//#simplest-hello-world
|
|
||||||
|
|
||||||
//#nested-hello-world-a
|
|
||||||
flow {
|
|
||||||
val f1 = flow { "Hello" }
|
|
||||||
f1() + " world!"
|
|
||||||
} onComplete println
|
|
||||||
//#nested-hello-world-a
|
|
||||||
|
|
||||||
//#nested-hello-world-b
|
|
||||||
flow {
|
|
||||||
val f1 = flow { "Hello" }
|
|
||||||
val f2 = flow { "world!" }
|
|
||||||
f1() + " " + f2()
|
|
||||||
} onComplete println
|
|
||||||
//#nested-hello-world-b
|
|
||||||
}
|
|
||||||
|
|
||||||
"demonstrate the use of dataflow variables" in {
|
|
||||||
val result = Promise[Int]()
|
|
||||||
def println(any: Try[Int]): Unit = result.complete(any)
|
|
||||||
//#dataflow-variable-a
|
|
||||||
val v1, v2 = Promise[Int]()
|
|
||||||
flow {
|
|
||||||
// v1 will become the value of v2 + 10 when v2 gets a value
|
|
||||||
v1 << 10 + v2()
|
|
||||||
v1() + v2()
|
|
||||||
} onComplete println
|
|
||||||
flow { v2 << 5 } // As you can see, no blocking above!
|
|
||||||
//#dataflow-variable-a
|
|
||||||
Await.result(result.future, 10.seconds) must be === 20
|
|
||||||
}
|
|
||||||
|
|
||||||
"demonstrate the difference between for and flow" in {
|
|
||||||
val result = Promise[Int]()
|
|
||||||
def println(any: Try[Int]): Unit = result.tryComplete(any)
|
|
||||||
//#for-vs-flow
|
|
||||||
val f1, f2 = Future { 1 }
|
|
||||||
|
|
||||||
val usingFor = for { v1 <- f1; v2 <- f2 } yield v1 + v2
|
|
||||||
val usingFlow = flow { f1() + f2() }
|
|
||||||
|
|
||||||
usingFor onComplete println
|
|
||||||
usingFlow onComplete println
|
|
||||||
//#for-vs-flow
|
|
||||||
Await.result(result.future, 10.seconds) must be === 2
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -1,111 +0,0 @@
|
||||||
Dataflow Concurrency
|
|
||||||
============================
|
|
||||||
|
|
||||||
Description
|
|
||||||
-----------
|
|
||||||
|
|
||||||
Akka implements `Oz-style dataflow concurrency <http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency>`_
|
|
||||||
by using a special API for :ref:`futures-scala` that enables a complementary way of writing synchronous-looking code that in reality is asynchronous.
|
|
||||||
|
|
||||||
The benefit of Dataflow concurrency is that it is deterministic; that means that it will always behave the same.
|
|
||||||
If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times - same result.
|
|
||||||
If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it.
|
|
||||||
Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency.
|
|
||||||
The limitation is that the code needs to be side-effect free, i.e. deterministic.
|
|
||||||
You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output.
|
|
||||||
|
|
||||||
The best way to learn how to program with dataflow variables is to read the fantastic book `Concepts, Techniques, and Models of Computer Programming <http://www.info.ucl.ac.be/%7Epvr/book.html>`_. By Peter Van Roy and Seif Haridi.
|
|
||||||
|
|
||||||
Getting Started (SBT)
|
|
||||||
---------------------
|
|
||||||
|
|
||||||
Scala's Delimited Continuations plugin is required to use the Dataflow API. To enable the plugin when using sbt, your project must inherit the ``AutoCompilerPlugins`` trait and contain a bit of configuration as is seen in this example:
|
|
||||||
|
|
||||||
.. code-block:: scala
|
|
||||||
|
|
||||||
autoCompilerPlugins := true,
|
|
||||||
libraryDependencies <+= scalaVersion {
|
|
||||||
v => compilerPlugin("org.scala-lang.plugins" % "continuations" % "@scalaVersion@")
|
|
||||||
},
|
|
||||||
scalacOptions += "-P:continuations:enable",
|
|
||||||
|
|
||||||
|
|
||||||
You will also need to include a dependency on ``akka-dataflow``:
|
|
||||||
|
|
||||||
.. code-block:: scala
|
|
||||||
|
|
||||||
"com.typesafe.akka" %% "akka-dataflow" % "@version@" @crossString@
|
|
||||||
|
|
||||||
Dataflow variables
|
|
||||||
------------------
|
|
||||||
|
|
||||||
A Dataflow variable can be read any number of times but only be written to once, which maps very well to the concept of Futures/Promises :ref:`futures-scala`.
|
|
||||||
Conversion from ``Future`` and ``Promise`` to Dataflow Variables is implicit and is invisible to the user (after importing akka.dataflow._).
|
|
||||||
|
|
||||||
The mapping from ``Promise`` and ``Future`` is as follows:
|
|
||||||
|
|
||||||
- Futures are readable-many, using the ``apply`` method, inside ``flow`` blocks.
|
|
||||||
- Promises are readable-many, just like Futures.
|
|
||||||
- Promises are writable-once, using the ``<<`` operator, inside ``flow`` blocks.
|
|
||||||
Writing to an already written Promise throws a ``java.lang.IllegalStateException``,
|
|
||||||
this has the effect that races to write a promise will be deterministic,
|
|
||||||
only one of the writers will succeed and the others will fail.
|
|
||||||
|
|
||||||
The flow
|
|
||||||
--------
|
|
||||||
|
|
||||||
The ``flow`` method acts as the delimiter of dataflow expressions (this also neatly aligns with the concept of delimited continuations),
|
|
||||||
and flow-expressions compose. At this point you might wonder what the ``flow``-construct brings to the table that for-comprehensions don't,
|
|
||||||
and that is the use of the CPS plugin that makes the *look like* it is synchronous, but in reality is asynchronous and non-blocking.
|
|
||||||
The result of a call to ``flow`` is a Future with the resulting value of the flow.
|
|
||||||
|
|
||||||
To be able to use the ``flow`` method, you need to import:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: import-akka-dataflow
|
|
||||||
|
|
||||||
The ``flow`` method will, just like Futures and Promises, require an implicit ``ExecutionContext`` in scope.
|
|
||||||
For the examples here we will use:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: import-global-implicit
|
|
||||||
|
|
||||||
Using flow
|
|
||||||
~~~~~~~~~~
|
|
||||||
|
|
||||||
First off we have the obligatory "Hello world!":
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: simplest-hello-world
|
|
||||||
|
|
||||||
You can also refer to the results of other flows within flows:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: nested-hello-world-a
|
|
||||||
|
|
||||||
… or:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: nested-hello-world-b
|
|
||||||
|
|
||||||
Working with variables
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
|
|
||||||
Inside the flow method you can use Promises as Dataflow variables:
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: dataflow-variable-a
|
|
||||||
|
|
||||||
Flow compared to for
|
|
||||||
--------------------
|
|
||||||
|
|
||||||
Should I use Dataflow or for-comprehensions?
|
|
||||||
|
|
||||||
.. includecode:: code/docs/dataflow/DataflowDocSpec.scala
|
|
||||||
:include: for-vs-flow
|
|
||||||
|
|
||||||
Conclusions:
|
|
||||||
|
|
||||||
- Dataflow has a smaller code footprint and arguably is easier to reason about.
|
|
||||||
- For-comprehensions are more general than Dataflow, and can operate on a wide array of types.
|
|
||||||
|
|
||||||
|
|
@ -5,7 +5,6 @@ Futures and Agents
|
||||||
:maxdepth: 2
|
:maxdepth: 2
|
||||||
|
|
||||||
futures
|
futures
|
||||||
dataflow
|
|
||||||
stm
|
stm
|
||||||
agents
|
agents
|
||||||
transactors
|
transactors
|
||||||
|
|
|
||||||
|
|
@ -619,7 +619,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-docs",
|
id = "akka-docs",
|
||||||
base = file("akka-docs"),
|
base = file("akka-docs"),
|
||||||
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels,
|
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels,
|
||||||
remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries,
|
remote % "compile;test->test", cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries,
|
||||||
persistence % "compile;test->test"),
|
persistence % "compile;test->test"),
|
||||||
settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
|
settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
|
||||||
sourceDirectory in Sphinx <<= baseDirectory / "rst",
|
sourceDirectory in Sphinx <<= baseDirectory / "rst",
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue