diff --git a/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala index d86666ab00..f76087d0fc 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala @@ -6,10 +6,8 @@ package akka.routing import java.net.URLEncoder import java.util.concurrent.atomic.AtomicInteger - -import scala.concurrent.Await +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ - import akka.actor.ActorRef import akka.actor.Actor import akka.actor.Props @@ -22,24 +20,28 @@ import org.scalatest.BeforeAndAfterEach object BalancingSpec { val counter = new AtomicInteger(1) - class Worker(latch: TestLatch) extends Actor { + class Worker(latch: TestLatch, startOthers: Future[Unit]) extends Actor { lazy val id = counter.getAndIncrement() - override def preStart(): Unit = latch.countDown() - def receive = { - case msg: Int => - if (id != 1) - Await.ready(latch, 1.minute) - else if (msg <= 10) - Thread.sleep(50) // dispatch to other routees + case _: Int => + latch.countDown() + if (id == 1) { + // wait for all routees to receive a message before processing + Await.result(latch, 1.minute) + } else { + // wait for the first worker to process messages before also processing + Await.result(startOthers, 1.minute) + } sender() ! id } } class Parent extends Actor { val pool = - context.actorOf(BalancingPool(2).props(routeeProps = Props(classOf[Worker], TestLatch(0)(context.system)))) + context.actorOf( + BalancingPool(2).props( + routeeProps = Props(classOf[Worker], TestLatch(0)(context.system), Future.successful(())))) def receive = { case msg => pool.forward(msg) @@ -73,11 +75,7 @@ class BalancingSpec extends AkkaSpec(""" counter.set(1) } - def test(pool: ActorRef, latch: TestLatch): Unit = { - // wait until all routees have started - Await.ready(latch, remainingOrDefault) - - latch.reset() + def test(pool: ActorRef, startOthers: Promise[Unit]): Unit = { val iterationCount = 100 for (i <- 1 to iterationCount) { @@ -91,7 +89,8 @@ class BalancingSpec extends AkkaSpec(""" log.warning(lastSender.toString) expectNoMessage(1.second) - latch.open() + // Now unblock the other workers from also making progress + startOthers.success(()) val replies2 = receiveN(poolSize - 1) // the remaining replies come from the blocked replies2.toSet should be((2 to poolSize).toSet) @@ -103,24 +102,31 @@ class BalancingSpec extends AkkaSpec(""" "deliver messages in a balancing fashion when defined programatically" in { val latch = TestLatch(poolSize) + val startOthers = Promise[Unit]() val pool = system.actorOf( - BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch)), + BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), name = "balancingPool-1") - test(pool, latch) + test(pool, startOthers) } "deliver messages in a balancing fashion when defined in config" taggedAs GHExcludeTest in { val latch = TestLatch(poolSize) + val startOthers = Promise[Unit]() val pool = - system.actorOf(FromConfig().props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-2") - test(pool, latch) + system.actorOf( + FromConfig().props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), + name = "balancingPool-2") + test(pool, startOthers) } "deliver messages in a balancing fashion when overridden in config" taggedAs GHExcludeTest in { val latch = TestLatch(poolSize) + val startOthers = Promise[Unit]() val pool = - system.actorOf(BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-3") - test(pool, latch) + system.actorOf( + BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), + name = "balancingPool-3") + test(pool, startOthers) } "work with anonymous actor names" in {