diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 8ebee80595..a78b11f1d3 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -24,6 +24,8 @@ public class JavaFutureTests { private static ActorSystem system; private static Timeout t; + + private final Duration timeout = Duration.create(5, TimeUnit.SECONDS); @BeforeClass public static void beforeAll() { @@ -52,7 +54,7 @@ public class JavaFutureTests { } }); - assertEquals("Hello World", f2.get()); + assertEquals("Hello World",Block.sync(f2, timeout)); } @Test @@ -61,15 +63,15 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onSuccess(new Procedure() { - public void apply(String result) { - if (result.equals("foo")) - latch.countDown(); - } + public void apply(String result) { + if (result.equals("foo")) + latch.countDown(); + } }); cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(f.get(), "foo"); + assertEquals(Block.sync(f, timeout), "foo"); } @Test @@ -78,10 +80,10 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onFailure(new Procedure() { - public void apply(Throwable t) { - if (t instanceof NullPointerException) - latch.countDown(); - } + public void apply(Throwable t) { + if (t instanceof NullPointerException) + latch.countDown(); + } }); Throwable exception = new NullPointerException(); @@ -103,7 +105,7 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(f.get(), "foo"); + assertEquals(Block.sync(f, timeout), "foo"); } @Test @@ -119,7 +121,7 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(f.get(), "foo"); + assertEquals(Block.sync(f, timeout), "foo"); } @Test @@ -137,8 +139,8 @@ public class JavaFutureTests { } }); - assertEquals(f.get(), "1000"); - assertEquals(r.get().intValue(), 1000); + assertEquals(Block.sync(f, timeout), "1000"); + assertEquals(Block.sync(r, timeout).intValue(), 1000); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); } @@ -156,8 +158,8 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(f.get(), "foo"); - assertEquals(r.get(), "foo"); + assertEquals(Block.sync(f, timeout), "foo"); + assertEquals(Block.sync(r, timeout), "foo"); } // TODO: Improve this test, perhaps with an Actor @@ -177,7 +179,7 @@ public class JavaFutureTests { Future> futureList = Futures.sequence(listFutures, system.dispatcher()); - assertEquals(futureList.get(), listExpected); + assertEquals(Block.sync(futureList, timeout), listExpected); } // TODO: Improve this test, perhaps with an Actor @@ -201,7 +203,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(result.get(), expected.toString()); + assertEquals(Block.sync(result, timeout), expected.toString()); } @Test @@ -224,7 +226,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(result.get(), expected.toString()); + assertEquals(Block.sync(result, timeout), expected.toString()); } @Test @@ -247,7 +249,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(result.get(), expectedStrings); + assertEquals(Block.sync(result, timeout), expectedStrings); } @Test @@ -268,7 +270,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(expect, Block.sync(f, Duration.create(5, TimeUnit.SECONDS))); + assertEquals(expect, Block.sync(f, timeout)); } @Test diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index df33fd2a19..3a2fdb1bec 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -123,7 +123,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { f.isCompleted must be === false a ! 42 f.isCompleted must be === true - f.get must be === 42 + Block.sync(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second) } @@ -136,7 +136,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their path" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { - (looker ? LookupPath(pathOf.path)).get must be === result + Block.sync(looker ? LookupPath(pathOf.path), timeout.duration) must be === result } for { looker ← all @@ -146,8 +146,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their string representation" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { - (looker ? LookupString(pathOf.path.toString)).get must be === result - (looker ? LookupString(pathOf.path.toString + "/")).get must be === result + Block.sync(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result + Block.sync(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result } for { looker ← all @@ -157,8 +157,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their root-anchored relative path" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { - (looker ? LookupString(pathOf.path.elements.mkString("/", "/", ""))).get must be === result - (looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/"))).get must be === result + Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "")), timeout.duration) must be === result + Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/")), timeout.duration) must be === result } for { looker ← all @@ -168,9 +168,9 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their relative path" in { def check(looker: ActorRef, result: ActorRef, elems: String*) { - (looker ? LookupElems(elems)).get must be === result - (looker ? LookupString(elems mkString "/")).get must be === result - (looker ? LookupString(elems mkString ("", "/", "/"))).get must be === result + Block.sync(looker ? LookupElems(elems), timeout.duration) must be === result + Block.sync(looker ? LookupString(elems mkString "/"), timeout.duration) must be === result + Block.sync(looker ? LookupString(elems mkString ("", "/", "/")), timeout.duration) must be === result } check(c1, user, "..") for { @@ -185,11 +185,11 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find system-generated actors" in { def check(target: ActorRef) { for (looker ← all) { - (looker ? LookupPath(target.path)).get must be === target - (looker ? LookupString(target.path.toString)).get must be === target - (looker ? LookupString(target.path.toString + "/")).get must be === target - (looker ? LookupString(target.path.elements.mkString("/", "/", ""))).get must be === target - if (target != root) (looker ? LookupString(target.path.elements.mkString("/", "/", "/"))).get must be === target + Block.sync(looker ? LookupPath(target.path), timeout.duration) must be === target + Block.sync(looker ? LookupString(target.path.toString), timeout.duration) must be === target + Block.sync(looker ? LookupString(target.path.toString + "/"), timeout.duration) must be === target + Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "")), timeout.duration) must be === target + if (target != root) Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "/")), timeout.duration) must be === target } } for (target ← Seq(root, syst, user, system.deadLetters)) check(target) @@ -199,7 +199,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { import scala.collection.JavaConverters._ def checkOne(looker: ActorRef, query: Query) { - (looker ? query).get must be === system.deadLetters + Block.sync(looker ? query, timeout.duration) must be === system.deadLetters } def check(looker: ActorRef) { Seq(LookupString("a/b/c"), @@ -218,21 +218,21 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val f = c1 ? GetSender(testActor) val a = expectMsgType[ActorRef] a.path.elements.head must be === "temp" - (c2 ? LookupPath(a.path)).get must be === a - (c2 ? LookupString(a.path.toString)).get must be === a - (c2 ? LookupString(a.path.elements.mkString("/", "/", ""))).get must be === a - (c2 ? LookupString("../../" + a.path.elements.mkString("/"))).get must be === a - (c2 ? LookupString(a.path.toString + "/")).get must be === a - (c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/")).get must be === a - (c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/")).get must be === a - (c2 ? LookupElems(Seq("..", "..") ++ a.path.elements)).get must be === a - (c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ "")).get must be === a + Block.sync(c2 ? LookupPath(a.path), timeout.duration) must be === a + Block.sync(c2 ? LookupString(a.path.toString), timeout.duration) must be === a + Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "")), timeout.duration) must be === a + Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/")), timeout.duration) must be === a + Block.sync(c2 ? LookupString(a.path.toString + "/"), timeout.duration) must be === a + Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/"), timeout.duration) must be === a + Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/"), timeout.duration) must be === a + Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a + Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a f.isCompleted must be === false a ! 42 f.isCompleted must be === true - f.get must be === 42 + Block.sync(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread - awaitCond((c2 ? LookupPath(a.path)).get == system.deadLetters, 1 second) + awaitCond(Block.sync(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 7622c597ac..7d5ccea0a2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -361,8 +361,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val fnull = (ref ? (null, timeout)).mapTo[String] ref ! PoisonPill - ffive.get must be("five") - fnull.get must be("null") + Block.sync(ffive, timeout.duration) must be("five") + Block.sync(fnull, timeout.duration) must be("null") awaitCond(ref.isTerminated, 2000 millis) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index 86af471d13..bbad543de7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -8,6 +8,7 @@ import akka.testkit._ import akka.util.duration._ import Actor._ import akka.util.Duration +import akka.dispatch.Block object ForwardActorSpec { val ExpectedMessage = "FOO" @@ -32,20 +33,21 @@ class ForwardActorSpec extends AkkaSpec { "A Forward Actor" must { - "forward actor reference when invoking forward on bang" in { + "forward actor reference when invoking forward on tell" in { val latch = new TestLatch(1) - val replyTo = system.actorOf(new Actor { def receive = { case ExpectedMessage ⇒ latch.countDown() } }) + val replyTo = system.actorOf(new Actor { def receive = { case ExpectedMessage ⇒ testActor ! ExpectedMessage } }) val chain = createForwardingChain(system) chain.tell(ExpectedMessage, replyTo) - latch.await(Duration(5, "s")) must be === true + expectMsg(5 seconds, ExpectedMessage) } - "forward actor reference when invoking forward on bang bang" in { + "forward actor reference when invoking forward on ask" in { val chain = createForwardingChain(system) - chain.ask(ExpectedMessage, 5000).get must be === ExpectedMessage + chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage } + expectMsg(5 seconds, ExpectedMessage) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 3fd3b32578..893994866d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -193,9 +193,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val f1 = client ? ByteString("Hello World!1") val f2 = client ? ByteString("Hello World!2") val f3 = client ? ByteString("Hello World!3") - f1.get must equal(ByteString("Hello World!1")) - f2.get must equal(ByteString("Hello World!2")) - f3.get must equal(ByteString("Hello World!3")) + Block.sync(f1, timeout.duration) must equal(ByteString("Hello World!1")) + Block.sync(f2, timeout.duration) must equal(ByteString("Hello World!2")) + Block.sync(f3, timeout.duration) must equal(ByteString("Hello World!3")) client.stop server.stop ioManager.stop @@ -209,7 +209,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val client = system.actorOf(new SimpleEchoClient("localhost", 8065, ioManager)) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) - assert(f.get.size === 1000) + assert(Block.sync(f, timeout.duration).size === 1000) client.stop server.stop ioManager.stop @@ -223,7 +223,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val client = system.actorOf(new SimpleEchoClient("localhost", 8066, ioManager)) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) - assert(f.get.size === 1000) + assert(Block.sync(f, timeout.duration).size === 1000) client.stop server.stop ioManager.stop @@ -244,12 +244,12 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { Block.on(f4, timeout.duration) val f5 = client1 ? (('get, "test")) val f6 = client2 ? 'getall - f1.get must equal("OK") - f2.get must equal("OK") - f3.get must equal(ByteString("World")) - f4.get must equal("OK") - f5.get must equal(ByteString("I'm a test!")) - f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) + Block.sync(f1, timeout.duration) must equal("OK") + Block.sync(f2, timeout.duration) must equal("OK") + Block.sync(f3, timeout.duration) must equal(ByteString("World")) + Block.sync(f4, timeout.duration) must equal("OK") + Block.sync(f5, timeout.duration) must equal(ByteString("I'm a test!")) + Block.sync(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) client1.stop client2.stop server.stop diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 408a7f02ff..d64ebe9632 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -7,10 +7,10 @@ package akka.actor import org.scalatest.BeforeAndAfterEach import akka.util.duration._ import akka.{ Die, Ping } -import akka.dispatch.Block import akka.testkit.TestEvent._ import akka.testkit._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.Block object SupervisorSpec { val Timeout = 5 seconds @@ -151,7 +151,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "not restart temporary actor" in { val (temporaryActor, _) = temporaryActorAllForOne - intercept[RuntimeException] { (temporaryActor.?(DieReply, TimeoutMillis)).get } + intercept[RuntimeException] { Block.sync(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } expectNoMsg(1 second) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 44053d6757..816603d079 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -247,7 +247,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val t = newFooBar val f = t.futurePigdog(200) f.isCompleted must be(false) - f.get must be("Pigdog") + Block.sync(f, timeout.duration) must be("Pigdog") mustStop(t) } @@ -255,7 +255,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val t = newFooBar val futures = for (i ← 1 to 20) yield (i, t.futurePigdog(20, i)) for ((i, f) ← futures) { - f.get must be("Pigdog" + i) + Block.sync(f, timeout.duration) must be("Pigdog" + i) } mustStop(t) } @@ -278,7 +278,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val t, t2 = newFooBar(Duration(2, "s")) val f = t.futureComposePigdogFrom(t2) f.isCompleted must be(false) - f.get must equal("PIGDOG") + Block.sync(f, timeout.duration) must equal("PIGDOG") mustStop(t) mustStop(t2) } @@ -323,7 +323,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val f2 = t.futurePigdog(0) f2.isCompleted must be(false) f.isCompleted must be(false) - f.get must equal(f2.get) + Block.sync(f, timeout.duration) must equal(Block.sync(f2, timeout.duration)) mustStop(t) } @@ -348,7 +348,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val results = for (i ← 1 to 120) yield (i, iterator.next.futurePigdog(200L, i)) - for ((i, r) ← results) r.get must be("Pigdog" + i) + for ((i, r) ← results) Block.sync(r, timeout.duration) must be("Pigdog" + i) for (t ← thais) mustStop(t) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 8a84af703d..6361324b29 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -390,12 +390,12 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie))) } val f6 = a ? Reply("bar2") - assert(f1.get === "foo") - assert(f2.get === "bar") - assert(f4.get === "foo2") - assert(intercept[ActorInterruptedException](f3.get).getMessage === "Ping!") - assert(f6.get === "bar2") - assert(intercept[ActorInterruptedException](f5.get).getMessage === "Ping!") + assert(Block.sync(f1, timeout.duration) === "foo") + assert(Block.sync(f2, timeout.duration) === "bar") + assert(Block.sync(f4, timeout.duration) === "foo2") + assert(intercept[ActorInterruptedException](Block.sync(f3, timeout.duration)).getMessage === "Ping!") + assert(Block.sync(f6, timeout.duration) === "bar2") + assert(intercept[ActorInterruptedException](Block.sync(f5, timeout.duration)).getMessage === "Ping!") } } @@ -410,10 +410,10 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { val f5 = a ? ThrowException(new RemoteException("RemoteException")) val f6 = a ? Reply("bar2") - assert(f1.get === "foo") - assert(f2.get === "bar") - assert(f4.get === "foo2") - assert(f6.get === "bar2") + assert(Block.sync(f1, timeout.duration) === "foo") + assert(Block.sync(f2, timeout.duration) === "bar") + assert(Block.sync(f4, timeout.duration) === "foo2") + assert(Block.sync(f6, timeout.duration) === "bar2") assert(f3.value.isEmpty) assert(f5.value.isEmpty) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index d1f2f36aa3..0f3ff874d3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -66,7 +66,7 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { case "ping" ⇒ if (works.get) latch.countDown() }).withDispatcher(throughputDispatcher)) - assert((slowOne ? "hogexecutor").get === "OK") + assert(Block.sync(slowOne ? "hogexecutor", timeout.duration) === "OK") (1 to 100) foreach { _ ⇒ slowOne ! "ping" } fastOne ! "sabotage" start.countDown() diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index c5742cc364..4126fce6a1 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -50,7 +50,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "return supplied value on timeout" in { val timedOut = new KeptPromise[String](Right("Timedout")) val promise = Promise[String]() orElse timedOut - promise.get must be("Timedout") + Block.sync(promise, timeout.duration) must be("Timedout") } } "completed with a result" must { @@ -200,9 +200,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa c ← (actor ? 7).mapTo[String] } yield b + "-" + c - future1.get must be("10-14") + Block.sync(future1, timeout.duration) must be("10-14") assert(checkType(future1, manifest[String])) - intercept[ClassCastException] { future2.get } + intercept[ClassCastException] { Block.sync(future2, timeout.duration) } actor.stop() } } @@ -230,8 +230,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Res(c: Int) ← actor ? Req(7) } yield b + "-" + c - future1.get must be("10-14") - intercept[MatchError] { future2.get } + Block.sync(future1, timeout.duration) must be("10-14") + intercept[MatchError] { Block.sync(future2, timeout.duration) } actor.stop() } } @@ -267,17 +267,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" } - future1.get must be(5) - intercept[ArithmeticException] { future2.get } - intercept[ArithmeticException] { future3.get } - future4.get must be("5") - future5.get must be("0") - intercept[ArithmeticException] { future6.get } - future7.get must be("You got ERROR") - intercept[RuntimeException] { future8.get } - future9.get must be("FAIL!") - future10.get must be("World") - future11.get must be("Oops!") + Block.sync(future1, timeout.duration) must be(5) + intercept[ArithmeticException] { Block.sync(future2, timeout.duration) } + intercept[ArithmeticException] { Block.sync(future3, timeout.duration) } + Block.sync(future4, timeout.duration) must be("5") + Block.sync(future5, timeout.duration) must be("0") + intercept[ArithmeticException] { Block.sync(future6, timeout.duration) } + Block.sync(future7, timeout.duration) must be("You got ERROR") + intercept[RuntimeException] { Block.sync(future8, timeout.duration) } + Block.sync(future9, timeout.duration) must be("FAIL!") + Block.sync(future10, timeout.duration) must be("World") + Block.sync(future11, timeout.duration) must be("Oops!") actor.stop() } @@ -285,16 +285,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ new KeptPromise[Int](Right(5)) - Future.firstCompletedOf(futures).get must be(5) + Block.sync(Future.firstCompletedOf(futures), timeout.duration) must be(5) } "find" in { val futures = for (i ← 1 to 10) yield Future { i } val result = Future.find[Int](futures)(_ == 3) - result.get must be(Some(3)) + Block.sync(result, timeout.duration) must be(Some(3)) val notFound = Future.find[Int](futures)(_ == 11) - notFound.get must be(None) + Block.sync(notFound, timeout.duration) must be(None) } "fold" in { @@ -315,7 +315,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa }) } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } - futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)).get must be(45) + Block.sync(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45) } "fold with an exception" in { @@ -353,7 +353,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "return zero value if folding empty list" in { - Future.fold(List[Future[Int]]())(0)(_ + _).get must be(0) + Block.sync(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) must be(0) } "shouldReduceResults" in { @@ -410,11 +410,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa }) val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) - assert(Future.sequence(oddFutures).get.sum === 10000) + assert(Block.sync(Future.sequence(oddFutures), timeout.duration).sum === 10000) oddActor.stop() val list = (1 to 100).toList - assert(Future.traverse(list)(x ⇒ Future(x * 2 - 1)).get.sum === 10000) + assert(Block.sync(Future.traverse(list)(x ⇒ Future(x * 2 - 1)), timeout.duration).sum === 10000) } "shouldHandleThrowables" in { @@ -461,7 +461,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + " " + y() + "!") - assert(r.get === "Hello World!") + assert(Block.sync(r, timeout.duration) === "Hello World!") actor.stop } @@ -475,7 +475,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) - intercept[java.lang.ArithmeticException](r.get) + intercept[java.lang.ArithmeticException](Block.sync(r, timeout.duration)) } } @@ -490,7 +490,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + y(), 100) - intercept[ClassCastException](r.get) + intercept[ClassCastException](Block.sync(r, timeout.duration)) } } @@ -505,7 +505,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + y()) - intercept[ClassCastException](r.get) + intercept[ClassCastException](Block.sync(r, timeout.duration)) } } @@ -529,10 +529,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { x << 5 } - assert(y.get === 5) - assert(z.get === 5) + assert(Block.sync(y, timeout.duration) === 5) + assert(Block.sync(z, timeout.duration) === 5) assert(lz.isOpen) - assert(result.get === 10) + assert(Block.sync(result, timeout.duration) === 10) val a, b, c = Promise[Int]() @@ -544,9 +544,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa c completeWith Future(5) - assert(a.get === 5) - assert(b.get === 3) - assert(result2.get === 50) + assert(Block.sync(a, timeout.duration) === 5) + assert(Block.sync(b, timeout.duration) === 3) + assert(Block.sync(result2, timeout.duration) === 50) } "futureDataFlowShouldEmulateBlocking1" in { @@ -571,7 +571,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Block.on(two, 1 minute) assert(List(one, two).forall(_.isCompleted == true)) - assert(simpleResult.get === 10) + assert(Block.sync(simpleResult, timeout.duration) === 10) } @@ -625,7 +625,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(i2.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) s1.open s2.open - assert(result.get === 10) + assert(Block.sync(result, timeout.duration) === 10) } "futureCompletingWithContinuationsFailure" in { @@ -649,8 +649,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { x << 5 } - assert(y.get === 5) - intercept[java.lang.ArithmeticException](result.get) + assert(Block.sync(y, timeout.duration) === 5) + intercept[java.lang.ArithmeticException](Block.sync(result, timeout.duration)) assert(z.value === None) assert(!lz.isOpen) } @@ -673,7 +673,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa latch.open - assert(result.get === Some("Hello")) + assert(Block.sync(result, timeout.duration) === Some("Hello")) } "futureFlowShouldBeTypeSafe" in { @@ -711,7 +711,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { x << 40 } flow { y << 2 } - assert(z.get === 42) + assert(Block.sync(z, timeout.duration) === 42) } "futureFlowLoops" in { @@ -733,7 +733,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa var i = 0 promises foreach { p ⇒ - assert(p.get === i) + assert(Block.sync(p, timeout.duration) === i) i += 1 } @@ -793,7 +793,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "should not deadlock with nested await (ticket 1313)" in { - val simple = Future() map (_ ⇒ (Future(()) map (_ ⇒ ())).get) + val simple = Future() map (_ ⇒ Block.sync((Future(()) map (_ ⇒ ())), timeout.duration)) Block.on(simple, timeout.duration) must be('completed) val l1, l2 = new StandardLatch @@ -818,16 +818,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } - "return result with 'get'" in { f((future, result) ⇒ future.get must be(result)) } + "return result with 'get'" in { f((future, result) ⇒ Block.sync(future, timeout.duration) must be(result)) } "return result with 'Block.sync'" in { f((future, result) ⇒ Block.sync(future, timeout.duration) must be(result)) } "not timeout" in { f((future, _) ⇒ Block.on(future, 0 millis)) } "filter result" in { f { (future, result) ⇒ - (future filter (_ ⇒ true)).get must be(result) - (evaluating { (future filter (_ ⇒ false)).get } must produce[MatchError]).getMessage must startWith(result.toString) + Block.sync((future filter (_ ⇒ true)), timeout.duration) must be(result) + (evaluating { Block.sync((future filter (_ ⇒ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString) } } - "transform result with map" in { f((future, result) ⇒ (future map (_.toString.length)).get must be(result.toString.length)) } + "transform result with map" in { f((future, result) ⇒ Block.sync((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) } "compose result with flatMap" in { f { (future, result) ⇒ val r = for (r ← future; p ← Promise.successful("foo")) yield r.toString + p @@ -862,12 +862,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future.value.get.left.get.getMessage must be(message) }) } - "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { future.get } must produce[E]).getMessage must be(message)) } + "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "throw exception with 'Block.sync'" in { f((future, message) ⇒ (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "retain exception with filter" in { f { (future, message) ⇒ - (evaluating { (future filter (_ ⇒ true)).get } must produce[E]).getMessage must be(message) - (evaluating { (future filter (_ ⇒ false)).get } must produce[E]).getMessage must be(message) + (evaluating { Block.sync(future filter (_ ⇒ true), timeout.duration) } must produce[E]).getMessage must be(message) + (evaluating { Block.sync(future filter (_ ⇒ false), timeout.duration) } must produce[E]).getMessage must be(message) } } "retain exception with map" in { f((future, message) ⇒ (evaluating { Block.sync(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala index bb2d857aae..5edffe3e0b 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala @@ -21,9 +21,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { b << q c << q() } - assert(a.get === 1) - assert(b.get === 2) - assert(c.get === 3) + assert(Block.sync(a, timeout.duration) === 1) + assert(Block.sync(b, timeout.duration) === 2) + assert(Block.sync(c, timeout.duration) === 3) } "pend" in { @@ -35,9 +35,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { c << q } flow { q <<< List(1, 2, 3) } - assert(a.get === 1) - assert(b.get === 2) - assert(c.get === 3) + assert(Block.sync(a, timeout.duration) === 1) + assert(Block.sync(b, timeout.duration) === 2) + assert(Block.sync(c, timeout.duration) === 3) } "pend again" in { @@ -54,10 +54,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { c << q1 d << q1 } - assert(a.get === 1) - assert(b.get === 2) - assert(c.get === 3) - assert(d.get === 4) + assert(Block.sync(a, timeout.duration) === 1) + assert(Block.sync(b, timeout.duration) === 2) + assert(Block.sync(c, timeout.duration) === 3) + assert(Block.sync(d, timeout.duration) === 4) } "enque" in { @@ -71,10 +71,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { } q ++= List(1, 2, 3, 4) - assert(a.get === 1) - assert(b.get === 2) - assert(c.get === 3) - assert(d.get === 4) + assert(Block.sync(a, timeout.duration) === 1) + assert(Block.sync(b, timeout.duration) === 2) + assert(Block.sync(c, timeout.duration) === 3) + assert(Block.sync(d, timeout.duration) === 4) } "map" in { @@ -90,9 +90,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { flow { qs << ("Hello", "World!", "Test") } - assert(a.get === 5) - assert(b.get === "World!") - assert(c.get === 4) + assert(Block.sync(a, timeout.duration) === 5) + assert(Block.sync(b, timeout.duration) === "World!") + assert(Block.sync(c, timeout.duration) === 4) } "not fail under concurrent stress" in { @@ -128,8 +128,7 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { } } - val result = future.get - assert(result === (1L to 100000L).sum) + assert(Block.sync(future, timeout.duration) === (1L to 100000L).sum) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fd51501142..b00fb02880 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -8,6 +8,7 @@ import collection.mutable.LinkedList import akka.routing.Routing.Broadcast import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ +import akka.dispatch.Block object RoutingSpec { @@ -270,7 +271,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { shutdownLatch.await - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) + Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1) } "throw an exception, if all the connections have stopped" in { @@ -297,8 +298,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) - + Block.sync(actor ? Broadcast("Hi!"), timeout.duration).asInstanceOf[Int] must be(0) } "return the first response from connections, when some of them failed to reply" in { @@ -306,7 +306,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) + Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1) } "be started when constructed" in { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 31caf6083b..cc39e9f634 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -339,7 +339,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor private[akka] def systemActorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - (systemGuardian ? CreateChild(props, name)).get match { + Block.sync(systemGuardian ? CreateChild(props, name), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -347,7 +347,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - (guardian ? CreateChild(props, name)).get match { + Block.sync(guardian ? CreateChild(props, name), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -355,7 +355,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props): ActorRef = { implicit val timeout = settings.CreationTimeout - (guardian ? CreateRandomNameChild(props)).get match { + Block.sync(guardian ? CreateRandomNameChild(props), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 83c1eb371c..5242e98235 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -415,7 +415,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case Some(Right(joption: AnyRef)) ⇒ joption case Some(Left(ex)) ⇒ throw ex } - case m ⇒ (actor.?(m, timeout)).get.asInstanceOf[AnyRef] + case m ⇒ Block.sync(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d2b6e3b9c4..e043cca761 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -376,24 +376,6 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] { */ def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) - //Removed - /*def as[A](implicit m: Manifest[A]): Option[A] = { - try Block.on(this, Duration.Inf) catch { case _: TimeoutException ⇒ } - value match { - case None ⇒ None - case Some(Left(ex)) ⇒ throw ex - case Some(Right(v)) ⇒ - try { Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) } catch { - case c: ClassCastException ⇒ - if (v.asInstanceOf[AnyRef] eq null) throw new ClassCastException("null cannot be cast to " + m.erasure) - else throw new ClassCastException("'" + v + "' of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure) - } - } - }*/ - - @deprecated("Used Block.on(future, timeoutDuration)") - def get: T = Block.sync(this, Duration.Inf) - /** * Tests whether this Future has been completed. */ diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 2e80efabe3..f71d03c5d8 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorRefProvider import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException +import akka.dispatch.Block object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream @@ -146,7 +147,7 @@ trait LoggingBus extends ActorEventBus { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) implicit val timeout = Timeout(3 seconds) - val response = try actor ? InitializeLogger(this) get catch { + val response = try Block.sync(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala index fe7a8f1908..aabbe6ff63 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala @@ -85,7 +85,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode { }) - monitorReponse.get must be("Too much memory is used!") + Block.sync(monitorReponse, 5 seconds) must be("Too much memory is used!") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala index 46463f6537..7718fb8e59 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala @@ -11,6 +11,7 @@ import akka.testkit.{ EventFilter, TestEvent } import java.net.ConnectException import java.nio.channels.NotYetConnectedException import akka.cluster.LocalCluster +import akka.dispatch.Block object DirectRoutingFailoverMultiJvmSpec { @@ -48,7 +49,7 @@ class DirectRoutingFailoverMultiJvmNode1 extends MasterClusterTestNode { } LocalCluster.barrier("verify-actor", NrOfNodes) { - (actor ? "identify").get must equal("node2") + Block.sync(actor ? "identify", timeout.duration) must equal("node2") } val timer = Timer(30.seconds, true) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala index e13688f2dd..5a29882f31 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala @@ -11,6 +11,7 @@ import java.util.{ Collections, Set ⇒ JSet } import java.net.ConnectException import java.nio.channels.NotYetConnectedException import akka.cluster.LocalCluster._ +import akka.dispatch.Block object RandomFailoverMultiJvmSpec { @@ -91,7 +92,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] for (i ← 0 until 100) { // we should get hits from both nodes in 100 attempts, if not then not very random - val value = (actor ? "identify").get.asInstanceOf[String] + val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String] set.add(value) } set diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala index 5b8791231d..1277980b5f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala @@ -12,6 +12,7 @@ import java.net.ConnectException import java.nio.channels.NotYetConnectedException import java.lang.Thread import akka.cluster.LocalCluster._ +import akka.dispatch.Block object RoundRobinFailoverMultiJvmSpec { @@ -94,7 +95,7 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] for (i ← 0 until 100) { - val value = (actor ? "identify").get.asInstanceOf[String] + val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String] set.add(value) } set diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala index b19571f5a4..e25838f67b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala @@ -11,6 +11,7 @@ import java.nio.channels.NotYetConnectedException import java.lang.Thread import akka.routing.Routing.Broadcast import akka.cluster.LocalCluster._ +import akka.dispatch.Block object ScatterGatherFailoverMultiJvmSpec { @@ -84,7 +85,7 @@ class ScatterGatherFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] for (i ← 0 until NrOfNodes * 2) { - val value = (actor ? "foo").get.asInstanceOf[String] + val value = Block.sync(actor ? "foo", timeout.duration).asInstanceOf[String] set.add(value) } set diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 65ec9e1dae..85f9de1784 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -25,8 +25,8 @@ public class UntypedTransactorExample { long timeout = 5000; Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS); - Future future1 = counter1.ask("GetCount", timeout); - Future future2 = counter2.ask("GetCount", timeout); + Future future1 = counter1.ask("GetCount", timeout); + Future future2 = counter2.ask("GetCount", timeout); int count1 = (Integer)Block.sync(future1, d); System.out.println("counter 1: " + count1); diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index 9258a05073..1afcd16a62 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -2,6 +2,8 @@ package akka.transactor.test; import static org.junit.Assert.*; +import akka.dispatch.Block; +import akka.util.Duration; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -10,7 +12,6 @@ import org.junit.Before; import akka.actor.ActorSystem; import akka.transactor.Coordinated; -import akka.actor.Actors; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; @@ -28,7 +29,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -82,7 +82,7 @@ public class UntypedCoordinatedIncrementTest { } for (ActorRef counter : counters) { Future future = counter.ask("GetCount", askTimeout); - assertEquals(1, ((Integer) future.get()).intValue()); + assertEquals(1, ((Integer) Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); } } @@ -102,8 +102,8 @@ public class UntypedCoordinatedIncrementTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", askTimeout); - assertEquals(0, ((Integer) future.get()).intValue()); + Futurefuture = counter.ask("GetCount", askTimeout); + assertEquals(0,((Integer)Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index b2db2e387a..408df14420 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -101,7 +101,7 @@ public class UntypedTransactorTest { } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", askTimeout); + Future future = counter.ask("GetCount", askTimeout); int count = (Integer)Block.sync(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); assertEquals(0, count); } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index c64ee6dd15..5083dd85f2 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -140,7 +140,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { system.registerOnTermination(latch.countDown()) system.stop() latch.await(2 seconds) - (davyJones ? "Die!").get must be === "finally gone" + Block.sync(davyJones ? "Die!", timeout.duration) must be === "finally gone" // this will typically also contain log messages which were sent after the logger shutdown locker must contain(DeadLetter(42, davyJones, probe.ref)) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 5e2d775195..6669c70f64 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -4,8 +4,8 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ -import akka.dispatch.Future import akka.util.duration._ +import akka.dispatch.{ Block, Future } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TestProbeSpec extends AkkaSpec with DefaultTimeout { @@ -18,7 +18,7 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout { tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher tk.lastMessage.sender ! "world" future must be('completed) - future.get must equal("world") + Block.sync(future, timeout.duration) must equal("world") } "reply to messages" in {