diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 7a684460ff..a97da8fe56 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -89,7 +89,7 @@ public class JavaFutureTests { Throwable exception = new NullPointerException(); cf.completeWithException(exception); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(f.exception().get(), exception); + assertEquals(f.value().get().left().get(), exception); } @Test diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 3d73e7d089..44053d6757 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -296,7 +296,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte t.failingPigdog() t.read() must be(1) //Make sure state is not reset after failure - Block.on(t.failingFuturePigdog, 2 seconds).exception.get.getMessage must be("expected") + intercept[IllegalStateException] { Block.sync(t.failingFuturePigdog, 2 seconds) }.getMessage must be("expected") t.read() must be(1) //Make sure state is not reset after failure (intercept[IllegalStateException] { t.failingJOptionPigdog }).getMessage must be("expected") diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index bffa5bac82..8a84af703d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -414,8 +414,8 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assert(f2.get === "bar") assert(f4.get === "foo2") assert(f6.get === "bar2") - assert(f3.result === None) - assert(f5.result === None) + assert(f3.value.isEmpty) + assert(f5.value.isEmpty) } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 5193d982e4..7109c804af 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -537,7 +537,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val a, b, c = Promise[Int]() val result2 = flow { - val n = (a << c).result.get + 10 + val n = (a << c).value.get.right.get + 10 b << (c() - 2) a() + n * b() } @@ -813,15 +813,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def emptyFuture(f: (Future[Any] ⇒ Unit) ⇒ Unit) { "not be completed" in { f(_ must not be ('completed)) } "not contain a value" in { f(_.value must be(None)) } - "not contain a result" in { f(_.result must be(None)) } - "not contain an exception" in { f(_.exception must be(None)) } } def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } - "contain a result" in { f((future, result) ⇒ future.result must be(Some(result))) } - "not contain an exception" in { f((future, _) ⇒ future.exception must be(None)) } "return result with 'get'" in { f((future, result) ⇒ future.get must be(result)) } "return result with 'Block.sync'" in { f((future, result) ⇒ Block.sync(future, timeout.duration) must be(result)) } "not timeout" in { f((future, _) ⇒ Block.on(future, 0 millis)) } @@ -843,9 +839,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } - "contain a value" in { f((future, _) ⇒ future.value must be('defined)) } - "not contain a result" in { f((future, _) ⇒ future.result must be(None)) } - "contain an exception" in { f((future, message) ⇒ future.exception.get.getMessage must be(message)) } + "contain a value" in { + f((future, message) ⇒ { + future.value must be('defined) + future.value.get must be('left) + future.value.get.left.get.getMessage must be(message) + }) + } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { future.get } must produce[E]).getMessage must be(message)) } "throw exception with 'Block.sync'" in { f((future, message) ⇒ (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "retain exception with filter" in { diff --git a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala index 056f7d7897..1d72f502ae 100644 --- a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala @@ -125,8 +125,7 @@ class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout { val tasks = List.fill(nrOfTasks)(executeRandomTask) - tasks.foreach(Block.on(_, timeout.duration)) - tasks.foreach(_.exception.map(throw _)) + tasks.foreach(Block.sync(_, timeout.duration)) } } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 6805269c21..c8fdc70e08 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -561,7 +561,7 @@ class LocalActorRefProvider( new RoutedActorRef(system, props, supervisor, name) } - private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch + private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch(1024) private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { import akka.dispatch.DefaultPromise @@ -584,9 +584,7 @@ class LocalActorRefProvider( } } -class LocalDeathWatch extends DeathWatch with ActorClassification { - - def mapSize = 1024 +class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification { override def publish(event: Event): Unit = { val monitors = dissociate(classify(event)) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 4cf3d273be..e4ff9d1a14 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -176,11 +176,15 @@ object Future { val result = Promise[Option[T]]() val ref = new AtomicInteger(futures.size) val search: Future[T] ⇒ Unit = f ⇒ try { - f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) + f.value.get match { + case Right(r) ⇒ if (predicate(r)) result completeWithResult Some(r) + case _ ⇒ + } } finally { if (ref.decrementAndGet == 0) result completeWithResult None } + futures.foreach(_ onComplete search) result @@ -417,14 +421,6 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] { case _ ⇒ None } - /** - * Returns the contained exception of this Future if it exists. - */ - final def exception: Option[Throwable] = value match { - case Some(Left(e)) ⇒ Some(e) - case _ ⇒ None - } - /** * When this Future is completed, apply the provided function to the * Future. If the Future has already been completed, this will apply diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 767e556901..3eb62217d5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -549,10 +549,10 @@ object TransactionLog { } private[akka] def await[T](future: Promise[T]): T = { - future.await - if (future.result.isDefined) future.result.get - else if (future.exception.isDefined) handleError(future.exception.get) - else handleError(new ReplicationException("No result from async read of entries for transaction log")) + future.await.value.get match { + case Right(result) => result + case Left(throwable) => handleError(throwable) + } } private[akka] def handleError(e: Throwable): Nothing = { diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 3f67895f32..1c36eaac45 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -5,6 +5,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.Block; import akka.dispatch.Future; +import akka.japi.Procedure; import akka.testkit.AkkaSpec; import akka.transactor.Coordinated; @@ -14,10 +15,10 @@ import java.util.concurrent.TimeUnit; public class UntypedCoordinatedExample { public static void main(String[] args) throws InterruptedException { - ActorSystem application = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf()); + ActorSystem app = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf()); - ActorRef counter1 = application.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); - ActorRef counter2 = application.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); + ActorRef counter1 = app.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); + ActorRef counter2 = app.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); counter1.tell(new Coordinated(new Increment(counter2))); @@ -26,28 +27,14 @@ public class UntypedCoordinatedExample { long timeout = 5000; Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS); - Future future1 = counter1.ask("GetCount", timeout); - Future future2 = counter2.ask("GetCount", timeout); + Future future1 = counter1.ask("GetCount", timeout); + Future future2 = counter2.ask("GetCount", timeout); - Block.on(future1, d); - if (future1.isCompleted()) { - if (future1.result().isDefined()) { - int result = (Integer) future1.result().get(); - System.out.println("counter 1: " + result); - } - } + int count1 = (Integer)Block.sync(future1, d); + System.out.println("counter 1: " + count1); + int count2 = (Integer)Block.sync(future2, d); + System.out.println("counter 1: " + count2); - Block.on(future2, d); - if (future2.isCompleted()) { - if (future2.result().isDefined()) { - int result = (Integer) future2.result().get(); - System.out.println("counter 2: " + result); - } - } - - counter1.stop(); - counter2.stop(); - - application.stop(); + app.stop(); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index a90e0a1952..9258a05073 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -81,7 +81,7 @@ public class UntypedCoordinatedIncrementTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", askTimeout); + Future future = counter.ask("GetCount", askTimeout); assertEquals(1, ((Integer) future.get()).intValue()); } }