diff --git a/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala index 4f772a74a6..33b6509f3b 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/BalancingSpec.scala @@ -6,8 +6,10 @@ package akka.routing import java.net.URLEncoder import java.util.concurrent.atomic.AtomicInteger + import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ + import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props } import akka.testkit.AkkaSpec import akka.testkit.GHExcludeTest @@ -15,6 +17,8 @@ import akka.testkit.ImplicitSender import akka.testkit.TestLatch import org.scalatest.BeforeAndAfterEach +import akka.testkit.TestProbe + object BalancingSpec { val counter = new AtomicInteger(1) @@ -83,28 +87,37 @@ class BalancingSpec extends AkkaSpec(""" counter.set(1) } - def test(pool: ActorRef, startOthers: Promise[Unit]): Unit = { - val iterationCount = 100 + def test(pool: ActorRef, startOthers: Promise[Unit], latch: TestLatch): Unit = { + val probe = TestProbe() + try { + val iterationCount = 100 - for (i <- 1 to iterationCount) { - pool ! i + for (i <- 1 to iterationCount) { + pool.tell(i, probe.ref) + } + + // all but one worker are blocked + val replies1 = probe.receiveN(iterationCount - poolSize + 1) + // all replies from the unblocked worker so far + replies1.toSet should be(Set(1)) + log.debug("worker one: [{}]", probe.lastSender) + probe.expectNoMessage(1.second) + + // Now unblock the other workers from also making progress + startOthers.success(()) + val replies2 = probe.receiveN(poolSize - 1) + // the remaining replies come from the blocked + replies2.toSet should be((2 to poolSize).toSet) + probe.expectNoMessage(500.millis) + } finally { + // careful cleanup since threads may be blocked + probe.watch(pool) + // make sure the latch and promise are not blocking actor threads + startOthers.trySuccess(()) + latch.open() + pool ! PoisonPill + probe.expectTerminated(pool) } - - // all but one worker are blocked - val replies1 = receiveN(iterationCount - poolSize + 1) - // all replies from the unblocked worker so far - replies1.toSet should be(Set(1)) - log.debug("worker one: [{}]", lastSender) - expectNoMessage(1.second) - - // Now unblock the other workers from also making progress - startOthers.success(()) - val replies2 = receiveN(poolSize - 1) - // the remaining replies come from the blocked - replies2.toSet should be((2 to poolSize).toSet) - expectNoMessage(500.millis) - - pool ! PoisonPill } "balancing pool" must { @@ -115,7 +128,7 @@ class BalancingSpec extends AkkaSpec(""" val pool = system.actorOf( BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), name = "balancingPool-1") - test(pool, startOthers) + test(pool, startOthers, latch) } "deliver messages in a balancing fashion when defined in config" taggedAs GHExcludeTest in { @@ -125,7 +138,7 @@ class BalancingSpec extends AkkaSpec(""" system.actorOf( FromConfig().props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), name = "balancingPool-2") - test(pool, startOthers) + test(pool, startOthers, latch) } "deliver messages in a balancing fashion when overridden in config" taggedAs GHExcludeTest in { @@ -135,7 +148,7 @@ class BalancingSpec extends AkkaSpec(""" system.actorOf( BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), name = "balancingPool-3") - test(pool, startOthers) + test(pool, startOthers, latch) } "work with anonymous actor names" in {