!dat #3920 Remove deprecated akka-dataflow

This commit is contained in:
Patrik Nordwall 2014-03-12 11:06:35 +01:00
parent bd989751b6
commit 9f906b2de9
4 changed files with 11 additions and 433 deletions

View file

@ -1,105 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import language.implicitConversions
import scala.util.continuations._
import scala.concurrent.{ Promise, Future, ExecutionContext }
import scala.util.control.NonFatal
import scala.util.Failure
package object dataflow {
/**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
* Continuations plugin.
*
* Within the block, the result of a Future may be accessed by calling Future.apply. At that point
* execution is suspended with the rest of the block being stored in a continuation until the result
* of the Future is available. If an Exception is thrown while processing, it will be contained
* within the resulting Future.
*
* This allows working with Futures in an imperative style without blocking for each result.
*
* Completing a Future using 'Promise << Future' will also suspend execution until the
* value of the other Future is available.
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
@deprecated("dataflow is deprecated, superseded by Scala Async", "2.3")
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val p = Promise[A]
executor.execute(
new Runnable {
def run = try {
(reify(body) foreachFull (r p.success(r).future, f p.failure(f).future): Future[Any]) onFailure {
case NonFatal(e) p tryComplete Failure(e)
}
} catch {
case NonFatal(e) p tryComplete Failure(e)
}
})
p.future
}
@deprecated("dataflow is deprecated, superseded by Scala Async", "2.3")
implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal {
/**
* Completes the Promise with the specified value or throws an exception if already
* completed. See Promise.success(value) for semantics.
*
* @param value The value which denotes the successful value of the Promise
* @return This Promise's Future
*/
final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
cont: (Future[T] Future[Any]) cont(promise.success(value).future)
}
/**
* Completes this Promise with the value of the specified Future when/if it completes.
*
* @param other The Future whose value will be transferred to this Promise upon completion
* @param ec An ExecutionContext which will be used to execute callbacks registered in this method
* @return A Future representing the result of this operation
*/
final def <<(other: Future[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = shift {
cont: (Future[T] Future[Any])
val fr = Promise[Any]()
(promise completeWith other).future onComplete {
v try { fr completeWith cont(promise.future) } catch { case NonFatal(e) fr failure e }
}
fr.future
}
/**
* Completes this Promise with the value of the specified Promise when/if it completes.
*
* @param other The Promise whose value will be transferred to this Promise upon completion
* @param ec An ExecutionContext which will be used to execute callbacks registered in this method
* @return A Future representing the result of this operation
*/
final def <<(other: Promise[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = <<(other.future)
/**
* For use only within a flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Promise without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T Future[Any]))
}
@deprecated("dataflow is deprecated, superseded by Scala Async", "2.3")
implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal {
/**
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(future flatMap (_: T Future[Any]))
}
}

View file

@ -1,304 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dataflow
import language.postfixOps
import scala.reflect.{ ClassTag, classTag }
import akka.actor.{ Actor, Status, Props }
import akka.actor.Status._
import akka.pattern.ask
import akka.testkit.{ EventFilter, filterEvents, filterException }
import scala.concurrent.{ Await, Promise, Future }
import scala.concurrent.duration._
import akka.testkit.{ DefaultTimeout, TestLatch, AkkaSpec }
import java.util.concurrent.TimeoutException
object DataflowSpec {
class TestActor extends Actor {
def receive = {
case "Hello" sender ! "World"
case "Failure"
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
case "NoReply"
}
}
class TestDelayActor(await: TestLatch) extends Actor {
def receive = {
case "Hello"
Await.ready(await, TestLatch.DefaultTimeout)
sender ! "World"
case "NoReply"
Await.ready(await, TestLatch.DefaultTimeout)
case "Failure"
Await.ready(await, TestLatch.DefaultTimeout)
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
}
}
}
class DataflowSpec extends AkkaSpec with DefaultTimeout {
import DataflowSpec._
import system.dispatcher
"Dataflow API" must {
"futureComposingWithContinuations" in {
val actor = system.actorOf(Props[TestActor])
val x = Future("Hello")
val y = x flatMap (actor ? _) mapTo classTag[String]
val r = flow(x() + " " + y() + "!")
assert(Await.result(r, timeout.duration) === "Hello World!")
system.stop(actor)
}
"futureComposingWithContinuationsFailureDivideZero" in {
filterException[ArithmeticException] {
val x = Future("Hello")
val y = x map (_.length)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply)
intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration))
}
}
"futureComposingWithContinuationsFailureCastInt" in {
filterException[ClassCastException] {
val actor = system.actorOf(Props[TestActor])
val x = Future(3)
val y = (actor ? "Hello").mapTo[Int]
val r = flow(x() + y())
intercept[ClassCastException](Await.result(r, timeout.duration))
}
}
"futureComposingWithContinuationsFailureCastNothing" in {
pending
filterException[ClassCastException] {
val actor = system.actorOf(Props[TestActor])
val x = Future("Hello")
val y = (actor ? "Hello").mapTo[Nothing]
val r = flow(x() + y())
intercept[ClassCastException](Await.result(r, timeout.duration))
}
}
"futureCompletingWithContinuations" in {
val x, y, z = Promise[Int]()
val ly, lz = new TestLatch
val result = flow {
y completeWith x.future
ly.open() // not within continuation
z << x
lz.open() // within continuation, will wait for 'z' to complete
z() + y()
}
Await.ready(ly, 100 milliseconds)
intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) }
flow { x << 5 }
assert(Await.result(y.future, timeout.duration) === 5)
assert(Await.result(z.future, timeout.duration) === 5)
Await.ready(lz, timeout.duration)
assert(Await.result(result, timeout.duration) === 10)
val a, b, c = Promise[Int]()
val result2 = flow {
val n = (a << c).value.get.get + 10
b << (c() - 2)
a() + n * b()
}
c completeWith Future(5)
assert(Await.result(a.future, timeout.duration) === 5)
assert(Await.result(b.future, timeout.duration) === 3)
assert(Await.result(result2, timeout.duration) === 50)
}
"futureDataFlowShouldEmulateBlocking1" in {
val one, two = Promise[Int]()
val simpleResult = flow {
one() + two()
}
assert(Seq(one.future, two.future, simpleResult).forall(_.isCompleted == false))
flow { one << 1 }
Await.ready(one.future, 1 minute)
assert(one.isCompleted)
assert(Seq(two.future, simpleResult).forall(_.isCompleted == false))
flow { two << 9 }
Await.ready(two.future, 1 minute)
assert(Seq(one, two).forall(_.isCompleted == true))
assert(Await.result(simpleResult, timeout.duration) === 10)
}
"futureDataFlowShouldEmulateBlocking2" in {
val x1, x2, y1, y2 = Promise[Int]()
val lx, ly, lz = new TestLatch
val result = flow {
lx.open()
x1 << y1
ly.open()
x2 << y2
lz.open()
x1() + x2()
}
Await.ready(lx, 2 seconds)
assert(!ly.isOpen)
assert(!lz.isOpen)
assert(List(x1, x2, y1, y2).forall(_.isCompleted == false))
flow { y1 << 1 } // When this is set, it should cascade down the line
Await.ready(ly, 2 seconds)
assert(Await.result(x1.future, 1 minute) === 1)
assert(!lz.isOpen)
flow { y2 << 9 } // When this is set, it should cascade down the line
Await.ready(lz, 2 seconds)
assert(Await.result(x2.future, 1 minute) === 9)
assert(List(x1, x2, y1, y2).forall(_.isCompleted))
assert(Await.result(result, 1 minute) === 10)
}
"dataFlowAPIshouldbeSlick" in {
val i1, i2, s1, s2 = new TestLatch
val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 }
val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 }
val result = flow { callService1() + callService2() }
assert(!s1.isOpen)
assert(!s2.isOpen)
assert(!result.isCompleted)
Await.ready(i1, 2 seconds)
Await.ready(i2, 2 seconds)
s1.open()
s2.open()
assert(Await.result(result, timeout.duration) === 10)
}
"futureCompletingWithContinuationsFailure" in {
filterException[ArithmeticException] {
val x, y, z = Promise[Int]()
val ly, lz = new TestLatch
val result = flow {
y << x
ly.open()
val oops = 1 / 0
z << x
lz.open()
z() + y() + oops
}
intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) }
intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) }
flow { x << 5 }
assert(Await.result(y.future, timeout.duration) === 5)
intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration))
assert(z.future.value === None)
assert(!lz.isOpen)
}
}
"futureContinuationsShouldNotBlock" in {
val latch = new TestLatch
val future = Future {
Await.ready(latch, TestLatch.DefaultTimeout)
"Hello"
}
val result = flow {
Some(future()).filter(_ == "Hello")
}
assert(!result.isCompleted)
latch.open()
assert(Await.result(result, timeout.duration) === Some("Hello"))
}
"futureFlowShouldBeTypeSafe" in {
val rString = flow {
val x = Future(5)
x().toString
}
val rInt = flow {
val x = rString.apply
val y = Future(5)
x.length + y()
}
assert(checkType(rString, classTag[String]))
assert(checkType(rInt, classTag[Int]))
assert(!checkType(rInt, classTag[String]))
assert(!checkType(rInt, classTag[Nothing]))
assert(!checkType(rInt, classTag[Any]))
Await.result(rString, timeout.duration)
Await.result(rInt, timeout.duration)
}
"futureFlowSimpleAssign" in {
val x, y, z = Promise[Int]()
flow {
z << x() + y()
}
flow { x << 40 }
flow { y << 2 }
assert(Await.result(z.future, timeout.duration) === 42)
}
"capture first exception with dataflow" in {
val f1 = flow { 40 / 0 }
intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout))
}
}
def checkType[A: ClassTag, B](in: Future[A], ref: ClassTag[B]): Boolean = implicitly[ClassTag[A]].runtimeClass eq ref.runtimeClass
}

View file

@ -18,3 +18,11 @@ In earlier versions of Akka `TestKit.remaining` returned the default timeout con
"akka.test.single-expect-default". This was a bit confusing and thus it has been changed to throw an "akka.test.single-expect-default". This was a bit confusing and thus it has been changed to throw an
AssertionError if called outside of within. The old behavior however can still be achieved by AssertionError if called outside of within. The old behavior however can still be achieved by
calling `TestKit.remainingOrDefault` instead. calling `TestKit.remainingOrDefault` instead.
Removed Deprecated Features
===========================
The following, previously deprecated, features have been removed:
* akka-dataflow

View file

@ -75,7 +75,7 @@ object AkkaBuild extends Build {
// add reportBinaryIssues to validatePullRequest on minor version maintenance branch // add reportBinaryIssues to validatePullRequest on minor version maintenance branch
validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => } validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => }
), ),
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor,
persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit)
) )
@ -83,7 +83,7 @@ object AkkaBuild extends Build {
id = "akka-scala-nightly", id = "akka-scala-nightly",
base = file("akka-scala-nightly"), base = file("akka-scala-nightly"),
// remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings) // remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings)
// samples and dataflow don't work with dbuild right now // samples don't work with dbuild right now
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j,
persistence, mailboxes, kernel, osgi, contrib, multiNodeTestkit) persistence, mailboxes, kernel, osgi, contrib, multiNodeTestkit)
) )
@ -155,25 +155,6 @@ object AkkaBuild extends Build {
) )
) )
val cpsPlugin = Seq(
libraryDependencies <++= scalaVersion { v =>
if (v.startsWith("2.10.")) Seq(compilerPlugin("org.scala-lang.plugins" % "continuations" % v))
else Seq(
compilerPlugin("org.scala-lang.plugins" %% "scala-continuations-plugin" % Dependencies.Versions.scalaContinuationsVersion),
"org.scala-lang.plugins" %% "scala-continuations-library" % Dependencies.Versions.scalaContinuationsVersion)
},
scalacOptions += "-P:continuations:enable"
)
lazy val dataflow = Project(
id = "akka-dataflow",
base = file("akka-dataflow"),
dependencies = Seq(testkit % "test->test"),
settings = defaultSettings ++ formatSettings ++ scaladocSettingsNoVerificationOfDiagrams ++ OSGi.dataflow ++ cpsPlugin ++ Seq(
previousArtifact := akkaPreviousArtifact("akka-dataflow")
)
)
lazy val testkit = Project( lazy val testkit = Project(
id = "akka-testkit", id = "akka-testkit",
base = file("akka-testkit"), base = file("akka-testkit"),
@ -565,7 +546,7 @@ object AkkaBuild extends Build {
dependencies = Seq(actor, testkit % "test->test", dependencies = Seq(actor, testkit % "test->test",
remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi,
persistence % "compile;test->test"), persistence % "compile;test->test"),
settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ Seq(
sourceDirectory in Sphinx <<= baseDirectory / "rst", sourceDirectory in Sphinx <<= baseDirectory / "rst",
sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" }, sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" },
// copy akka-contrib/docs into our rst_preprocess/contrib (and apply substitutions) // copy akka-contrib/docs into our rst_preprocess/contrib (and apply substitutions)
@ -1048,8 +1029,6 @@ object AkkaBuild extends Build {
val slf4j = exports(Seq("akka.event.slf4j.*")) val slf4j = exports(Seq("akka.event.slf4j.*"))
val dataflow = exports(Seq("akka.dataflow.*"))
val transactor = exports(Seq("akka.transactor.*")) val transactor = exports(Seq("akka.transactor.*"))
val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport())) val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport()))