From 3b1330c6d7d0c8eb8b0eb13d7436b5091b4d107b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 11 Dec 2011 00:40:52 +0100 Subject: [PATCH] Tests are green with new Futures, consider this a half-way-there marker --- .../java/akka/dispatch/JavaFutureTests.java | 57 +-- .../test/scala/akka/actor/ActorRefSpec.scala | 6 +- .../scala/akka/actor/ActorTimeoutSpec.scala | 13 +- .../src/test/scala/akka/actor/IOActor.scala | 6 +- .../actor/LocalActorRefProviderSpec.scala | 4 +- .../scala/akka/actor/TypedActorSpec.scala | 4 +- .../test/scala/akka/dispatch/FutureSpec.scala | 115 ++--- .../akka/dispatch/MailboxConfigSpec.scala | 16 +- .../akka/dispatch/PromiseStreamSpec.scala | 34 -- .../trading/system/MatchingEngine.scala | 3 - .../scala/akka/routing/ActorPoolSpec.scala | 6 +- .../scala/akka/ticket/Ticket703Spec.scala | 4 +- .../src/test/scala/akka/util/IndexSpec.scala | 4 +- .../src/main/scala/akka/actor/ActorRef.scala | 15 +- .../scala/akka/actor/ActorRefProvider.scala | 16 +- .../main/scala/akka/actor/TypedActor.scala | 3 +- .../src/main/scala/akka/dispatch/Future.scala | 422 ++++-------------- .../scala/akka/dispatch/PromiseStream.scala | 4 +- .../scala/akka/dispatch/japi/Future.scala | 15 +- .../src/main/scala/akka/event/Logging.scala | 4 +- .../scala/akka/cluster/TransactionLog.scala | 6 +- .../local/LocalMetricsMultiJvmSpec.scala | 2 +- akka-docs/.history | 1 + .../intro/getting-started-first-java.rst | 2 +- .../actor/mailbox/MongoBasedMailbox.scala | 21 +- .../akka/remote/RemoteActorRefProvider.scala | 8 +- .../scala/TypedActorSpringFeatureTest.scala | 7 +- .../src/main/scala/akka/agent/Agent.scala | 10 +- .../example/UntypedCoordinatedExample.java | 9 +- .../example/UntypedTransactorExample.java | 9 +- .../test/UntypedTransactorTest.java | 8 +- .../scala/akka/agent/test/AgentSpec.scala | 45 +- .../test/scala/akka/testkit/AkkaSpec.scala | 9 +- 33 files changed, 287 insertions(+), 601 deletions(-) create mode 100644 akka-docs/.history diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index d534d87103..a7f1d09fbc 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -22,14 +22,14 @@ import akka.testkit.AkkaSpec; public class JavaFutureTests { private static ActorSystem system; - private static FutureFactory ff; + private volatile static FutureFactory ff; private static Timeout t; @BeforeClass public static void beforeAll() { system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf()); t = system.settings().ActorTimeout(); - ff = new FutureFactory(system.dispatcher(), t); + ff = new FutureFactory(system.dispatcher()); } @AfterClass @@ -51,7 +51,7 @@ public class JavaFutureTests { public String apply(String s) { return s + " World"; } - }, t); + }); assertEquals("Hello World", f2.get()); } @@ -59,8 +59,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); Future f = cf; f.onResult(new Procedure() { public void apply(String result) { @@ -77,8 +76,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); Future f = cf; f.onException(new Procedure() { public void apply(Throwable t) { @@ -93,27 +91,10 @@ public class JavaFutureTests { assertEquals(f.exception().get(), exception); } - @Test - public void mustBeAbleToExecuteAnOnTimeoutCallback() throws Throwable { - final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); - Future f = cf; - f.onTimeout(new Procedure>() { - public void apply(Future future) { - latch.countDown(); - } - }); - - assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertTrue(f.value().isEmpty()); - } - @Test public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); Future f = cf; f.onComplete(new Procedure>() { public void apply(akka.dispatch.Future future) { @@ -129,8 +110,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToForeachAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); Future f = cf; f.foreach(new Procedure() { public void apply(String future) { @@ -146,19 +126,17 @@ public class JavaFutureTests { @Test public void mustBeAbleToFlatMapAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); cf.completeWithResult("1000"); Future f = cf; Future r = f.flatMap(new Function>() { public Future apply(String r) { latch.countDown(); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); cf.completeWithResult(Integer.parseInt(r)); return cf; } - }, t); + }); assertEquals(f.get(), "1000"); assertEquals(r.get().intValue(), 1000); @@ -168,15 +146,14 @@ public class JavaFutureTests { @Test public void mustBeAbleToFilterAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, system - .dispatcherFactory().defaultGlobalDispatcher()); + Promise cf = ff.promise(); Future f = cf; Future r = f.filter(new Function() { public Boolean apply(String r) { latch.countDown(); return r.equals("foo"); } - }, t); + }); cf.completeWithResult("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); @@ -199,7 +176,7 @@ public class JavaFutureTests { })); } - Future> futureList = ff.sequence(listFutures, t); + Future> futureList = ff.sequence(listFutures); assertEquals(futureList.get(), listExpected); } @@ -219,7 +196,7 @@ public class JavaFutureTests { })); } - Future result = ff.fold("", 15000, listFutures, new Function2() { + Future result = ff.fold("", listFutures, new Function2() { public String apply(String r, String t) { return r + t; } @@ -242,7 +219,7 @@ public class JavaFutureTests { })); } - Future result = ff.reduce(listFutures, 15000, new Function2() { + Future result = ff.reduce(listFutures, new Function2() { public String apply(String r, String t) { return r + t; } @@ -261,7 +238,7 @@ public class JavaFutureTests { listStrings.add("test"); } - Future> result = ff.traverse(listStrings, t, new Function>() { + Future> result = ff.traverse(listStrings, new Function>() { public Future apply(final String r) { return ff.future(new Callable() { public String call() { @@ -290,7 +267,7 @@ public class JavaFutureTests { public Boolean apply(Integer i) { return i == 5; } - }, t); + }); final Integer got = f.get().get(); assertEquals(expect, got); diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index b3b8ece741..dd130bc0ad 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -11,9 +11,9 @@ import akka.testkit._ import akka.util.duration._ import java.lang.IllegalStateException import akka.util.ReflectiveAccess -import akka.dispatch.{ DefaultPromise, Promise, Future } import akka.serialization.Serialization import java.util.concurrent.{ CountDownLatch, TimeUnit } +import akka.dispatch.{ Block, DefaultPromise, Promise, Future } object ActorRefSpec { @@ -126,9 +126,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { } def wrap[T](f: Promise[Actor] ⇒ T): T = { - val result = new DefaultPromise[Actor](10 * 60 * 1000) + val result = Promise[Actor]() val r = f(result) - result.get + Block.on(result, 1 minute).resultOrException r } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index 735867bc97..ddd040b2d6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -4,10 +4,11 @@ package akka.actor import org.scalatest.BeforeAndAfterAll -import akka.dispatch.FutureTimeoutException import akka.util.duration._ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout +import java.util.concurrent.TimeoutException +import akka.dispatch.Block @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { @@ -28,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val echo = actorWithTimeout(Timeout(12)) try { val f = echo ? "hallo" - intercept[FutureTimeoutException] { f.await } + intercept[TimeoutException] { Block.on(f, system.settings.ActorTimeout.duration) } } finally { echo.stop } } } @@ -39,7 +40,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val echo = actorWithTimeout(Props.defaultTimeout) try { val f = (echo ? "hallo").mapTo[String] - intercept[FutureTimeoutException] { f.await } + intercept[TimeoutException] { Block.on(f, timeout.duration) } f.value must be(None) } finally { echo.stop } } @@ -48,7 +49,11 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { val echo = actorWithTimeout(Props.defaultTimeout) - try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { echo.stop } + val f = echo.?("hallo", testTimeout) + try { + intercept[TimeoutException] { Block.on(f, testTimeout) } + f.value must be === None + } finally { echo.stop } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 8e15f5fbbe..3fd3b32578 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -8,9 +8,9 @@ import org.scalatest.BeforeAndAfterEach import akka.util.ByteString import akka.util.cps._ -import akka.dispatch.Future import scala.util.continuations._ import akka.testkit._ +import akka.dispatch.{ Block, Future } object IOActorSpec { import IO._ @@ -239,9 +239,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val f1 = client1 ? (('set, "hello", ByteString("World"))) val f2 = client1 ? (('set, "test", ByteString("No one will read me"))) val f3 = client1 ? (('get, "hello")) - f2.await + Block.on(f2, timeout.duration) val f4 = client2 ? (('set, "test", ByteString("I'm a test!"))) - f4.await + Block.on(f4, timeout.duration) val f5 = client1 ? (('get, "test")) val f6 = client2 ? 'getall f1.get must equal("OK") diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 991332871c..ecef8daf65 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -6,7 +6,7 @@ package akka.actor import akka.testkit._ import akka.util.duration._ -import akka.dispatch.Future +import akka.dispatch.{ Block, Future } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class LocalActorRefProviderSpec extends AkkaSpec { @@ -32,7 +32,7 @@ class LocalActorRefProviderSpec extends AkkaSpec { val address = "new-actor" + i implicit val timeout = Timeout(5 seconds) val actors = for (j ← 1 to 4) yield Future(system.actorOf(Props(c ⇒ { case _ ⇒ }), address)) - val set = Set() ++ actors.map(_.await.value match { + val set = Set() ++ actors.map(a ⇒ Block.on(a, timeout.duration).value match { case Some(Right(a: ActorRef)) ⇒ 1 case Some(Left(ex: InvalidActorNameException)) ⇒ 2 case x ⇒ x diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 3cc54ea6bb..17185c2ff4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -7,7 +7,6 @@ package akka.actor import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.util.Duration import akka.util.duration._ -import akka.dispatch.{ Dispatchers, Future, KeptPromise } import akka.serialization.Serialization import java.util.concurrent.atomic.AtomicReference import annotation.tailrec @@ -17,6 +16,7 @@ import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.japi.{ Creator, Option ⇒ JOption } import akka.testkit.DefaultTimeout +import akka.dispatch.{ Block, Dispatchers, Future, KeptPromise } object TypedActorSpec { @@ -296,7 +296,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte t.failingPigdog() t.read() must be(1) //Make sure state is not reset after failure - t.failingFuturePigdog.await.exception.get.getMessage must be("expected") + Block.on(t.failingFuturePigdog, 2 seconds).exception.get.getMessage must be("expected") t.read() must be(1) //Make sure state is not reset after failure (intercept[IllegalStateException] { t.failingJOptionPigdog }).getMessage must be("expected") diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 2ef735b05f..c5d9801e00 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -10,11 +10,11 @@ import akka.actor.{ Actor, ActorRef, Status } import akka.testkit.{ EventFilter, filterEvents, filterException } import akka.util.duration._ import org.multiverse.api.latches.StandardLatch -import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.testkit.AkkaSpec import org.scalatest.junit.JUnitSuite import java.lang.ArithmeticException import akka.testkit.DefaultTimeout +import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } object FutureSpec { class TestActor extends Actor { @@ -47,7 +47,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "never completed" must { behave like emptyFuture(_(Promise())) "return supplied value on timeout" in { - val promise = Promise[String](100) orElse "Timedout" + val timedOut = new KeptPromise[String](Right("Timedout")) + val promise = Promise[String]() orElse timedOut promise.get must be("Timedout") } } @@ -61,9 +62,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = Promise[String]().complete(Left(new RuntimeException(message))) behave like futureWithException[RuntimeException](_(future, message)) } - "expired" must { - behave like expiredFuture(_(Promise(0))) - } } "A Future" when { @@ -78,7 +76,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } test(future) latch.open - future.await + Block.on(future, timeout.duration) } } "is completed" must { @@ -90,7 +88,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa result } latch.open - future.await + Block.on(future, timeout.duration) test(future, result) } } @@ -99,8 +97,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[ArithmeticException] { check({ (future: Future[Int], actions: List[FutureAction]) ⇒ val result = (future /: actions)(_ /: _) - val expected = (future.await.value.get /: actions)(_ /: _) - ((result.await.value.get, expected) match { + val expected = (Block.on(future, timeout.duration).value.get /: actions)(_ /: _) + ((Block.on(result, timeout.duration).value.get, expected) match { case (Right(a), Right(b)) ⇒ a == b case (Left(a), Left(b)) if a.toString == b.toString ⇒ true case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ @@ -118,7 +116,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa behave like futureWithResult { test ⇒ val actor = system.actorOf[TestActor] val future = actor ? "Hello" - future.await + Block.on(future, timeout.duration) test(future, "World") actor.stop() } @@ -128,7 +126,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[RuntimeException] { val actor = system.actorOf[TestActor] val future = actor ? "Failure" - future.await + Block.on(future, timeout.duration) test(future, "Expected exception; to test fault-tolerance") actor.stop() } @@ -142,7 +140,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor1 = system.actorOf[TestActor] val actor2 = system.actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } - future.await + Block.on(future, timeout.duration) test(future, "WORLD") actor1.stop() actor2.stop() @@ -154,7 +152,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor1 = system.actorOf[TestActor] val actor2 = system.actorOf(new Actor { def receive = { case s: String ⇒ sender ! Status.Failure(new ArithmeticException("/ by zero")) } }) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } - future.await + Block.on(future, timeout.duration) test(future, "/ by zero") actor1.stop() actor2.stop() @@ -167,7 +165,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor1 = system.actorOf[TestActor] val actor2 = system.actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }) val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i } - future.await + Block.on(future, timeout.duration) test(future, "World (of class java.lang.String)") actor1.stop() actor2.stop() @@ -285,7 +283,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "firstCompletedOf" in { - val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5)) + val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ new KeptPromise[Int](Right(5)) Future.firstCompletedOf(futures).get must be(5) } @@ -306,7 +304,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - Future.fold(futures, timeout)(0)(_ + _).get must be(45) + Block.on(Future.fold(futures)(0)(_ + _), timeout millis).resultOrException.get must be(45) } "fold by composing" in { @@ -333,18 +331,19 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - Future.fold(futures, timeout)(0)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") + Block.on(Future.fold(futures)(0)(_ + _), timeout millis).exception.get.getMessage must be("shouldFoldResultsWithException: expected") } } "fold mutable zeroes safely" in { import scala.collection.mutable.ArrayBuffer def test(testNumber: Int) { - val fs = (0 to 1000) map (i ⇒ Future(i, 10000)) - val result = Future.fold(fs, 10000)(ArrayBuffer.empty[AnyRef]) { + val fs = (0 to 1000) map (i ⇒ Future(i)) + val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) { case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l - }.get.asInstanceOf[ArrayBuffer[Int]].sum + } + val result = Block.on(f.mapTo[ArrayBuffer[Int]], 10000 millis).resultOrException.get.sum assert(result === 250500) } @@ -364,7 +363,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - assert(Future.reduce(futures, timeout)(_ + _).get === 45) + assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).resultOrException.get === 45) } "shouldReduceResultsWithException" in { @@ -381,7 +380,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") + assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).exception.get.getMessage === "shouldFoldResultsWithException: expected") } } @@ -421,9 +420,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa class ThrowableTest(m: String) extends Throwable(m) filterException[ThrowableTest] { - val f1 = Future { throw new ThrowableTest("test") } - f1.await - intercept[ThrowableTest] { f1.get } + val f1 = Future[Any] { throw new ThrowableTest("test") } + intercept[ThrowableTest] { Block.on(f1, timeout.duration).resultOrException.get } val latch = new StandardLatch val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } @@ -431,12 +429,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open - f2.await - assert(f2.get === "success") + assert(Block.on(f2, timeout.duration).resultOrException.get === "success") f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } - f3.await - assert(f3.get === "SUCCESS") + assert(Block.on(f3, timeout.duration).resultOrException.get === "SUCCESS") } } @@ -450,10 +446,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa latch.open assert(f2.get === 10) - val f3 = Future({ Thread.sleep(10); 5 }, 10 millis) - filterException[FutureTimeoutException] { - intercept[FutureTimeoutException] { - f3.get + val f3 = Future({ Thread.sleep(100); 5 }) + filterException[TimeoutException] { + intercept[TimeoutException] { + Block.on(f3, 0 millis) } } } @@ -556,24 +552,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(result2.get === 50) } - "shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry" in { - val latch = new StandardLatch - val f = Promise[Int](0) - Thread.sleep(25) - f.onComplete(_ ⇒ latch.open) //Shouldn't throw any exception here - - assert(f.isExpired) //Should be expired - - f.complete(Right(1)) //Shouldn't complete the Future since it is expired - - assert(f.value.isEmpty) //Shouldn't be completed - assert(!latch.isOpen) //Shouldn't run the listener - } - "futureDataFlowShouldEmulateBlocking1" in { import Future.flow - val one, two = Promise[Int](1000 * 60) + val one, two = Promise[Int]() val simpleResult = flow { one() + two() } @@ -582,14 +564,14 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { one << 1 } - one.await + Block.on(one, 1 minute) assert(one.isCompleted) assert(List(two, simpleResult).forall(_.isCompleted == false)) flow { two << 9 } - two.await + Block.on(two, 1 minute) assert(List(one, two).forall(_.isCompleted == true)) assert(simpleResult.get === 10) @@ -598,7 +580,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "futureDataFlowShouldEmulateBlocking2" in { import Future.flow - val x1, x2, y1, y2 = Promise[Int](1000 * 60) + val x1, x2, y1, y2 = Promise[Int]() val lx, ly, lz = new StandardLatch val result = flow { lx.open() @@ -616,17 +598,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { y1 << 1 } // When this is set, it should cascade down the line assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) - assert(x1.get === 1) + assert(Block.on(x1, 1 minute).resultOrException.get === 1) assert(!lz.isOpen) flow { y2 << 9 } // When this is set, it should cascade down the line assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) - assert(x2.get === 9) + assert(Block.on(x2, 1 minute).resultOrException.get === 9) assert(List(x1, x2, y1, y2).forall(_.isCompleted == true)) - assert(result.get === 10) + assert(Block.on(result, 1 minute).resultOrException.get === 10) } "dataFlowAPIshouldbeSlick" in { @@ -717,8 +699,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(!checkType(rInt, manifest[Nothing])) assert(!checkType(rInt, manifest[Any])) - rString.await - rInt.await + Block.on(rString, timeout.duration).resultOrException + Block.on(rInt, timeout.duration).resultOrException } "futureFlowSimpleAssign" in { @@ -810,30 +792,29 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa latch(8).open latch(9).await - f4.await must be('completed) + Block.on(f4, timeout.duration) must be('completed) } "should not deadlock with nested await (ticket 1313)" in { val simple = Future() map (_ ⇒ (Future(()) map (_ ⇒ ())).get) - simple.await must be('completed) + Block.on(simple, timeout.duration) must be('completed) val l1, l2 = new StandardLatch val complex = Future() map { _ ⇒ - Future.blocking() - val nested = Future() + Future.blocking(system.dispatcher) + val nested = Future(()) nested foreach (_ ⇒ l1.open) l1.await // make sure nested is completed nested foreach (_ ⇒ l2.open) l2.await } - assert(complex.await.isCompleted) + assert(Block.on(complex, timeout.duration).isCompleted) } } } def emptyFuture(f: (Future[Any] ⇒ Unit) ⇒ Unit) { "not be completed" in { f(_ must not be ('completed)) } - "not be expired" in { f(_ must not be ('expired)) } "not contain a value" in { f(_.value must be(None)) } "not contain a result" in { f(_.result must be(None)) } "not contain an exception" in { f(_.exception must be(None)) } @@ -841,13 +822,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } - "not be expired" in { f((future, _) ⇒ future must not be ('expired)) } "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } "contain a result" in { f((future, result) ⇒ future.result must be(Some(result))) } "not contain an exception" in { f((future, _) ⇒ future.exception must be(None)) } "return result with 'get'" in { f((future, result) ⇒ future.get must be(result)) } "return result with 'resultOrException'" in { f((future, result) ⇒ future.resultOrException must be(Some(result))) } - "not timeout" in { f((future, _) ⇒ future.await) } + "not timeout" in { f((future, _) ⇒ Block.on(future, 0 millis)) } "filter result" in { f { (future, result) ⇒ (future filter (_ ⇒ true)).get must be(result) @@ -866,13 +846,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } - "not be expired" in { f((future, _) ⇒ future must not be ('expired)) } "contain a value" in { f((future, _) ⇒ future.value must be('defined)) } "not contain a result" in { f((future, _) ⇒ future.result must be(None)) } "contain an exception" in { f((future, message) ⇒ future.exception.get.getMessage must be(message)) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { future.get } must produce[E]).getMessage must be(message)) } "throw exception with 'resultOrException'" in { f((future, message) ⇒ (evaluating { future.resultOrException } must produce[E]).getMessage must be(message)) } - "not timeout" in { f((future, _) ⇒ future.await) } "retain exception with filter" in { f { (future, message) ⇒ (evaluating { (future filter (_ ⇒ true)).get } must produce[E]).getMessage must be(message) @@ -889,11 +867,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "always cast successfully using mapTo" is pending } - def expiredFuture(f: (Future[Any] ⇒ Unit) ⇒ Unit) { - "not be completed" in { f(_ must not be ('completed)) } - "be expired" in { f(_ must be('expired)) } - } - sealed trait IntAction { def apply(that: Int): Int } case class IntAdd(n: Int) extends IntAction { def apply(that: Int) = that + n } case class IntSub(n: Int) extends IntAction { def apply(that: Int) = that - n } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 7af8f057d8..7693a85632 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -17,13 +17,9 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val q = factory(config) ensureInitialMailboxState(config, q) - implicit val within = 1 second + val f = spawn { q.dequeue } - val f = spawn { - q.dequeue - } - - f.await.resultOrException must be === Some(null) + Block.on(f, 1 second).resultOrException must be === Some(null) } "create a bounded mailbox with 10 capacity and with push timeout" in { @@ -61,8 +57,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } //CANDIDATE FOR TESTKIT - def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = { - val result = new DefaultPromise[T](within.length, within.unit) + def spawn[T <: AnyRef](fun: ⇒ T): Future[T] = { + val result = Promise[T]() val t = new Thread(new Runnable { def run = try { result.completeWithResult(fun) @@ -119,8 +115,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val consumers = for (i ← (1 to 4).toList) yield createConsumer - val ps = producers.map(_.await.resultOrException.get) - val cs = consumers.map(_.await.resultOrException.get) + val ps = producers.map(Block.on(_, within).resultOrException.get) + val cs = consumers.map(Block.on(_, within).resultOrException.get) ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala index 724beba6bb..bb2d857aae 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala @@ -40,40 +40,6 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { assert(c.get === 3) } - "timeout" in { - val a, c = Promise[Int]() - val b = Promise[Int](0) - val q = PromiseStream[Int](1000) - flow { - a << q() - b << q() - c << q() - } - Thread.sleep(10) - flow { - q << (1, 2) - q << 3 - } - assert(a.get === 1) - intercept[FutureTimeoutException] { b.get } - assert(c.get === 3) - } - - "timeout again" in { - val q = PromiseStream[Int](500) - val a = q.dequeue() - val b = q.dequeue() - q += 1 - Thread.sleep(500) - q += (2, 3) - val c = q.dequeue() - val d = q.dequeue() - assert(a.get === 1) - intercept[FutureTimeoutException] { b.get } - assert(c.get === 2) - assert(d.get === 3) - } - "pend again" in { val a, b, c, d = Promise[Int]() val q1, q2 = PromiseStream[Int]() diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala index cabf890488..cf5142f0ba 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/MatchingEngine.scala @@ -2,9 +2,6 @@ package akka.performance.trading.system import akka.performance.trading.domain._ import akka.actor._ -import akka.dispatch.Future -import akka.dispatch.FutureTimeoutException -import akka.dispatch.MessageDispatcher trait MatchingEngine { val meId: String diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 943718848a..6b724d4d74 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -1,11 +1,11 @@ package akka.routing -import akka.dispatch.{ KeptPromise, Future } import akka.actor._ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import akka.testkit.AkkaSpec +import akka.dispatch.{ Block, KeptPromise, Future } object ActorPoolSpec { @@ -125,8 +125,8 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { }).withFaultHandler(faultHandler)) try { - (for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach { - _.await.resultOrException.get must be("Response") + (for (count ← 1 to 500) yield pool.?("Test", 20 seconds)) foreach { + Block.on(_, 20 seconds).resultOrException.get must be("Response") } } finally { pool.stop() diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index 09b5f5e24c..2c11150d72 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -3,6 +3,8 @@ package akka.ticket import akka.actor._ import akka.routing._ import akka.testkit.AkkaSpec +import akka.dispatch.Block +import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket703Spec extends AkkaSpec { @@ -26,7 +28,7 @@ class Ticket703Spec extends AkkaSpec { } })) }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) - (actorPool.?("Ping", 10000)).await.result must be === Some("Response") + Block.on(actorPool.?("Ping", 10000), 10 seconds).resultOrException.get must be === "Response" } } } diff --git a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala index 271f8a9ac0..056f7d7897 100644 --- a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala @@ -4,7 +4,7 @@ package akka.util import org.scalatest.matchers.MustMatchers -import akka.dispatch.Future +import akka.dispatch.{ Future, Block } import akka.testkit.AkkaSpec import scala.util.Random import akka.testkit.DefaultTimeout @@ -125,7 +125,7 @@ class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout { val tasks = List.fill(nrOfTasks)(executeRandomTask) - tasks.foreach(_.await) + tasks.foreach(Block.on(_, timeout.duration)) tasks.foreach(_.exception.map(throw _)) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 648e671c50..8d7f3edba3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -11,10 +11,10 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.Serialization import java.net.InetSocketAddress import akka.remote.RemoteAddress -import java.util.concurrent.TimeUnit import akka.event.EventStream import akka.event.DeathWatch import scala.annotation.tailrec +import java.util.concurrent.{ TimeoutException, TimeUnit } /** * ActorRef is an immutable and serializable handle to an Actor. @@ -407,18 +407,9 @@ class AskActorRef( val path: ActorPath, override val getParent: InternalActorRef, deathWatch: DeathWatch, - timeout: Timeout, val dispatcher: MessageDispatcher) extends MinimalActorRef { - final val result = new DefaultPromise[Any](timeout)(dispatcher) - - { - val callback: Future[Any] ⇒ Unit = { _ ⇒ deathWatch.publish(Terminated(AskActorRef.this)); whenDone() } - result onComplete callback - result onTimeout callback - } - - protected def whenDone(): Unit = () + final val result = Promise[Any]()(dispatcher) override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case Status.Success(r) ⇒ result.completeWithResult(r) @@ -434,7 +425,7 @@ class AskActorRef( override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher) - override def isTerminated = result.isCompleted || result.isExpired + override def isTerminated = result.isCompleted override def stop(): Unit = if (!isTerminated) result.completeWithException(new ActorKilledException("Stopped")) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d68a1349f0..0b0e6a4a2f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -442,7 +442,7 @@ class LocalActorRefProvider( def dispatcher: MessageDispatcher = system.dispatcher - lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher) + lazy val terminationFuture: Promise[Unit] = Promise[Unit]()(dispatcher) lazy val rootGuardian: InternalActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) { override def getParent: InternalActorRef = this @@ -567,16 +567,20 @@ class LocalActorRefProvider( import akka.dispatch.DefaultPromise (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ - new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout + Promise[Any]()(dispatcher) //Abort early if nonsensical timeout case t ⇒ val path = tempPath() val name = path.name - val a = new AskActorRef(path, tempContainer, deathWatch, t, dispatcher) { - override def whenDone() { - tempContainer.children.remove(name) + val a = new AskActorRef(path, tempContainer, deathWatch, dispatcher) + tempContainer.children.put(name, a) + val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { tempContainer.children.remove(name) } + a.result onComplete { _ ⇒ + try { f.cancel() } + finally { + try { tempContainer.children.remove(name) } + finally { deathWatch.publish(Terminated(a)) } } } - tempContainer.children.put(name, a) recipient.tell(message, a) a.result } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 711a4ac235..83c1eb371c 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serializer, Serialization } import akka.dispatch._ import akka.serialization.SerializationExtension +import java.util.concurrent.TimeoutException trait TypedActorFactory { @@ -409,7 +410,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case m if m.returnsFuture_? ⇒ actor.?(m, timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ val f = actor.?(m, timeout) - (try { f.await.value } catch { case _: FutureTimeoutException ⇒ None }) match { + (try { Block.on(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None case Some(Right(joption: AnyRef)) ⇒ joption case Some(Left(ex)) ⇒ throw ex diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index a09f28f6a9..fee1d3458a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -13,7 +13,6 @@ import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } import scala.util.continuations._ -import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } @@ -22,68 +21,59 @@ import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } +import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable } +import akka.dispatch.Block.CanBlock -class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { - def this(message: String) = this(message, null) +object Block { + sealed trait CanBlock + + trait Blockable { + + /** + * Should throw java.util.concurrent.TimeoutException if times out + */ + def block(atMost: Duration)(implicit permit: CanBlock): this.type + } + + private implicit val permit = new CanBlock {} + + def on[T <: Blockable](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost) } -class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout) { +class FutureFactory(implicit dispatcher: MessageDispatcher) { /** * Java API, equivalent to Future.apply */ def future[T](body: Callable[T]): Future[T] = - Future(body.call, timeout) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], timeout: Timeout): Future[T] = - Future(body.call, timeout) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], timeout: Long): Future[T] = - Future(body.call, timeout) + Future(body.call) /** * Java API, equivalent to Future.apply */ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = - Future(body.call)(dispatcher, timeout) + Future(body.call)(dispatcher) /** - * Java API, equivalent to Future.apply + * Java API, equivalent to Promise.apply */ - def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] = - Future(body.call)(dispatcher, timeout) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = - Future(body.call)(dispatcher, timeout) + def promise[T](): Promise[T] = Promise[T]() /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = { + def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = { val pred: T ⇒ Boolean = predicate.apply(_) - Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)), timeout)(pred).map(JOption.fromScalaOption(_))(timeout) + Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(pred).map(JOption.fromScalaOption(_)) } - def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = find(futures, predicate, timeout) - /** * Java API. * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] = - Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) - - def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = firstCompletedOf(futures, timeout) + def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures)) /** * Java API @@ -92,31 +82,22 @@ class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout) * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(zero)(fun.apply _) - - def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun) - - def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout, futures, fun) + def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = + Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _) /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] = - Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) - - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun) - - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout, fun) + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) /** * Java API. * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = { - implicit val t = timeout + def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = { scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒ for (r ← fr; a ← fa) yield { r add a @@ -124,27 +105,18 @@ class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout) }) } - def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, timeout) - /** * Java API. * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ - def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = { - implicit val t = timeout + def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = { scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒ val fb = fn(a) - for (r ← fr; b ← fb) yield { - r add b - r - } + for (r ← fr; b ← fb) yield { r add b; r } } } - - def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, timeout, fn) - } object Future { @@ -153,8 +125,8 @@ object Future { * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body * The execution is performed by the specified Dispatcher. */ - def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = { - val promise = new DefaultPromise[T](timeout) + def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher): Future[T] = { + val promise = Promise[T]() dispatcher dispatchTask { () ⇒ promise complete { try { @@ -168,15 +140,6 @@ object Future { promise } - def apply[T](body: ⇒ T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] = - apply(body)(dispatcher, timeout) - - def apply[T](body: ⇒ T, timeout: Duration)(implicit dispatcher: MessageDispatcher): Future[T] = - apply(body)(dispatcher, timeout) - - def apply[T](body: ⇒ T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] = - apply(body)(dispatcher, timeout) - import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -184,17 +147,14 @@ object Future { * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] = + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] = in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] = - sequence(in)(cbf, timeout, dispatcher) - /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = { - val futureResult = new DefaultPromise[T](timeout) + def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = { + val futureResult = Promise[T]() val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) futures.foreach(_ onComplete completeFirst) @@ -202,16 +162,13 @@ object Future { futureResult } - def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] = - firstCompletedOf(futures)(dispatcher, timeout) - /** * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[Option[T]] = { + def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = { if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) else { - val result = new DefaultPromise[Option[T]](timeout) + val result = Promise[Option[T]]() val ref = new AtomicInteger(futures.size) val search: Future[T] ⇒ Unit = f ⇒ try { f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) @@ -225,9 +182,6 @@ object Future { } } - def find[T](futures: Iterable[Future[T]], timeout: Timeout)(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = - find(futures)(predicate)(dispatcher, timeout) - /** * A non-blocking fold over the specified futures. * The fold is performed on the thread where the last future is completed, @@ -238,16 +192,16 @@ object Future { * val result = Futures.fold(0)(futures)(_ + _).await.result * */ - def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = { + def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { if (futures.isEmpty) { new KeptPromise[R](Right(zero)) } else { - val result = new DefaultPromise[R](timeout) + val result = Promise[R]() val results = new ConcurrentLinkedQueue[T]() val done = new Switch(false) val allDone = futures.size - val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? + val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { f.value.get match { case Right(value) ⇒ val added = results add value @@ -280,9 +234,6 @@ object Future { } } - def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = - fold(futures)(zero)(foldFun)(dispatcher, timeout) - /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first * Example: @@ -290,11 +241,11 @@ object Future { * val result = Futures.reduce(futures)(_ + _).await.result * */ - def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = { + def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = { if (futures.isEmpty) new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) else { - val result = new DefaultPromise[R](timeout) + val result = Promise[R]() val seedFound = new AtomicBoolean(false) val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold @@ -308,10 +259,6 @@ object Future { result } } - - def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = - reduce(futures)(op)(dispatcher, timeout) - /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list @@ -320,15 +267,12 @@ object Future { * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x))) * */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[B]] = + def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) - def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = - traverse(in)(fn)(cbf, timeout, dispatcher) - /** * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited * Continuations plugin. @@ -345,8 +289,8 @@ object Future { * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ - def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = { - val future = Promise[A](timeout) + def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = { + val future = Promise[A] dispatchTask({ () ⇒ (reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException { case e: Exception ⇒ future completeWithException e @@ -355,8 +299,6 @@ object Future { future } - // TODO make variant of flow(timeout)(body) which does NOT break type inference - /** * Assures that any Future tasks initiated in the current thread will be * executed asynchronously, including any tasks currently queued to be @@ -381,7 +323,7 @@ object Future { * } * */ - def blocking()(implicit dispatcher: MessageDispatcher): Unit = + def blocking(implicit dispatcher: MessageDispatcher): Unit = _taskStack.get match { case Some(taskStack) if taskStack.nonEmpty ⇒ val tasks = taskStack.elems @@ -419,7 +361,7 @@ object Future { } } -sealed trait Future[+T] extends japi.Future[T] { +sealed trait Future[+T] extends japi.Future[T] with Block.Blockable { implicit def dispatcher: MessageDispatcher @@ -429,35 +371,7 @@ sealed trait Future[+T] extends japi.Future[T] { * Returns the result of this Future without blocking, by suspending execution and storing it as a * continuation until the result is available. */ - def apply()(implicit timeout: Timeout): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) - - /** - * Blocks awaiting completion of this Future, then returns the resulting value, - * or throws the completed exception - * - * Scala & Java API - * - * throws FutureTimeoutException if this Future times out when waiting for completion - */ - def get: T = this.await.resultOrException.get - - /** - * Blocks the current thread until the Future has been completed or the - * timeout has expired. In the case of the timeout expiring a - * FutureTimeoutException will be thrown. - */ - def await: Future[T] - - /** - * Blocks the current thread until the Future has been completed or the - * timeout has expired, additionally bounding the waiting period according to - * the atMost parameter. The timeout will be the lesser value of - * 'atMost' and the timeout supplied at the constructuion of this Future. In - * the case of the timeout expiring a FutureTimeoutException will be thrown. - * Other callers of this method are not affected by the additional bound - * imposed by atMost. - */ - def await(atMost: Duration): Future[T] + def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) /** * Await completion of this Future and return its value if it conforms to A's @@ -466,7 +380,7 @@ sealed trait Future[+T] extends japi.Future[T] { * in case of a timeout. */ def as[A](implicit m: Manifest[A]): Option[A] = { - try await catch { case _: FutureTimeoutException ⇒ } + try Block.on(this, Duration.Inf) catch { case _: TimeoutException ⇒ } value match { case None ⇒ None case Some(Left(ex)) ⇒ throw ex @@ -479,42 +393,14 @@ sealed trait Future[+T] extends japi.Future[T] { } } - /** - * Await completion of this Future and return its value if it conforms to A's - * erased type, None otherwise. Will throw any exception the Future was - * completed with. Will return None in case of a timeout. - */ - def asSilently[A](implicit m: Manifest[A]): Option[A] = { - try await catch { case _: FutureTimeoutException ⇒ } - value match { - case None ⇒ None - case Some(Left(ex)) ⇒ throw ex - case Some(Right(v)) ⇒ - try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) - catch { case _: ClassCastException ⇒ None } - } - } + @deprecated("Used Block.on(future, timeoutDuration)") + def get: T = Block.on(this, Duration.Inf).resultOrException.get /** * Tests whether this Future has been completed. */ final def isCompleted: Boolean = value.isDefined - /** - * Tests whether this Future's timeout has expired. - * - * Note that an expired Future may still contain a value, or it may be - * completed with a value. - */ - def isExpired: Boolean - - def timeout: Timeout - - /** - * This Future's timeout in nanoseconds. - */ - def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue - /** * The contained value of this Future. Before this Future is completed * the value will be None. After completion the value will be Some(Right(t)) @@ -542,8 +428,7 @@ sealed trait Future[+T] extends japi.Future[T] { /** * When this Future is completed, apply the provided function to the * Future. If the Future has already been completed, this will apply - * immediately. Will not be called in case of a timeout, which also holds if - * corresponding Promise is attempted to complete after expiry. Multiple + * immediately. Multiple * callbacks may be registered; there is no guarantee that they will be * executed in a particular order. */ @@ -582,9 +467,11 @@ sealed trait Future[+T] extends japi.Future[T] { } } - def onTimeout(func: Future[T] ⇒ Unit): this.type - - def orElse[A >: T](fallback: ⇒ A): Future[A] + /** + * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this. + * This is semantically the same as: Future.firstCompletedOf(Seq(this, that)) + */ + def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize /** * Creates a new Future that will handle any matching Throwable that this @@ -597,8 +484,8 @@ sealed trait Future[+T] extends japi.Future[T] { * Future(6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 * */ - final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = { - val future = new DefaultPromise[A](timeout) + final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { + val future = Promise[A]() onComplete { _.value.get match { case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) @@ -621,8 +508,8 @@ sealed trait Future[+T] extends japi.Future[T] { * } yield b + "-" + c * */ - final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = { - val future = new DefaultPromise[A](timeout) + final def map[A](f: T ⇒ A): Future[A] = { + val future = Promise[A]() onComplete { _.value.get match { case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] @@ -643,8 +530,8 @@ sealed trait Future[+T] extends japi.Future[T] { * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. */ - final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = { - val fa = new DefaultPromise[A](timeout) + final def mapTo[A](implicit m: Manifest[A]): Future[A] = { + val fa = Promise[A]() onComplete { ft ⇒ fa complete (ft.value.get match { case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]] @@ -673,8 +560,8 @@ sealed trait Future[+T] extends japi.Future[T] { * } yield b + "-" + c * */ - final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = { - val future = new DefaultPromise[A](timeout) + final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { + val future = Promise[A]() onComplete { _.value.get match { @@ -698,17 +585,17 @@ sealed trait Future[+T] extends japi.Future[T] { } } - final def withFilter(p: T ⇒ Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p) + final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) - final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit timeout: Timeout) { + final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { def foreach(f: A ⇒ Unit): Unit = self filter p foreach f def map[B](f: A ⇒ B): Future[B] = self filter p map f def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } - final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = { - val future = new DefaultPromise[T](timeout) + final def filter(p: T ⇒ Boolean): Future[T] = { + val future = Promise[T]() onComplete { _.value.get match { case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]] @@ -735,16 +622,10 @@ sealed trait Future[+T] extends japi.Future[T] { } object Promise { - /** - * Creates a non-completed, new, Promise with the supplied timeout in milliseconds + * Creates a non-completed, new, Promise */ - def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A](timeout) - - /** - * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf) - */ - def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout) + def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() } /** @@ -782,7 +663,7 @@ trait Promise[T] extends Future[T] { final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultPromise[Any](this.timeout) + val fr = Promise[Any]() this completeWith other onComplete { f ⇒ try { fr completeWith cont(f) @@ -796,7 +677,7 @@ trait Promise[T] extends Future[T] { } final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultPromise[Any](this.timeout) + val fr = Promise[Any]() stream.dequeue(this).onComplete { f ⇒ try { fr completeWith cont(f) @@ -828,81 +709,50 @@ private[dispatch] object DefaultPromise { case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { def exception: Throwable = value.get.left.get } - case object Expired extends FState[Nothing] { - def value: Option[Either[Throwable, Nothing]] = None - } private val emptyPendingValue = Pending[Nothing](Nil) } /** * The default concrete Future implementation. */ -class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { +class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { self ⇒ - import DefaultPromise.{ FState, Success, Failure, Pending, Expired } + import DefaultPromise.{ FState, Success, Failure, Pending } - def this()(implicit dispatcher: MessageDispatcher, timeout: Timeout) = this(timeout) + def block(atMost: Duration)(implicit permit: CanBlock): this.type = if (value.isDefined) this else { + Future.blocking + val start = MILLISECONDS.toNanos(System.currentTimeMillis) - def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout)) + @tailrec + def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOSECONDS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec + try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } - def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit)) - - private val _startTimeInNanos = currentTimeInNanos - - @tailrec - private def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (value.isEmpty && waitTimeNanos > 0) { - val ms = NANOSECONDS.toMillis(waitTimeNanos) - val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec - val start = currentTimeInNanos - try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } - - awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start)) - } else { - value.isDefined + awaitUnsafe(waitTimeNanos - (MILLISECONDS.toNanos(System.currentTimeMillis) - start)) + } else { + value.isDefined + } } - } - def await(atMost: Duration): this.type = if (value.isDefined) this else { - Future.blocking() - - val waitNanos = - if (timeout.duration.isFinite && atMost.isFinite) - atMost.toNanos min timeLeft() - else if (atMost.isFinite) - atMost.toNanos - else if (timeout.duration.isFinite) - timeLeft() - else Long.MaxValue //If both are infinite, use Long.MaxValue + val waitNanos = if (atMost.isFinite) atMost.toNanos else Long.MaxValue if (awaitUnsafe(waitNanos)) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds") + else throw new TimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds") } - def await = await(timeout.duration) - - def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false - def value: Option[Either[Throwable, T]] = getState.value @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = - AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState) + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] @inline - protected final def getState: FState[T] = { + protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) - @tailrec - def read(): FState[T] = { - val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this) - if (cur.isInstanceOf[Pending[_]] && isExpired) { - if (updateState(cur, Expired)) Expired else read() - } else cur - } - - read() - } + @inline + protected final def getState: FState[T] = updater.get(this) def complete(value: Either[Throwable, T]): this.type = { val callbacks = { @@ -935,11 +785,9 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val cur = getState cur match { case _: Success[_] | _: Failure[_] ⇒ true - case Expired ⇒ false case p: Pending[_] ⇒ val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false - else tryAddCallback() + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() } } @@ -948,83 +796,17 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi this } - def onTimeout(func: Future[T] ⇒ Unit): this.type = { - val runNow = - if (!timeout.duration.isFinite) false //Not possible - else if (value.isEmpty) { - if (!isExpired) { - val runnable = new Runnable { - def run() { - if (!isCompleted) { - if (!isExpired) - try dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS), this) - catch { - case _: IllegalStateException ⇒ func(DefaultPromise.this) - } - else func(DefaultPromise.this) - } - } - } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) - onComplete(_ ⇒ timeoutFuture.cancel()) - false - } else true - } else false - - if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) - - this - } - - final def orElse[A >: T](fallback: ⇒ A): Future[A] = - if (timeout.duration.isFinite) { - getState match { - case _: Success[_] | _: Failure[_] ⇒ this - case Expired ⇒ Future[A](fallback, timeout) - case _: Pending[_] ⇒ - val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense. - promise completeWith this - val runnable = new Runnable { - def run() { - if (!isCompleted) { - val done = - if (!isExpired) - try { - dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS), this) - true - } catch { - case _: IllegalStateException ⇒ false - } - else false - if (!done) - promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 - } - } - } - dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable) - promise - } - } else this - private def notifyCompleted(func: Future[T] ⇒ Unit) { // FIXME catching all and continue isn't good for OOME, ticket #1418 try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } - - @inline - private def currentTimeInNanos: Long = MILLISECONDS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? - //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs. - @inline - private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) - - private def timeLeftNoinline(): Long = timeLeft() } /** * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { +final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { val value = Some(suppliedValue) def complete(value: Either[Throwable, T]): this.type = this @@ -1032,12 +814,6 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val di Future dispatchTask (() ⇒ func(this)) this } - def await(atMost: Duration): this.type = this - def await: this.type = this - def isExpired: Boolean = true - def timeout: Timeout = Timeout.zero - - final def onTimeout(func: Future[T] ⇒ Unit): this.type = this - final def orElse[A >: T](fallback: ⇒ A): Future[A] = this + def block(atMost: Duration)(implicit permit: CanBlock): this.type = this } diff --git a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala index 4356cbaff3..918eb7c080 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala @@ -185,9 +185,9 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: if (eo.nonEmpty) { if (_elemOut.compareAndSet(eo, eo.tail)) new KeptPromise(Right(eo.head)) else dequeue() - } else dequeue(Promise[A](timeout)) + } else dequeue(Promise[A]) } - } else dequeue(Promise[A](timeout)) + } else dequeue(Promise[A]) @tailrec final def dequeue(promise: Promise[A]): Future[A] = _state.get match { diff --git a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala index e923dd6c18..8871050adb 100644 --- a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala @@ -8,22 +8,13 @@ import akka.actor.Timeout /* Java API */ trait Future[+T] { self: akka.dispatch.Future[T] ⇒ - private[japi] final def onTimeout[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onTimeout(proc(_)) private[japi] final def onResult[A >: T](proc: Procedure[A]): this.type = self.onResult({ case r ⇒ proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit]) private[japi] final def onException(proc: Procedure[Throwable]): this.type = self.onException({ case t: Throwable ⇒ proc(t) }: PartialFunction[Throwable, Unit]) private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_)) - private[japi] final def map[A >: T, B](f: JFunc[A, B], timeout: Timeout): akka.dispatch.Future[B] = { - implicit val t = timeout - self.map(f(_)) - } - private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]], timeout: Timeout): akka.dispatch.Future[B] = { - implicit val t = timeout - self.flatMap(f(_)) - } + private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_)) + private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_)) private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_)) - private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean], timeout: Timeout): akka.dispatch.Future[A] = { - implicit val t = timeout + private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = self.filter((a: Any) ⇒ p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] - } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index d6e71aa586..2e80efabe3 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -11,10 +11,10 @@ import akka.config.ConfigurationException import akka.util.ReentrantGuard import akka.util.duration._ import akka.actor.Timeout -import akka.dispatch.FutureTimeoutException import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorRefProvider import scala.util.control.NoStackTrace +import java.util.concurrent.TimeoutException object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream @@ -147,7 +147,7 @@ trait LoggingBus extends ActorEventBus { val actor = system.systemActorOf(Props(clazz), name) implicit val timeout = Timeout(3 seconds) val response = try actor ? InitializeLogger(this) get catch { - case _: FutureTimeoutException ⇒ + case _: TimeoutException ⇒ publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } if (response != LoggerInitialized) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 7643a0bd31..767e556901 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -202,7 +202,7 @@ class TransactionLog private ( EventHandler.debug(this, "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) if (isAsync) { - val future = new DefaultPromise[Vector[Array[Byte]]](timeout) + val future = Promise[Vector[Array[Byte]]]() ledger.asyncReadEntries( from, to, new AsyncCallback.ReadCallback { @@ -464,7 +464,7 @@ object TransactionLog { } } - val future = new DefaultPromise[LedgerHandle](timeout) + val future = Promise[LedgerHandle]() if (isAsync) { bookieClient.asyncCreateLedger( ensembleSize, quorumSize, digestType, password, @@ -526,7 +526,7 @@ object TransactionLog { val ledger = try { if (isAsync) { - val future = new DefaultPromise[LedgerHandle](timeout) + val future = Promise[LedgerHandle]() bookieClient.asyncOpenLedger( logId, digestType, password, new AsyncCallback.OpenCallback { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala index 456fd4f65a..548fb149b5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala @@ -73,7 +73,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode { } "allow to track JVM state and bind handles through MetricsAlterationMonitors" in { - val monitorReponse = new DefaultPromise[String] + val monitorReponse = Promise[String]() node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor { diff --git a/akka-docs/.history b/akka-docs/.history new file mode 100644 index 0000000000..a3abe50906 --- /dev/null +++ b/akka-docs/.history @@ -0,0 +1 @@ +exit diff --git a/akka-docs/intro/getting-started-first-java.rst b/akka-docs/intro/getting-started-first-java.rst index ee890d723d..74182b5002 100644 --- a/akka-docs/intro/getting-started-first-java.rst +++ b/akka-docs/intro/getting-started-first-java.rst @@ -360,7 +360,7 @@ One thing to note is that we used two different versions of the ``actorOf`` meth The actor's life-cycle is: -- Created & Started -- ``Actor.actorOf[MyActor]`` -- can receive messages +- Created & Started -- ``actorOf(MyActor.class)`` -- can receive messages - Stopped -- ``actorRef.stop()`` -- can **not** receive messages Once the actor has been stopped it is dead and can not be started again. diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index d010a1ef6a..63f59d2d6c 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -8,10 +8,10 @@ import com.mongodb.async._ import com.mongodb.async.futures.RequestFutures import org.bson.collection._ import akka.actor.ActorCell -import akka.dispatch.Envelope import akka.event.Logging -import akka.dispatch.DefaultPromise import akka.actor.ActorRef +import akka.dispatch.{ Block, Promise, Envelope, DefaultPromise } +import java.util.concurrent.TimeoutException class MongoBasedMailboxException(message: String) extends AkkaException(message) @@ -43,15 +43,14 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { /* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */ val durableMessage = MongoDurableMessage(ownerPathString, envelope.message, envelope.sender) // todo - do we need to filter the actor name at all for safe collection naming? - val result = new DefaultPromise[Boolean](settings.WriteTimeout)(dispatcher) + val result = Promise[Boolean]()(dispatcher) mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒ wr match { case Right((oid, wr)) ⇒ result.completeWithResult(true) case Left(t) ⇒ result.completeWithException(t) } }) - - result.as[Boolean].orNull + Block.on(result, settings.WriteTimeout) } def dequeue(): Envelope = withErrorHandling { @@ -62,7 +61,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { * TODO - Should we have a specific query in place? Which way do we sort? * TODO - Error handling version! */ - val envelopePromise = new DefaultPromise[Envelope](settings.ReadTimeout)(dispatcher) + val envelopePromise = Promise[Envelope]()(dispatcher) mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage] ⇒ doc match { case Some(msg) ⇒ { @@ -71,18 +70,16 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise) } case None ⇒ - { - log.info("No matching document found. Not an error, just an empty queue.") - envelopePromise.completeWithResult(null) - } + log.info("No matching document found. Not an error, just an empty queue.") + envelopePromise.completeWithResult(null) () } } - envelopePromise.as[Envelope].orNull + try { Block.on(envelopePromise, settings.ReadTimeout).resultOrException.orNull } catch { case _: TimeoutException ⇒ null } } def numberOfMessages: Int = { - val count = new DefaultPromise[Int](settings.ReadTimeout)(dispatcher) + val count = Promise[Int]()(dispatcher) mongo.count()(count.completeWithResult) count.as[Int].getOrElse(-1) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 44b756dfba..638901a02e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -19,10 +19,10 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import com.google.protobuf.ByteString import java.util.concurrent.atomic.AtomicBoolean import akka.event.EventStream -import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise import java.net.InetAddress import akka.serialization.SerializationExtension +import java.util.concurrent.{ TimeoutException, ConcurrentHashMap } /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -84,7 +84,7 @@ class RemoteActorRefProvider( if (systemService) local.actorOf(system, props, supervisor, name, systemService) else { val path = supervisor.path / name - val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher) + val newFuture = Promise[ActorRef]()(dispatcher) actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future case null ⇒ @@ -168,7 +168,7 @@ class RemoteActorRefProvider( actors.replace(path.toString, newFuture, actor) actor case actor: InternalActorRef ⇒ actor - case future: Future[_] ⇒ future.get.asInstanceOf[InternalActorRef] + case future: Future[_] ⇒ Block.on(future, system.settings.ActorTimeout.duration).resultOrException.get.asInstanceOf[InternalActorRef] } } @@ -224,7 +224,7 @@ class RemoteActorRefProvider( if (withACK) { try { val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout) - (try f.await.value catch { case _: FutureTimeoutException ⇒ None }) match { + (try Block.on(f, remoteExtension.RemoteSystemDaemonAckTimeout).value catch { case _: TimeoutException ⇒ None }) match { case Some(Right(receiver)) ⇒ log.debug("Remote system command sent to [{}] successfully received", receiver) diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index f2cc5a288d..1fa4874408 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -4,7 +4,6 @@ package akka.spring import foo.{ PingActor, IMyPojo, MyPojo } -import akka.dispatch.FutureTimeoutException import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -14,10 +13,10 @@ import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext import org.springframework.core.io.{ ClassPathResource, Resource } import org.scalatest.{ BeforeAndAfterAll, FeatureSpec } -import java.util.concurrent.CountDownLatch import akka.remote.netty.NettyRemoteSupport import akka.actor._ import akka.actor.Actor._ +import java.util.concurrent.{TimeoutException, CountDownLatch} object RemoteTypedActorLog { import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit, BlockingQueue } @@ -89,9 +88,9 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with B assert(MyPojo.lastOneWayMessage === "hello 1") } - scenario("FutureTimeoutException when timed out") { + scenario("TimeoutException when timed out") { val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") - evaluating { myPojo.longRunning() } should produce[FutureTimeoutException] + evaluating { myPojo.longRunning() } should produce[TimeoutException] } scenario("typed-actor with timeout") { diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index cfe618ce47..33530c8406 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -8,7 +8,7 @@ import akka.actor.ActorSystem import akka.actor._ import akka.stm._ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } -import akka.dispatch.{ PinnedDispatcher, UnboundedMailbox, DefaultPromise, Dispatchers, Future } +import akka.dispatch._ /** * Used internally to send functions. @@ -123,7 +123,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = { def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]] if (Stm.activeTransaction) { - val result = new DefaultPromise[T](timeout)(system.dispatcher) + val result = Promise[T]()(system.dispatcher) get //Join xa deferred { result completeWith dispatch } //Attach deferred-block to current transaction result @@ -134,7 +134,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * Dispatch a new value for the internal state. Behaves the same * as sending a function (x => newValue). */ - def send(newValue: T): Unit = send(x ⇒ newValue) + def send(newValue: T): Unit = send(_ ⇒ newValue) /** * Dispatch a new value for the internal state. Behaves the same @@ -166,7 +166,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * still be executed in order. */ def alterOff(f: T ⇒ T)(timeout: Timeout): Future[T] = { - val result = new DefaultPromise[T](timeout)(system.dispatcher) + val result = Promise[T]()(system.dispatcher) send((value: T) ⇒ { suspend() val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration) @@ -186,7 +186,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Gets this agent's value after all currently queued updates have completed. */ - def await(implicit timeout: Timeout): T = future.await.result.get + def await(implicit timeout: Timeout): T = Block.on(future, timeout.duration).result.get /** * Map this agent to a new agent, applying the function to the internal state. diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 9baf0f1485..3f67895f32 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -3,10 +3,14 @@ package akka.transactor.example; import akka.actor.ActorSystem; import akka.actor.ActorRef; import akka.actor.Props; +import akka.dispatch.Block; import akka.dispatch.Future; import akka.testkit.AkkaSpec; import akka.transactor.Coordinated; +import akka.util.Duration; +import java.util.concurrent.TimeUnit; + public class UntypedCoordinatedExample { public static void main(String[] args) throws InterruptedException { @@ -20,11 +24,12 @@ public class UntypedCoordinatedExample { Thread.sleep(3000); long timeout = 5000; + Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS); Future future1 = counter1.ask("GetCount", timeout); Future future2 = counter2.ask("GetCount", timeout); - future1.await(); + Block.on(future1, d); if (future1.isCompleted()) { if (future1.result().isDefined()) { int result = (Integer) future1.result().get(); @@ -32,7 +37,7 @@ public class UntypedCoordinatedExample { } } - future2.await(); + Block.on(future2, d); if (future2.isCompleted()) { if (future2.result().isDefined()) { int result = (Integer) future2.result().get(); diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 55e28f872f..2c6a5b5e7b 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -3,8 +3,12 @@ package akka.transactor.example; import akka.actor.ActorSystem; import akka.actor.ActorRef; import akka.actor.Props; +import akka.dispatch.Block; import akka.dispatch.Future; import akka.testkit.AkkaSpec; +import akka.util.Duration; + +import java.util.concurrent.TimeUnit; public class UntypedTransactorExample { public static void main(String[] args) throws InterruptedException { @@ -19,11 +23,12 @@ public class UntypedTransactorExample { Thread.sleep(3000); long timeout = 5000; + Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS); Future future1 = counter1.ask("GetCount", timeout); Future future2 = counter2.ask("GetCount", timeout); - future1.await(); + Block.on(future1, d); if (future1.isCompleted()) { if (future1.result().isDefined()) { int result = (Integer) future1.result().get(); @@ -31,7 +36,7 @@ public class UntypedTransactorExample { } } - future2.await(); + Block.on(future2, d); if (future2.isCompleted()) { if (future2.result().isDefined()) { int result = (Integer) future2.result().get(); diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 528a2a14f8..d93b1d2389 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -2,6 +2,8 @@ package akka.transactor.test; import static org.junit.Assert.*; +import akka.dispatch.Block; +import akka.util.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -78,8 +80,8 @@ public class UntypedTransactorTest { } for (ActorRef counter : counters) { Future future = counter.ask("GetCount", askTimeout); - future.await(); - if (future.isCompleted()) { + Block.on(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); + if (future.isCompleted()) { Option resultOption = future.result(); if (resultOption.isDefined()) { Object result = resultOption.get(); @@ -107,7 +109,7 @@ public class UntypedTransactorTest { } for (ActorRef counter : counters) { Future future = counter.ask("GetCount", askTimeout); - future.await(); + Block.on(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); if (future.isCompleted()) { Option resultOption = future.result(); if (resultOption.isDefined()) { diff --git a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala b/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala index 9ef95594be..affd6d4c35 100644 --- a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala +++ b/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala @@ -11,6 +11,7 @@ import akka.util.duration._ import java.util.concurrent.CountDownLatch import akka.testkit.AkkaSpec import akka.testkit._ +import akka.dispatch.Block class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { val latch = new CountDownLatch(num) @@ -35,7 +36,7 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be("abcd") - agent.close + agent.close() } "maintain order between send and sendOff" in { @@ -51,7 +52,7 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be("abcd") - agent.close + agent.close() } "maintain order between alter and alterOff" in { @@ -62,13 +63,13 @@ class AgentSpec extends AkkaSpec { val r2 = agent.alterOff((s: String) ⇒ { Thread.sleep(2000); s + "c" })(5000) val r3 = agent.alter(_ + "d")(5000) - r1.await.resultOrException.get must be === "ab" - r2.await.resultOrException.get must be === "abc" - r3.await.resultOrException.get must be === "abcd" + Block.on(r1, 5 seconds).resultOrException.get must be === "ab" + Block.on(r2, 5 seconds).resultOrException.get must be === "abc" + Block.on(r3, 5 seconds).resultOrException.get must be === "abcd" agent() must be("abcd") - agent.close + agent.close() } "be immediately readable" in { @@ -90,14 +91,14 @@ class AgentSpec extends AkkaSpec { read must be(5) agent() must be(10) - agent.close + agent.close() } "be readable within a transaction" in { val agent = Agent(5) val value = atomic { agent() } value must be(5) - agent.close + agent.close() } "dispatch sends in successful transactions" in { @@ -112,7 +113,7 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be(10) - agent.close + agent.close() } "not dispatch sends in aborted transactions" in { @@ -132,7 +133,7 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be(5) - agent.close + agent.close() } "be able to return a 'queued' future" in { @@ -140,11 +141,9 @@ class AgentSpec extends AkkaSpec { agent send (_ + "b") agent send (_ + "c") - val future = agent.future + Block.on(agent.future, timeout.duration).resultOrException.get must be("abc") - future.await.result.get must be("abc") - - agent.close + agent.close() } "be able to await the value after updates have completed" in { @@ -154,7 +153,7 @@ class AgentSpec extends AkkaSpec { agent.await must be("abc") - agent.close + agent.close() } "be able to be mapped" in { @@ -164,8 +163,8 @@ class AgentSpec extends AkkaSpec { agent1() must be(5) agent2() must be(10) - agent1.close - agent2.close + agent1.close() + agent2.close() } "be able to be used in a 'foreach' for comprehension" in { @@ -178,7 +177,7 @@ class AgentSpec extends AkkaSpec { result must be(3) - agent.close + agent.close() } "be able to be used in a 'map' for comprehension" in { @@ -188,8 +187,8 @@ class AgentSpec extends AkkaSpec { agent1() must be(5) agent2() must be(10) - agent1.close - agent2.close + agent1.close() + agent2.close() } "be able to be used in a 'flatMap' for comprehension" in { @@ -205,9 +204,9 @@ class AgentSpec extends AkkaSpec { agent2() must be(2) agent3() must be(3) - agent1.close - agent2.close - agent3.close + agent1.close() + agent2.close() + agent3.close() } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 1fdbaee7d7..c64ee6dd15 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -7,16 +7,15 @@ import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag } import org.scalatest.matchers.MustMatchers import akka.actor.{ ActorSystem, ActorSystemImpl } import akka.actor.{ Actor, ActorRef, Props } -import akka.dispatch.MessageDispatcher import akka.event.{ Logging, LoggingAdapter } import akka.util.duration._ -import akka.dispatch.FutureTimeoutException import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.actor.PoisonPill -import java.util.concurrent.LinkedBlockingQueue import akka.actor.CreateChild import akka.actor.DeadLetter +import java.util.concurrent.TimeoutException +import akka.dispatch.{ Block, MessageDispatcher } object TimingTest extends Tag("timing") @@ -65,8 +64,8 @@ abstract class AkkaSpec(_system: ActorSystem) final override def afterAll { system.stop() - try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch { - case _: FutureTimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) + try Block.on(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch { + case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) } atTermination() }