diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index a78b11f1d3..84eddf5ef7 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -54,7 +54,7 @@ public class JavaFutureTests { } }); - assertEquals("Hello World",Block.sync(f2, timeout)); + assertEquals("Hello World", Await.result(f2, timeout)); } @Test @@ -71,7 +71,7 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(Block.sync(f, timeout), "foo"); + assertEquals(Await.result(f, timeout), "foo"); } @Test @@ -105,7 +105,7 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(Block.sync(f, timeout), "foo"); + assertEquals(Await.result(f, timeout), "foo"); } @Test @@ -121,7 +121,7 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(Block.sync(f, timeout), "foo"); + assertEquals(Await.result(f, timeout), "foo"); } @Test @@ -139,8 +139,8 @@ public class JavaFutureTests { } }); - assertEquals(Block.sync(f, timeout), "1000"); - assertEquals(Block.sync(r, timeout).intValue(), 1000); + assertEquals(Await.result(f, timeout), "1000"); + assertEquals(Await.result(r, timeout).intValue(), 1000); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); } @@ -158,8 +158,8 @@ public class JavaFutureTests { cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(Block.sync(f, timeout), "foo"); - assertEquals(Block.sync(r, timeout), "foo"); + assertEquals(Await.result(f, timeout), "foo"); + assertEquals(Await.result(r, timeout), "foo"); } // TODO: Improve this test, perhaps with an Actor @@ -179,7 +179,7 @@ public class JavaFutureTests { Future> futureList = Futures.sequence(listFutures, system.dispatcher()); - assertEquals(Block.sync(futureList, timeout), listExpected); + assertEquals(Await.result(futureList, timeout), listExpected); } // TODO: Improve this test, perhaps with an Actor @@ -203,7 +203,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(Block.sync(result, timeout), expected.toString()); + assertEquals(Await.result(result, timeout), expected.toString()); } @Test @@ -226,7 +226,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(Block.sync(result, timeout), expected.toString()); + assertEquals(Await.result(result, timeout), expected.toString()); } @Test @@ -249,7 +249,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(Block.sync(result, timeout), expectedStrings); + assertEquals(Await.result(result, timeout), expectedStrings); } @Test @@ -270,7 +270,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(expect, Block.sync(f, timeout)); + assertEquals(expect, Await.result(f, timeout)); } @Test @@ -278,7 +278,7 @@ public class JavaFutureTests { Promise p = Futures.promise(system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.success("foo"); - Block.on(p, d); - assertEquals(Block.sync(p, d), "foo"); + Await.ready(p, d); + assertEquals(Await.result(p, d), "foo"); } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 1ba3792f37..51f0bbb3fc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -7,7 +7,7 @@ package akka.actor import akka.testkit._ import org.scalatest.BeforeAndAfterEach import akka.util.duration._ -import akka.dispatch.Block +import akka.dispatch.Await object ActorFireForgetRequestReplySpec { @@ -81,7 +81,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected exception")) { val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) - val actor = Block.sync((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) + val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) actor.isTerminated must be(false) actor ! "Die" state.finished.await diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 2aea4aa9f0..43ca8bc7b3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -11,7 +11,7 @@ import akka.actor.Actor._ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ -import akka.dispatch.Block +import akka.dispatch.Await object ActorLifeCycleSpec { @@ -41,7 +41,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def postRestart(reason: Throwable) { report("postRestart") } }) - val restarter = Block.sync((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) + val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) expectMsg(("preStart", id, 0)) restarter ! Kill @@ -72,7 +72,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) - val restarter = Block.sync((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) + val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) expectMsg(("preStart", id, 0)) restarter ! Kill @@ -102,7 +102,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) val props = Props(new LifeCycleTestActor(testActor, id, gen)) - val a = Block.sync((supervisor ? props).mapTo[ActorRef], timeout.duration) + val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) expectMsg(("preStart", id, 0)) a ! "status" expectMsg(("OK", id, 0)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 3a2fdb1bec..59eb5d2fb8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -5,7 +5,7 @@ package akka.actor import akka.testkit._ import akka.util.duration._ -import akka.dispatch.Block +import akka.dispatch.Await object ActorLookupSpec { @@ -37,7 +37,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val c1 = system.actorOf(p, "c1") val c2 = system.actorOf(p, "c2") - val c21 = Block.sync((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration) + val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration) val user = system.asInstanceOf[ActorSystemImpl].guardian val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian @@ -123,7 +123,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { f.isCompleted must be === false a ! 42 f.isCompleted must be === true - Block.sync(f, timeout.duration) must be === 42 + Await.result(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second) } @@ -136,7 +136,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their path" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { - Block.sync(looker ? LookupPath(pathOf.path), timeout.duration) must be === result + Await.result(looker ? LookupPath(pathOf.path), timeout.duration) must be === result } for { looker ← all @@ -146,8 +146,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their string representation" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { - Block.sync(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result - Block.sync(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result + Await.result(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result + Await.result(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result } for { looker ← all @@ -157,8 +157,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their root-anchored relative path" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { - Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "")), timeout.duration) must be === result - Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/")), timeout.duration) must be === result + Await.result(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "")), timeout.duration) must be === result + Await.result(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/")), timeout.duration) must be === result } for { looker ← all @@ -168,9 +168,9 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their relative path" in { def check(looker: ActorRef, result: ActorRef, elems: String*) { - Block.sync(looker ? LookupElems(elems), timeout.duration) must be === result - Block.sync(looker ? LookupString(elems mkString "/"), timeout.duration) must be === result - Block.sync(looker ? LookupString(elems mkString ("", "/", "/")), timeout.duration) must be === result + Await.result(looker ? LookupElems(elems), timeout.duration) must be === result + Await.result(looker ? LookupString(elems mkString "/"), timeout.duration) must be === result + Await.result(looker ? LookupString(elems mkString ("", "/", "/")), timeout.duration) must be === result } check(c1, user, "..") for { @@ -185,11 +185,11 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find system-generated actors" in { def check(target: ActorRef) { for (looker ← all) { - Block.sync(looker ? LookupPath(target.path), timeout.duration) must be === target - Block.sync(looker ? LookupString(target.path.toString), timeout.duration) must be === target - Block.sync(looker ? LookupString(target.path.toString + "/"), timeout.duration) must be === target - Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "")), timeout.duration) must be === target - if (target != root) Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "/")), timeout.duration) must be === target + Await.result(looker ? LookupPath(target.path), timeout.duration) must be === target + Await.result(looker ? LookupString(target.path.toString), timeout.duration) must be === target + Await.result(looker ? LookupString(target.path.toString + "/"), timeout.duration) must be === target + Await.result(looker ? LookupString(target.path.elements.mkString("/", "/", "")), timeout.duration) must be === target + if (target != root) Await.result(looker ? LookupString(target.path.elements.mkString("/", "/", "/")), timeout.duration) must be === target } } for (target ← Seq(root, syst, user, system.deadLetters)) check(target) @@ -199,7 +199,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { import scala.collection.JavaConverters._ def checkOne(looker: ActorRef, query: Query) { - Block.sync(looker ? query, timeout.duration) must be === system.deadLetters + Await.result(looker ? query, timeout.duration) must be === system.deadLetters } def check(looker: ActorRef) { Seq(LookupString("a/b/c"), @@ -218,21 +218,21 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val f = c1 ? GetSender(testActor) val a = expectMsgType[ActorRef] a.path.elements.head must be === "temp" - Block.sync(c2 ? LookupPath(a.path), timeout.duration) must be === a - Block.sync(c2 ? LookupString(a.path.toString), timeout.duration) must be === a - Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "")), timeout.duration) must be === a - Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/")), timeout.duration) must be === a - Block.sync(c2 ? LookupString(a.path.toString + "/"), timeout.duration) must be === a - Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/"), timeout.duration) must be === a - Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/"), timeout.duration) must be === a - Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a - Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a + Await.result(c2 ? LookupPath(a.path), timeout.duration) must be === a + Await.result(c2 ? LookupString(a.path.toString), timeout.duration) must be === a + Await.result(c2 ? LookupString(a.path.elements.mkString("/", "/", "")), timeout.duration) must be === a + Await.result(c2 ? LookupString("../../" + a.path.elements.mkString("/")), timeout.duration) must be === a + Await.result(c2 ? LookupString(a.path.toString + "/"), timeout.duration) must be === a + Await.result(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/"), timeout.duration) must be === a + Await.result(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/"), timeout.duration) must be === a + Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a + Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a f.isCompleted must be === false a ! 42 f.isCompleted must be === true - Block.sync(f, timeout.duration) must be === 42 + Await.result(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread - awaitCond(Block.sync(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second) + awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 7d5ccea0a2..4e42c6d9d0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -13,7 +13,7 @@ import java.lang.IllegalStateException import akka.util.ReflectiveAccess import akka.serialization.Serialization import java.util.concurrent.{ CountDownLatch, TimeUnit } -import akka.dispatch.{ Block, DefaultPromise, Promise, Future } +import akka.dispatch.{ Await, DefaultPromise, Promise, Future } object ActorRefSpec { @@ -128,7 +128,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { def wrap[T](f: Promise[Actor] ⇒ T): T = { val result = Promise[Actor]() val r = f(result) - Block.sync(result, 1 minute) + Await.result(result, 1 minute) r } @@ -306,7 +306,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { def receive = { case _ ⇒ sender ! nested } }) - val nested = Block.sync((a ? "any").mapTo[ActorRef], timeout.duration) + val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration) a must not be null nested must not be null (a ne nested) must be === true @@ -314,13 +314,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { "support advanced nested actorOfs" in { val a = system.actorOf(Props(new OuterActor(system.actorOf(Props(new InnerActor))))) - val inner = Block.sync(a ? "innerself", timeout.duration) + val inner = Await.result(a ? "innerself", timeout.duration) - Block.sync(a ? a, timeout.duration) must be(a) - Block.sync(a ? "self", timeout.duration) must be(a) + Await.result(a ? a, timeout.duration) must be(a) + Await.result(a ? "self", timeout.duration) must be(a) inner must not be a - Block.sync(a ? "msg", timeout.duration) must be === "msg" + Await.result(a ? "msg", timeout.duration) must be === "msg" } "support reply via sender" in { @@ -361,8 +361,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val fnull = (ref ? (null, timeout)).mapTo[String] ref ! PoisonPill - Block.sync(ffive, timeout.duration) must be("five") - Block.sync(fnull, timeout.duration) must be("null") + Await.result(ffive, timeout.duration) must be("five") + Await.result(fnull, timeout.duration) must be("null") awaitCond(ref.isTerminated, 2000 millis) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index ddd040b2d6..0d77a75e56 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -8,7 +8,7 @@ import akka.util.duration._ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import java.util.concurrent.TimeoutException -import akka.dispatch.Block +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { @@ -29,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val echo = actorWithTimeout(Timeout(12)) try { val f = echo ? "hallo" - intercept[TimeoutException] { Block.on(f, system.settings.ActorTimeout.duration) } + intercept[TimeoutException] { Await.ready(f, system.settings.ActorTimeout.duration) } } finally { echo.stop } } } @@ -40,7 +40,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val echo = actorWithTimeout(Props.defaultTimeout) try { val f = (echo ? "hallo").mapTo[String] - intercept[TimeoutException] { Block.on(f, timeout.duration) } + intercept[TimeoutException] { Await.ready(f, timeout.duration) } f.value must be(None) } finally { echo.stop } } @@ -51,7 +51,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val echo = actorWithTimeout(Props.defaultTimeout) val f = echo.?("hallo", testTimeout) try { - intercept[TimeoutException] { Block.on(f, testTimeout) } + intercept[TimeoutException] { Await.ready(f, testTimeout) } f.value must be === None } finally { echo.stop } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 45b93c9444..9431c582c9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.BeforeAndAfterEach import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ -import akka.dispatch.Block +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { @@ -79,13 +79,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende filterException[ActorKilledException] { val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2)))) val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) - val terminal = Block.sync((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) + val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) val monitor = startWatching(terminal) terminal ! Kill terminal ! Kill - Block.sync(terminal ? "foo", timeout.duration) must be === "foo" + Await.result(terminal ? "foo", timeout.duration) must be === "foo" terminal ! Kill expectTerminationOf(terminal) @@ -106,8 +106,8 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } })) - val failed = Block.sync((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration) - val brother = Block.sync((supervisor ? Props(new Actor { + val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration) + val brother = Await.result((supervisor ? Props(new Actor { context.watch(failed) def receive = Actor.emptyBehavior })).mapTo[ActorRef], timeout.duration) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala index bbad543de7..031563f5c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala @@ -8,7 +8,7 @@ import akka.testkit._ import akka.util.duration._ import Actor._ import akka.util.Duration -import akka.dispatch.Block +import akka.dispatch.Await object ForwardActorSpec { val ExpectedMessage = "FOO" diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 893994866d..42e323321c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -10,7 +10,7 @@ import akka.util.ByteString import akka.util.cps._ import scala.util.continuations._ import akka.testkit._ -import akka.dispatch.{ Block, Future } +import akka.dispatch.{ Await, Future } object IOActorSpec { import IO._ @@ -193,9 +193,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val f1 = client ? ByteString("Hello World!1") val f2 = client ? ByteString("Hello World!2") val f3 = client ? ByteString("Hello World!3") - Block.sync(f1, timeout.duration) must equal(ByteString("Hello World!1")) - Block.sync(f2, timeout.duration) must equal(ByteString("Hello World!2")) - Block.sync(f3, timeout.duration) must equal(ByteString("Hello World!3")) + Await.result(f1, timeout.duration) must equal(ByteString("Hello World!1")) + Await.result(f2, timeout.duration) must equal(ByteString("Hello World!2")) + Await.result(f3, timeout.duration) must equal(ByteString("Hello World!3")) client.stop server.stop ioManager.stop @@ -209,7 +209,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val client = system.actorOf(new SimpleEchoClient("localhost", 8065, ioManager)) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) - assert(Block.sync(f, timeout.duration).size === 1000) + assert(Await.result(f, timeout.duration).size === 1000) client.stop server.stop ioManager.stop @@ -223,7 +223,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val client = system.actorOf(new SimpleEchoClient("localhost", 8066, ioManager)) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) - assert(Block.sync(f, timeout.duration).size === 1000) + assert(Await.result(f, timeout.duration).size === 1000) client.stop server.stop ioManager.stop @@ -239,17 +239,17 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val f1 = client1 ? (('set, "hello", ByteString("World"))) val f2 = client1 ? (('set, "test", ByteString("No one will read me"))) val f3 = client1 ? (('get, "hello")) - Block.on(f2, timeout.duration) + Await.ready(f2, timeout.duration) val f4 = client2 ? (('set, "test", ByteString("I'm a test!"))) - Block.on(f4, timeout.duration) + Await.ready(f4, timeout.duration) val f5 = client1 ? (('get, "test")) val f6 = client2 ? 'getall - Block.sync(f1, timeout.duration) must equal("OK") - Block.sync(f2, timeout.duration) must equal("OK") - Block.sync(f3, timeout.duration) must equal(ByteString("World")) - Block.sync(f4, timeout.duration) must equal("OK") - Block.sync(f5, timeout.duration) must equal(ByteString("I'm a test!")) - Block.sync(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) + Await.result(f1, timeout.duration) must equal("OK") + Await.result(f2, timeout.duration) must equal("OK") + Await.result(f3, timeout.duration) must equal(ByteString("World")) + Await.result(f4, timeout.duration) must equal("OK") + Await.result(f5, timeout.duration) must equal(ByteString("I'm a test!")) + Await.result(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) client1.stop client2.stop server.stop diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index ecef8daf65..1abc6896f9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -6,7 +6,7 @@ package akka.actor import akka.testkit._ import akka.util.duration._ -import akka.dispatch.{ Block, Future } +import akka.dispatch.{ Await, Future } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class LocalActorRefProviderSpec extends AkkaSpec { @@ -32,7 +32,7 @@ class LocalActorRefProviderSpec extends AkkaSpec { val address = "new-actor" + i implicit val timeout = Timeout(5 seconds) val actors = for (j ← 1 to 4) yield Future(system.actorOf(Props(c ⇒ { case _ ⇒ }), address)) - val set = Set() ++ actors.map(a ⇒ Block.on(a, timeout.duration).value match { + val set = Set() ++ actors.map(a ⇒ Await.ready(a, timeout.duration).value match { case Some(Right(a: ActorRef)) ⇒ 1 case Some(Left(ex: InvalidActorNameException)) ⇒ 2 case x ⇒ x diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index a5df8f4f8c..44f678d0a3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -6,7 +6,7 @@ package akka.actor import java.lang.Thread.sleep import org.scalatest.BeforeAndAfterAll -import akka.dispatch.Block +import akka.dispatch.Await import akka.testkit.TestEvent._ import akka.testkit.EventFilter import java.util.concurrent.{ TimeUnit, CountDownLatch } @@ -52,7 +52,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { stopLatch.open } }) - val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration) + val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) slave ! Ping slave ! Crash @@ -87,7 +87,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { countDownLatch.countDown() } }) - val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration) + val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) (1 to 100) foreach { _ ⇒ slave ! Crash } assert(countDownLatch.await(120, TimeUnit.SECONDS)) @@ -125,7 +125,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } } }) - val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration) + val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) slave ! Ping slave ! Crash @@ -176,7 +176,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { stopLatch.open } }) - val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration) + val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) slave ! Ping slave ! Crash @@ -228,7 +228,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { stopLatch.open } }) - val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration) + val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) slave ! Ping slave ! Crash diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 7c585116db..501a9ab43c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -7,7 +7,7 @@ import akka.testkit.EventFilter import akka.util.duration._ import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } import akka.testkit.DefaultTimeout -import akka.dispatch.Block +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { @@ -114,7 +114,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout override def postRestart(reason: Throwable) = restartLatch.open }) - val actor = Block.sync((supervisor ? props).mapTo[ActorRef], timeout.duration) + val actor = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping)) // appx 2 pings before crash diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 78ce792def..b466978fa8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -7,7 +7,7 @@ package akka.actor import akka.testkit._ import java.util.concurrent.{ TimeUnit, CountDownLatch } -import akka.dispatch.Block +import akka.dispatch.Await object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) @@ -34,10 +34,10 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None))) val managerProps = Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None)) - val manager = Block.sync((boss ? managerProps).mapTo[ActorRef], timeout.duration) + val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration) val workerProps = Props(new CountDownActor(countDown)) - val workerOne, workerTwo, workerThree = Block.sync((manager ? workerProps).mapTo[ActorRef], timeout.duration) + val workerOne, workerTwo, workerThree = Await.result((manager ? workerProps).mapTo[ActorRef], timeout.duration) filterException[ActorKilledException] { workerOne ! Kill diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index bc3b54a020..9885a6db26 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -4,7 +4,7 @@ package akka.actor import akka.testkit.{ filterEvents, EventFilter } -import akka.dispatch.{ PinnedDispatcher, Dispatchers, Block } +import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout @@ -28,11 +28,11 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout { } }) - val actor1, actor2 = Block.sync((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration) + val actor1, actor2 = Await.result((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration) - val actor3 = Block.sync((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newDispatcher("test").build)).mapTo[ActorRef], timeout.duration) + val actor3 = Await.result((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newDispatcher("test").build)).mapTo[ActorRef], timeout.duration) - val actor4 = Block.sync((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration) + val actor4 = Await.result((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration) actor1 ! Kill actor2 ! Kill @@ -40,10 +40,10 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout { actor4 ! Kill countDownLatch.await(10, TimeUnit.SECONDS) - assert(Block.sync(actor1 ? "status", timeout.duration) == "OK", "actor1 is shutdown") - assert(Block.sync(actor2 ? "status", timeout.duration) == "OK", "actor2 is shutdown") - assert(Block.sync(actor3 ? "status", timeout.duration) == "OK", "actor3 is shutdown") - assert(Block.sync(actor4 ? "status", timeout.duration) == "OK", "actor4 is shutdown") + assert(Await.result(actor1 ? "status", timeout.duration) == "OK", "actor1 is shutdown") + assert(Await.result(actor2 ? "status", timeout.duration) == "OK", "actor2 is shutdown") + assert(Await.result(actor3 ? "status", timeout.duration) == "OK", "actor3 is shutdown") + assert(Await.result(actor4 ? "status", timeout.duration) == "OK", "actor4 is shutdown") } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index d64ebe9632..2bd4cda5f9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -10,7 +10,7 @@ import akka.{ Die, Ping } import akka.testkit.TestEvent._ import akka.testkit._ import java.util.concurrent.atomic.AtomicInteger -import akka.dispatch.Block +import akka.dispatch.Await object SupervisorSpec { val Timeout = 5 seconds @@ -72,7 +72,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende // Creating actors and supervisors // ===================================================== - private def child(supervisor: ActorRef, props: Props): ActorRef = Block.sync((supervisor ? props).mapTo[ActorRef], props.timeout.duration) + private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], props.timeout.duration) def temporaryActorAllForOne = { val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0)))) @@ -128,14 +128,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } def ping(pingPongActor: ActorRef) = { - Block.sync(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage expectMsg(Timeout, PingMessage) } def kill(pingPongActor: ActorRef) = { val result = (pingPongActor ? (DieReply, TimeoutMillis)) expectMsg(Timeout, ExceptionMessage) - intercept[RuntimeException] { Block.sync(result, TimeoutMillis millis) } + intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } } "A supervisor" must { @@ -151,7 +151,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "not restart temporary actor" in { val (temporaryActor, _) = temporaryActorAllForOne - intercept[RuntimeException] { Block.sync(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } + intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } expectNoMsg(1 second) } @@ -292,16 +292,16 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende throw e } }) - val dyingActor = Block.sync((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration) + val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration) filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { intercept[RuntimeException] { - Block.sync(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) + Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) } } - Block.sync(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage inits.get must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index aaa39db326..ceefe8f43d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -11,7 +11,7 @@ import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException } import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.DefaultTimeout -import akka.dispatch.{ Block, Dispatchers } +import akka.dispatch.{ Await, Dispatchers } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { @@ -28,8 +28,8 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.path } }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) val headActor = system.actorOf(p) - val middleActor = Block.sync((headActor ? p).mapTo[ActorRef], timeout.duration) - val lastActor = Block.sync((middleActor ? p).mapTo[ActorRef], timeout.duration) + val middleActor = Await.result((headActor ? p).mapTo[ActorRef], timeout.duration) + val lastActor = Await.result((middleActor ? p).mapTo[ActorRef], timeout.duration) middleActor ! Kill expectMsg(middleActor.path) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index b8a6954fe9..5f47f97bf6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -10,7 +10,7 @@ import akka.testkit.{ TestKit, filterEvents, EventFilter } import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.DefaultTimeout -import akka.dispatch.Block +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { @@ -25,7 +25,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "be able to reply on failure during preRestart" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000))) - val supervised = Block.sync((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) + val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) expectMsg("failure1") @@ -36,7 +36,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "be able to reply on failure during postStop" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None))) - val supervised = Block.sync((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) + val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) expectMsg("failure2") diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 816603d079..1637354b7f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -16,7 +16,7 @@ import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.japi.{ Creator, Option ⇒ JOption } import akka.testkit.DefaultTimeout -import akka.dispatch.{ Block, Dispatchers, Future, KeptPromise } +import akka.dispatch.{ Await, Dispatchers, Future, Promise } object TypedActorSpec { @@ -85,7 +85,7 @@ object TypedActorSpec { def pigdog = "Pigdog" - def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog)) + def futurePigdog(): Future[String] = Promise.successful(pigdog) def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) @@ -94,7 +94,7 @@ object TypedActorSpec { def futurePigdog(delay: Long, numbered: Int): Future[String] = { Thread.sleep(delay) - new KeptPromise(Right(pigdog + numbered)) + Promise.successful(pigdog + numbered) } def futureComposePigdogFrom(foo: Foo): Future[String] = { @@ -247,7 +247,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val t = newFooBar val f = t.futurePigdog(200) f.isCompleted must be(false) - Block.sync(f, timeout.duration) must be("Pigdog") + Await.result(f, timeout.duration) must be("Pigdog") mustStop(t) } @@ -255,7 +255,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val t = newFooBar val futures = for (i ← 1 to 20) yield (i, t.futurePigdog(20, i)) for ((i, f) ← futures) { - Block.sync(f, timeout.duration) must be("Pigdog" + i) + Await.result(f, timeout.duration) must be("Pigdog" + i) } mustStop(t) } @@ -278,7 +278,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val t, t2 = newFooBar(Duration(2, "s")) val f = t.futureComposePigdogFrom(t2) f.isCompleted must be(false) - Block.sync(f, timeout.duration) must equal("PIGDOG") + Await.result(f, timeout.duration) must equal("PIGDOG") mustStop(t) mustStop(t2) } @@ -290,13 +290,13 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte }).withFaultHandler(OneForOneStrategy { case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume })) - val t = Block.sync((boss ? Props().withTimeout(2 seconds)).mapTo[Foo], timeout.duration) + val t = Await.result((boss ? Props().withTimeout(2 seconds)).mapTo[Foo], timeout.duration) t.incr() t.failingPigdog() t.read() must be(1) //Make sure state is not reset after failure - intercept[IllegalStateException] { Block.sync(t.failingFuturePigdog, 2 seconds) }.getMessage must be("expected") + intercept[IllegalStateException] { Await.result(t.failingFuturePigdog, 2 seconds) }.getMessage must be("expected") t.read() must be(1) //Make sure state is not reset after failure (intercept[IllegalStateException] { t.failingJOptionPigdog }).getMessage must be("expected") @@ -323,7 +323,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val f2 = t.futurePigdog(0) f2.isCompleted must be(false) f.isCompleted must be(false) - Block.sync(f, timeout.duration) must equal(Block.sync(f2, timeout.duration)) + Await.result(f, timeout.duration) must equal(Await.result(f2, timeout.duration)) mustStop(t) } @@ -348,7 +348,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val results = for (i ← 1 to 120) yield (i, iterator.next.futurePigdog(200L, i)) - for ((i, r) ← results) Block.sync(r, timeout.duration) must be("Pigdog" + i) + for ((i, r) ← results) Await.result(r, timeout.duration) must be("Pigdog" + i) for (t ← thais) mustStop(t) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 6361324b29..82cabd800b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -31,7 +31,7 @@ object ActorModelSpec { case class Increment(counter: AtomicLong) extends ActorModelMessage - case class Await(latch: CountDownLatch) extends ActorModelMessage + case class AwaitLatch(latch: CountDownLatch) extends ActorModelMessage case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage @@ -68,7 +68,7 @@ object ActorModelSpec { } def receive = { - case Await(latch) ⇒ ack; latch.await(); busy.switchOff() + case AwaitLatch(latch) ⇒ ack; latch.await(); busy.switchOff() case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() @@ -385,17 +385,17 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { val a = newTestActor(dispatcher) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") - val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie))) } + val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(ActorInterruptedException(ie)) } val f4 = a ? Reply("foo2") - val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie))) } + val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(ActorInterruptedException(ie)) } val f6 = a ? Reply("bar2") - assert(Block.sync(f1, timeout.duration) === "foo") - assert(Block.sync(f2, timeout.duration) === "bar") - assert(Block.sync(f4, timeout.duration) === "foo2") - assert(intercept[ActorInterruptedException](Block.sync(f3, timeout.duration)).getMessage === "Ping!") - assert(Block.sync(f6, timeout.duration) === "bar2") - assert(intercept[ActorInterruptedException](Block.sync(f5, timeout.duration)).getMessage === "Ping!") + assert(Await.result(f1, timeout.duration) === "foo") + assert(Await.result(f2, timeout.duration) === "bar") + assert(Await.result(f4, timeout.duration) === "foo2") + assert(intercept[ActorInterruptedException](Await.result(f3, timeout.duration)).getMessage === "Ping!") + assert(Await.result(f6, timeout.duration) === "bar2") + assert(intercept[ActorInterruptedException](Await.result(f5, timeout.duration)).getMessage === "Ping!") } } @@ -410,10 +410,10 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { val f5 = a ? ThrowException(new RemoteException("RemoteException")) val f6 = a ? Reply("bar2") - assert(Block.sync(f1, timeout.duration) === "foo") - assert(Block.sync(f2, timeout.duration) === "bar") - assert(Block.sync(f4, timeout.duration) === "foo2") - assert(Block.sync(f6, timeout.duration) === "bar2") + assert(Await.result(f1, timeout.duration) === "foo") + assert(Await.result(f2, timeout.duration) === "bar") + assert(Await.result(f4, timeout.duration) === "foo2") + assert(Await.result(f6, timeout.duration) === "bar2") assert(f3.value.isEmpty) assert(f5.value.isEmpty) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index 0f3ff874d3..a30ba18c2d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -7,7 +7,7 @@ import akka.actor.{ Props, Actor } import akka.util.Duration import akka.util.duration._ import akka.testkit.DefaultTimeout -import akka.dispatch.{ Block, PinnedDispatcher, Dispatchers, Dispatcher } +import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher } object DispatcherActorSpec { class TestActor extends Actor { @@ -44,7 +44,7 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) - assert("World" === Block.sync(actor ? "Hello", timeout.duration)) + assert("World" === Await.result(actor ? "Hello", timeout.duration)) actor.stop() } @@ -66,7 +66,7 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { case "ping" ⇒ if (works.get) latch.countDown() }).withDispatcher(throughputDispatcher)) - assert(Block.sync(slowOne ? "hogexecutor", timeout.duration) === "OK") + assert(Await.result(slowOne ? "hogexecutor", timeout.duration) === "OK") (1 to 100) foreach { _ ⇒ slowOne ! "ping" } fastOne ! "sabotage" start.countDown() diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index ed4a003f25..b3832fa754 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -6,7 +6,7 @@ import akka.testkit._ import akka.actor.{ Props, Actor } import akka.testkit.AkkaSpec import org.scalatest.BeforeAndAfterEach -import akka.dispatch.{ Block, PinnedDispatcher, Dispatchers } +import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers } object PinnedActorSpec { class TestActor extends Actor { @@ -35,7 +35,7 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeo "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) - assert("World" === Block.sync(actor ? "Hello", timeout.duration)) + assert("World" === Await.result(actor ? "Hello", timeout.duration)) actor.stop() } } diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index ee5b1c68fd..5d24b9678f 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -4,7 +4,7 @@ package akka.dataflow import akka.actor.{ Actor, Props } -import akka.dispatch.{ Future, Block } +import akka.dispatch.{ Future, Await } import akka.actor.future2actor import akka.util.duration._ import akka.testkit.AkkaSpec @@ -26,9 +26,9 @@ class Future2ActorSpec extends AkkaSpec with DefaultTimeout { case "ex" ⇒ Future(throw new AssertionError) pipeTo context.sender } })) - Block.sync(actor ? "do", timeout.duration) must be(31) + Await.result(actor ? "do", timeout.duration) must be(31) intercept[AssertionError] { - Block.sync(actor ? "ex", timeout.duration) + Await.result(actor ? "ex", timeout.duration) } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 4126fce6a1..45de4bb3bd 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -48,9 +48,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "never completed" must { behave like emptyFuture(_(Promise())) "return supplied value on timeout" in { - val timedOut = new KeptPromise[String](Right("Timedout")) + val timedOut = Promise.successful[String]("Timedout") val promise = Promise[String]() orElse timedOut - Block.sync(promise, timeout.duration) must be("Timedout") + Await.result(promise, timeout.duration) must be("Timedout") } } "completed with a result" must { @@ -77,7 +77,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } test(future) latch.open - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) } } "is completed" must { @@ -89,7 +89,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa result } latch.open - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) test(future, result) } } @@ -98,8 +98,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[ArithmeticException] { check({ (future: Future[Int], actions: List[FutureAction]) ⇒ val result = (future /: actions)(_ /: _) - val expected = (Block.on(future, timeout.duration).value.get /: actions)(_ /: _) - ((Block.on(result, timeout.duration).value.get, expected) match { + val expected = (Await.ready(future, timeout.duration).value.get /: actions)(_ /: _) + ((Await.ready(result, timeout.duration).value.get, expected) match { case (Right(a), Right(b)) ⇒ a == b case (Left(a), Left(b)) if a.toString == b.toString ⇒ true case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ @@ -117,7 +117,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa behave like futureWithResult { test ⇒ val actor = system.actorOf[TestActor] val future = actor ? "Hello" - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) test(future, "World") actor.stop() } @@ -127,7 +127,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[RuntimeException] { val actor = system.actorOf[TestActor] val future = actor ? "Failure" - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) test(future, "Expected exception; to test fault-tolerance") actor.stop() } @@ -141,7 +141,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor1 = system.actorOf[TestActor] val actor2 = system.actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) test(future, "WORLD") actor1.stop() actor2.stop() @@ -153,7 +153,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor1 = system.actorOf[TestActor] val actor2 = system.actorOf(new Actor { def receive = { case s: String ⇒ sender ! Status.Failure(new ArithmeticException("/ by zero")) } }) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) test(future, "/ by zero") actor1.stop() actor2.stop() @@ -166,7 +166,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor1 = system.actorOf[TestActor] val actor2 = system.actorOf(new Actor { def receive = { case s: String ⇒ sender ! s.toUpperCase } }) val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i } - Block.on(future, timeout.duration) + Await.ready(future, timeout.duration) test(future, "World (of class java.lang.String)") actor1.stop() actor2.stop() @@ -200,9 +200,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa c ← (actor ? 7).mapTo[String] } yield b + "-" + c - Block.sync(future1, timeout.duration) must be("10-14") + Await.result(future1, timeout.duration) must be("10-14") assert(checkType(future1, manifest[String])) - intercept[ClassCastException] { Block.sync(future2, timeout.duration) } + intercept[ClassCastException] { Await.result(future2, timeout.duration) } actor.stop() } } @@ -230,8 +230,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Res(c: Int) ← actor ? Req(7) } yield b + "-" + c - Block.sync(future1, timeout.duration) must be("10-14") - intercept[MatchError] { Block.sync(future2, timeout.duration) } + Await.result(future1, timeout.duration) must be("10-14") + intercept[MatchError] { Await.result(future2, timeout.duration) } actor.stop() } } @@ -267,34 +267,34 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" } - Block.sync(future1, timeout.duration) must be(5) - intercept[ArithmeticException] { Block.sync(future2, timeout.duration) } - intercept[ArithmeticException] { Block.sync(future3, timeout.duration) } - Block.sync(future4, timeout.duration) must be("5") - Block.sync(future5, timeout.duration) must be("0") - intercept[ArithmeticException] { Block.sync(future6, timeout.duration) } - Block.sync(future7, timeout.duration) must be("You got ERROR") - intercept[RuntimeException] { Block.sync(future8, timeout.duration) } - Block.sync(future9, timeout.duration) must be("FAIL!") - Block.sync(future10, timeout.duration) must be("World") - Block.sync(future11, timeout.duration) must be("Oops!") + Await.result(future1, timeout.duration) must be(5) + intercept[ArithmeticException] { Await.result(future2, timeout.duration) } + intercept[ArithmeticException] { Await.result(future3, timeout.duration) } + Await.result(future4, timeout.duration) must be("5") + Await.result(future5, timeout.duration) must be("0") + intercept[ArithmeticException] { Await.result(future6, timeout.duration) } + Await.result(future7, timeout.duration) must be("You got ERROR") + intercept[RuntimeException] { Await.result(future8, timeout.duration) } + Await.result(future9, timeout.duration) must be("FAIL!") + Await.result(future10, timeout.duration) must be("World") + Await.result(future11, timeout.duration) must be("Oops!") actor.stop() } } "firstCompletedOf" in { - val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ new KeptPromise[Int](Right(5)) - Block.sync(Future.firstCompletedOf(futures), timeout.duration) must be(5) + val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5) + Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5) } "find" in { val futures = for (i ← 1 to 10) yield Future { i } val result = Future.find[Int](futures)(_ == 3) - Block.sync(result, timeout.duration) must be(Some(3)) + Await.result(result, timeout.duration) must be(Some(3)) val notFound = Future.find[Int](futures)(_ == 11) - Block.sync(notFound, timeout.duration) must be(None) + Await.result(notFound, timeout.duration) must be(None) } "fold" in { @@ -305,7 +305,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) + Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) } "fold by composing" in { @@ -315,7 +315,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa }) } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } - Block.sync(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45) + Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45) } "fold with an exception" in { @@ -332,7 +332,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - intercept[Throwable] { Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected") + intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -344,7 +344,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l } - val result = Block.sync(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum + val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum assert(result === 250500) } @@ -353,7 +353,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "return zero value if folding empty list" in { - Block.sync(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) must be(0) + Await.result(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) must be(0) } "shouldReduceResults" in { @@ -364,7 +364,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - assert(Block.sync(Future.reduce(futures)(_ + _), timeout millis) === 45) + assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45) } "shouldReduceResultsWithException" in { @@ -381,13 +381,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - intercept[Throwable] { Block.sync(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected" + intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected" } } "shouldReduceThrowIAEOnEmptyInput" in { filterException[IllegalArgumentException] { - intercept[UnsupportedOperationException] { Block.sync(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) } + intercept[UnsupportedOperationException] { Await.result(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) } } } @@ -410,11 +410,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa }) val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) - assert(Block.sync(Future.sequence(oddFutures), timeout.duration).sum === 10000) + assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000) oddActor.stop() val list = (1 to 100).toList - assert(Block.sync(Future.traverse(list)(x ⇒ Future(x * 2 - 1)), timeout.duration).sum === 10000) + assert(Await.result(Future.traverse(list)(x ⇒ Future(x * 2 - 1)), timeout.duration).sum === 10000) } "shouldHandleThrowables" in { @@ -422,7 +422,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[ThrowableTest] { val f1 = Future[Any] { throw new ThrowableTest("test") } - intercept[ThrowableTest] { Block.sync(f1, timeout.duration) } + intercept[ThrowableTest] { Await.result(f1, timeout.duration) } val latch = new StandardLatch val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } @@ -430,10 +430,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f2 onSuccess { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open - assert(Block.sync(f2, timeout.duration) === "success") + assert(Await.result(f2, timeout.duration) === "success") f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) f2 onSuccess { case _ ⇒ throw new ThrowableTest("current thread receive") } - assert(Block.sync(f3, timeout.duration) === "SUCCESS") + assert(Await.result(f3, timeout.duration) === "SUCCESS") } } @@ -441,14 +441,14 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new StandardLatch val f = Future { latch.await; 5 } - val f2 = Future { Block.sync(f, timeout.duration) + 5 } + val f2 = Future { Await.result(f, timeout.duration) + 5 } - intercept[TimeoutException](Block.on(f2, 100 millis)) + intercept[TimeoutException](Await.ready(f2, 100 millis)) latch.open - assert(Block.sync(f2, timeout.duration) === 10) + assert(Await.result(f2, timeout.duration) === 10) val f3 = Future { Thread.sleep(100); 5 } - filterException[TimeoutException] { intercept[TimeoutException] { Block.on(f3, 0 millis) } } + filterException[TimeoutException] { intercept[TimeoutException] { Await.ready(f3, 0 millis) } } } "futureComposingWithContinuations" in { @@ -461,7 +461,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + " " + y() + "!") - assert(Block.sync(r, timeout.duration) === "Hello World!") + assert(Await.result(r, timeout.duration) === "Hello World!") actor.stop } @@ -475,7 +475,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) - intercept[java.lang.ArithmeticException](Block.sync(r, timeout.duration)) + intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration)) } } @@ -490,7 +490,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + y(), 100) - intercept[ClassCastException](Block.sync(r, timeout.duration)) + intercept[ClassCastException](Await.result(r, timeout.duration)) } } @@ -505,7 +505,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val r = flow(x() + y()) - intercept[ClassCastException](Block.sync(r, timeout.duration)) + intercept[ClassCastException](Await.result(r, timeout.duration)) } } @@ -529,10 +529,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { x << 5 } - assert(Block.sync(y, timeout.duration) === 5) - assert(Block.sync(z, timeout.duration) === 5) + assert(Await.result(y, timeout.duration) === 5) + assert(Await.result(z, timeout.duration) === 5) assert(lz.isOpen) - assert(Block.sync(result, timeout.duration) === 10) + assert(Await.result(result, timeout.duration) === 10) val a, b, c = Promise[Int]() @@ -544,9 +544,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa c completeWith Future(5) - assert(Block.sync(a, timeout.duration) === 5) - assert(Block.sync(b, timeout.duration) === 3) - assert(Block.sync(result2, timeout.duration) === 50) + assert(Await.result(a, timeout.duration) === 5) + assert(Await.result(b, timeout.duration) === 3) + assert(Await.result(result2, timeout.duration) === 50) } "futureDataFlowShouldEmulateBlocking1" in { @@ -561,17 +561,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { one << 1 } - Block.on(one, 1 minute) + Await.ready(one, 1 minute) assert(one.isCompleted) assert(List(two, simpleResult).forall(_.isCompleted == false)) flow { two << 9 } - Block.on(two, 1 minute) + Await.ready(two, 1 minute) assert(List(one, two).forall(_.isCompleted == true)) - assert(Block.sync(simpleResult, timeout.duration) === 10) + assert(Await.result(simpleResult, timeout.duration) === 10) } @@ -595,17 +595,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { y1 << 1 } // When this is set, it should cascade down the line assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) - assert(Block.sync(x1, 1 minute) === 1) + assert(Await.result(x1, 1 minute) === 1) assert(!lz.isOpen) flow { y2 << 9 } // When this is set, it should cascade down the line assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) - assert(Block.sync(x2, 1 minute) === 9) + assert(Await.result(x2, 1 minute) === 9) assert(List(x1, x2, y1, y2).forall(_.isCompleted)) - assert(Block.sync(result, 1 minute) === 10) + assert(Await.result(result, 1 minute) === 10) } "dataFlowAPIshouldbeSlick" in { @@ -625,7 +625,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(i2.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) s1.open s2.open - assert(Block.sync(result, timeout.duration) === 10) + assert(Await.result(result, timeout.duration) === 10) } "futureCompletingWithContinuationsFailure" in { @@ -649,8 +649,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { x << 5 } - assert(Block.sync(y, timeout.duration) === 5) - intercept[java.lang.ArithmeticException](Block.sync(result, timeout.duration)) + assert(Await.result(y, timeout.duration) === 5) + intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration)) assert(z.value === None) assert(!lz.isOpen) } @@ -673,7 +673,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa latch.open - assert(Block.sync(result, timeout.duration) === Some("Hello")) + assert(Await.result(result, timeout.duration) === Some("Hello")) } "futureFlowShouldBeTypeSafe" in { @@ -696,8 +696,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(!checkType(rInt, manifest[Nothing])) assert(!checkType(rInt, manifest[Any])) - Block.sync(rString, timeout.duration) - Block.sync(rInt, timeout.duration) + Await.result(rString, timeout.duration) + Await.result(rInt, timeout.duration) } "futureFlowSimpleAssign" in { @@ -711,7 +711,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { x << 40 } flow { y << 2 } - assert(Block.sync(z, timeout.duration) === 42) + assert(Await.result(z, timeout.duration) === 42) } "futureFlowLoops" in { @@ -733,7 +733,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa var i = 0 promises foreach { p ⇒ - assert(Block.sync(p, timeout.duration) === i) + assert(Await.result(p, timeout.duration) === i) i += 1 } @@ -789,12 +789,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa latch(8).open latch(9).await - Block.on(f4, timeout.duration) must be('completed) + Await.ready(f4, timeout.duration) must be('completed) } "should not deadlock with nested await (ticket 1313)" in { - val simple = Future() map (_ ⇒ Block.sync((Future(()) map (_ ⇒ ())), timeout.duration)) - Block.on(simple, timeout.duration) must be('completed) + val simple = Future() map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration)) + Await.ready(simple, timeout.duration) must be('completed) val l1, l2 = new StandardLatch val complex = Future() map { _ ⇒ @@ -805,7 +805,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa nested foreach (_ ⇒ l2.open) l2.await } - Block.on(complex, timeout.duration) must be('completed) + Await.ready(complex, timeout.duration) must be('completed) } } } @@ -818,39 +818,39 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } - "return result with 'get'" in { f((future, result) ⇒ Block.sync(future, timeout.duration) must be(result)) } - "return result with 'Block.sync'" in { f((future, result) ⇒ Block.sync(future, timeout.duration) must be(result)) } - "not timeout" in { f((future, _) ⇒ Block.on(future, 0 millis)) } + "return result with 'get'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } + "return result with 'Await.sync'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } + "not timeout" in { f((future, _) ⇒ Await.ready(future, 0 millis)) } "filter result" in { f { (future, result) ⇒ - Block.sync((future filter (_ ⇒ true)), timeout.duration) must be(result) - (evaluating { Block.sync((future filter (_ ⇒ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString) + Await.result((future filter (_ ⇒ true)), timeout.duration) must be(result) + (evaluating { Await.result((future filter (_ ⇒ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString) } } - "transform result with map" in { f((future, result) ⇒ Block.sync((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) } + "transform result with map" in { f((future, result) ⇒ Await.result((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) } "compose result with flatMap" in { f { (future, result) ⇒ val r = for (r ← future; p ← Promise.successful("foo")) yield r.toString + p - Block.sync(r, timeout.duration) must be(result.toString + "foo") + Await.result(r, timeout.duration) must be(result.toString + "foo") } } "perform action with foreach" in { f { (future, result) ⇒ val p = Promise[Any]() future foreach p.success - Block.sync(p, timeout.duration) must be(result) + Await.result(p, timeout.duration) must be(result) } } - "not recover from exception" in { f((future, result) ⇒ Block.sync(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) must be(result)) } + "not recover from exception" in { f((future, result) ⇒ Await.result(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) must be(result)) } "perform action on result" in { f { (future, result) ⇒ val p = Promise[Any]() future.onSuccess { case x ⇒ p.success(x) } - Block.sync(p, timeout.duration) must be(result) + Await.result(p, timeout.duration) must be(result) } } "not perform action on exception" is pending - "cast using mapTo" in { f((future, result) ⇒ Block.sync(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), timeout.duration) must be(false)) } + "cast using mapTo" in { f((future, result) ⇒ Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), timeout.duration) must be(false)) } } def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) ⇒ Unit) ⇒ Unit) { @@ -862,27 +862,27 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future.value.get.left.get.getMessage must be(message) }) } - "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) } - "throw exception with 'Block.sync'" in { f((future, message) ⇒ (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) } + "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } + "throw exception with 'Await.sync'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "retain exception with filter" in { f { (future, message) ⇒ - (evaluating { Block.sync(future filter (_ ⇒ true), timeout.duration) } must produce[E]).getMessage must be(message) - (evaluating { Block.sync(future filter (_ ⇒ false), timeout.duration) } must produce[E]).getMessage must be(message) + (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } must produce[E]).getMessage must be(message) + (evaluating { Await.result(future filter (_ ⇒ false), timeout.duration) } must produce[E]).getMessage must be(message) } } - "retain exception with map" in { f((future, message) ⇒ (evaluating { Block.sync(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) } - "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Block.sync(future flatMap (_ ⇒ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) } + "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) } + "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) } "not perform action with foreach" is pending - "recover from exception" in { f((future, message) ⇒ Block.sync(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) must be("pigdog")) } + "recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) must be("pigdog")) } "not perform action on result" is pending "perform action on exception" in { f { (future, message) ⇒ val p = Promise[Any]() future.onFailure { case _ ⇒ p.success(message) } - Block.sync(p, timeout.duration) must be(message) + Await.result(p, timeout.duration) must be(message) } } - "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Block.sync(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) } + "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) } } sealed trait IntAction { def apply(that: Int): Int } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 0cd04e5c60..d0c2053243 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -19,7 +19,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val f = spawn { q.dequeue } - Block.sync(f, 1 second) must be(null) + Await.result(f, 1 second) must be(null) } "create a bounded mailbox with 10 capacity and with push timeout" in { @@ -115,8 +115,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val consumers = for (i ← (1 to 4).toList) yield createConsumer - val ps = producers.map(Block.sync(_, within)) - val cs = consumers.map(Block.sync(_, within)) + val ps = producers.map(Await.result(_, within)) + val cs = consumers.map(Await.result(_, within)) ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index b37bc0f75c..ccc632c6be 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -43,7 +43,7 @@ class PriorityDispatcherSpec extends AkkaSpec with DefaultTimeout { actor.resume //Signal the actor to start treating it's message backlog - Block.sync(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse + Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala index 5edffe3e0b..e41dc9c4cd 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala @@ -21,9 +21,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { b << q c << q() } - assert(Block.sync(a, timeout.duration) === 1) - assert(Block.sync(b, timeout.duration) === 2) - assert(Block.sync(c, timeout.duration) === 3) + assert(Await.result(a, timeout.duration) === 1) + assert(Await.result(b, timeout.duration) === 2) + assert(Await.result(c, timeout.duration) === 3) } "pend" in { @@ -35,9 +35,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { c << q } flow { q <<< List(1, 2, 3) } - assert(Block.sync(a, timeout.duration) === 1) - assert(Block.sync(b, timeout.duration) === 2) - assert(Block.sync(c, timeout.duration) === 3) + assert(Await.result(a, timeout.duration) === 1) + assert(Await.result(b, timeout.duration) === 2) + assert(Await.result(c, timeout.duration) === 3) } "pend again" in { @@ -54,10 +54,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { c << q1 d << q1 } - assert(Block.sync(a, timeout.duration) === 1) - assert(Block.sync(b, timeout.duration) === 2) - assert(Block.sync(c, timeout.duration) === 3) - assert(Block.sync(d, timeout.duration) === 4) + assert(Await.result(a, timeout.duration) === 1) + assert(Await.result(b, timeout.duration) === 2) + assert(Await.result(c, timeout.duration) === 3) + assert(Await.result(d, timeout.duration) === 4) } "enque" in { @@ -71,10 +71,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { } q ++= List(1, 2, 3, 4) - assert(Block.sync(a, timeout.duration) === 1) - assert(Block.sync(b, timeout.duration) === 2) - assert(Block.sync(c, timeout.duration) === 3) - assert(Block.sync(d, timeout.duration) === 4) + assert(Await.result(a, timeout.duration) === 1) + assert(Await.result(b, timeout.duration) === 2) + assert(Await.result(c, timeout.duration) === 3) + assert(Await.result(d, timeout.duration) === 4) } "map" in { @@ -90,9 +90,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { flow { qs << ("Hello", "World!", "Test") } - assert(Block.sync(a, timeout.duration) === 5) - assert(Block.sync(b, timeout.duration) === "World!") - assert(Block.sync(c, timeout.duration) === 4) + assert(Await.result(a, timeout.duration) === 5) + assert(Await.result(b, timeout.duration) === "World!") + assert(Await.result(c, timeout.duration) === 4) } "not fail under concurrent stress" in { @@ -128,7 +128,7 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { } } - assert(Block.sync(future, timeout.duration) === (1L to 100000L).sum) + assert(Await.result(future, timeout.duration) === (1L to 100000L).sum) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index d5c7106ea9..bbab00c194 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -5,7 +5,7 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import akka.testkit.AkkaSpec -import akka.dispatch.{ Block, KeptPromise, Future } +import akka.dispatch.{ Await, Promise, Future } object ActorPoolSpec { @@ -17,7 +17,7 @@ object ActorPoolSpec { import TypedActor.dispatcher def sq(x: Int, sleep: Long): Future[Int] = { if (sleep > 0) Thread.sleep(sleep) - new KeptPromise(Right(x * x)) + Promise.successful(x * x) } } @@ -47,7 +47,7 @@ class TypedActorPoolSpec extends AkkaSpec with DefaultTimeout { val results = for (i ← 1 to 100) yield (i, pool.sq(i, 0)) for ((i, r) ← results) - Block.sync(r, timeout.duration) must equal(i * i) + Await.result(r, timeout.duration) must equal(i * i) ta.stop(pool) } @@ -97,7 +97,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { count.get must be(2) - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) pool.stop() } @@ -126,7 +126,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { try { (for (count ← 1 to 500) yield pool.?("Test", 20 seconds)) foreach { - Block.sync(_, 20 seconds) must be("Response") + Await.result(_, 20 seconds) must be("Response") } } finally { pool.stop() @@ -163,7 +163,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool ! 1 - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) var loops = 0 def loop(t: Int) = { @@ -183,7 +183,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch.await count.get must be(loops) - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) // a whole bunch should max it out @@ -192,7 +192,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch.await count.get must be(loops) - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4) pool.stop() } @@ -239,7 +239,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch.await count.get must be(loops) - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) // send a bunch over the threshold and observe an increment loops = 15 @@ -248,7 +248,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch.await(10 seconds) count.get must be(loops) - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3) pool.stop() } @@ -342,7 +342,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (5 millis).dilated.sleep - val z = Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size + val z = Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size z must be >= (2) @@ -353,7 +353,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (500 millis).dilated.sleep } - Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be <= (z) + Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be <= (z) pool.stop() } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 3f10f8541a..90d15d6141 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -8,7 +8,7 @@ import akka.testkit.AkkaSpec import akka.actor.DeploymentConfig._ import akka.routing.Routing.Broadcast import akka.testkit.DefaultTimeout -import akka.dispatch.Block +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { @@ -83,7 +83,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { - val id = Block.sync((actor ? "hit").mapTo[Int], timeout.duration) + val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration) replies = replies + (id -> (replies(id) + 1)) } } @@ -194,7 +194,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { - val id = Block.sync((actor ? "hit").mapTo[Int], timeout.duration) + val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration) replies = replies + (id -> (replies(id) + 1)) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index b00fb02880..e64cc306d6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -8,7 +8,7 @@ import collection.mutable.LinkedList import akka.routing.Routing.Broadcast import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ -import akka.dispatch.Block +import akka.dispatch.Await object RoutingSpec { @@ -271,7 +271,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { shutdownLatch.await - Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1) + Await.result(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1) } "throw an exception, if all the connections have stopped" in { @@ -298,7 +298,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - Block.sync(actor ? Broadcast("Hi!"), timeout.duration).asInstanceOf[Int] must be(0) + Await.result(actor ? Broadcast("Hi!"), timeout.duration).asInstanceOf[Int] must be(0) } "return the first response from connections, when some of them failed to reply" in { @@ -306,7 +306,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1) + Await.result(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1) } "be started when constructed" in { diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index 8feb284be4..f51beb7617 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -3,7 +3,7 @@ package akka.ticket import akka.actor._ import akka.routing._ import akka.testkit.AkkaSpec -import akka.dispatch.Block +import akka.dispatch.Await import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -28,7 +28,7 @@ class Ticket703Spec extends AkkaSpec { } })) }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) - Block.sync(actorPool.?("Ping", 10000), 10 seconds) must be === "Response" + Await.result(actorPool.?("Ping", 10000), 10 seconds) must be === "Response" } } } diff --git a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala index 1d72f502ae..9f869fe907 100644 --- a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala @@ -4,7 +4,7 @@ package akka.util import org.scalatest.matchers.MustMatchers -import akka.dispatch.{ Future, Block } +import akka.dispatch.{ Future, Await } import akka.testkit.AkkaSpec import scala.util.Random import akka.testkit.DefaultTimeout @@ -125,7 +125,7 @@ class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout { val tasks = List.fill(nrOfTasks)(executeRandomTask) - tasks.foreach(Block.sync(_, timeout.duration)) + tasks.foreach(Await.result(_, timeout.duration)) } } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 07e03d42cc..0ec3de6132 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -383,7 +383,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) { _path = rootPath / "null" - brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef - promises are always broken.")))(dispatcher) + brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher) } override def isTerminated(): Boolean = true @@ -425,7 +425,7 @@ class AskActorRef( } override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = - new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher) + Promise.failed(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))(dispatcher) override def isTerminated = result.isCompleted diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index cc39e9f634..b0b69122e6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -339,7 +339,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor private[akka] def systemActorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - Block.sync(systemGuardian ? CreateChild(props, name), timeout.duration) match { + Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -347,7 +347,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - Block.sync(guardian ? CreateChild(props, name), timeout.duration) match { + Await.result(guardian ? CreateChild(props, name), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } @@ -355,7 +355,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def actorOf(props: Props): ActorRef = { implicit val timeout = settings.CreationTimeout - Block.sync(guardian ? CreateRandomNameChild(props), timeout.duration) match { + Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match { case ref: ActorRef ⇒ ref case ex: Exception ⇒ throw ex } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 5242e98235..549cac1424 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -410,12 +410,12 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case m if m.returnsFuture_? ⇒ actor.?(m, timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ val f = actor.?(m, timeout) - (try { Block.on(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { + (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None case Some(Right(joption: AnyRef)) ⇒ joption case Some(Left(ex)) ⇒ throw ex } - case m ⇒ Block.sync(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] + case m ⇒ Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef] } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e043cca761..6e0691b1cf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -22,27 +22,27 @@ import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable } -import akka.dispatch.Block.CanBlock +import akka.dispatch.Await.CanAwait -object Block { - sealed trait CanBlock +object Await { + sealed trait CanAwait - trait Blockable[+T] { + trait Awaitable[+T] { /** * Should throw java.util.concurrent.TimeoutException if times out */ - def block(atMost: Duration)(implicit permit: CanBlock): this.type + def ready(atMost: Duration)(implicit permit: CanAwait): this.type /** * Throws exceptions if cannot produce a T within the specified time */ - def sync(atMost: Duration)(implicit permit: CanBlock): T + def result(atMost: Duration)(implicit permit: CanAwait): T } - private implicit val permit = new CanBlock {} + private implicit val permit = new CanAwait {} - def on[T <: Blockable[_]](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost) - def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost) + def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) + def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) } object Futures { @@ -147,7 +147,7 @@ object Future { * Useful for reducing many Futures into a single Future. */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] = - in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Returns a Future to the result of the first future in the list that is completed @@ -165,7 +165,7 @@ object Future { * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = { - if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) + if (futures.isEmpty) Promise.successful[Option[T]](None) else { val result = Promise[Option[T]]() val ref = new AtomicInteger(futures.size) @@ -196,9 +196,8 @@ object Future { * */ def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { - if (futures.isEmpty) { - new KeptPromise[R](Right(zero)) - } else { + if (futures.isEmpty) Promise.successful(zero) + else { val result = Promise[R]() val results = new ConcurrentLinkedQueue[T]() val done = new Switch(false) @@ -245,8 +244,7 @@ object Future { * */ def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = { - if (futures.isEmpty) - new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) + if (futures.isEmpty) Promise[R].failure(new UnsupportedOperationException("empty reduce left")) else { val result = Promise[R]() val seedFound = new AtomicBoolean(false) @@ -271,7 +269,7 @@ object Future { * */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = - in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a) ⇒ + in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) @@ -364,7 +362,7 @@ object Future { } } -sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] { +sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { implicit def dispatcher: MessageDispatcher @@ -713,12 +711,12 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) } - def block(atMost: Duration)(implicit permit: CanBlock): this.type = + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = if (value.isDefined || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") - def sync(atMost: Duration)(implicit permit: CanBlock): T = - block(atMost).value.get match { + def result(atMost: Duration)(implicit permit: CanAwait): T = + ready(atMost).value.get match { case Left(e) ⇒ throw e case Right(r) ⇒ r } @@ -797,8 +795,8 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis this } - def block(atMost: Duration)(implicit permit: CanBlock): this.type = this - def sync(atMost: Duration)(implicit permit: CanBlock): T = value.get match { + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { case Left(e) ⇒ throw e case Right(r) ⇒ r } diff --git a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala index 6460e1e1aa..4ec0aaf300 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala @@ -183,7 +183,7 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: if (eo eq null) dequeue() else { if (eo.nonEmpty) { - if (_elemOut.compareAndSet(eo, eo.tail)) new KeptPromise(Right(eo.head)) + if (_elemOut.compareAndSet(eo, eo.tail)) Promise.successful(eo.head) else dequeue() } else dequeue(Promise[A]) } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index f71d03c5d8..2bcd6b762b 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorRefProvider import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException -import akka.dispatch.Block +import akka.dispatch.Await object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream @@ -147,7 +147,7 @@ trait LoggingBus extends ActorEventBus { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) implicit val timeout = Timeout(3 seconds) - val response = try Block.sync(actor ? InitializeLogger(this), timeout.duration) catch { + val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala index 638419814f..086bc2aef6 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala @@ -8,7 +8,7 @@ import akka.util.duration._ import akka.actor._ import akka.actor.Actor._ import akka.camel.TypedCamelTestSupport.{ SetExpectedMessageCount ⇒ SetExpectedTestMessageCount, _ } -import akka.dispatch.Block +import akka.dispatch.Await class TypedConsumerPublishRequestorTest extends JUnitSuite { import TypedConsumerPublishRequestorTest._ @@ -40,10 +40,10 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerMethodRegisteredEvent = { Actor.registry.addListener(requestor) - val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds) + val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds) val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props()) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - val event = Block.sync((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds) + val event = Await.result((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds) assert(event.endpointUri === "direct:foo") assert(event.typedActor === obj) assert(event.methodName === "foo") @@ -51,21 +51,21 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerMethodUnregisteredEvent = { - val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds) + val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds) Actor.registry.addListener(requestor) val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props()) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - val ignorableEvent = Block.sync((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds) + val ignorableEvent = Await.result((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds) - val latch2 = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds) + val latch2 = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds) TypedActor.stop(obj) assert(latch2.await(5000, TimeUnit.MILLISECONDS)) - val event = Block.sync((publisher ? GetRetainedMessage).mapTo[ConsumerMethodUnregistered], 3 seconds) + val event = Await.result((publisher ? GetRetainedMessage).mapTo[ConsumerMethodUnregistered], 3 seconds) assert(event.endpointUri === "direct:foo") assert(event.typedActor === obj) @@ -75,23 +75,23 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveThreeConsumerMethodRegisteredEvents = { Actor.registry.addListener(requestor) - val latch = Block.sync((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds) + val latch = Await.result((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds) val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props()) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodRegistered]) - val events = Block.sync((publisher ? request).mapTo[List[ConsumerMethodRegistered]], 3 seconds) + val events = Await.result((publisher ? request).mapTo[List[ConsumerMethodRegistered]], 3 seconds) assert(events.map(_.method.getName).sortWith(_ < _) === List("m2", "m3", "m4")) } @Test def shouldReceiveThreeConsumerMethodUnregisteredEvents = { val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props()) - val latch = Block.sync((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds) + val latch = Await.result((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds) Actor.registry.addListener(requestor) TypedActor.stop(obj) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodUnregistered]) - val events = Block.sync((publisher ? request).mapTo[List[ConsumerMethodUnregistered]], 3 seconds) + val events = Await.result((publisher ? request).mapTo[List[ConsumerMethodUnregistered]], 3 seconds) assert(events.map(_.method.getName).sortWith(_ < _) === List("m2", "m3", "m4")) } } diff --git a/akka-camel/src/main/scala/akka/camel/CamelService.scala b/akka-camel/src/main/scala/akka/camel/CamelService.scala index 673fa65853..0b8a2aece0 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelService.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelService.scala @@ -14,7 +14,7 @@ import akka.japi.{ SideEffect, Option ⇒ JOption } import akka.util.Bootable import TypedCamelAccess._ -import akka.dispatch.Block +import akka.dispatch.Await /** * Publishes consumer actors at their Camel endpoints. Consumer actors are published asynchronously when @@ -165,7 +165,7 @@ trait CamelService extends Bootable { * activations that occurred in the past are not considered. */ private def expectEndpointActivationCount(count: Int): CountDownLatch = - Block.sync((activationTracker ? SetExpectedActivationCount(count)).mapTo[CountDownLatch], 3 seconds) + Await.result((activationTracker ? SetExpectedActivationCount(count)).mapTo[CountDownLatch], 3 seconds) /** * Sets an expectation on the number of upcoming endpoint de-activations and returns @@ -173,7 +173,7 @@ trait CamelService extends Bootable { * de-activations that occurred in the past are not considered. */ private def expectEndpointDeactivationCount(count: Int): CountDownLatch = - Block.sync((activationTracker ? SetExpectedDeactivationCount(count)).mapTo[CountDownLatch], 3 seconds) + Await.result((activationTracker ? SetExpectedDeactivationCount(count)).mapTo[CountDownLatch], 3 seconds) private[camel] def registerPublishRequestor: Unit = Actor.registry.addListener(publishRequestor) diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index b0bb4614e8..c0d0281ab3 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -172,7 +172,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn private def sendSync(exchange: Exchange) = { val actor = target(exchange) - val result: Any = try { Some(Block.sync((actor ? requestFor(exchange), 5 seconds)) } catch { case e ⇒ Some(Failure(e)) } + val result: Any = try { Some(Await.result((actor ? requestFor(exchange), 5 seconds)) } catch { case e ⇒ Some(Failure(e)) } result match { case Some(Ack) ⇒ { /* no response message to set */ } @@ -294,7 +294,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall } def ?(message: Any)(implicit timeout: Timeout): Future[Any] = - new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))) + Promise.failed(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))) def restart(reason: Throwable): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName) diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala index fcebfcb4d6..c675c14cf6 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala @@ -8,7 +8,7 @@ import org.scalatest.junit.JUnitSuite import akka.actor._ import akka.actor.Actor._ import akka.camel.CamelTestSupport.{ SetExpectedMessageCount ⇒ SetExpectedTestMessageCount, _ } -import akka.dispatch.Block +import akka.dispatch.Await class ConsumerPublishRequestorTest extends JUnitSuite { import ConsumerPublishRequestorTest._ @@ -36,19 +36,19 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerRegisteredEvent = { - val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds) + val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds) requestor ! ActorRegistered(consumer.address, consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - assert(Block.sync(publisher ? GetRetainedMessage, 5 seconds) === + assert(Await.result(publisher ? GetRetainedMessage, 5 seconds) === ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer])) } @Test def shouldReceiveOneConsumerUnregisteredEvent = { - val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds) + val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds) requestor ! ActorUnregistered(consumer.address, consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) - assert(Block.sync(publisher ? GetRetainedMessage, 5 seconds) === + assert(Await.result(publisher ? GetRetainedMessage, 5 seconds) === ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer])) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 6330cb39f9..555ecbfe15 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -51,7 +51,7 @@ import RemoteSystemDaemonMessageType._ import com.eaio.uuid.UUID import com.google.protobuf.ByteString -import akka.dispatch.{Block, Dispatchers, Future, PinnedDispatcher} +import akka.dispatch.{Await, Dispatchers, Future, PinnedDispatcher} // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down @@ -1156,7 +1156,7 @@ class DefaultClusterNode private[akka] ( connection ! command } else { try { - Block.sync(connection ? (command, remoteDaemonAckTimeout), 10 seconds).asInstanceOf[Status] match { + Await.result(connection ? (command, remoteDaemonAckTimeout), 10 seconds).asInstanceOf[Status] match { case Success(status) ⇒ EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(status)) case Failure(cause) ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala index aabbe6ff63..50b7741758 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala @@ -85,7 +85,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode { }) - Block.sync(monitorReponse, 5 seconds) must be("Too much memory is used!") + Await.result(monitorReponse, 5 seconds) must be("Too much memory is used!") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala index 7718fb8e59..443bf29364 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/DirectRoutingFailoverMultiJvmSpec.scala @@ -11,7 +11,7 @@ import akka.testkit.{ EventFilter, TestEvent } import java.net.ConnectException import java.nio.channels.NotYetConnectedException import akka.cluster.LocalCluster -import akka.dispatch.Block +import akka.dispatch.Await object DirectRoutingFailoverMultiJvmSpec { @@ -49,7 +49,7 @@ class DirectRoutingFailoverMultiJvmNode1 extends MasterClusterTestNode { } LocalCluster.barrier("verify-actor", NrOfNodes) { - Block.sync(actor ? "identify", timeout.duration) must equal("node2") + Await.result(actor ? "identify", timeout.duration) must equal("node2") } val timer = Timer(30.seconds, true) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala index 5a29882f31..d46f346a05 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala @@ -11,7 +11,7 @@ import java.util.{ Collections, Set ⇒ JSet } import java.net.ConnectException import java.nio.channels.NotYetConnectedException import akka.cluster.LocalCluster._ -import akka.dispatch.Block +import akka.dispatch.Await object RandomFailoverMultiJvmSpec { @@ -92,7 +92,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] for (i ← 0 until 100) { // we should get hits from both nodes in 100 attempts, if not then not very random - val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String] + val value = Await.result(actor ? "identify", timeout.duration).asInstanceOf[String] set.add(value) } set diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_3/Random3ReplicasMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_3/Random3ReplicasMultiJvmSpec.scala index 6faf1e6f75..d042ffe182 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_3/Random3ReplicasMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_3/Random3ReplicasMultiJvmSpec.scala @@ -9,7 +9,7 @@ import akka.actor._ import akka.config.Config import Cluster._ import akka.cluster.LocalCluster._ -import akka.dispatch.Block +import akka.dispatch.Await /** * When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible @@ -79,7 +79,7 @@ class Random3ReplicasMultiJvmNode2 extends ClusterTestNode { } for (i ← 0 until 1000) { - count(Block.sync((hello ? "Hello").mapTo[String], 10 seconds)) + count(Await.result((hello ? "Hello").mapTo[String], 10 seconds)) } val repliesNode1 = replies("World from node [node1]") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala index 1277980b5f..93ea64ab4a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala @@ -12,7 +12,7 @@ import java.net.ConnectException import java.nio.channels.NotYetConnectedException import java.lang.Thread import akka.cluster.LocalCluster._ -import akka.dispatch.Block +import akka.dispatch.Await object RoundRobinFailoverMultiJvmSpec { @@ -95,7 +95,7 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] for (i ← 0 until 100) { - val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String] + val value = Await.result(actor ? "identify", timeout.duration).asInstanceOf[String] set.add(value) } set diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/replicationfactor_2/RoundRobin2ReplicasMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/replicationfactor_2/RoundRobin2ReplicasMultiJvmSpec.scala index fcf0638983..0e595e5111 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/replicationfactor_2/RoundRobin2ReplicasMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/replicationfactor_2/RoundRobin2ReplicasMultiJvmSpec.scala @@ -20,7 +20,7 @@ import akka.cluster.LocalCluster._ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.ConcurrentHashMap -import akka.dispatch.Block +import akka.dispatch.Await /** * When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible @@ -109,7 +109,7 @@ class RoundRobin2ReplicasMultiJvmNode2 extends ClusterTestNode { implicit val timeout = Timeout(Duration(20, "seconds")) for(i <- 1 to 8) - count(Block.sync((hello ? "Hello").mapTo[String], timeout.duration)) + count(Await.result((hello ? "Hello").mapTo[String], timeout.duration)) replies.get("World from node [node1]").get must equal(4) replies.get("World from node [node2]").get must equal(4) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala index e25838f67b..92b95d6274 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/scattergather/failover/ScatterGatherFailoverMultiJvmSpec.scala @@ -11,7 +11,7 @@ import java.nio.channels.NotYetConnectedException import java.lang.Thread import akka.routing.Routing.Broadcast import akka.cluster.LocalCluster._ -import akka.dispatch.Block +import akka.dispatch.Await object ScatterGatherFailoverMultiJvmSpec { @@ -85,7 +85,7 @@ class ScatterGatherFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] for (i ← 0 until NrOfNodes * 2) { - val value = Block.sync(actor ? "foo", timeout.duration).asInstanceOf[String] + val value = Await.result(actor ? "foo", timeout.duration).asInstanceOf[String] set.add(value) } set diff --git a/akka-cluster/src/test/scala/akka/cluster/sample/ComputeGridSample.scala b/akka-cluster/src/test/scala/akka/cluster/sample/ComputeGridSample.scala index 4811b8c9d4..4031fa1b1e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sample/ComputeGridSample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sample/ComputeGridSample.scala @@ -49,7 +49,7 @@ object ComputeGridSample { val fun = () ⇒ "AKKA ROCKS" val futures = local send (fun, 2) // send and invoke function on to two cluster nodes and get result - val result = Block.sync(Futures.fold("")(futures)(_ + " - " + _), timeout) + val result = Await.sync(Futures.fold("")(futures)(_ + " - " + _), timeout) println("===================>>> Cluster says [" + result + "]") local.stop @@ -83,7 +83,7 @@ object ComputeGridSample { val future2 = local send (fun, 2, 1) head // send and invoke function on one cluster node and get result // grab the result from the first one that returns - val result = Block.sync(Futures.firstCompletedOf(List(future1, future2)), timeout) + val result = Await.sync(Futures.firstCompletedOf(List(future1, future2)), timeout) println("===================>>> Cluster says [" + result + "]") local.stop diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index c54c91ab33..6e1c28219d 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -10,7 +10,7 @@ import org.bson.collection._ import akka.actor.ActorCell import akka.event.Logging import akka.actor.ActorRef -import akka.dispatch.{ Block, Promise, Envelope, DefaultPromise } +import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise } import java.util.concurrent.TimeoutException class MongoBasedMailboxException(message: String) extends AkkaException(message) @@ -50,7 +50,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { case Left(t) ⇒ result.failure(t) } }) - Block.on(result, settings.WriteTimeout) + Await.ready(result, settings.WriteTimeout) } def dequeue(): Envelope = withErrorHandling { @@ -75,13 +75,13 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { () } } - try { Block.sync(envelopePromise, settings.ReadTimeout) } catch { case _: TimeoutException ⇒ null } + try { Await.result(envelopePromise, settings.ReadTimeout) } catch { case _: TimeoutException ⇒ null } } def numberOfMessages: Int = { val count = Promise[Int]()(dispatcher) mongo.count()(count.success) - try { Block.sync(count, settings.ReadTimeout).asInstanceOf[Int] } catch { case _: Exception ⇒ -1 } + try { Await.result(count, settings.ReadTimeout).asInstanceOf[Int] } catch { case _: Exception ⇒ -1 } } //TODO review find other solution, this will be very expensive diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 64bad38c79..444f6a0724 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -24,7 +24,7 @@ import scala.annotation.tailrec import com.google.protobuf.ByteString import java.util.concurrent.TimeoutException -import akka.dispatch.Block +import akka.dispatch.Await /** * Interface for node membership change listener. @@ -248,7 +248,7 @@ class Gossiper(remote: Remote) { try { val t = remoteExtension.RemoteSystemDaemonAckTimeout - Block.sync(connection ? (toRemoteMessage(newGossip), t), t) match { + Await.result(connection ? (toRemoteMessage(newGossip), t), t) match { case Success(receiver) ⇒ log.debug("Gossip sent to [{}] was successfully received", receiver) case Failure(cause) ⇒ log.error(cause, cause.toString) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 66dc71f35b..b07d2a5cdf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -168,7 +168,7 @@ class RemoteActorRefProvider( actors.replace(path.toString, creationPromise, actor) actor case actor: InternalActorRef ⇒ actor - case future: Future[_] ⇒ Block.sync(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef] + case future: Future[_] ⇒ Await.result(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef] } } @@ -224,7 +224,7 @@ class RemoteActorRefProvider( if (withACK) { try { val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout) - (try Block.on(f, remoteExtension.RemoteSystemDaemonAckTimeout).value catch { case _: TimeoutException ⇒ None }) match { + (try Await.ready(f, remoteExtension.RemoteSystemDaemonAckTimeout).value catch { case _: TimeoutException ⇒ None }) match { case Some(Right(receiver)) ⇒ log.debug("Remote system command sent to [{}] successfully received", receiver) diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 25c0d6f188..cbda6b16f7 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -186,7 +186,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Gets this agent's value after all currently queued updates have completed. */ - def await(implicit timeout: Timeout): T = Block.sync(future, timeout.duration) + def await(implicit timeout: Timeout): T = Await.result(future, timeout.duration) /** * Map this agent to a new agent, applying the function to the internal state. diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 1c36eaac45..0e655ce807 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -3,9 +3,8 @@ package akka.transactor.example; import akka.actor.ActorSystem; import akka.actor.ActorRef; import akka.actor.Props; -import akka.dispatch.Block; +import akka.dispatch.Await; import akka.dispatch.Future; -import akka.japi.Procedure; import akka.testkit.AkkaSpec; import akka.transactor.Coordinated; @@ -30,9 +29,9 @@ public class UntypedCoordinatedExample { Future future1 = counter1.ask("GetCount", timeout); Future future2 = counter2.ask("GetCount", timeout); - int count1 = (Integer)Block.sync(future1, d); + int count1 = (Integer) Await.result(future1, d); System.out.println("counter 1: " + count1); - int count2 = (Integer)Block.sync(future2, d); + int count2 = (Integer) Await.result(future2, d); System.out.println("counter 1: " + count2); app.stop(); diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 85f9de1784..d3c68c5294 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -3,7 +3,7 @@ package akka.transactor.example; import akka.actor.ActorSystem; import akka.actor.ActorRef; import akka.actor.Props; -import akka.dispatch.Block; +import akka.dispatch.Await; import akka.dispatch.Future; import akka.testkit.AkkaSpec; import akka.util.Duration; @@ -28,9 +28,9 @@ public class UntypedTransactorExample { Future future1 = counter1.ask("GetCount", timeout); Future future2 = counter2.ask("GetCount", timeout); - int count1 = (Integer)Block.sync(future1, d); + int count1 = (Integer) Await.result(future1, d); System.out.println("counter 1: " + count1); - int count2 = (Integer)Block.sync(future2, d); + int count2 = (Integer) Await.result(future2, d); System.out.println("counter 1: " + count2); app.stop(); diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index 1afcd16a62..0f994b05c2 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -2,7 +2,7 @@ package akka.transactor.test; import static org.junit.Assert.*; -import akka.dispatch.Block; +import akka.dispatch.Await; import akka.util.Duration; import org.junit.After; import org.junit.AfterClass; @@ -82,7 +82,7 @@ public class UntypedCoordinatedIncrementTest { } for (ActorRef counter : counters) { Future future = counter.ask("GetCount", askTimeout); - assertEquals(1, ((Integer) Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); + assertEquals(1, ((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); } } @@ -103,7 +103,7 @@ public class UntypedCoordinatedIncrementTest { } for (ActorRef counter : counters) { Futurefuture = counter.ask("GetCount", askTimeout); - assertEquals(0,((Integer)Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); + assertEquals(0,((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 408df14420..c0bc2c53f6 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -2,7 +2,7 @@ package akka.transactor.test; import static org.junit.Assert.*; -import akka.dispatch.Block; +import akka.dispatch.Await; import akka.util.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -11,7 +11,6 @@ import org.junit.Before; import akka.actor.ActorSystem; import akka.actor.ActorRef; -import akka.actor.Actors; import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -27,7 +26,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; import akka.testkit.AkkaSpec; @@ -80,7 +78,7 @@ public class UntypedTransactorTest { } for (ActorRef counter : counters) { Future future = counter.ask("GetCount", askTimeout); - int count = (Integer)Block.sync(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); + int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); assertEquals(1, count); } } @@ -102,7 +100,7 @@ public class UntypedTransactorTest { } for (ActorRef counter : counters) { Future future = counter.ask("GetCount", askTimeout); - int count = (Integer)Block.sync(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); + int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); assertEquals(0, count); } } diff --git a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala b/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala index d23768b276..901e45cd8a 100644 --- a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala +++ b/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala @@ -11,7 +11,7 @@ import akka.util.duration._ import java.util.concurrent.CountDownLatch import akka.testkit.AkkaSpec import akka.testkit._ -import akka.dispatch.Block +import akka.dispatch.Await class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { val latch = new CountDownLatch(num) @@ -63,9 +63,9 @@ class AgentSpec extends AkkaSpec { val r2 = agent.alterOff((s: String) ⇒ { Thread.sleep(2000); s + "c" })(5000) val r3 = agent.alter(_ + "d")(5000) - Block.sync(r1, 5 seconds) must be === "ab" - Block.sync(r2, 5 seconds) must be === "abc" - Block.sync(r3, 5 seconds) must be === "abcd" + Await.result(r1, 5 seconds) must be === "ab" + Await.result(r2, 5 seconds) must be === "abc" + Await.result(r3, 5 seconds) must be === "abcd" agent() must be("abcd") @@ -141,7 +141,7 @@ class AgentSpec extends AkkaSpec { agent send (_ + "b") agent send (_ + "c") - Block.sync(agent.future, timeout.duration) must be("abc") + Await.result(agent.future, timeout.duration) must be("abc") agent.close() } diff --git a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala index 439e03f72a..26ed0f1034 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala @@ -7,7 +7,7 @@ import akka.actor._ import akka.stm.{ Ref, TransactionFactory } import akka.util.duration._ import akka.testkit._ -import akka.dispatch.Block +import akka.dispatch.Await object CoordinatedIncrement { case class Increment(friends: Seq[ActorRef]) @@ -73,7 +73,7 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { counters(0) ! coordinated(Increment(counters.tail)) coordinated.await for (counter ← counters) { - Block.sync((counter ? GetCount).mapTo[Int], timeout.duration) must be === 1 + Await.result((counter ? GetCount).mapTo[Int], timeout.duration) must be === 1 } counters foreach (_.stop()) failer.stop() @@ -90,7 +90,7 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { counters(0) ! Coordinated(Increment(counters.tail :+ failer)) coordinated.await for (counter ← counters) { - Block.sync(counter ? GetCount, timeout.duration) must be === 0 + Await.result(counter ? GetCount, timeout.duration) must be === 0 } counters foreach (_.stop()) failer.stop() diff --git a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala index e229ae794f..c7774920da 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala @@ -11,7 +11,7 @@ import akka.testkit._ import scala.util.Random.{ nextInt ⇒ random } import java.util.concurrent.CountDownLatch import akka.testkit.TestEvent.Mute -import akka.dispatch.Block +import akka.dispatch.Await object FickleFriends { case class FriendlyIncrement(friends: Seq[ActorRef], latch: CountDownLatch) @@ -120,9 +120,9 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { val latch = new CountDownLatch(1) coordinator ! FriendlyIncrement(counters, latch) latch.await // this could take a while - Block.sync(coordinator ? GetCount, timeout.duration) must be === 1 + Await.result(coordinator ? GetCount, timeout.duration) must be === 1 for (counter ← counters) { - Block.sync(counter ? GetCount, timeout.duration) must be === 1 + Await.result(counter ? GetCount, timeout.duration) must be === 1 } counters foreach (_.stop()) coordinator.stop() diff --git a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala index c72778df06..9ad8fabad4 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.stm._ import akka.util.duration._ import akka.testkit._ -import akka.dispatch.Block +import akka.dispatch.Await object TransactorIncrement { case class Increment(friends: Seq[ActorRef], latch: TestLatch) @@ -96,7 +96,7 @@ class TransactorSpec extends AkkaSpec { counters(0) ! Increment(counters.tail, incrementLatch) incrementLatch.await for (counter ← counters) { - Block.sync(counter ? GetCount, timeout.duration) must be === 1 + Await.result(counter ? GetCount, timeout.duration) must be === 1 } counters foreach (_.stop()) failer.stop() @@ -113,7 +113,7 @@ class TransactorSpec extends AkkaSpec { counters(0) ! Increment(counters.tail :+ failer, failLatch) failLatch.await for (counter ← counters) { - Block.sync(counter ? GetCount, timeout.duration) must be === 0 + Await.result(counter ? GetCount, timeout.duration) must be === 0 } counters foreach (_.stop()) failer.stop() diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index b524114046..d037f602af 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -141,7 +141,7 @@ class TestKit(_system: ActorSystem) { def msgAvailable = !queue.isEmpty /** - * Block until the given condition evaluates to `true` or the timeout + * Await until the given condition evaluates to `true` or the timeout * expires, whichever comes first. * * If no timeout is given, take it from the innermost enclosing `within` @@ -536,7 +536,7 @@ object TestKit { private[testkit] val testActorId = new AtomicInteger(0) /** - * Block until the given condition evaluates to `true` or the timeout + * Await until the given condition evaluates to `true` or the timeout * expires, whichever comes first. * * If no timeout is given, take it from the innermost enclosing `within` diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 5083dd85f2..f84174a4ba 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -15,7 +15,7 @@ import akka.actor.PoisonPill import akka.actor.CreateChild import akka.actor.DeadLetter import java.util.concurrent.TimeoutException -import akka.dispatch.{ Block, MessageDispatcher } +import akka.dispatch.{ Await, MessageDispatcher } object TimingTest extends Tag("timing") @@ -64,7 +64,7 @@ abstract class AkkaSpec(_system: ActorSystem) final override def afterAll { system.stop() - try Block.on(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch { + try Await.ready(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch { case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) } atTermination() @@ -140,7 +140,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { system.registerOnTermination(latch.countDown()) system.stop() latch.await(2 seconds) - Block.sync(davyJones ? "Die!", timeout.duration) must be === "finally gone" + Await.result(davyJones ? "Die!", timeout.duration) must be === "finally gone" // this will typically also contain log messages which were sent after the logger shutdown locker must contain(DeadLetter(42, davyJones, probe.ref)) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 8466fa25b3..d525f2a91e 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.event.Logging.Warning -import akka.dispatch.{ Future, Promise, Block } +import akka.dispatch.{ Future, Promise, Await } import akka.util.duration._ import akka.actor.ActorSystem @@ -110,7 +110,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTime def receive = { case _ ⇒ sender ! nested } })) a must not be (null) - val nested = Block.sync((a ? "any").mapTo[ActorRef], timeout.duration) + val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration) nested must not be (null) a must not be theSameInstanceAs(nested) } @@ -121,7 +121,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTime def receive = { case _ ⇒ sender ! nested } })) a must not be (null) - val nested = Block.sync((a ? "any").mapTo[ActorRef], timeout.duration) + val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration) nested must not be (null) a must not be theSameInstanceAs(nested) } @@ -195,7 +195,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTime val f = a ? "work" // CallingThreadDispatcher means that there is no delay f must be('completed) - Block.sync(f, timeout.duration) must equal("workDone") + Await.result(f, timeout.duration) must equal("workDone") } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 6669c70f64..4723070299 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.util.duration._ -import akka.dispatch.{ Block, Future } +import akka.dispatch.{ Await, Future } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TestProbeSpec extends AkkaSpec with DefaultTimeout { @@ -18,7 +18,7 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout { tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher tk.lastMessage.sender ! "world" future must be('completed) - Block.sync(future, timeout.duration) must equal("world") + Await.result(future, timeout.duration) must equal("world") } "reply to messages" in {