Merge pull request #19874 from akka/wip-19873-MetricsBasedResizerSpec-RK
make MetricsBasedResizerSpec deterministic, fixes #19873
This commit is contained in:
commit
19d8da60ee
1 changed files with 70 additions and 50 deletions
|
|
@ -20,11 +20,18 @@ import akka.pattern.ask
|
|||
|
||||
object MetricsBasedResizerSpec {
|
||||
|
||||
case class Latches(first: TestLatch, second: TestLatch)
|
||||
|
||||
/**
|
||||
* The point of these Actors is that their mailbox size will be queried
|
||||
* by the resizer.
|
||||
*/
|
||||
class TestLatchingActor(implicit timeout: Timeout) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case latch: TestLatch ⇒
|
||||
Try(Await.ready(latch, timeout.duration))
|
||||
case Latches(first, second) ⇒
|
||||
first.countDown()
|
||||
Try(Await.ready(second, timeout.duration))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -33,29 +40,26 @@ object MetricsBasedResizerSpec {
|
|||
|
||||
def routees(num: Int = 10)(implicit system: ActorSystem, timeout: Timeout) = (1 to num).map(_ ⇒ routee).toVector
|
||||
|
||||
case class TestRouter(routees: Vector[ActorRefRoutee], resizer: Resizer)(implicit system: ActorSystem, timeout: Timeout) {
|
||||
|
||||
system.registerOnTermination(close())
|
||||
case class TestRouter(routees: Vector[ActorRefRoutee])(implicit system: ActorSystem, timeout: Timeout) {
|
||||
|
||||
var msgs: Set[TestLatch] = Set()
|
||||
|
||||
def mockSend(l: TestLatch = TestLatch(),
|
||||
routeeIdx: Int = Random.nextInt(routees.length),
|
||||
wait: Boolean = true)(implicit sender: ActorRef): TestLatch = {
|
||||
def mockSend(await: Boolean,
|
||||
l: TestLatch = TestLatch(),
|
||||
routeeIdx: Int = Random.nextInt(routees.length)): Latches = {
|
||||
val target = routees(routeeIdx)
|
||||
target.send(l, sender)
|
||||
val first = TestLatch()
|
||||
val latches = Latches(first, l)
|
||||
target.send(latches, Actor.noSender)
|
||||
msgs = msgs + l
|
||||
if (wait) waitForMessageToArrive()
|
||||
l
|
||||
if (await) Await.ready(first, timeout.duration)
|
||||
latches
|
||||
}
|
||||
|
||||
def waitForMessageToArrive(): Unit = Thread.sleep(1.milliseconds.dilated.toMillis)
|
||||
|
||||
def close(): Unit = msgs.foreach(_.open())
|
||||
|
||||
def sendToAll()(implicit sender: ActorRef): Seq[TestLatch] = {
|
||||
val sentMessages = (0 until routees.length).map(i ⇒ mockSend(routeeIdx = i, wait = false))
|
||||
waitForMessageToArrive()
|
||||
def sendToAll(await: Boolean): Seq[Latches] = {
|
||||
val sentMessages = (0 until routees.length).map(i ⇒ mockSend(await, routeeIdx = i))
|
||||
sentMessages
|
||||
}
|
||||
|
||||
|
|
@ -105,18 +109,18 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
|
||||
"record last totalQueueLength correctly" in {
|
||||
val resizer = DefaultOptimalSizeExploringResizer()
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
val router = TestRouter(routees(2))
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
|
||||
resizer.record.totalQueueLength shouldBe 0
|
||||
|
||||
router.mockSend()
|
||||
router.mockSend()
|
||||
router.sendToAll(await = true)
|
||||
router.mockSend(await = false) // test one message in mailbox and one in each ActorCell
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
resizer.record.totalQueueLength shouldBe 2
|
||||
resizer.record.totalQueueLength shouldBe 3
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"start an underutilizationStreak when not fully utilized" in {
|
||||
|
|
@ -132,11 +136,13 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
resizer.record = ResizeRecord(
|
||||
underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now.minusHours(1), highestUtilization = 1)))
|
||||
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
router.sendToAll()
|
||||
val router = TestRouter(routees(2))
|
||||
router.sendToAll(await = true)
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
resizer.record.underutilizationStreak shouldBe empty
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"leave the underutilizationStreak start date unchanged when not fully utilized" in {
|
||||
|
|
@ -154,12 +160,13 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
resizer.record = ResizeRecord(
|
||||
underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 2)))
|
||||
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
router.mockSend()
|
||||
val router = TestRouter(routees(2))
|
||||
router.mockSend(await = true)
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"update the underutilizationStreak highestUtilization if current utilization is higher" in {
|
||||
|
|
@ -167,24 +174,27 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
resizer.record = ResizeRecord(
|
||||
underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 1)))
|
||||
|
||||
val router = TestRouter(routees(3), resizer)
|
||||
router.mockSend(routeeIdx = 0)
|
||||
router.mockSend(routeeIdx = 1)
|
||||
val router = TestRouter(routees(3))
|
||||
router.mockSend(await = true, routeeIdx = 0)
|
||||
router.mockSend(await = true, routeeIdx = 1)
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"not record a performance log when it's not fully utilized in two consecutive checks" in {
|
||||
val resizer = DefaultOptimalSizeExploringResizer()
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
val router = TestRouter(routees(2))
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
|
||||
router.sendToAll()
|
||||
router.sendToAll(await = true)
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
|
||||
resizer.performanceLog shouldBe empty
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"not record the performance log when no message is processed" in {
|
||||
|
|
@ -194,44 +204,51 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
messageCount = 2,
|
||||
checkTime = System.nanoTime())
|
||||
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
|
||||
router.sendToAll()
|
||||
val router = TestRouter(routees(2))
|
||||
|
||||
router.sendToAll(await = true)
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
|
||||
resizer.performanceLog shouldBe empty
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"record the performance log with the correct pool size" in {
|
||||
val resizer = DefaultOptimalSizeExploringResizer()
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
val msgs = router.sendToAll()
|
||||
val router = TestRouter(routees(2))
|
||||
val msgs = router.sendToAll(await = true)
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
msgs.head.open()
|
||||
msgs.head.second.open()
|
||||
|
||||
router.sendToAll()
|
||||
router.mockSend(await = true, routeeIdx = 0)
|
||||
router.mockSend(await = false, routeeIdx = 1)
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
resizer.performanceLog.get(2) should not be empty
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"record the performance log with the correct process speed" in {
|
||||
val resizer = DefaultOptimalSizeExploringResizer()
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
val msgs = router.sendToAll()
|
||||
router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed.
|
||||
val router = TestRouter(routees(2))
|
||||
val msgs1 = router.sendToAll(await = true)
|
||||
val msgs2 = router.sendToAll(await = false) //make sure the routees are still busy after the first batch of messages get processed.
|
||||
|
||||
val before = LocalDateTime.now
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records
|
||||
|
||||
msgs.foreach(_.open()) //process two messages
|
||||
msgs1.foreach(_.second.open()) //process two messages
|
||||
|
||||
Thread.sleep(1) // wait for routees to update their mail boxes
|
||||
// wait for routees to update their mail boxes
|
||||
msgs2.foreach(l ⇒ Await.ready(l.first, timeout.duration))
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
|
||||
val after = LocalDateTime.now
|
||||
resizer.performanceLog(2).toMillis shouldBe (java.time.Duration.between(before, after).toMillis / 2 +- 1)
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
"update the old performance log entry with updated speed " in {
|
||||
|
|
@ -241,17 +258,17 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
|
||||
resizer.performanceLog = Map(2 → oldSpeed.milliseconds)
|
||||
|
||||
val router = TestRouter(routees(2), resizer)
|
||||
val msgs = router.sendToAll()
|
||||
|
||||
router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed.
|
||||
val router = TestRouter(routees(2))
|
||||
val msgs1 = router.sendToAll(await = true)
|
||||
val msgs2 = router.sendToAll(await = false) //make sure the routees are still busy after the first batch of messages get processed.
|
||||
|
||||
val before = LocalDateTime.now
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records
|
||||
|
||||
msgs.foreach(_.open()) //process two messages
|
||||
msgs1.foreach(_.second.open()) //process two messages
|
||||
|
||||
Thread.sleep(1) // wait for routees to update their mail boxes
|
||||
// wait for routees to update their mail boxes
|
||||
msgs2.foreach(l ⇒ Await.ready(l.first, timeout.duration))
|
||||
|
||||
resizer.reportMessageCount(router.routees, router.msgs.size)
|
||||
|
||||
|
|
@ -259,6 +276,8 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
val newSpeed = java.time.Duration.between(before, after).toMillis / 2
|
||||
|
||||
resizer.performanceLog(2).toMillis shouldBe ((newSpeed + oldSpeed) / 2 +- 1)
|
||||
|
||||
router.close()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -333,10 +352,11 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT
|
|||
|
||||
val resizer = DefaultOptimalSizeExploringResizer(lowerBound = 2)
|
||||
val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props(Props(new TestLatchingActor)))
|
||||
Thread.sleep(10)
|
||||
val latches = Latches(TestLatch(), TestLatch(0))
|
||||
router ! latches
|
||||
Await.ready(latches.first, timeout.duration)
|
||||
|
||||
poolSize(router) shouldBe resizer.lowerBound
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue