diff --git a/akka-dataflow/src/main/scala/akka/dataflow/package.scala b/akka-dataflow/src/main/scala/akka/dataflow/package.scala deleted file mode 100644 index 25f3327f15..0000000000 --- a/akka-dataflow/src/main/scala/akka/dataflow/package.scala +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka - -import language.implicitConversions - -import scala.util.continuations._ -import scala.concurrent.{ Promise, Future, ExecutionContext } -import scala.util.control.NonFatal -import scala.util.Failure - -package object dataflow { - /** - * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited - * Continuations plugin. - * - * Within the block, the result of a Future may be accessed by calling Future.apply. At that point - * execution is suspended with the rest of the block being stored in a continuation until the result - * of the Future is available. If an Exception is thrown while processing, it will be contained - * within the resulting Future. - * - * This allows working with Futures in an imperative style without blocking for each result. - * - * Completing a Future using 'Promise << Future' will also suspend execution until the - * value of the other Future is available. - * - * The Delimited Continuations compiler plugin must be enabled in order to use this method. - */ - @deprecated("dataflow is deprecated, superseded by Scala Async", "2.3") - def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { - val p = Promise[A] - executor.execute( - new Runnable { - def run = try { - (reify(body) foreachFull (r ⇒ p.success(r).future, f ⇒ p.failure(f).future): Future[Any]) onFailure { - case NonFatal(e) ⇒ p tryComplete Failure(e) - } - } catch { - case NonFatal(e) ⇒ p tryComplete Failure(e) - } - }) - p.future - } - - @deprecated("dataflow is deprecated, superseded by Scala Async", "2.3") - implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal { - - /** - * Completes the Promise with the specified value or throws an exception if already - * completed. See Promise.success(value) for semantics. - * - * @param value The value which denotes the successful value of the Promise - * @return This Promise's Future - */ - final def <<(value: T): Future[T] @cps[Future[Any]] = shift { - cont: (Future[T] ⇒ Future[Any]) ⇒ cont(promise.success(value).future) - } - - /** - * Completes this Promise with the value of the specified Future when/if it completes. - * - * @param other The Future whose value will be transferred to this Promise upon completion - * @param ec An ExecutionContext which will be used to execute callbacks registered in this method - * @return A Future representing the result of this operation - */ - final def <<(other: Future[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = shift { - cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = Promise[Any]() - (promise completeWith other).future onComplete { - v ⇒ try { fr completeWith cont(promise.future) } catch { case NonFatal(e) ⇒ fr failure e } - } - fr.future - } - - /** - * Completes this Promise with the value of the specified Promise when/if it completes. - * - * @param other The Promise whose value will be transferred to this Promise upon completion - * @param ec An ExecutionContext which will be used to execute callbacks registered in this method - * @return A Future representing the result of this operation - */ - final def <<(other: Promise[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = <<(other.future) - - /** - * For use only within a flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Promise without blocking, by suspending execution and storing it as a - * continuation until the result is available. - */ - final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any])) - } - - @deprecated("dataflow is deprecated, superseded by Scala Async", "2.3") - implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal { - /** - * For use only within a Future.flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Future without blocking, by suspending execution and storing it as a - * continuation until the result is available. - */ - final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(future flatMap (_: T ⇒ Future[Any])) - } -} diff --git a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala deleted file mode 100644 index 624a3b70fa..0000000000 --- a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.dataflow - -import language.postfixOps - -import scala.reflect.{ ClassTag, classTag } -import akka.actor.{ Actor, Status, Props } -import akka.actor.Status._ -import akka.pattern.ask -import akka.testkit.{ EventFilter, filterEvents, filterException } -import scala.concurrent.{ Await, Promise, Future } -import scala.concurrent.duration._ -import akka.testkit.{ DefaultTimeout, TestLatch, AkkaSpec } -import java.util.concurrent.TimeoutException - -object DataflowSpec { - class TestActor extends Actor { - def receive = { - case "Hello" ⇒ sender ! "World" - case "Failure" ⇒ - sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) - case "NoReply" ⇒ - } - } - - class TestDelayActor(await: TestLatch) extends Actor { - def receive = { - case "Hello" ⇒ - Await.ready(await, TestLatch.DefaultTimeout) - sender ! "World" - case "NoReply" ⇒ - Await.ready(await, TestLatch.DefaultTimeout) - case "Failure" ⇒ - Await.ready(await, TestLatch.DefaultTimeout) - sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) - } - } -} - -class DataflowSpec extends AkkaSpec with DefaultTimeout { - import DataflowSpec._ - import system.dispatcher - "Dataflow API" must { - "futureComposingWithContinuations" in { - - val actor = system.actorOf(Props[TestActor]) - - val x = Future("Hello") - val y = x flatMap (actor ? _) mapTo classTag[String] - - val r = flow(x() + " " + y() + "!") - - assert(Await.result(r, timeout.duration) === "Hello World!") - - system.stop(actor) - } - - "futureComposingWithContinuationsFailureDivideZero" in { - filterException[ArithmeticException] { - - val x = Future("Hello") - val y = x map (_.length) - - val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply) - - intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration)) - } - } - - "futureComposingWithContinuationsFailureCastInt" in { - filterException[ClassCastException] { - - val actor = system.actorOf(Props[TestActor]) - - val x = Future(3) - val y = (actor ? "Hello").mapTo[Int] - - val r = flow(x() + y()) - - intercept[ClassCastException](Await.result(r, timeout.duration)) - } - } - - "futureComposingWithContinuationsFailureCastNothing" in { - pending - filterException[ClassCastException] { - - val actor = system.actorOf(Props[TestActor]) - - val x = Future("Hello") - val y = (actor ? "Hello").mapTo[Nothing] - - val r = flow(x() + y()) - - intercept[ClassCastException](Await.result(r, timeout.duration)) - } - } - - "futureCompletingWithContinuations" in { - - val x, y, z = Promise[Int]() - val ly, lz = new TestLatch - - val result = flow { - y completeWith x.future - ly.open() // not within continuation - - z << x - lz.open() // within continuation, will wait for 'z' to complete - z() + y() - } - - Await.ready(ly, 100 milliseconds) - intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } - - flow { x << 5 } - - assert(Await.result(y.future, timeout.duration) === 5) - assert(Await.result(z.future, timeout.duration) === 5) - Await.ready(lz, timeout.duration) - assert(Await.result(result, timeout.duration) === 10) - - val a, b, c = Promise[Int]() - - val result2 = flow { - val n = (a << c).value.get.get + 10 - b << (c() - 2) - a() + n * b() - } - - c completeWith Future(5) - - assert(Await.result(a.future, timeout.duration) === 5) - assert(Await.result(b.future, timeout.duration) === 3) - assert(Await.result(result2, timeout.duration) === 50) - } - - "futureDataFlowShouldEmulateBlocking1" in { - - val one, two = Promise[Int]() - val simpleResult = flow { - one() + two() - } - - assert(Seq(one.future, two.future, simpleResult).forall(_.isCompleted == false)) - - flow { one << 1 } - - Await.ready(one.future, 1 minute) - - assert(one.isCompleted) - assert(Seq(two.future, simpleResult).forall(_.isCompleted == false)) - - flow { two << 9 } - - Await.ready(two.future, 1 minute) - - assert(Seq(one, two).forall(_.isCompleted == true)) - assert(Await.result(simpleResult, timeout.duration) === 10) - - } - - "futureDataFlowShouldEmulateBlocking2" in { - - val x1, x2, y1, y2 = Promise[Int]() - val lx, ly, lz = new TestLatch - val result = flow { - lx.open() - x1 << y1 - ly.open() - x2 << y2 - lz.open() - x1() + x2() - } - Await.ready(lx, 2 seconds) - assert(!ly.isOpen) - assert(!lz.isOpen) - assert(List(x1, x2, y1, y2).forall(_.isCompleted == false)) - - flow { y1 << 1 } // When this is set, it should cascade down the line - - Await.ready(ly, 2 seconds) - assert(Await.result(x1.future, 1 minute) === 1) - assert(!lz.isOpen) - - flow { y2 << 9 } // When this is set, it should cascade down the line - - Await.ready(lz, 2 seconds) - assert(Await.result(x2.future, 1 minute) === 9) - - assert(List(x1, x2, y1, y2).forall(_.isCompleted)) - - assert(Await.result(result, 1 minute) === 10) - } - - "dataFlowAPIshouldbeSlick" in { - - val i1, i2, s1, s2 = new TestLatch - - val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 } - val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 } - - val result = flow { callService1() + callService2() } - - assert(!s1.isOpen) - assert(!s2.isOpen) - assert(!result.isCompleted) - Await.ready(i1, 2 seconds) - Await.ready(i2, 2 seconds) - s1.open() - s2.open() - assert(Await.result(result, timeout.duration) === 10) - } - - "futureCompletingWithContinuationsFailure" in { - filterException[ArithmeticException] { - - val x, y, z = Promise[Int]() - val ly, lz = new TestLatch - - val result = flow { - y << x - ly.open() - val oops = 1 / 0 - z << x - lz.open() - z() + y() + oops - } - intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) } - intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } - flow { x << 5 } - - assert(Await.result(y.future, timeout.duration) === 5) - intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration)) - assert(z.future.value === None) - assert(!lz.isOpen) - } - } - - "futureContinuationsShouldNotBlock" in { - - val latch = new TestLatch - val future = Future { - Await.ready(latch, TestLatch.DefaultTimeout) - "Hello" - } - - val result = flow { - Some(future()).filter(_ == "Hello") - } - - assert(!result.isCompleted) - - latch.open() - - assert(Await.result(result, timeout.duration) === Some("Hello")) - } - - "futureFlowShouldBeTypeSafe" in { - - val rString = flow { - val x = Future(5) - x().toString - } - - val rInt = flow { - val x = rString.apply - val y = Future(5) - x.length + y() - } - - assert(checkType(rString, classTag[String])) - assert(checkType(rInt, classTag[Int])) - assert(!checkType(rInt, classTag[String])) - assert(!checkType(rInt, classTag[Nothing])) - assert(!checkType(rInt, classTag[Any])) - - Await.result(rString, timeout.duration) - Await.result(rInt, timeout.duration) - } - - "futureFlowSimpleAssign" in { - val x, y, z = Promise[Int]() - - flow { - z << x() + y() - } - flow { x << 40 } - flow { y << 2 } - - assert(Await.result(z.future, timeout.duration) === 42) - } - - "capture first exception with dataflow" in { - val f1 = flow { 40 / 0 } - intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) - } - } - - def checkType[A: ClassTag, B](in: Future[A], ref: ClassTag[B]): Boolean = implicitly[ClassTag[A]].runtimeClass eq ref.runtimeClass - -} diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index f8fa8f00c7..a140d7cd9f 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -18,3 +18,11 @@ In earlier versions of Akka `TestKit.remaining` returned the default timeout con "akka.test.single-expect-default". This was a bit confusing and thus it has been changed to throw an AssertionError if called outside of within. The old behavior however can still be achieved by calling `TestKit.remainingOrDefault` instead. + +Removed Deprecated Features +=========================== + +The following, previously deprecated, features have been removed: + +* akka-dataflow + diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 80d610d4a7..67b5f68f1b 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -75,7 +75,7 @@ object AkkaBuild extends Build { // add reportBinaryIssues to validatePullRequest on minor version maintenance branch validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => } ), - aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, + aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) ) @@ -83,7 +83,7 @@ object AkkaBuild extends Build { id = "akka-scala-nightly", base = file("akka-scala-nightly"), // remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings) - // samples and dataflow don't work with dbuild right now + // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, persistence, mailboxes, kernel, osgi, contrib, multiNodeTestkit) ) @@ -155,25 +155,6 @@ object AkkaBuild extends Build { ) ) - val cpsPlugin = Seq( - libraryDependencies <++= scalaVersion { v => - if (v.startsWith("2.10.")) Seq(compilerPlugin("org.scala-lang.plugins" % "continuations" % v)) - else Seq( - compilerPlugin("org.scala-lang.plugins" %% "scala-continuations-plugin" % Dependencies.Versions.scalaContinuationsVersion), - "org.scala-lang.plugins" %% "scala-continuations-library" % Dependencies.Versions.scalaContinuationsVersion) - }, - scalacOptions += "-P:continuations:enable" - ) - - lazy val dataflow = Project( - id = "akka-dataflow", - base = file("akka-dataflow"), - dependencies = Seq(testkit % "test->test"), - settings = defaultSettings ++ formatSettings ++ scaladocSettingsNoVerificationOfDiagrams ++ OSGi.dataflow ++ cpsPlugin ++ Seq( - previousArtifact := akkaPreviousArtifact("akka-dataflow") - ) - ) - lazy val testkit = Project( id = "akka-testkit", base = file("akka-testkit"), @@ -565,7 +546,7 @@ object AkkaBuild extends Build { dependencies = Seq(actor, testkit % "test->test", remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, persistence % "compile;test->test"), - settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( + settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ Seq( sourceDirectory in Sphinx <<= baseDirectory / "rst", sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" }, // copy akka-contrib/docs into our rst_preprocess/contrib (and apply substitutions) @@ -1048,8 +1029,6 @@ object AkkaBuild extends Build { val slf4j = exports(Seq("akka.event.slf4j.*")) - val dataflow = exports(Seq("akka.dataflow.*")) - val transactor = exports(Seq("akka.transactor.*")) val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport()))