tweak ResizerSpec to work better with async Resize(), see #1814
- previously relied on resize() being invoked before enqueueing to the mailbox, which is not at all guaranteed any longer.
This commit is contained in:
parent
251a7cc7e3
commit
bb40c1ae30
2 changed files with 26 additions and 29 deletions
|
|
@ -11,6 +11,7 @@ import akka.util.duration._
|
|||
import akka.actor.ActorRef
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.pattern.ask
|
||||
import akka.util.Duration
|
||||
|
||||
object ResizerSpec {
|
||||
|
||||
|
|
@ -161,53 +162,48 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
|||
// as influenced by the backlog of blocking pooled actors
|
||||
|
||||
val resizer = DefaultResizer(
|
||||
lowerBound = 2,
|
||||
upperBound = 4,
|
||||
lowerBound = 3,
|
||||
upperBound = 5,
|
||||
rampupRate = 0.1,
|
||||
backoffRate = 0.0,
|
||||
pressureThreshold = 1,
|
||||
messagesPerResize = 1,
|
||||
backoffThreshold = 0.0)
|
||||
|
||||
val router = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case (n: Int, latch: TestLatch, count: AtomicInteger) ⇒
|
||||
(n millis).dilated.sleep
|
||||
count.incrementAndGet
|
||||
latch.countDown()
|
||||
case d: Duration ⇒ d.dilated.sleep; sender ! "done"
|
||||
case "echo" ⇒ sender ! "reply"
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
|
||||
|
||||
// first message should create the minimum number of routees
|
||||
router ! 1
|
||||
router ! "echo"
|
||||
expectMsg("reply")
|
||||
|
||||
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
|
||||
def routees(r: ActorRef): Int = {
|
||||
r ! CurrentRoutees
|
||||
expectMsgType[RouterRoutees].routees.size
|
||||
}
|
||||
|
||||
def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = {
|
||||
(100 millis).dilated.sleep
|
||||
for (m ← 0 until loops) {
|
||||
router.!((t, latch, count))
|
||||
(100 millis).dilated.sleep
|
||||
}
|
||||
routees(router) must be(3)
|
||||
|
||||
def loop(loops: Int, d: Duration) = {
|
||||
for (m ← 0 until loops) router ! d
|
||||
for (m ← 0 until loops) expectMsg(d * 2, "done")
|
||||
}
|
||||
|
||||
// 2 more should go thru without triggering more
|
||||
val count1 = new AtomicInteger
|
||||
val latch1 = TestLatch(2)
|
||||
loop(2, 200, latch1, count1)
|
||||
Await.ready(latch1, TestLatch.DefaultTimeout)
|
||||
count1.get must be(2)
|
||||
loop(2, 200 millis)
|
||||
|
||||
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
|
||||
routees(router) must be(3)
|
||||
|
||||
// a whole bunch should max it out
|
||||
val count2 = new AtomicInteger
|
||||
val latch2 = TestLatch(10)
|
||||
loop(10, 500, latch2, count2)
|
||||
Await.ready(latch2, TestLatch.DefaultTimeout)
|
||||
count2.get must be(10)
|
||||
|
||||
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4)
|
||||
loop(4, 500 millis)
|
||||
awaitCond(routees(router) == 4)
|
||||
|
||||
loop(10, 500 millis)
|
||||
awaitCond(routees(router) == 5)
|
||||
}
|
||||
|
||||
"backoff" in {
|
||||
|
|
@ -240,7 +236,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
|||
(300 millis).dilated.sleep
|
||||
|
||||
// let it cool down
|
||||
for (m ← 0 to 3) {
|
||||
for (m ← 0 to 5) {
|
||||
router ! 1
|
||||
(500 millis).dilated.sleep
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1028,7 +1028,8 @@ case class DefaultResizer(
|
|||
*/
|
||||
def capacity(routees: IndexedSeq[ActorRef]): Int = {
|
||||
val currentSize = routees.size
|
||||
val delta = filter(pressure(routees), currentSize)
|
||||
val press = pressure(routees)
|
||||
val delta = filter(press, currentSize)
|
||||
val proposed = currentSize + delta
|
||||
|
||||
if (proposed < lowerBound) delta + (lowerBound - proposed)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue