Stabilize BalancingSpec (#30673)

* Stabilize BalancingSpec

It seems previously in some cases (despite the 'sleep'
in the first worker) it could happen that one worker
would never get a message. This more explicitly avoids
that scenario.

* Remove dead statement

* scalafmt

* Remove accidentally-committed printlns
This commit is contained in:
Arnout Engelen 2021-09-10 15:44:26 +02:00 committed by GitHub
parent 68a65813a2
commit 64cd855584
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -6,10 +6,8 @@ package akka.routing
import java.net.URLEncoder
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.Actor
import akka.actor.Props
@ -22,24 +20,28 @@ import org.scalatest.BeforeAndAfterEach
object BalancingSpec {
val counter = new AtomicInteger(1)
class Worker(latch: TestLatch) extends Actor {
class Worker(latch: TestLatch, startOthers: Future[Unit]) extends Actor {
lazy val id = counter.getAndIncrement()
override def preStart(): Unit = latch.countDown()
def receive = {
case msg: Int =>
if (id != 1)
Await.ready(latch, 1.minute)
else if (msg <= 10)
Thread.sleep(50) // dispatch to other routees
case _: Int =>
latch.countDown()
if (id == 1) {
// wait for all routees to receive a message before processing
Await.result(latch, 1.minute)
} else {
// wait for the first worker to process messages before also processing
Await.result(startOthers, 1.minute)
}
sender() ! id
}
}
class Parent extends Actor {
val pool =
context.actorOf(BalancingPool(2).props(routeeProps = Props(classOf[Worker], TestLatch(0)(context.system))))
context.actorOf(
BalancingPool(2).props(
routeeProps = Props(classOf[Worker], TestLatch(0)(context.system), Future.successful(()))))
def receive = {
case msg => pool.forward(msg)
@ -73,11 +75,7 @@ class BalancingSpec extends AkkaSpec("""
counter.set(1)
}
def test(pool: ActorRef, latch: TestLatch): Unit = {
// wait until all routees have started
Await.ready(latch, remainingOrDefault)
latch.reset()
def test(pool: ActorRef, startOthers: Promise[Unit]): Unit = {
val iterationCount = 100
for (i <- 1 to iterationCount) {
@ -91,7 +89,8 @@ class BalancingSpec extends AkkaSpec("""
log.warning(lastSender.toString)
expectNoMessage(1.second)
latch.open()
// Now unblock the other workers from also making progress
startOthers.success(())
val replies2 = receiveN(poolSize - 1)
// the remaining replies come from the blocked
replies2.toSet should be((2 to poolSize).toSet)
@ -103,24 +102,31 @@ class BalancingSpec extends AkkaSpec("""
"deliver messages in a balancing fashion when defined programatically" in {
val latch = TestLatch(poolSize)
val startOthers = Promise[Unit]()
val pool = system.actorOf(
BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch)),
BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)),
name = "balancingPool-1")
test(pool, latch)
test(pool, startOthers)
}
"deliver messages in a balancing fashion when defined in config" taggedAs GHExcludeTest in {
val latch = TestLatch(poolSize)
val startOthers = Promise[Unit]()
val pool =
system.actorOf(FromConfig().props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-2")
test(pool, latch)
system.actorOf(
FromConfig().props(routeeProps = Props(classOf[Worker], latch, startOthers.future)),
name = "balancingPool-2")
test(pool, startOthers)
}
"deliver messages in a balancing fashion when overridden in config" taggedAs GHExcludeTest in {
val latch = TestLatch(poolSize)
val startOthers = Promise[Unit]()
val pool =
system.actorOf(BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-3")
test(pool, latch)
system.actorOf(
BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch, startOthers.future)),
name = "balancingPool-3")
test(pool, startOthers)
}
"work with anonymous actor names" in {