diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 43af040d62..83c82e443b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -334,7 +334,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { clientRef ! "simple" clientRef ! "simple" - latch.await + Await.ready(latch, timeout.duration) latch.reset @@ -343,7 +343,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { clientRef ! "simple" clientRef ! "simple" - latch.await + Await.ready(latch, timeout.duration) system.stop(clientRef) system.stop(serverRef) @@ -370,7 +370,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { "restart when Kill:ed" in { filterException[ActorKilledException] { - val latch = new CountDownLatch(2) + val latch = TestLatch(2) val boss = system.actorOf(Props(new Actor { @@ -385,7 +385,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) boss ! "sendKill" - latch.await(5, TimeUnit.SECONDS) must be === true + Await.ready(latch, 5 seconds) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index a856c045c1..a6d6a7df98 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -8,12 +8,14 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.testkit._ import TestEvent.Mute import FSM._ -import akka.util.Duration import akka.util.duration._ import akka.event._ import com.typesafe.config.ConfigFactory +import akka.dispatch.Await +import akka.util.{ Timeout, Duration } object FSMActorSpec { + val timeout = Timeout(2 seconds) class Latches(implicit system: ActorSystem) { val unlockedLatch = TestLatch() @@ -122,7 +124,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im })) lock ! SubscribeTransitionCallBack(transitionTester) - initialStateLatch.await + Await.ready(initialStateLatch, timeout.duration) lock ! '3' lock ! '3' @@ -130,14 +132,14 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im lock ! '2' lock ! '1' - unlockedLatch.await - transitionLatch.await - transitionCallBackLatch.await - lockedLatch.await + Await.ready(unlockedLatch, timeout.duration) + Await.ready(transitionLatch, timeout.duration) + Await.ready(transitionCallBackLatch, timeout.duration) + Await.ready(lockedLatch, timeout.duration) EventFilter.warning(start = "unhandled event", occurrences = 1) intercept { lock ! "not_handled" - unhandledLatch.await + Await.ready(unhandledLatch, timeout.duration) } val answerLatch = TestLatch() @@ -151,10 +153,10 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } })) tester ! Hello - answerLatch.await + Await.ready(answerLatch, timeout.duration) tester ! Bye - terminatedLatch.await + Await.ready(terminatedLatch, timeout.duration) } "log termination" in { @@ -186,7 +188,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } } val ref = system.actorOf(Props(fsm)) - started.await + Await.ready(started, timeout.duration) system.stop(ref) expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 9b1ba99459..757acb1fd0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -188,7 +188,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064, ioManager))) val f1 = client ? ByteString("Hello World!1") val f2 = client ? ByteString("Hello World!2") @@ -205,7 +205,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager())) val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065, ioManager))) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) @@ -219,7 +219,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager(2))) val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8066, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8066, ioManager))) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) @@ -233,7 +233,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer val server = system.actorOf(Props(new KVStore("localhost", 8067, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client1 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager))) val client2 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager))) val f1 = client1 ? (('set, "hello", ByteString("World"))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 9706a77d9f..2671fa9b9a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -8,6 +8,8 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.Await +import java.util.concurrent.TimeoutException @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ReceiveTimeoutSpec extends AkkaSpec { @@ -25,7 +27,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { } })) - timeoutLatch.await + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) system.stop(timeoutActor) } @@ -44,7 +46,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutActor ! Tick - timeoutLatch.await + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) system.stop(timeoutActor) } @@ -67,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutActor ! Tick - timeoutLatch.await + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) count.get must be(1) system.stop(timeoutActor) } @@ -81,7 +83,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { } })) - timeoutLatch.awaitTimeout(1 second) // timeout expected + intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) } system.stop(timeoutActor) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 789bf12e68..b627046052 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -32,7 +32,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val restartLatch = new TestLatch val secondRestartLatch = new TestLatch - val countDownLatch = new CountDownLatch(3) + val countDownLatch = new TestLatch(3) val stopLatch = new TestLatch val slaveProps = Props(new Actor { @@ -60,23 +60,23 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping // test restart and post restart ping - assert(restartLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) // now crash again... should not restart slave ! Crash slave ! Ping - assert(secondRestartLatch.await(10 seconds)) - assert(countDownLatch.await(10, TimeUnit.SECONDS)) + Await.ready(secondRestartLatch, 10 seconds) + Await.ready(countDownLatch, 10 seconds) slave ! Crash - assert(stopLatch.await(10 seconds)) + Await.ready(stopLatch, 10 seconds) } "ensure that slave is immortal without max restarts and time range" in { val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) - val countDownLatch = new CountDownLatch(100) + val countDownLatch = new TestLatch(100) val slaveProps = Props(new Actor { @@ -91,7 +91,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) (1 to 100) foreach { _ ⇒ slave ! Crash } - assert(countDownLatch.await(120, TimeUnit.SECONDS)) + Await.ready(countDownLatch, 2 minutes) assert(!slave.isTerminated) } @@ -131,14 +131,14 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping slave ! Crash - assert(restartLatch.await(10 seconds)) - assert(pingLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) + Await.ready(pingLatch, 10 seconds) slave ! Ping slave ! Crash - assert(secondRestartLatch.await(10 seconds)) - assert(secondPingLatch.await(10 seconds)) + Await.ready(secondRestartLatch, 10 seconds) + Await.ready(secondPingLatch, 10 seconds) // sleep to go out of the restart strategy's time range sleep(700L) @@ -147,7 +147,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Crash slave ! Ping - assert(thirdRestartLatch.await(1 second)) + Await.ready(thirdRestartLatch, 1 second) assert(!slave.isTerminated) } @@ -157,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val restartLatch = new TestLatch val secondRestartLatch = new TestLatch - val countDownLatch = new CountDownLatch(3) + val countDownLatch = new TestLatch(3) val stopLatch = new TestLatch val slaveProps = Props(new Actor { @@ -184,7 +184,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping // test restart and post restart ping - assert(restartLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) assert(!slave.isTerminated) @@ -192,20 +192,20 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Crash slave ! Ping - assert(secondRestartLatch.await(10 seconds)) - assert(countDownLatch.await(10, TimeUnit.SECONDS)) + Await.ready(secondRestartLatch, 10 seconds) + Await.ready(countDownLatch, 10 seconds) sleep(700L) slave ! Crash - assert(stopLatch.await(10 seconds)) + Await.ready(stopLatch, 10 seconds) sleep(500L) assert(slave.isTerminated) } "ensure that slave is not restarted within time range" in { val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch - val countDownLatch = new CountDownLatch(2) + val countDownLatch = new TestLatch(2) val boss = system.actorOf(Props(new Actor { def receive = { @@ -236,7 +236,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping // test restart and post restart ping - assert(restartLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) assert(!slave.isTerminated) @@ -245,14 +245,14 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { // may not be running slave ! Ping - assert(countDownLatch.await(10, TimeUnit.SECONDS)) + Await.ready(countDownLatch, 10 seconds) // may not be running slave ! Crash - assert(stopLatch.await(10 seconds)) + Await.ready(stopLatch, 10 seconds) - assert(maxNoOfRestartsLatch.await(10 seconds)) + Await.ready(maxNoOfRestartsLatch, 10 seconds) sleep(500L) assert(slave.isTerminated) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 96c67dacd7..ba06a90023 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -103,7 +103,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout object Crash val restartLatch = new TestLatch - val pingLatch = new CountDownLatch(6) + val pingLatch = new TestLatch(6) val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000))) val props = Props(new Actor { @@ -122,13 +122,13 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash)) } - assert(restartLatch.await(2 seconds)) + Await.ready(restartLatch, 2 seconds) // should be enough time for the ping countdown to recover and reach 6 pings - assert(pingLatch.await(5, TimeUnit.SECONDS)) + Await.ready(pingLatch, 5 seconds) } "never fire prematurely" in { - val ticks = new CountDownLatch(300) + val ticks = new TestLatch(300) case class Msg(ts: Long) @@ -147,11 +147,11 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout Thread.sleep(5) } - assert(ticks.await(3, TimeUnit.SECONDS) == true) + Await.ready(ticks, 3 seconds) } "schedule with different initial delay and frequency" in { - val ticks = new CountDownLatch(3) + val ticks = new TestLatch(3) case object Msg @@ -162,8 +162,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout })) val startTime = System.nanoTime() - val cancellable = system.scheduler.schedule(1000 milliseconds, 300 milliseconds, actor, Msg) - ticks.await(3, TimeUnit.SECONDS) + val cancellable = system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg) + Await.ready(ticks, 3 seconds) val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 assert(elapsedTimeMs > 1600) diff --git a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala index 38a57fda10..b9fc484957 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala @@ -5,6 +5,7 @@ import akka.actor._ import akka.actor.Actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ListenerSpec extends AkkaSpec { @@ -45,10 +46,10 @@ class ListenerSpec extends AkkaSpec { broadcast ! WithListeners(_ ! "foo") broadcast ! "foo" - barLatch.await + Await.ready(barLatch, TestLatch.DefaultTimeout) barCount.get must be(2) - fooLatch.await + Await.ready(fooLatch, TestLatch.DefaultTimeout) for (a ← List(broadcast, a1, a2, a3)) system.stop(a) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 08b6a766ab..abf37fdff8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -28,10 +28,10 @@ object FutureSpec { class TestDelayActor(await: TestLatch) extends Actor { def receive = { - case "Hello" ⇒ await.await; sender ! "World" - case "NoReply" ⇒ await.await + case "Hello" ⇒ Await.ready(await, TestLatch.DefaultTimeout); sender ! "World" + case "NoReply" ⇒ Await.ready(await, TestLatch.DefaultTimeout) case "Failure" ⇒ - await.await + Await.ready(await, TestLatch.DefaultTimeout) sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) } } @@ -72,7 +72,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val result = "test value" val future = Future { - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) result } test(future) @@ -85,7 +85,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val result = "test value" val future = Future { - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) result } latch.open() @@ -395,7 +395,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val actor = system.actorOf(Props[TestActor]) actor ? "Hello" onSuccess { case "World" ⇒ latch.open() } - assert(latch.await(5 seconds)) + Await.ready(latch, 5 seconds) system.stop(actor) } @@ -426,7 +426,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa intercept[ThrowableTest] { Await.result(f1, timeout.duration) } val latch = new TestLatch - val f2 = Future { latch.await(5 seconds); "success" } + val f2 = Future { Await.ready(latch, 5 seconds); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) f2 onSuccess { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) @@ -441,7 +441,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "shouldBlockUntilResult" in { val latch = new TestLatch - val f = Future { latch.await; 5 } + val f = Future { Await.ready(latch, 5 seconds); 5 } val f2 = Future { Await.result(f, timeout.duration) + 5 } intercept[TimeoutException](Await.ready(f2, 100 millis)) @@ -525,8 +525,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa z() + y() } - assert(ly.await(100 milliseconds)) - lz.awaitTimeout(100 milliseconds) + Await.ready(ly, 100 milliseconds) + intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } flow { x << 5 } @@ -588,20 +588,20 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa lz.open() x1() + x2() } - assert(lx.await(2 seconds)) + Await.ready(lx, 2 seconds) assert(!ly.isOpen) assert(!lz.isOpen) assert(List(x1, x2, y1, y2).forall(_.isCompleted == false)) flow { y1 << 1 } // When this is set, it should cascade down the line - assert(ly.await(2 seconds)) + Await.ready(ly, 2 seconds) assert(Await.result(x1, 1 minute) === 1) assert(!lz.isOpen) flow { y2 << 9 } // When this is set, it should cascade down the line - assert(lz.await(2 seconds)) + Await.ready(lz, 2 seconds) assert(Await.result(x2, 1 minute) === 9) assert(List(x1, x2, y1, y2).forall(_.isCompleted)) @@ -614,16 +614,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val i1, i2, s1, s2 = new TestLatch - val callService1 = Future { i1.open(); s1.await; 1 } - val callService2 = Future { i2.open(); s2.await; 9 } + val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 } + val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 } val result = flow { callService1() + callService2() } assert(!s1.isOpen) assert(!s2.isOpen) assert(!result.isCompleted) - assert(i1.await(2 seconds)) - assert(i2.await(2 seconds)) + Await.ready(i1, 2 seconds) + Await.ready(i2, 2 seconds) s1.open() s2.open() assert(Await.result(result, timeout.duration) === 10) @@ -644,10 +644,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa lz.open() z() + y() + oops } - - ly.awaitTimeout(100 milliseconds) - lz.awaitTimeout(100 milliseconds) - + intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) } + intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } flow { x << 5 } assert(Await.result(y, timeout.duration) === 5) @@ -662,7 +660,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val future = Future { - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) "Hello" } @@ -745,36 +743,36 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "run callbacks async" in { val latch = Vector.fill(10)(new TestLatch) - val f1 = Future { latch(0).open(); latch(1).await; "Hello" } - val f2 = f1 map { s ⇒ latch(2).open(); latch(3).await; s.length } + val f1 = Future { latch(0).open(); Await.ready(latch(1), TestLatch.DefaultTimeout); "Hello" } + val f2 = f1 map { s ⇒ latch(2).open(); Await.ready(latch(3), TestLatch.DefaultTimeout); s.length } f2 foreach (_ ⇒ latch(4).open()) - latch(0).await + Await.ready(latch(0), TestLatch.DefaultTimeout) f1 must not be ('completed) f2 must not be ('completed) latch(1).open() - latch(2).await + Await.ready(latch(2), TestLatch.DefaultTimeout) f1 must be('completed) f2 must not be ('completed) - val f3 = f1 map { s ⇒ latch(5).open(); latch(6).await; s.length * 2 } + val f3 = f1 map { s ⇒ latch(5).open(); Await.ready(latch(6), TestLatch.DefaultTimeout); s.length * 2 } f3 foreach (_ ⇒ latch(3).open()) - latch(5).await + Await.ready(latch(5), TestLatch.DefaultTimeout) f3 must not be ('completed) latch(6).open() - latch(4).await + Await.ready(latch(4), TestLatch.DefaultTimeout) f2 must be('completed) f3 must be('completed) val p1 = Promise[String]() - val f4 = p1 map { s ⇒ latch(7).open(); latch(8).await; s.length } + val f4 = p1 map { s ⇒ latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length } f4 foreach (_ ⇒ latch(9).open()) p1 must not be ('completed) @@ -782,13 +780,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa p1 complete Right("Hello") - latch(7).await + Await.ready(latch(7), TestLatch.DefaultTimeout) p1 must be('completed) f4 must not be ('completed) latch(8).open() - latch(9).await + Await.ready(latch(9), TestLatch.DefaultTimeout) Await.ready(f4, timeout.duration) must be('completed) } @@ -802,9 +800,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Future.blocking(system.dispatcher) val nested = Future(()) nested foreach (_ ⇒ l1.open()) - l1.await // make sure nested is completed + Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed nested foreach (_ ⇒ l2.open()) - l2.await + Await.ready(l2, TestLatch.DefaultTimeout) } Await.ready(complex, timeout.duration) must be('completed) } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index c87e4aeb89..f18fd2e5e1 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -92,8 +92,8 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool ! "a" pool ! "b" - latch.await - successes.await + Await.ready(latch, TestLatch.DefaultTimeout) + Await.ready(successes, TestLatch.DefaultTimeout) count.get must be(2) @@ -180,7 +180,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { loops = 2 loop(500) - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) @@ -189,7 +189,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { loops = 10 loop(500) - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4) @@ -236,7 +236,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { // send a few messages and observe pool at its lower bound loops = 3 loop(500) - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) @@ -245,7 +245,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { loops = 15 loop(500) - latch.await(10 seconds) + Await.ready(latch, 10 seconds) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3) @@ -278,7 +278,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool1 ! "a" pool1 ! "b" - latch1.await + Await.ready(latch1, TestLatch.DefaultTimeout) delegates.size must be(1) system.stop(pool1) @@ -306,7 +306,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool2 ! "a" pool2 ! "b" - latch2.await + Await.ready(latch2, TestLatch.DefaultTimeout) delegates.size must be(2) system.stop(pool2) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 45e86fc387..dd4e45f5cb 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -3,7 +3,6 @@ package akka.routing import akka.actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await @@ -30,8 +29,8 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli "round robin router" must { "be able to shut down its instance" in { - val helloLatch = new CountDownLatch(5) - val stopLatch = new CountDownLatch(5) + val helloLatch = new TestLatch(5) + val stopLatch = new TestLatch(5) val actor = system.actorOf(Props(new Actor { def receive = { @@ -48,16 +47,16 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! "hello" actor ! "hello" actor ! "hello" - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(helloLatch, 5 seconds) system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) val counter = new AtomicInteger var replies = Map.empty[Int, Int] @@ -83,14 +82,14 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli counter.get must be(connectionCount) actor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) replies.values foreach { _ must be(iterationCount) } } "deliver a broadcast message using the !" in { - val helloLatch = new CountDownLatch(5) - val stopLatch = new CountDownLatch(5) + val helloLatch = new TestLatch(5) + val stopLatch = new TestLatch(5) val actor = system.actorOf(Props(new Actor { def receive = { @@ -103,17 +102,17 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli }).withRouter(RoundRobinRouter(5)), "round-robin-broadcast") actor ! Broadcast("hello") - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(helloLatch, 5 seconds) system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } } "random router" must { "be able to shut down its instance" in { - val stopLatch = new CountDownLatch(7) + val stopLatch = new TestLatch(7) val actor = system.actorOf(Props(new Actor { def receive = { @@ -136,13 +135,13 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli } system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } "deliver messages in a random fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) val counter = new AtomicInteger var replies = Map.empty[Int, Int] @@ -168,15 +167,15 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli counter.get must be(connectionCount) actor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) replies.values foreach { _ must be > (0) } replies.values.sum must be === iterationCount * connectionCount } "deliver a broadcast message using the !" in { - val helloLatch = new CountDownLatch(6) - val stopLatch = new CountDownLatch(6) + val helloLatch = new TestLatch(6) + val stopLatch = new TestLatch(6) val actor = system.actorOf(Props(new Actor { def receive = { @@ -189,10 +188,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli }).withRouter(RandomRouter(6)), "random-broadcast") actor ! Broadcast("hello") - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(helloLatch, 5 seconds) system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index d4b5d66a6b..b1d84d8d21 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -68,7 +68,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } "send message to connection" in { - val doneLatch = new CountDownLatch(1) + val doneLatch = new TestLatch(1) val counter = new AtomicInteger(0) @@ -83,7 +83,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ! "hello" routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter.get must be(1) } @@ -102,7 +102,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) //lets create some connections. var actors = new LinkedList[ActorRef] @@ -130,7 +130,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ! Broadcast("end") //now wait some and do validations. - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) for (i ← 0 until connectionCount) { val counter = counters.get(i).get @@ -139,7 +139,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } "deliver a broadcast message using the !" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -162,7 +162,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ! Broadcast(1) routedActor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) @@ -177,7 +177,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } "deliver a broadcast message" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -200,7 +200,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ! Broadcast(1) routedActor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) @@ -214,7 +214,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } "broadcast message using !" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -236,14 +236,14 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ! 1 routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) } "broadcast message using ?" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -267,7 +267,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ? 1 routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) @@ -304,7 +304,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { routedActor ! Broadcast(1) routedActor ! Broadcast("end") - doneLatch.await + Await.ready(doneLatch, TestLatch.DefaultTimeout) counter1.get must be(1) counter2.get must be(1) @@ -317,7 +317,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2)))) routedActor ! Broadcast(Stop(Some(1))) - shutdownLatch.await + Await.ready(shutdownLatch, TestLatch.DefaultTimeout) Await.result(routedActor ? Broadcast(0), timeout.duration) must be(22) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f3697ae904..479de8731b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -41,7 +41,7 @@ object Await { def result(atMost: Duration)(implicit permit: CanAwait): T } - private implicit val permit = new CanAwait {} + private[this] implicit final val permit = new CanAwait {} def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala index 13bb9d84ab..37e5e607fb 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala @@ -5,11 +5,9 @@ package akka.testkit import akka.util.Duration -import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.actor.ActorSystem - -class TestLatchTimeoutException(message: String) extends RuntimeException(message) -class TestLatchNoTimeoutException(message: String) extends RuntimeException(message) +import akka.dispatch.Await.{ CanAwait, Awaitable } +import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } /** * A count down latch wrapper for use in testing. @@ -24,34 +22,23 @@ object TestLatch { def apply(count: Int = 1)(implicit system: ActorSystem) = new TestLatch(count) } -class TestLatch(count: Int = 1)(implicit system: ActorSystem) { +class TestLatch(count: Int = 1)(implicit system: ActorSystem) extends Awaitable[Unit] { private var latch = new CountDownLatch(count) def countDown() = latch.countDown() - def isOpen: Boolean = latch.getCount == 0 - def open() = while (!isOpen) countDown() - - def await(): Boolean = await(TestLatch.DefaultTimeout) - - def await(timeout: Duration): Boolean = { - val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) - if (!opened) throw new TestLatchTimeoutException( - "Timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor)) - opened - } - - /** - * Timeout is expected. Throws exception if latch is opened before timeout. - */ - def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = { - val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) - if (opened) throw new TestLatchNoTimeoutException( - "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor)) - opened - } - def reset() = latch = new CountDownLatch(count) + + def ready(atMost: Duration)(implicit permit: CanAwait) = { + val opened = latch.await(atMost.dilated.toNanos, TimeUnit.NANOSECONDS) + if (!opened) throw new TimeoutException( + "Timeout of %s with time factor of %s" format (atMost.toString, TestKitExtension(system).TestTimeFactor)) + this + } + + def result(atMost: Duration)(implicit permit: CanAwait): Unit = { + ready(atMost) + } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index baab14b9cb..e918a15b1c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -107,7 +107,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { system.actorFor("/") ! PoisonPill - latch.await(2 seconds) + Await.ready(latch, 2 seconds) } "must enqueue unread messages from testActor to deadLetters" in { @@ -139,7 +139,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { val latch = new TestLatch(1)(system) system.registerOnTermination(latch.countDown()) system.shutdown() - latch.await(2 seconds) + Await.ready(latch, 2 seconds) Await.result(davyJones ? "Die!", timeout.duration) must be === "finally gone" // this will typically also contain log messages which were sent after the logger shutdown