diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala index 6f6d5b0dac..f10093c613 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala @@ -8,7 +8,7 @@ import akka.actor.{ ActorRef, Actor } import java.util.concurrent.atomic.AtomicInteger import collection.mutable.LinkedList import akka.routing.Routing.Broadcast -import java.util.concurrent.{ CountDownLatch, TimeUnit } +import akka.testkit._ class Ticket1111Spec extends WordSpec with MustMatchers { @@ -16,7 +16,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers { "return response, even if one of the connections has stopped" in { - val shutdownLatch = new CountDownLatch(1) + val shutdownLatch = new TestLatch(1) val props = RoutedProps() .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) @@ -26,14 +26,14 @@ class Ticket1111Spec extends WordSpec with MustMatchers { actor ! Broadcast(Stop(Some(0))) - shutdownLatch.await(5, TimeUnit.SECONDS) must be(true) + shutdownLatch.await (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) } "throw an exception, if all the connections have stopped" in { - val shutdownLatch = new CountDownLatch(2) + val shutdownLatch = new TestLatch(2) val props = RoutedProps() .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) @@ -43,7 +43,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers { actor ! Broadcast(Stop()) - shutdownLatch.await(5, TimeUnit.SECONDS) must be(true) + shutdownLatch.await (intercept[RoutingException] { actor ? Broadcast(0) @@ -100,7 +100,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers { "deliver one-way messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) var connections = new LinkedList[ActorRef] var counters = new LinkedList[AtomicInteger] @@ -130,7 +130,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers { actor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + doneLatch.await for (i ← 0 until connectionCount) { val counter = counters.get(i).get @@ -139,7 +139,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers { } "deliver a broadcast message using the !" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val connection1 = actorOf(new Actor { @@ -166,7 +166,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers { actor ! Broadcast(1) actor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + doneLatch.await counter1.get must be(1) counter2.get must be(1) @@ -174,13 +174,17 @@ class Ticket1111Spec extends WordSpec with MustMatchers { case class Stop(id: Option[Int] = None) - def newActor(id: Int, shudownLatch: Option[CountDownLatch] = None) = actorOf(new Actor { + def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = actorOf(new Actor { def receive = { - case Stop(None) ⇒ self.stop(); shudownLatch.map(_.countDown()) - case Stop(Some(_id)) if (_id == id) ⇒ self.stop(); shudownLatch.map(_.countDown()) + case Stop(None) ⇒ self.stop() + case Stop(Some(_id)) if (_id == id) ⇒ self.stop() case _id: Int if (_id == id) ⇒ case _ ⇒ Thread sleep 100 * id; tryReply(id) } + + override def postStop = { + shudownLatch foreach (_.countDown()) + } }) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c8a2e2261a..7618883f5b 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -467,6 +467,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = { val responses = connections.versionedIterable.iterable.flatMap { actor ⇒ try { + if (actor.isShutdown) throw new ActorInitializationException("For compatability - check death first") Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]]) } catch { case e: Exception ⇒