Harden BalancingSpec, #30686 (#30785)

* I think it was a thread starvation problem because next test step could
  start before the previous pool had been terminated.
* Many (at least 5) threads are blocked in this test and AkkaSpec defines a max of 8.
This commit is contained in:
Patrik Nordwall 2021-10-20 10:26:18 +02:00 committed by GitHub
parent 4ef9b31d8e
commit 96264446cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -6,8 +6,10 @@ package akka.routing
import java.net.URLEncoder import java.net.URLEncoder
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props } import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.GHExcludeTest import akka.testkit.GHExcludeTest
@ -15,6 +17,8 @@ import akka.testkit.ImplicitSender
import akka.testkit.TestLatch import akka.testkit.TestLatch
import org.scalatest.BeforeAndAfterEach import org.scalatest.BeforeAndAfterEach
import akka.testkit.TestProbe
object BalancingSpec { object BalancingSpec {
val counter = new AtomicInteger(1) val counter = new AtomicInteger(1)
@ -83,28 +87,37 @@ class BalancingSpec extends AkkaSpec("""
counter.set(1) counter.set(1)
} }
def test(pool: ActorRef, startOthers: Promise[Unit]): Unit = { def test(pool: ActorRef, startOthers: Promise[Unit], latch: TestLatch): Unit = {
val iterationCount = 100 val probe = TestProbe()
try {
val iterationCount = 100
for (i <- 1 to iterationCount) { for (i <- 1 to iterationCount) {
pool ! i pool.tell(i, probe.ref)
}
// all but one worker are blocked
val replies1 = probe.receiveN(iterationCount - poolSize + 1)
// all replies from the unblocked worker so far
replies1.toSet should be(Set(1))
log.debug("worker one: [{}]", probe.lastSender)
probe.expectNoMessage(1.second)
// Now unblock the other workers from also making progress
startOthers.success(())
val replies2 = probe.receiveN(poolSize - 1)
// the remaining replies come from the blocked
replies2.toSet should be((2 to poolSize).toSet)
probe.expectNoMessage(500.millis)
} finally {
// careful cleanup since threads may be blocked
probe.watch(pool)
// make sure the latch and promise are not blocking actor threads
startOthers.trySuccess(())
latch.open()
pool ! PoisonPill
probe.expectTerminated(pool)
} }
// all but one worker are blocked
val replies1 = receiveN(iterationCount - poolSize + 1)
// all replies from the unblocked worker so far
replies1.toSet should be(Set(1))
log.debug("worker one: [{}]", lastSender)
expectNoMessage(1.second)
// Now unblock the other workers from also making progress
startOthers.success(())
val replies2 = receiveN(poolSize - 1)
// the remaining replies come from the blocked
replies2.toSet should be((2 to poolSize).toSet)
expectNoMessage(500.millis)
pool ! PoisonPill
} }
"balancing pool" must { "balancing pool" must {
@ -115,7 +128,7 @@ class BalancingSpec extends AkkaSpec("""
val pool = system.actorOf( val pool = system.actorOf(
BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)),
name = "balancingPool-1") name = "balancingPool-1")
test(pool, startOthers) test(pool, startOthers, latch)
} }
"deliver messages in a balancing fashion when defined in config" taggedAs GHExcludeTest in { "deliver messages in a balancing fashion when defined in config" taggedAs GHExcludeTest in {
@ -125,7 +138,7 @@ class BalancingSpec extends AkkaSpec("""
system.actorOf( system.actorOf(
FromConfig().props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), FromConfig().props(routeeProps = Props(classOf[Worker], latch, startOthers.future)),
name = "balancingPool-2") name = "balancingPool-2")
test(pool, startOthers) test(pool, startOthers, latch)
} }
"deliver messages in a balancing fashion when overridden in config" taggedAs GHExcludeTest in { "deliver messages in a balancing fashion when overridden in config" taggedAs GHExcludeTest in {
@ -135,7 +148,7 @@ class BalancingSpec extends AkkaSpec("""
system.actorOf( system.actorOf(
BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)), BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)),
name = "balancingPool-3") name = "balancingPool-3")
test(pool, startOthers) test(pool, startOthers, latch)
} }
"work with anonymous actor names" in { "work with anonymous actor names" in {