diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 36cb52c747..c7aa2ca4d0 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -5,6 +5,8 @@ import akka.actor.ActorSystem; import akka.japi.*; import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.Promise; import scala.concurrent.util.Duration; import akka.testkit.TestKitExtension; import org.junit.AfterClass; @@ -54,7 +56,7 @@ public class JavaFutureTests { public String apply(String s) { return s + " World"; } - }); + }, system.dispatcher()); assertEquals("Hello World", Await.result(f2, timeout)); } @@ -63,13 +65,13 @@ public class JavaFutureTests { public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); - Future f = cf; + Future f = cf.future(); f.onSuccess(new OnSuccess() { public void onSuccess(String result) { if (result.equals("foo")) latch.countDown(); } - }); + }, system.dispatcher()); cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); @@ -80,13 +82,13 @@ public class JavaFutureTests { public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); - Future f = cf; + Future f = cf.future(); f.onFailure(new OnFailure() { public void onFailure(Throwable t) { if (t instanceof NullPointerException) latch.countDown(); } - }); + }, system.dispatcher()); Throwable exception = new NullPointerException(); cf.failure(exception); @@ -98,12 +100,12 @@ public class JavaFutureTests { public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); - Future f = cf; + Future f = cf.future(); f.onComplete(new OnComplete() { public void onComplete(Throwable t, String r) { latch.countDown(); } - }); + }, system.dispatcher()); cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); @@ -114,12 +116,12 @@ public class JavaFutureTests { public void mustBeAbleToForeachAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); - Future f = cf; + Future f = cf.future(); f.foreach(new Foreach() { public void each(String future) { latch.countDown(); } - }); + },system.dispatcher()); cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); @@ -131,16 +133,16 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); cf.success("1000"); - Future f = cf; + Future f = cf.future(); Future r = f.flatMap(new Mapper>() { public Future checkedApply(String r) throws Throwable { if (false) throw new IOException("Just here to make sure this compiles."); latch.countDown(); Promise cf = Futures.promise(system.dispatcher()); cf.success(Integer.parseInt(r)); - return cf; + return cf.future(); } - }); + }, system.dispatcher()); assertEquals(Await.result(f, timeout), "1000"); assertEquals(Await.result(r, timeout).intValue(), 1000); @@ -151,13 +153,13 @@ public class JavaFutureTests { public void mustBeAbleToFilterAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); - Future f = cf; + Future f = cf.future(); Future r = f.filter(Filter.filterOf(new Function() { public Boolean apply(String r) { latch.countDown(); return r.equals("foo"); } - })); + }), system.dispatcher()); cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); @@ -281,8 +283,8 @@ public class JavaFutureTests { Promise p = Futures.promise(system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.success("foo"); - Await.ready(p, d); - assertEquals(Await.result(p, d), "foo"); + Await.ready(p.future(), d); + assertEquals(Await.result(p.future(), d), "foo"); } @Test @@ -291,8 +293,8 @@ public class JavaFutureTests { Future f = p.future().mapTo(classTag(String.class)); Duration d = Duration.create(1, TimeUnit.SECONDS); p.success("foo"); - Await.ready(p, d); - assertEquals(Await.result(p, d), "foo"); + Await.ready(p.future(), d); + assertEquals(Await.result(p.future(), d), "foo"); } @Test @@ -306,7 +308,7 @@ public class JavaFutureTests { else throw t; } - }); + }, system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.failure(fail); assertEquals(Await.result(f, d), "foo"); @@ -323,7 +325,7 @@ public class JavaFutureTests { else throw t; } - }); + }, system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.failure(fail); assertEquals(Await.result(f, d), "foo"); diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 3650a54158..0f8dc392a3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -14,7 +14,7 @@ import akka.util.Timeout import scala.concurrent.util.duration._ import scala.concurrent.Await import java.lang.IllegalStateException -import akka.dispatch.Promise +import scala.concurrent.Promise import akka.pattern.ask import akka.serialization.JavaSerializer @@ -131,7 +131,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { def wrap[T](f: Promise[Actor] ⇒ T): T = { val result = Promise[Actor]() val r = f(result) - Await.result(result, 1 minute) + Await.result(result.future, 1 minute) r } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index e3f2e45012..71939192f3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -14,7 +14,7 @@ import scala.collection.JavaConverters import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue } import akka.pattern.ask import akka.util.Timeout -import akka.dispatch.Future +import scala.concurrent.Future class JavaExtensionSpec extends JavaExtension with JUnitSuite diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index edab316e82..9cddba38a3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -7,12 +7,12 @@ package akka.actor import language.postfixOps import akka.util.ByteString -import scala.concurrent.{ ExecutionContext, Await } +import scala.concurrent.{ ExecutionContext, Await, Future, Promise } import scala.concurrent.util.{ Duration, Deadline } import scala.concurrent.util.duration._ import scala.util.continuations._ import akka.testkit._ -import akka.dispatch.{ Future, Promise, MessageDispatcher } +import akka.dispatch.MessageDispatcher import java.net.{ SocketAddress } import akka.pattern.ask @@ -246,7 +246,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { */ def retry[T](count: Option[Int] = None, timeout: Option[Duration] = None, delay: Option[Duration] = Some(100 millis), filter: Option[Throwable ⇒ Boolean] = None)(future: ⇒ Future[T])(implicit executor: ExecutionContext): Future[T] = { - val promise = Promise[T]()(executor) + val promise = Promise[T]() val timer: Option[Deadline] = timeout match { case Some(duration) ⇒ Some(duration fromNow) @@ -271,7 +271,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { run(0) - promise + promise.future } "an IO Actor" must { @@ -279,7 +279,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { filterException[java.net.ConnectException] { val addressPromise = Promise[SocketAddress]() val server = system.actorOf(Props(new SimpleEchoServer(addressPromise))) - val address = Await.result(addressPromise, TestLatch.DefaultTimeout) + val address = Await.result(addressPromise.future, TestLatch.DefaultTimeout) val client = system.actorOf(Props(new SimpleEchoClient(address))) val f1 = retry() { client ? ByteString("Hello World!1") } val f2 = retry() { client ? ByteString("Hello World!2") } @@ -296,7 +296,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { filterException[java.net.ConnectException] { val addressPromise = Promise[SocketAddress]() val server = system.actorOf(Props(new SimpleEchoServer(addressPromise))) - val address = Await.result(addressPromise, TestLatch.DefaultTimeout) + val address = Await.result(addressPromise.future, TestLatch.DefaultTimeout) val client = system.actorOf(Props(new SimpleEchoClient(address))) val list = List.range(0, 100) val f = Future.traverse(list)(i ⇒ retry() { client ? ByteString(i.toString) }) @@ -310,7 +310,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { filterException[java.net.ConnectException] { val addressPromise = Promise[SocketAddress]() val server = system.actorOf(Props(new KVStore(addressPromise))) - val address = Await.result(addressPromise, TestLatch.DefaultTimeout) + val address = Await.result(addressPromise.future, TestLatch.DefaultTimeout) val client1 = system.actorOf(Props(new KVClient(address))) val client2 = system.actorOf(Props(new KVClient(address))) val f1 = retry() { client1 ? KVSet("hello", "World") } diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 9d88d51329..3894524487 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -10,7 +10,7 @@ import akka.testkit._ import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.util.Timeout -import akka.dispatch.Future +import scala.concurrent.Future object LocalActorRefProviderSpec { val config = """ diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 378f9febee..61a1e84f7e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -7,16 +7,15 @@ import language.postfixOps import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.util.Timeout -import scala.concurrent.Await +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.util.Duration import scala.concurrent.util.duration._ import java.util.concurrent.atomic.AtomicReference import annotation.tailrec import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } -import akka.serialization.SerializationExtension -import akka.japi.{ Creator, Option ⇒ JOption } +import akka.japi.{ Option ⇒ JOption } import akka.testkit.DefaultTimeout -import akka.dispatch.{ Dispatchers, Future, Promise } +import akka.dispatch.{ Dispatchers } import akka.pattern.ask import akka.serialization.JavaSerializer import akka.actor.TypedActor._ @@ -110,7 +109,7 @@ object TypedActorSpec { def pigdog = "Pigdog" - def futurePigdog(): Future[String] = Promise.successful(pigdog) + def futurePigdog(): Future[String] = Promise.successful(pigdog).future def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) @@ -119,7 +118,7 @@ object TypedActorSpec { def futurePigdog(delay: Long, numbered: Int): Future[String] = { Thread.sleep(delay) - Promise.successful(pigdog + numbered) + Promise.successful(pigdog + numbered).future } def futureComposePigdogFrom(foo: Foo): Future[String] = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 884434fd21..030f46801d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -23,7 +23,7 @@ import akka.testkit._ import akka.util.{ Timeout, Switch } import scala.concurrent.util.duration._ import scala.concurrent.util.Duration -import scala.concurrent.Await +import scala.concurrent.{ Await, Future, Promise } import scala.annotation.tailrec object ActorModelSpec { @@ -413,9 +413,9 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val a = newTestActor(dispatcher.id) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") - val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(new ActorInterruptedException(ie)) } + val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(new ActorInterruptedException(ie)).future } val f4 = a ? Reply("foo2") - val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(new ActorInterruptedException(ie)) } + val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(new ActorInterruptedException(ie)).future } val f6 = a ? Reply("bar2") assert(Await.result(f1, timeout.duration) === "foo") diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index 5ca18c75c8..4c2cd5fd1e 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -6,7 +6,7 @@ package akka.dataflow import language.postfixOps import akka.actor.{ Actor, Props } -import akka.dispatch.Future +import scala.concurrent.Future import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.testkit.{ AkkaSpec, DefaultTimeout } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index e8f5ed9d75..99562d25b2 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -10,7 +10,7 @@ import org.scalacheck.Prop._ import org.scalacheck.Gen._ import akka.actor._ import akka.testkit.{ EventFilter, filterEvents, filterException, AkkaSpec, DefaultTimeout, TestLatch } -import scala.concurrent.Await +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.util.duration._ import scala.concurrent.ExecutionContext import org.scalatest.junit.JUnitSuite @@ -48,12 +48,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "A Promise" when { "never completed" must { - behave like emptyFuture(_(Promise())) + behave like emptyFuture(_(Promise().future)) "return supplied value on timeout" in { - val failure = Promise.failed[String](new RuntimeException("br0ken")) - val otherFailure = Promise.failed[String](new RuntimeException("last")) - val empty = Promise[String]() - val timedOut = Promise.successful[String]("Timedout") + val failure = Promise.failed[String](new RuntimeException("br0ken")).future + val otherFailure = Promise.failed[String](new RuntimeException("last")).future + val empty = Promise[String]().future + val timedOut = Promise.successful[String]("Timedout").future Await.result(failure fallbackTo timedOut, timeout.duration) must be("Timedout") Await.result(timedOut fallbackTo empty, timeout.duration) must be("Timedout") @@ -65,22 +65,22 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "completed with a result" must { val result = "test value" - val future = Promise[String]().complete(Right(result)) + val future = Promise[String]().complete(Right(result)).future behave like futureWithResult(_(future, result)) } "completed with an exception" must { val message = "Expected Exception" - val future = Promise[String]().complete(Left(new RuntimeException(message))) + val future = Promise[String]().complete(Left(new RuntimeException(message))).future behave like futureWithException[RuntimeException](_(future, message)) } "completed with an InterruptedException" must { val message = "Boxed InterruptedException" - val future = Promise[String]().complete(Left(new InterruptedException(message))) + val future = Promise[String]().complete(Left(new InterruptedException(message))).future behave like futureWithException[RuntimeException](_(future, message)) } "completed with a NonLocalReturnControl" must { val result = "test value" - val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))) + val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future behave like futureWithResult(_(future, result)) } @@ -94,11 +94,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val B = namedCtx("B") // create a promise with ctx A - val p = Promise[String]()(A) + val p = Promise[String]() // I would expect that any callback from p // is executed in the context of p - val result = p map { _ + Thread.currentThread().getName() } + val result = p.future map { _ + Thread.currentThread().getName() } p.completeWith(Future { "Hi " }(B)) try { @@ -332,15 +332,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "recoverWith from exceptions" in { val o = new IllegalStateException("original") val r = new IllegalStateException("recovered") + val yay = Promise.successful("yay!").future intercept[IllegalStateException] { - Await.result(Promise.failed[String](o) recoverWith { case _ if false == true ⇒ Promise.successful("yay!") }, timeout.duration) + Await.result(Promise.failed[String](o).future recoverWith { case _ if false == true ⇒ yay }, timeout.duration) } must be(o) - Await.result(Promise.failed[String](o) recoverWith { case _ ⇒ Promise.successful("yay!") }, timeout.duration) must equal("yay!") + Await.result(Promise.failed[String](o).future recoverWith { case _ ⇒ yay }, timeout.duration) must equal("yay!") intercept[IllegalStateException] { - Await.result(Promise.failed[String](o) recoverWith { case _ ⇒ Promise.failed[String](r) }, timeout.duration) + Await.result(Promise.failed[String](o).future recoverWith { case _ ⇒ Promise.failed[String](r).future }, timeout.duration) } must be(r) } @@ -356,7 +357,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "firstCompletedOf" in { - val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5) + val futures = Vector.fill[Future[Int]](10)(Promise[Int]().future) :+ Promise.successful[Int](5).future Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5) } @@ -384,18 +385,18 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val timeout = 10000 millis val f = new IllegalStateException("test") intercept[IllegalStateException] { - Await.result(Promise.failed[String](f) zip Promise.successful("foo"), timeout) + Await.result(Promise.failed[String](f).future zip Promise.successful("foo").future, timeout) } must be(f) intercept[IllegalStateException] { - Await.result(Promise.successful("foo") zip Promise.failed[String](f), timeout) + Await.result(Promise.successful("foo").future zip Promise.failed[String](f).future, timeout) } must be(f) intercept[IllegalStateException] { - Await.result(Promise.failed[String](f) zip Promise.failed[String](f), timeout) + Await.result(Promise.failed[String](f).future zip Promise.failed[String](f).future, timeout) } must be(f) - Await.result(Promise.successful("foo") zip Promise.successful("foo"), timeout) must be(("foo", "foo")) + Await.result(Promise.successful("foo").future zip Promise.successful("foo").future, timeout) must be(("foo", "foo")) } "fold by composing" in { @@ -542,7 +543,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[TimeoutException] { intercept[TimeoutException] { Await.ready(f3, 0 millis) } } } - "futureComposingWithContinuations" in { + //FIXME DATAFLOW + /*"futureComposingWithContinuations" in { import Future.flow val actor = system.actorOf(Props[TestActor]) @@ -835,7 +837,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f3 must be('completed) val p1 = Promise[String]() - val f4 = p1 map { s ⇒ latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length } + val f4 = p1.future map { s ⇒ latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length } f4 foreach (_ ⇒ latch(9).open()) p1 must not be ('completed) @@ -860,7 +862,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val l1, l2 = new TestLatch val complex = Future() map { _ ⇒ - Future.blocking() + scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(implicitly[ExecutionContext]) val nested = Future(()) nested foreach (_ ⇒ l1.open()) Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed @@ -870,11 +872,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Await.ready(complex, timeout.duration) must be('completed) } - "should capture first exception with dataflow" in { + //FIXME DATAFLOW + /*"should capture first exception with dataflow" in { import Future.flow val f1 = flow { 40 / 0 } intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) - } + }*/ } } @@ -899,7 +902,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "transform result with map" in { f((future, result) ⇒ Await.result((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) } "compose result with flatMap" in { f { (future, result) ⇒ - val r = for (r ← future; p ← Promise.successful("foo")) yield r.toString + p + val r = for (r ← future; p ← Promise.successful("foo").future) yield r.toString + p Await.result(r, timeout.duration) must be(result.toString + "foo") } } @@ -907,13 +910,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f { (future, result) ⇒ val p = Promise[Any]() future foreach p.success - Await.result(p, timeout.duration) must be(result) + Await.result(p.future, timeout.duration) must be(result) } } "zip properly" in { f { (future, result) ⇒ - Await.result(future zip Promise.successful("foo"), timeout.duration) must be((result, "foo")) - (evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")), timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes") + Await.result(future zip Promise.successful("foo").future, timeout.duration) must be((result, "foo")) + (evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes") } } "not recover from exception" in { f((future, result) ⇒ Await.result(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) must be(result)) } @@ -921,7 +924,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f { (future, result) ⇒ val p = Promise[Any]() future.onSuccess { case x ⇒ p.success(x) } - Await.result(p, timeout.duration) must be(result) + Await.result(p.future, timeout.duration) must be(result) } } "not project a failure" in { f((future, result) ⇒ (evaluating { Await.result(future.failed, timeout.duration) } must produce[NoSuchElementException]).getMessage must be("Future.failed not completed with a throwable. Instead completed with: " + result)) } @@ -947,11 +950,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } } "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) } - "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) } + "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo").future), timeout.duration) } must produce[E]).getMessage must be(message)) } "not perform action with foreach" is pending "zip properly" in { - f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo"), timeout.duration) } must produce[E]).getMessage must be(message) } + f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo").future, timeout.duration) } must produce[E]).getMessage must be(message) } } "recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) must be("pigdog")) } "not perform action on result" is pending @@ -960,7 +963,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f { (future, message) ⇒ val p = Promise[Any]() future.onFailure { case _ ⇒ p.success(message) } - Await.result(p, timeout.duration) must be(message) + Await.result(p.future, timeout.duration) must be(message) } } "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 78468feda9..ed93362b6f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -10,7 +10,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import com.typesafe.config.Config import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell } import akka.testkit.AkkaSpec -import scala.concurrent.Await +import scala.concurrent.{ Future, Promise, Await } import scala.concurrent.util.duration.intToDurationInt @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -76,7 +76,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } }) t.start - result + result.future } def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters, system) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala index 499ee2a74a..aa65dae1a5 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -6,8 +6,7 @@ package akka.pattern import akka.testkit._ import scala.concurrent.util.duration._ import org.scalatest.BeforeAndAfter -import akka.dispatch.{ Promise, Future } -import scala.concurrent.Await +import scala.concurrent.{ Promise, Future, Await } class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { @@ -37,7 +36,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { def openBreaker: Unit = { for (i ← 1 to 5) Await.result(breakers.breaker.withCircuitBreaker(Future(unreliableCall("fail"))) recoverWith { - case _ ⇒ Promise.successful("OK") + case _ ⇒ Promise.successful("OK").future }, 1.second.dilated) } @@ -62,9 +61,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { Thread.sleep(10); unreliableCall("success") - }) recoverWith { - case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") - } + }) recoverWith { case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO").future } val futureList = Future.sequence(futures) @@ -82,9 +79,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { Thread.sleep(10); unreliableCall("succeed") - }) recoverWith { - case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") - } + }) recoverWith { case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO").future } val futureList = Future.sequence(futures) @@ -106,7 +101,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { Thread.sleep(10); unreliableCall("succeed") }) recoverWith { - case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO").future } val futureList = Future.sequence(futures) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index c683b4f5b7..d0c0fcc309 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -9,7 +9,7 @@ import language.postfixOps import scala.concurrent.util.duration._ import akka.testkit._ import org.scalatest.BeforeAndAfter -import akka.dispatch.Future +import scala.concurrent.Future import scala.concurrent.Await object CircuitBreakerSpec { diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index 5114deab5a..caaf0908b9 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -8,10 +8,9 @@ import language.postfixOps import akka.testkit.AkkaSpec import akka.actor.{ Props, Actor } -import scala.concurrent.Await +import scala.concurrent.{ Future, Promise, Await } import scala.concurrent.util.Duration import scala.concurrent.util.duration._ -import akka.dispatch.{ Future, Promise } object PatternSpec { case class Work(duration: Duration) @@ -50,16 +49,16 @@ class PatternSpec extends AkkaSpec { "pattern.after" must { "be completed successfully eventually" in { - val f = after(1 second, using = system.scheduler)(Promise.successful(5)) + val f = after(1 second, using = system.scheduler)(Promise.successful(5).future) - val r = Future.firstCompletedOf(Seq(Promise[Int](), f)) + val r = Future.firstCompletedOf(Seq(Promise[Int]().future, f)) Await.result(r, remaining) must be(5) } "be completed abnormally eventually" in { - val f = after(1 second, using = system.scheduler)(Promise.failed(new IllegalStateException("Mexico"))) + val f = after(1 second, using = system.scheduler)(Promise.failed(new IllegalStateException("Mexico")).future) - val r = Future.firstCompletedOf(Seq(Promise[Int](), f)) + val r = Future.firstCompletedOf(Seq(Promise[Int]().future, f)) intercept[IllegalStateException] { Await.result(r, remaining) }.getMessage must be("Mexico") } } diff --git a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala index 842ee870a0..daae7e7a94 100644 --- a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala @@ -4,7 +4,7 @@ package akka.util import org.scalatest.matchers.MustMatchers -import akka.dispatch.Future +import scala.concurrent.Future import akka.testkit.AkkaSpec import scala.concurrent.Await import scala.util.Random diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractPromise.java b/akka-actor/src/main/java/akka/dispatch/AbstractPromise.java deleted file mode 100644 index db11e84483..0000000000 --- a/akka-actor/src/main/java/akka/dispatch/AbstractPromise.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.dispatch; - -import akka.util.Unsafe; - -abstract class AbstractPromise { - private volatile Object _ref = DefaultPromise.EmptyPending(); - - final static long _refOffset; // Memory offset to _ref field - - static { - try { - _refOffset = Unsafe.instance.objectFieldOffset(AbstractPromise.class.getDeclaredField("_ref")); - } catch(Throwable t){ - throw new ExceptionInInitializerError(t); - } - } - - protected final boolean updateState(Object oldState, Object newState) { - return Unsafe.instance.compareAndSwapObject(this, _refOffset, oldState, newState); - } - - protected final Object getState() { - return _ref; - } -} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index bbb84144c5..3c5f57a2d8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -4,12 +4,12 @@ package akka.actor -import java.util.concurrent.atomic.AtomicLong import akka.dispatch._ import akka.routing._ -import akka.AkkaException import akka.event._ import akka.util.{ NonFatal, Switch, Helpers } +import scala.concurrent.{ Future, Promise } +import java.util.concurrent.atomic.AtomicLong /** * Interface for all ActorRef providers to implement. @@ -361,9 +361,7 @@ class LocalActorRefProvider( def provider: ActorRefProvider = LocalActorRefProvider.this - override def stop(): Unit = stopped switchOn { - terminationFuture.complete(causeOfTermination.toLeft(())) - } + override def stop(): Unit = stopped switchOn { terminationPromise.complete(causeOfTermination.toLeft(())) } override def isTerminated: Boolean = stopped.isOn @@ -458,7 +456,9 @@ class LocalActorRefProvider( def dispatcher: MessageDispatcher = system.dispatcher - lazy val terminationFuture: Promise[Unit] = Promise[Unit]()(dispatcher) + lazy val terminationPromise: Promise[Unit] = Promise[Unit]() + + def terminationFuture: Future[Unit] = terminationPromise.future @volatile private var extraNames: Map[String, InternalActorRef] = Map() diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 127d73757c..54d0870d18 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -11,7 +11,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec import scala.concurrent.util.Duration import java.io.Closeable -import scala.concurrent.{ Await, Awaitable, CanAwait } +import scala.concurrent.{ Await, Awaitable, CanAwait, Future } import akka.util._ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 77760aa433..2feee3e0bc 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -6,8 +6,7 @@ package akka.actor import language.higherKinds import language.postfixOps -import akka.dispatch.Future -import scala.concurrent.ExecutionContext +import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.util.Duration import akka.util.{ ByteString, NonFatal } import java.net.{ SocketAddress, InetSocketAddress } @@ -564,7 +563,7 @@ object IO { * A mutable reference to an [[akka.actor.IO.Iteratee]]. Not thread safe. * * Designed for use within an [[akka.actor.Actor]], although all actions - * perfomed on the Iteratee are processed within a [[akka.dispatch.Future]] + * perfomed on the Iteratee are processed within a [[scala.concurrent.Future]] * so it is not safe to refer to the Actor's state from within this Iteratee. * Messages should instead be sent to the Actor in order to modify state. * diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 76dbd46bec..92ba3e4742 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -9,10 +9,10 @@ import akka.japi.{ Creator, Option ⇒ JOption } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Timeout, NonFatal } import scala.concurrent.util.Duration -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import akka.util.Reflect.instantiator -import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.dispatch._ +import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit.MILLISECONDS import scala.reflect.ClassTag diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 90143920f7..42f5e3030e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -4,29 +4,12 @@ package akka.dispatch -import language.implicitConversions -import language.postfixOps -import language.higherKinds - +import scala.runtime.{ BoxedUnit, AbstractPartialFunction } import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption } -import scala.util.continuations._ -import scala.reflect.ClassTag +import scala.concurrent.{ Future, Promise, ExecutionContext } import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } -import scala.annotation.tailrec -import scala.collection.mutable.Stack -import akka.util.BoxedType -import akka.util.NonFatal -import akka.event.Logging.{ LogEventException, Debug, Error } -import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.{ ExecutionException, Callable, TimeoutException } -import java.util.concurrent.atomic.{ AtomicInteger } -import akka.pattern.AskTimeoutException -import scala.util.DynamicVariable -import scala.concurrent.util.Duration -import scala.concurrent.ExecutionContext -import scala.runtime.{ BoxedUnit, AbstractPartialFunction } -import scala.concurrent.{ Awaitable, Await, CanAwait } /** * Futures is the Java API for Futures and Promises @@ -41,17 +24,17 @@ object Futures { /** * Java API, equivalent to Promise.apply */ - def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()(executor) + def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]() /** * Java API, creates an already completed Promise with the specified exception */ - def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)(executor) + def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception) /** * Java API, Creates an already completed Promise with the specified result */ - def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)(executor) + def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result) /** * Java API. @@ -131,767 +114,7 @@ object Futures { * Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method. * */ - def blocking(): Unit = Future.blocking() -} - -object Future { - - /** - * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body - * The execution is performed by the specified Dispatcher. - */ - def apply[T](body: ⇒ T)(implicit executor: ExecutionContext): Future[T] = { - val promise = Promise[T]() - executor.execute(new Runnable { - def run = - promise complete { - try { - Right(body) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - Left(e) - } - } - }) - promise - } - - import scala.collection.mutable.Builder - import scala.collection.generic.CanBuildFrom - - /** - * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. - * Useful for reducing many Futures into a single Future. - */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = - in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) - - /** - * Returns a Future to the result of the first future in the list that is completed - */ - def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { - val futureResult = Promise[T]() - - val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult tryComplete _ - futures.foreach(_ onComplete completeFirst) - - futureResult - } - - /** - * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate - */ - def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { - if (futures.isEmpty) Promise.successful[Option[T]](None) - else { - val result = Promise[Option[T]]() - val ref = new AtomicInteger(futures.size) - val search: Either[Throwable, T] ⇒ Unit = v ⇒ try { - v match { - case Right(r) ⇒ if (predicate(r)) result tryComplete Right(Some(r)) - case _ ⇒ - } - } finally { - if (ref.decrementAndGet == 0) - result tryComplete Right(None) - } - - futures.foreach(_ onComplete search) - - result - } - } - - /** - * A non-blocking fold over the specified futures, with the start value of the given zero. - * The fold is performed on the thread where the last future is completed, - * the result will be the first failure of any of the futures, or any failure in the actual fold, - * or the result of the fold. - * Example: - *
-   *   val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
-   * 
- */ - def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = { - if (futures.isEmpty) Promise.successful(zero) - else sequence(futures).map(_.foldLeft(zero)(foldFun)) - } - - /** - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first - * Example: - *
-   *   val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
-   * 
- */ - def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = { - if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")) - else sequence(futures).map(_ reduceLeft op) - } - /** - * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. - * This is useful for performing a parallel map. For example, to apply a function to all items of a list - * in parallel: - *
-   * val myFutureList = Future.traverse(myList)(x ⇒ Future(myFunc(x)))
-   * 
- */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = - in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ - val fb = fn(a.asInstanceOf[A]) - for (r ← fr; b ← fb) yield (r += b) - }.map(_.result) - - /** - * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited - * Continuations plugin. - * - * Within the block, the result of a Future may be accessed by calling Future.apply. At that point - * execution is suspended with the rest of the block being stored in a continuation until the result - * of the Future is available. If an Exception is thrown while processing, it will be contained - * within the resulting Future. - * - * This allows working with Futures in an imperative style without blocking for each result. - * - * Completing a Future using 'Promise << Future' will also suspend execution until the - * value of the other Future is available. - * - * The Delimited Continuations compiler plugin must be enabled in order to use this method. - */ - def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { - val p = Promise[A] - dispatchTask({ () ⇒ - try { - (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { - case NonFatal(e) ⇒ p tryComplete Left(e) - } - } catch { - case NonFatal(e) ⇒ p tryComplete Left(e) - } - }, true) - p.future - } - - /** - * Signals that the current thread of execution will potentially engage - * an action that will take a non-trivial amount of time, perhaps by using blocking.IO or using a lot of CPU time, - * giving the system a chance to spawn new threads, reuse old threads or otherwise, - * to prevent starvation and/or unfairness. - * - * Assures that any Future tasks initiated in the current thread will be - * executed asynchronously, including any tasks currently queued to be - * executed in the current thread. This is needed if the current task may - * block, causing delays in executing the remaining tasks which in some - * cases may cause a deadlock. - * - * Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method. - * - * For example, in the following block of code the call to 'latch.open' - * might not be executed until after the call to 'latch.await', causing - * a deadlock. By adding 'Future.blocking()' the call to 'latch.open' - * will instead be dispatched separately from the current block, allowing - * it to be run in parallel: - *
-   * val latch = new StandardLatch
-   * val future = Future() map { _ ⇒
-   *   Future.blocking()
-   *   val nested = Future()
-   *   nested foreach (_ ⇒ latch.open)
-   *   latch.await
-   * }
-   * 
- */ - def blocking(): Unit = - _taskStack.get match { - case stack if (stack ne null) && stack.nonEmpty ⇒ - val executionContext = _executionContext.value match { - case null ⇒ throw new IllegalStateException("'blocking' needs to be invoked inside a Future callback.") - case some ⇒ some - } - val tasks = stack.elems - stack.clear() - _taskStack.remove() - dispatchTask(() ⇒ _taskStack.get.elems = tasks, true)(executionContext) - case _ ⇒ _taskStack.remove() - } - - private val _taskStack = new ThreadLocal[Stack[() ⇒ Unit]]() - private val _executionContext = new DynamicVariable[ExecutionContext](null) - - /** - * Internal API, do not call - */ - private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit = - _taskStack.get match { - case stack if (stack ne null) && (executor eq _executionContext.value) && !force ⇒ stack push task - case _ ⇒ executor.execute( - new Runnable { - def run = - try { - _executionContext.withValue(executor) { - val taskStack = Stack.empty[() ⇒ Unit] - taskStack push task - _taskStack set taskStack - - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case NonFatal(e) ⇒ executor.reportFailure(e) - } - } - } - } finally { - _taskStack.remove() - } - }) - } - -} - -/** - * Trait representing a value that may not have been computed yet. - * - * @define asyncCallbackWarning - * - * Note: the callback function may (and probably will) run in another thread, - * and therefore should not refer to any unsynchronized state. In - * particular, if using this method from an actor, do not access - * the state of the actor from the callback function. - * [[akka.dispatch.Promise]].`completeWith`, - * [[akka.pattern.PipeToSupport.PipeableFuture]].`pipeTo`, - * and [[akka.dispatch.Future]].`fallbackTo` are some methods to consider - * using when possible, to avoid concurrent callbacks. - */ -sealed trait Future[+T] extends Awaitable[T] { - - protected implicit def executor: ExecutionContext - - protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { - case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X]) - case Left(t: InterruptedException) ⇒ Left(new RuntimeException("Boxed InterruptedException", t)) - case _ ⇒ source - } - - /** - * @return a new Future that will contain a tuple containing the successful result of this and that Future. - * If this or that fail, they will race to complete the returned Future with their failure. - * The returned Future will not be completed if neither this nor that are completed. - */ - def zip[U](that: Future[U]): Future[(T, U)] = { - val p = Promise[(T, U)]() - onComplete { - case Left(t) ⇒ p failure t - case Right(r) ⇒ that onSuccess { case r2 ⇒ p success ((r, r2)) } - } - that onFailure { case f ⇒ p tryComplete Left(f) } - p.future - } - - /** - * For use only within a Future.flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Future without blocking, by suspending execution and storing it as a - * continuation until the result is available. - */ - def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) - - /** - * Tests whether this Future has been completed. - */ - def isCompleted: Boolean - - /** - * The contained value of this Future. Before this Future is completed - * the value will be None. After completion the value will be Some(Right(t)) - * if it contains a valid result, or Some(Left(error)) if it contains - * an exception. - */ - def value: Option[Either[Throwable, T]] - - /** - * When this Future is completed, apply the provided function to the - * Future. If the Future has already been completed, this will apply - * immediately. Multiple - * callbacks may be registered; there is no guarantee that they will be - * executed in a particular order. - * - * $asyncCallbackWarning - */ - def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type - - /** - * When the future is completed with a valid result, apply the provided - * PartialFunction to the result. See `onComplete` for more details. - *
-   *   future onSuccess {
-   *     case Foo ⇒ target ! "foo"
-   *     case Bar ⇒ target ! "bar"
-   *   }
-   * 
- * - * $asyncCallbackWarning - */ - final def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete { - case Right(r) if pf isDefinedAt r ⇒ pf(r) - case _ ⇒ - } - - /** - * When the future is completed with an exception, apply the provided - * PartialFunction to the exception. See `onComplete` for more details. - *
-   *   future onFailure {
-   *     case NumberFormatException ⇒ target ! "wrong format"
-   *   }
-   * 
- * - * $asyncCallbackWarning - */ - final def onFailure[U](pf: PartialFunction[Throwable, U]): this.type = onComplete { - case Left(ex) if pf isDefinedAt ex ⇒ pf(ex) - case _ ⇒ - } - - /** - * Returns a failure projection of this Future - * If `this` becomes completed with a failure, that failure will be the success of the returned Future - * If `this` becomes completed with a result, then the returned future will fail with a NoSuchElementException - */ - final def failed: Future[Throwable] = { - val p = Promise[Throwable]() - this.onComplete { - case Left(t) ⇒ p success t - case Right(r) ⇒ p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r) - } - p.future - } - - /** - * Returns a new Future that will either hold the successful value of this Future, - * or, it this Future fails, it will hold the result of "that" Future. - */ - def fallbackTo[U >: T](that: Future[U]): Future[U] = { - val p = Promise[U]() - onComplete { - case r @ Right(_) ⇒ p complete r - case _ ⇒ p completeWith that - } - p.future - } - - /** - * Creates a new Future that will handle any matching Throwable that this - * Future might contain. If there is no match, or if this Future contains - * a valid result then the new Future will contain the same. - * Example: - *
-   * Future(6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
-   * Future(6 / 0) recover { case e: NotFoundException   ⇒ 0 } // result: exception
-   * Future(6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
-   * 
- * - * $asyncCallbackWarning - */ - final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val p = Promise[A]() - onComplete { - case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case NonFatal(x) ⇒ Left(x) }) - case otherwise ⇒ p complete otherwise - } - p.future - } - - /** - * Returns a new Future that will, in case this future fails, - * be completed with the resulting Future of the given PartialFunction, - * if the given PartialFunction matches the failure of the original Future. - * - * If the PartialFunction throws, that Throwable will be propagated to the returned Future. - * - * Example: - * - * {{{ - * val f = Future { Int.MaxValue } - * Future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue - * }}} - * - * $asyncCallbackWarning - */ - def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { - val p = Promise[U]() - - onComplete { - case Left(t) if pf isDefinedAt t ⇒ - try { p completeWith pf(t) } catch { case NonFatal(t) ⇒ p complete resolve(Left(t)) } - case otherwise ⇒ p complete otherwise - } - - p.future - } - - /** - * Returns a new Future that will contain the completed result of this Future, - * and which will invoke the supplied PartialFunction when completed. - * - * This allows for establishing order of side-effects. - * - * {{{ - * Future { 5 } andThen { - * case something => assert(something is awesome) - * } andThen { - * case Left(t) => handleProblem(t) - * case Right(v) => dealWithSuccess(v) - * } - * }}} - * - * $asyncCallbackWarning - */ - def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = { - val p = Promise[T]() - onComplete { case r ⇒ try if (pf isDefinedAt r) pf(r) finally p complete r } - p.future - } - - /** - * Creates a new Future by applying a function to the successful result of - * this Future. If this Future is completed with an exception then the new - * Future will also contain this exception. - * Example: - *
-   * val future1 = for {
-   *   a: Int    <- actor ? "Hello" // returns 5
-   *   b: String <- actor ? a       // returns "10"
-   *   c: String <- actor ? 7       // returns "14"
-   * } yield b + "-" + c
-   * 
- * - * $asyncCallbackWarning - */ - final def map[A](f: T ⇒ A): Future[A] = { - val future = Promise[A]() - onComplete { - case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] - case Right(res) ⇒ - future complete (try { - Right(f(res)) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - Left(e) - }) - } - future - } - - /** - * Creates a new Future[A] which is completed with this Future's result if - * that conforms to A's erased type or a ClassCastException otherwise. - * - * When used from Java, to create the ClassTag, use: - * import static akka.japi.Util.classTag; - * future.mapTo(classTag(MyClass.class)); - */ - final def mapTo[A](implicit m: ClassTag[A]): Future[A] = { - val fa = Promise[A]() - onComplete { - case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] - case Right(t) ⇒ - fa complete (try { - Right(BoxedType(m.runtimeClass).cast(t).asInstanceOf[A]) - } catch { - case e: ClassCastException ⇒ Left(e) - }) - } - fa.future - } - - /** - * Creates a new Future by applying a function to the successful result of - * this Future, and returns the result of the function as the new Future. - * If this Future is completed with an exception then the new Future will - * also contain this exception. - * Example: - *
-   * val future1 = for {
-   *   a: Int    <- actor ? "Hello" // returns 5
-   *   b: String <- actor ? a       // returns "10"
-   *   c: String <- actor ? 7       // returns "14"
-   * } yield b + "-" + c
-   * 
- * - * $asyncCallbackWarning - */ - final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val p = Promise[A]() - - onComplete { - case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]] - case Right(r) ⇒ - try { - p completeWith f(r) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - p complete Left(e) - case t: Throwable ⇒ - p complete Left(new ExecutionException(t)); throw t - } - } - p.future - } - - /** - * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions - * - * $asyncCallbackWarning - */ - final def foreach[U](f: T ⇒ U): Unit = onComplete { - case Right(r) ⇒ f(r) - case _ ⇒ - } - - /** - * Used by for-comprehensions - * - * $asyncCallbackWarning - */ - final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) - - final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { - def foreach(f: A ⇒ Unit): Unit = self filter p foreach f - def map[B](f: A ⇒ B): Future[B] = self filter p map f - def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f - def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) - } - - /** - * Returns a new Future that will hold the successful result of this Future if it matches - * the given predicate, if it doesn't match, the resulting Future will be a failed Future - * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future - * - * $asyncCallbackWarning - */ - final def filter(pred: T ⇒ Boolean): Future[T] = { - val p = Promise[T]() - onComplete { - case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]] - case r @ Right(res) ⇒ p complete (try { - if (pred(res)) r else Left(new MatchError(res)) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - Left(e) - }) - } - p.future - } - -} - -object Promise { - /** - * Creates a non-completed Promise - * - * Scala API - */ - def apply[A]()(implicit executor: ExecutionContext): Promise[A] = new DefaultPromise[A]() - - /** - * Creates an already completed Promise with the specified exception - */ - def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Left(exception)) - - /** - * Creates an already completed Promise with the specified result - */ - def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result)) -} - -/** - * Essentially this is the Promise (or write-side) of a Future (read-side). - */ -trait Promise[T] extends Future[T] { - - /** - * Returns the Future associated with this Promise - */ - def future: Future[T] = this - - /** - * Completes this Promise with the specified result, if not already completed. - * @return whether this call completed the Promise - */ - def tryComplete(value: Either[Throwable, T]): Boolean - - /** - * Completes this Promise with the specified result, if not already completed. - * @throws IllegalStateException if already completed, this is to aid in debugging of complete-races, - * use tryComplete to do a conditional complete. - * @return this - */ - final def complete(value: Either[Throwable, T]): this.type = - if (tryComplete(value)) this else throw new IllegalStateException("Promise already completed: " + this + " tried to complete with " + value) - - /** - * Completes this Promise with the specified result, if not already completed. - * @return this - */ - final def success(result: T): this.type = complete(Right(result)) - - /** - * Completes this Promise with the specified exception, if not already completed. - * @return this - */ - final def failure(exception: Throwable): this.type = complete(Left(exception)) - - /** - * Completes this Promise with the specified other Future, when that Future is completed, - * unless this Promise has already been completed. - * @return this. - */ - final def completeWith(other: Future[T]): this.type = { - other onComplete { tryComplete(_) } - this - } - - final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } - - final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = Promise[Any]() - val thisPromise = this - thisPromise completeWith other onComplete { v ⇒ - try { - fr completeWith cont(thisPromise) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - fr failure e - } - } - fr - } -} - -//Companion object to FState, just to provide a cheap, immutable default entry -private[dispatch] object DefaultPromise { - def EmptyPending[T](): List[T] = Nil -} - -/** - * The default concrete Future implementation. - */ -class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { - self ⇒ - - protected final def tryAwait(atMost: Duration): Boolean = { - Future.blocking - - @tailrec - def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (!isCompleted && waitTimeNanos > 0) { - val ms = NANOSECONDS.toMillis(waitTimeNanos) - val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec - val start = System.nanoTime() - try { synchronized { if (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } - - awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) - } else isCompleted - } - awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) - } - - @throws(classOf[TimeoutException]) - def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (isCompleted || tryAwait(atMost)) this - else throw new TimeoutException("Futures timed out after [" + atMost + "]") - - @throws(classOf[Exception]) - def result(atMost: Duration)(implicit permit: CanAwait): T = - ready(atMost).value.get match { - case Left(e: AskTimeoutException) ⇒ throw new AskTimeoutException(e.getMessage, e) // to get meaningful stack trace - case Left(e) ⇒ throw e - case Right(r) ⇒ r - } - - def value: Option[Either[Throwable, T]] = getState match { - case _: List[_] ⇒ None - case c: Either[_, _] ⇒ Some(c.asInstanceOf[Either[Throwable, T]]) - } - - def isCompleted(): Boolean = getState match { - case _: Either[_, _] ⇒ true - case _ ⇒ false - } - - def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] ⇒ Unit] = { - try { - @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = { - getState match { - case raw: List[_] ⇒ - val cur = raw.asInstanceOf[List[Either[Throwable, T] ⇒ Unit]] - if (updateState(cur, v)) cur else tryComplete(v) - case _ ⇒ null - } - } - tryComplete(resolve(value)) - } finally { - synchronized { notifyAll() } //Notify any evil blockers - } - } - - callbacks match { - case null ⇒ false - case cs if cs.isEmpty ⇒ true - case cs ⇒ Future.dispatchTask(() ⇒ cs.foreach(f ⇒ notifyCompleted(f, value))); true - } - } - - def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { - @tailrec //Returns whether the future has already been completed or not - def tryAddCallback(): Either[Throwable, T] = { - val cur = getState - cur match { - case r: Either[_, _] ⇒ r.asInstanceOf[Either[Throwable, T]] - case listeners: List[_] ⇒ if (updateState(listeners, func :: listeners)) null else tryAddCallback() - } - } - - tryAddCallback() match { - case null ⇒ this - case completed ⇒ - Future.dispatchTask(() ⇒ notifyCompleted(func, completed)) - this - } - } - - private final def notifyCompleted[U](func: Either[Throwable, T] ⇒ U, result: Either[Throwable, T]): Unit = - try func(result) catch { case NonFatal(e) ⇒ executor reportFailure e } -} - -/** - * An already completed Future is seeded with it's result at creation, is useful for when you are participating in - * a Future-composition but you already have a value to contribute. - */ -final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { - val value = Some(resolve(suppliedValue)) - - def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { - val completedAs = value.get - Future dispatchTask (() ⇒ func(completedAs)) - this - } - def isCompleted(): Boolean = true - def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { - case Left(e) ⇒ throw e - case Right(r) ⇒ r - } + def blocking(): Unit = scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(ExecutionContext.defaultExecutionContext) //FIXME NOT CORRECT EC } /** diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index d029972d3e..e4134e01da 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -9,6 +9,7 @@ import java.util.concurrent.TimeoutException import annotation.tailrec import akka.actor._ import akka.dispatch._ +import scala.concurrent.{ Future, Promise } import akka.util.{ NonFatal, Timeout, Unsafe } /** @@ -43,7 +44,7 @@ trait AskSupport { implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) /** - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future * will be completed with an [[akka.pattern.AskTimeoutException]] after the @@ -68,21 +69,21 @@ trait AskSupport { * } pipeTo nextActor * }}} * - * [see [[akka.dispatch.Future]] for a description of `flow`] + * [see [[scala.concurrent.Future]] for a description of `flow`] */ def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { case ref: InternalActorRef if ref.isTerminated ⇒ actorRef.tell(message) - Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher) + Promise.failed[Any](new AskTimeoutException("sending to terminated ref breaks promises")).future case ref: InternalActorRef ⇒ val provider = ref.provider if (timeout.duration.length <= 0) { actorRef.tell(message) - Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher) + Promise.failed[Any](new AskTimeoutException("not asking with negative timeout")).future } else { val a = PromiseActorRef(provider, timeout) actorRef.tell(message, a) - a.result + a.result.future } case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) } @@ -93,7 +94,7 @@ trait AskSupport { private[akka] final class AskableActorRef(val actorRef: ActorRef) { /** - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future * will be completed with an [[akka.pattern.AskTimeoutException]] after the @@ -118,12 +119,12 @@ trait AskSupport { * } pipeTo nextActor * }}} * - * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + * [see the [[scala.concurrent.Future]] companion object for a description of `flow`] */ def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) /** - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future * will be completed with an [[akka.pattern.AskTimeoutException]] after the @@ -148,7 +149,7 @@ trait AskSupport { * } pipeTo nextActor * }}} * - * [see the [[akka.dispatch.Future]] companion object for a description of `flow`] + * [see the [[scala.concurrent.Future]] companion object for a description of `flow`] */ def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) } @@ -277,7 +278,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide @tailrec override def stop(): Unit = { def ensureCompleted(): Unit = { - if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) + result.tryComplete(Left(new ActorKilledException("Stopped"))) val watchers = clearWatchers() if (!watchers.isEmpty) { val termination = Terminated(this)(existenceConfirmed = true) @@ -304,10 +305,10 @@ private[akka] object PromiseActorRef { private case class StoppedWithPath(path: ActorPath) def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { - val result = Promise[Any]()(provider.dispatcher) + val result = Promise[Any]() val a = new PromiseActorRef(provider, result) val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) } - result onComplete { _ ⇒ try a.stop() finally f.cancel() } + result.future onComplete { _ ⇒ try a.stop() finally f.cancel() } a } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index d12442679c..d6d17907b9 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -6,14 +6,12 @@ package akka.pattern import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean } import akka.AkkaException import akka.actor.Scheduler -import akka.dispatch.{ Future, Promise } import akka.util.{ NonFatal, Unsafe } -import scala.concurrent.ExecutionContext -import scala.concurrent.util.duration._ -import scala.concurrent.util.{ Duration, Deadline } -import util.control.NoStackTrace +import scala.util.control.NoStackTrace import java.util.concurrent.{ Callable, CopyOnWriteArrayList } -import scala.concurrent.{ Awaitable, Await, CanAwait } +import scala.concurrent.{ ExecutionContext, Future, Promise, Awaitable, Await, CanAwait } +import scala.concurrent.util.{ Duration, Deadline } +import scala.concurrent.util.duration._ /** * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread @@ -111,22 +109,18 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * * @param body Call needing protected * @tparam T return type from call - * @return [[akka.dispatch.Future]] containing the call result + * @return [[scala.concurrent.Future]] containing the call result */ - def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = { - currentState.invoke(body) - } + def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = currentState.invoke(body) /** * Java API for withCircuitBreaker * * @param body Call needing protected * @tparam T return type from call - * @return [[akka.dispatch.Future]] containing the call result + * @return [[scala.concurrent.Future]] containing the call result */ - def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = { - withCircuitBreaker(body.call) - } + def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call) /** * Wraps invocations of synchronous calls that need to be protected @@ -137,16 +131,10 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T return type from call * @return The result of the call */ - def withSyncCircuitBreaker[T](body: ⇒ T): T = { - Await.result(withCircuitBreaker( - { - try - Promise.successful(body)(CircuitBreaker.syncExecutionContext) - catch { - case NonFatal(t) ⇒ Promise.failed(t)(CircuitBreaker.syncExecutionContext) - } - }), callTimeout) - } + def withSyncCircuitBreaker[T](body: ⇒ T): T = + Await.result( + withCircuitBreaker({ try Promise.successful(body) catch { case NonFatal(t) ⇒ Promise.failed(t) } }.future), + callTimeout) /** * Java API for withSyncCircuitBreaker @@ -156,9 +144,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @return The result of the call */ - def callWithSyncCircuitBreaker[T](body: Callable[T]): T = { - withSyncCircuitBreaker(body.call) - } + def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) /** * Adds a callback to execute when circuit breaker opens @@ -181,9 +167,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onOpen[T](callback: Callable[T]): CircuitBreaker = { - onOpen(callback.call) - } + def onOpen[T](callback: Callable[T]): CircuitBreaker = onOpen(callback.call) /** * Adds a callback to execute when circuit breaker transitions to half-open @@ -206,9 +190,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = { - onHalfOpen(callback.call) - } + def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = onHalfOpen(callback.call) /** * Adds a callback to execute when circuit breaker state closes @@ -231,9 +213,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation * @return CircuitBreaker for fluent usage */ - def onClose[T](callback: Callable[T]): CircuitBreaker = { - onClose(callback.call) - } + def onClose[T](callback: Callable[T]): CircuitBreaker = onClose(callback.call) /** * Retrieves current failure count. @@ -249,37 +229,30 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @param toState State being transitioning from * @throws IllegalStateException if an invalid transition is attempted */ - private def transition(fromState: State, toState: State): Unit = { + private def transition(fromState: State, toState: State): Unit = if (swapState(fromState, toState)) toState.enter() else throw new IllegalStateException("Illegal transition attempted from: " + fromState + " to " + toState) - } /** * Trips breaker to an open state. This is valid from Closed or Half-Open states. * * @param fromState State we're coming from (Closed or Half-Open) */ - private def tripBreaker(fromState: State): Unit = { - transition(fromState, Open) - } + private def tripBreaker(fromState: State): Unit = transition(fromState, Open) /** * Resets breaker to a closed state. This is valid from an Half-Open state only. * */ - private def resetBreaker(): Unit = { - transition(HalfOpen, Closed) - } + private def resetBreaker(): Unit = transition(HalfOpen, Closed) /** * Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only. * */ - private def attemptReset(): Unit = { - transition(Open, HalfOpen) - } + private def attemptReset(): Unit = transition(Open, HalfOpen) /** * Internal state abstraction @@ -293,9 +266,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @param listener listener implementation * @tparam T return type of listener, not used - but supplied for type inference purposes */ - def addListener[T](listener: () ⇒ T) { - listeners add listener - } + def addListener[T](listener: () ⇒ T): Unit = listeners add listener /** * Test for whether listeners exist @@ -330,16 +301,12 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati */ def callThrough[T](body: ⇒ Future[T]): Future[T] = { val deadline = callTimeout.fromNow - val bodyFuture = try body catch { - case NonFatal(t) ⇒ Promise.failed(t) - } - bodyFuture onFailure { - case _ ⇒ callFails() - } onSuccess { - case _ ⇒ - if (deadline.isOverdue()) callFails() - else callSucceeds() + val bodyFuture = try body catch { case NonFatal(t) ⇒ Promise.failed(t).future } + bodyFuture onComplete { + case Right(_) if !deadline.isOverdue() ⇒ callSucceeds() + case _ ⇒ callFails() } + bodyFuture } /** @@ -392,16 +359,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T Return type of protected call * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = { - callThrough(body) - } + override def invoke[T](body: ⇒ Future[T]): Future[T] = callThrough(body) /** * On successful call, the failure count is reset to 0 * * @return */ - override def callSucceeds(): Unit = { set(0) } + override def callSucceeds(): Unit = set(0) /** * On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and @@ -409,27 +374,21 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * * @return */ - override def callFails(): Unit = { - if (incrementAndGet() == maxFailures) tripBreaker(Closed) - } + override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed) /** * On entry of this state, failure count is reset. * * @return */ - override def _enter(): Unit = { - set(0) - } + override def _enter(): Unit = set(0) /** * Override for more descriptive toString * * @return */ - override def toString: String = { - "Closed with failure count = " + get() - } + override def toString: String = "Closed with failure count = " + get() } /** @@ -445,44 +404,36 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T Return type of protected call * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = { - if (compareAndSet(true, false)) - callThrough(body) - else - Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)) - } + override def invoke[T](body: ⇒ Future[T]): Future[T] = + if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)).future /** * Reset breaker on successful call. * * @return */ - override def callSucceeds(): Unit = { resetBreaker() } + override def callSucceeds(): Unit = resetBreaker() /** * Reopen breaker on failed call. * * @return */ - override def callFails(): Unit = { tripBreaker(HalfOpen) } + override def callFails(): Unit = tripBreaker(HalfOpen) /** * On entry, guard should be reset for that first call to get in * * @return */ - override def _enter(): Unit = { - set(true) - } + override def _enter(): Unit = set(true) /** * Override for more descriptive toString * * @return */ - override def toString: String = { - "Half-Open currently testing call for success = " + get() - } + override def toString: String = "Half-Open currently testing call for success = " + get() } /** @@ -497,9 +448,8 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @tparam T Return type of protected call * @return Future containing result of protected call */ - override def invoke[T](body: ⇒ Future[T]): Future[T] = { - Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)) - } + override def invoke[T](body: ⇒ Future[T]): Future[T] = + Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)).future /** * Calculate remaining timeout to inform the caller in case a backoff algorithm is useful @@ -516,14 +466,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * * @return */ - override def callSucceeds(): Unit = {} + override def callSucceeds(): Unit = () /** * No-op for open, calls are never executed so cannot succeed or fail * * @return */ - override def callFails(): Unit = {} + override def callFails(): Unit = () /** * On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to @@ -543,9 +493,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * * @return */ - override def toString: String = { - "Open" - } + override def toString: String = "Open" } } diff --git a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala index 8126e16b3f..b377f24691 100644 --- a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala @@ -5,20 +5,20 @@ package akka.pattern */ import scala.concurrent.util.Duration -import scala.concurrent.ExecutionContext +import scala.concurrent.{ ExecutionContext, Promise, Future } import akka.actor._ -import akka.dispatch.{ Promise, Future } trait FutureTimeoutSupport { /** - * Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value + * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value * after the specified duration. */ def after[T](duration: Duration, using: Scheduler)(value: ⇒ Future[T])(implicit ec: ExecutionContext): Future[T] = if (duration.isFinite() && duration.length < 1) value else { val p = Promise[T]() val c = using.scheduleOnce(duration) { p completeWith value } - p onComplete { _ ⇒ c.cancel() } - p + val f = p.future + f onComplete { _ ⇒ c.cancel() } + f } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 9a526eacc6..240017dd76 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -6,12 +6,13 @@ package akka.pattern import akka.actor._ import akka.util.{ Timeout } +import akka.dispatch.{ Unwatch, Watch } +import scala.concurrent.{ Promise, Future } import scala.concurrent.util.Duration -import akka.dispatch.{ Unwatch, Watch, Promise, Future } trait GracefulStopSupport { /** - * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when + * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been * terminated. * @@ -31,22 +32,23 @@ trait GracefulStopSupport { * } * }}} * - * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { - if (target.isTerminated) Promise.successful(true) + if (target.isTerminated) Promise.successful(true).future else system match { case e: ExtendedActorSystem ⇒ val internalTarget = target.asInstanceOf[InternalActorRef] val ref = PromiseActorRef(e.provider, Timeout(timeout)) internalTarget.sendSystemMessage(Watch(target, ref)) - ref.result onComplete { // Just making sure we're not leaking here + val f = ref.result.future + f onComplete { // Just making sure we're not leaking here case Right(Terminated(`target`)) ⇒ () case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) } target ! PoisonPill - ref.result map { + f map { case Terminated(`target`) ⇒ true case _ ⇒ false } diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index dd33307524..348ae54e95 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -9,14 +9,14 @@ import java.util.concurrent.Callable object Patterns { import akka.actor.{ ActorRef, ActorSystem } - import akka.dispatch.Future import akka.pattern.{ ask ⇒ scalaAsk, pipe ⇒ scalaPipe, gracefulStop ⇒ scalaGracefulStop, after ⇒ scalaAfter } import akka.util.Timeout + import scala.concurrent.Future import scala.concurrent.util.Duration /** * Java API for `akka.pattern.ask`: - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future * will be completed with an [[akka.pattern.AskTimeoutException]] after the @@ -47,7 +47,7 @@ object Patterns { /** * Java API for `akka.pattern.ask`: - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future * will be completed with an [[akka.pattern.AskTimeoutException]] after the @@ -77,7 +77,7 @@ object Patterns { def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] /** - * Register an onComplete callback on this [[akka.dispatch.Future]] to send + * Register an onComplete callback on this [[scala.concurrent.Future]] to send * the result to the given actor reference. Returns the original Future to * allow method chaining. * @@ -94,27 +94,27 @@ object Patterns { def pipe[T](future: Future[T]): PipeableFuture[T] = scalaPipe(future) /** - * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when + * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been * terminated. * * Useful when you need to wait for termination or compose ordered termination of several actors. * - * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] /** - * Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided Callable + * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * after the specified duration. */ def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] = scalaAfter(duration, scheduler)(value.call())(context) /** - * Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value + * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value * after the specified duration. */ def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala index f8111b0e25..20bb944478 100644 --- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -5,17 +5,19 @@ package akka.pattern import language.implicitConversions -import akka.dispatch.Future +import scala.concurrent.{ Future } import akka.actor.{ Status, ActorRef } trait PipeToSupport { final class PipeableFuture[T](val future: Future[T]) { - def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = null): Future[T] = + def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = null): Future[T] = { future onComplete { case Right(r) ⇒ recipient ! r case Left(f) ⇒ recipient ! Status.Failure(f) } + future + } def to(recipient: ActorRef): PipeableFuture[T] = to(recipient, null) def to(recipient: ActorRef, sender: ActorRef): PipeableFuture[T] = { pipeTo(recipient)(sender) @@ -24,7 +26,7 @@ trait PipeToSupport { } /** - * Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]: + * Import this implicit conversion to gain the `pipeTo` method on [[scala.concurrent.Future]]: * * {{{ * import akka.pattern.pipe diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 883db42b2c..b2f232de8f 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -15,7 +15,7 @@ import akka.actor._ * *
    *
  • ask: create a temporary one-off actor for receiving a reply to a - * message and complete a [[akka.dispatch.Future]] with it; returns said + * message and complete a [[scala.concurrent.Future]] with it; returns said * Future.
  • *
  • pipeTo: feed eventually computed value of a future to an actor as * a message.
  • diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 49fcb6474a..5838babd11 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -16,10 +16,8 @@ import scala.collection.JavaConversions.iterableAsScalaIterable import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.util.Unsafe import akka.dispatch.Dispatchers import scala.annotation.tailrec -import scala.runtime.ScalaRunTime /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -1078,7 +1076,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ case (sender, message) ⇒ val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider val asker = akka.pattern.PromiseActorRef(provider, within) - asker.result.pipeTo(sender) + asker.result.future.pipeTo(sender) toAll(asker, routeeProvider.routees) } } diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 76415b93ba..ac2af5c1b5 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -350,10 +350,8 @@ object CompactByteString { /** * Creates a new CompactByteString by copying a byte array. */ - def apply(bytes: Array[Byte]): CompactByteString = { - if (bytes.isEmpty) empty - else ByteString.ByteString1C(bytes.clone) - } + def apply(bytes: Array[Byte]): CompactByteString = + if (bytes.isEmpty) empty else ByteString.ByteString1C(bytes.clone) /** * Creates a new CompactByteString by copying bytes. @@ -395,10 +393,8 @@ object CompactByteString { /** * Creates a new CompactByteString by encoding a String with a charset. */ - def apply(string: String, charset: String): CompactByteString = { - if (string.isEmpty) empty - else ByteString.ByteString1C(string.getBytes(charset)) - } + def apply(string: String, charset: String): CompactByteString = + if (string.isEmpty) empty else ByteString.ByteString1C(string.getBytes(charset)) /** * Creates a new CompactByteString by copying length bytes starting at offset from @@ -452,7 +448,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { this } - protected def fillByteBuffer(len: Int, byteOrder: ByteOrder)(fill: ByteBuffer ⇒ Unit): this.type = { + @inline protected final def fillByteBuffer(len: Int, byteOrder: ByteOrder)(fill: ByteBuffer ⇒ Unit): this.type = { fillArray(len) { case (array, start) ⇒ val buffer = ByteBuffer.wrap(array, start, len) diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index ef4fe0c078..e2da7f0b2f 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -6,7 +6,7 @@ package akka.camel import akka.camel.internal._ import akka.util.Timeout -import akka.dispatch.Future +import scala.concurrent.Future import java.util.concurrent.TimeoutException import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ diff --git a/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala index ac085c7079..6e942a0160 100644 --- a/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala +++ b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -8,7 +8,7 @@ package docs.circuitbreaker import scala.concurrent.util.duration._ // small d is important here import akka.pattern.CircuitBreaker import akka.actor.Actor -import akka.dispatch.Future +import scala.concurrent.Future import akka.event.Logging //#imports1 diff --git a/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java index 81e09c1fc5..4c45acf0b0 100644 --- a/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -6,13 +6,13 @@ package docs.circuitbreaker; //#imports1 import akka.actor.UntypedActor; -import akka.dispatch.Future; +import scala.concurrent.Future; import akka.event.LoggingAdapter; import scala.concurrent.util.Duration; import akka.pattern.CircuitBreaker; import akka.event.Logging; -import static akka.dispatch.Futures.future; +import static scala.concurrent.Futures.future; import java.util.concurrent.Callable; diff --git a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java index 0bc30381f6..d0b1396fef 100644 --- a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java @@ -10,8 +10,8 @@ import akka.actor.Props; //#imports //#import-future -import akka.dispatch.Future; -import akka.dispatch.Futures; +import scala.concurrent.Future; +import scala.concurrent.Futures; import akka.dispatch.Mapper; import scala.concurrent.Await; import scala.concurrent.util.Duration; @@ -33,7 +33,7 @@ import akka.actor.Terminated; //#import-gracefulStop import static akka.pattern.Patterns.gracefulStop; -import akka.dispatch.Future; +import scala.concurrent.Future; import scala.concurrent.Await; import scala.concurrent.util.Duration; import akka.pattern.AskTimeoutException; @@ -42,8 +42,8 @@ import akka.pattern.AskTimeoutException; //#import-askPipe import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe; -import akka.dispatch.Future; -import akka.dispatch.Futures; +import scala.concurrent.Future; +import scala.concurrent.Futures; import scala.concurrent.util.Duration; import akka.util.Timeout; import java.util.concurrent.TimeUnit; diff --git a/akka-docs/java/code/docs/future/FutureDocTestBase.java b/akka-docs/java/code/docs/future/FutureDocTestBase.java index 6012cc715e..c6c01769ff 100644 --- a/akka-docs/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/docs/future/FutureDocTestBase.java @@ -14,29 +14,29 @@ import akka.util.Timeout; import scala.concurrent.util.Duration; import akka.japi.Function; import java.util.concurrent.Callable; -import static akka.dispatch.Futures.future; +import static scala.concurrent.Futures.future; import static java.util.concurrent.TimeUnit.SECONDS; //#imports2 //#imports3 -import static akka.dispatch.Futures.sequence; +import static scala.concurrent.Futures.sequence; //#imports3 //#imports4 -import static akka.dispatch.Futures.traverse; +import static scala.concurrent.Futures.traverse; //#imports4 //#imports5 import akka.japi.Function2; -import static akka.dispatch.Futures.fold; +import static scala.concurrent.Futures.fold; //#imports5 //#imports6 -import static akka.dispatch.Futures.reduce; +import static scala.concurrent.Futures.reduce; //#imports6 diff --git a/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java index 883887856e..b1df65f61a 100644 --- a/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java @@ -16,7 +16,7 @@ import akka.routing.*; import scala.concurrent.util.Duration; import akka.util.Timeout; import scala.concurrent.Await; -import akka.dispatch.Future; +import scala.concurrent.Future; import akka.dispatch.Dispatchers; import akka.testkit.AkkaSpec; import com.typesafe.config.ConfigFactory; diff --git a/akka-docs/java/code/docs/jrouting/ParentActor.java b/akka-docs/java/code/docs/jrouting/ParentActor.java index bf86f055e9..c21aed2dc6 100644 --- a/akka-docs/java/code/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/docs/jrouting/ParentActor.java @@ -13,7 +13,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import scala.concurrent.util.Duration; import akka.util.Timeout; -import akka.dispatch.Future; +import scala.concurrent.Future; import scala.concurrent.Await; //#parentActor diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index 4d36872f1a..a2b4ff9a9d 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -97,7 +97,7 @@ Method dispatch semantics Methods returning: * ``void`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell`` - * ``akka.dispatch.Future`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask`` + * ``scala.concurrent.Future`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask`` * ``scala.Option`` or ``akka.japi.Option`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer, and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise. Any exception that was thrown during this call will be rethrown. diff --git a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala index 0b803500ec..3526adb6cd 100644 --- a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala @@ -12,7 +12,7 @@ import akka.event.Logging //#imports1 -import akka.dispatch.Future +import scala.concurrent.Future import akka.actor.{ ActorRef, ActorSystem } import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers diff --git a/akka-docs/scala/code/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/docs/future/FutureDocSpec.scala index 6f13a7a1f1..a21f52f0c2 100644 --- a/akka-docs/scala/code/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/docs/future/FutureDocSpec.scala @@ -79,7 +79,7 @@ class FutureDocSpec extends AkkaSpec { val msg = "hello" implicit val timeout = Timeout(5 seconds) //#map-to - import akka.dispatch.Future + import scala.concurrent.Future import akka.pattern.ask val future: Future[String] = ask(actor, msg).mapTo[String] @@ -90,7 +90,7 @@ class FutureDocSpec extends AkkaSpec { "demonstrate usage of simple future eval" in { //#future-eval import scala.concurrent.Await - import akka.dispatch.Future + import scala.concurrent.Future import scala.concurrent.util.duration._ val future = Future { diff --git a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala index 2f0df367a9..3f45857d70 100644 --- a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala @@ -9,7 +9,7 @@ import language.postfixOps import akka.testkit.TestProbe import scala.concurrent.util.duration._ import akka.actor._ -import akka.dispatch.Futures +import scala.concurrent.Futures //#imports-test-probe diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index bd7d92f924..ce9c608e4e 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -97,7 +97,7 @@ Method dispatch semantics Methods returning: * ``Unit`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell`` - * ``akka.dispatch.Future[_]`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask`` + * ``scala.concurrent.Future[_]`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask`` * ``scala.Option[_]`` or ``akka.japi.Option`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer, and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise. Any exception that was thrown during this call will be rethrown. diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index a1cae44773..d875f34747 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -17,7 +17,7 @@ import akka.event.{ LoggingAdapter, Logging } import scala.util.control.NoStackTrace import akka.event.LoggingReceive import java.net.InetSocketAddress -import akka.dispatch.Future +import scala.concurrent.Future import akka.actor.{ OneForOneStrategy, SupervisorStrategy, Status, Address, PoisonPill } import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit.MILLISECONDS diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index f50df526f1..273b3cf1d0 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -16,7 +16,7 @@ import com.typesafe.config.ConfigFactory import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeoutException import akka.pattern.{ ask, pipe, AskTimeoutException } -import akka.dispatch.Future +import scala.concurrent.Future import scala.util.control.NoStackTrace import akka.event.{ LoggingAdapter, Logging } import java.net.{ InetSocketAddress, ConnectException } diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index 87427802f6..1c09613fe0 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -10,7 +10,7 @@ import akka.remote.netty.NettyRemoteTransport import akka.actor.Actor import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout -import akka.dispatch.Future +import scala.concurrent.Future import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.atomic.AtomicBoolean diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 24d3084c88..dd0c36873d 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -6,7 +6,7 @@ package akka.remote import akka.testkit._ import akka.actor._ import com.typesafe.config._ -import akka.dispatch.Future +import scala.concurrent.Future import scala.concurrent.Await import akka.pattern.ask diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 16d2542da5..7bd2663b07 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -8,7 +8,7 @@ import language.postfixOps import akka.testkit._ import akka.actor._ import com.typesafe.config._ -import akka.dispatch.Future +import scala.concurrent.Future import akka.pattern.ask import java.io.File import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index b4c7547e58..7df30be4c7 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -9,8 +9,7 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.event.Logging.Warning -import akka.dispatch.{ Future, Promise } -import scala.concurrent.Await +import scala.concurrent.{ Future, Promise, Await } import scala.concurrent.util.duration._ import akka.actor.ActorSystem import akka.pattern.ask diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 7979762d7b..8173baabf3 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -6,9 +6,8 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ -import scala.concurrent.Await +import scala.concurrent.{ Future, Await } import scala.concurrent.util.duration._ -import akka.dispatch.Future import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java index a198419682..60c4873b27 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java @@ -17,7 +17,7 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import scala.concurrent.Await; -import akka.dispatch.Future; +import scala.concurrent.Future; import static akka.pattern.Patterns.ask; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java index 483eb52d2c..b24d000ced 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java @@ -17,7 +17,7 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import scala.concurrent.Await; -import akka.dispatch.Future; +import scala.concurrent.Future; import static akka.pattern.Patterns.ask; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter;