From bb40c1ae307bdb5147b4de03efb7f239f8d2d424 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 14:43:20 +0100 Subject: [PATCH] tweak ResizerSpec to work better with async Resize(), see #1814 - previously relied on resize() being invoked before enqueueing to the mailbox, which is not at all guaranteed any longer. --- .../test/scala/akka/routing/ResizerSpec.scala | 52 +++++++++---------- .../src/main/scala/akka/routing/Routing.scala | 3 +- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 26b5021c18..b9765c8e92 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -11,6 +11,7 @@ import akka.util.duration._ import akka.actor.ActorRef import java.util.concurrent.atomic.AtomicInteger import akka.pattern.ask +import akka.util.Duration object ResizerSpec { @@ -161,53 +162,48 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with // as influenced by the backlog of blocking pooled actors val resizer = DefaultResizer( - lowerBound = 2, - upperBound = 4, + lowerBound = 3, + upperBound = 5, rampupRate = 0.1, + backoffRate = 0.0, pressureThreshold = 1, messagesPerResize = 1, backoffThreshold = 0.0) val router = system.actorOf(Props(new Actor { def receive = { - case (n: Int, latch: TestLatch, count: AtomicInteger) ⇒ - (n millis).dilated.sleep - count.incrementAndGet - latch.countDown() + case d: Duration ⇒ d.dilated.sleep; sender ! "done" + case "echo" ⇒ sender ! "reply" } }).withRouter(RoundRobinRouter(resizer = Some(resizer)))) // first message should create the minimum number of routees - router ! 1 + router ! "echo" + expectMsg("reply") - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + def routees(r: ActorRef): Int = { + r ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size + } - def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - (100 millis).dilated.sleep - for (m ← 0 until loops) { - router.!((t, latch, count)) - (100 millis).dilated.sleep - } + routees(router) must be(3) + + def loop(loops: Int, d: Duration) = { + for (m ← 0 until loops) router ! d + for (m ← 0 until loops) expectMsg(d * 2, "done") } // 2 more should go thru without triggering more - val count1 = new AtomicInteger - val latch1 = TestLatch(2) - loop(2, 200, latch1, count1) - Await.ready(latch1, TestLatch.DefaultTimeout) - count1.get must be(2) + loop(2, 200 millis) - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + routees(router) must be(3) // a whole bunch should max it out - val count2 = new AtomicInteger - val latch2 = TestLatch(10) - loop(10, 500, latch2, count2) - Await.ready(latch2, TestLatch.DefaultTimeout) - count2.get must be(10) - - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4) + loop(4, 500 millis) + awaitCond(routees(router) == 4) + loop(10, 500 millis) + awaitCond(routees(router) == 5) } "backoff" in { @@ -240,7 +236,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with (300 millis).dilated.sleep // let it cool down - for (m ← 0 to 3) { + for (m ← 0 to 5) { router ! 1 (500 millis).dilated.sleep } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index da2c81d1a7..b050a21b53 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1028,7 +1028,8 @@ case class DefaultResizer( */ def capacity(routees: IndexedSeq[ActorRef]): Int = { val currentSize = routees.size - val delta = filter(pressure(routees), currentSize) + val press = pressure(routees) + val delta = filter(press, currentSize) val proposed = currentSize + delta if (proposed < lowerBound) delta + (lowerBound - proposed)