diff --git a/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala index b89d41a791..d8cd86fe33 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala @@ -20,11 +20,18 @@ import akka.pattern.ask object MetricsBasedResizerSpec { + case class Latches(first: TestLatch, second: TestLatch) + + /** + * The point of these Actors is that their mailbox size will be queried + * by the resizer. + */ class TestLatchingActor(implicit timeout: Timeout) extends Actor { def receive = { - case latch: TestLatch ⇒ - Try(Await.ready(latch, timeout.duration)) + case Latches(first, second) ⇒ + first.countDown() + Try(Await.ready(second, timeout.duration)) } } @@ -33,29 +40,26 @@ object MetricsBasedResizerSpec { def routees(num: Int = 10)(implicit system: ActorSystem, timeout: Timeout) = (1 to num).map(_ ⇒ routee).toVector - case class TestRouter(routees: Vector[ActorRefRoutee], resizer: Resizer)(implicit system: ActorSystem, timeout: Timeout) { - - system.registerOnTermination(close()) + case class TestRouter(routees: Vector[ActorRefRoutee])(implicit system: ActorSystem, timeout: Timeout) { var msgs: Set[TestLatch] = Set() - def mockSend(l: TestLatch = TestLatch(), - routeeIdx: Int = Random.nextInt(routees.length), - wait: Boolean = true)(implicit sender: ActorRef): TestLatch = { + def mockSend(await: Boolean, + l: TestLatch = TestLatch(), + routeeIdx: Int = Random.nextInt(routees.length)): Latches = { val target = routees(routeeIdx) - target.send(l, sender) + val first = TestLatch() + val latches = Latches(first, l) + target.send(latches, Actor.noSender) msgs = msgs + l - if (wait) waitForMessageToArrive() - l + if (await) Await.ready(first, timeout.duration) + latches } - def waitForMessageToArrive(): Unit = Thread.sleep(1.milliseconds.dilated.toMillis) - def close(): Unit = msgs.foreach(_.open()) - def sendToAll()(implicit sender: ActorRef): Seq[TestLatch] = { - val sentMessages = (0 until routees.length).map(i ⇒ mockSend(routeeIdx = i, wait = false)) - waitForMessageToArrive() + def sendToAll(await: Boolean): Seq[Latches] = { + val sentMessages = (0 until routees.length).map(i ⇒ mockSend(await, routeeIdx = i)) sentMessages } @@ -105,18 +109,18 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT "record last totalQueueLength correctly" in { val resizer = DefaultOptimalSizeExploringResizer() - val router = TestRouter(routees(2), resizer) + val router = TestRouter(routees(2)) resizer.reportMessageCount(router.routees, router.msgs.size) - resizer.record.totalQueueLength shouldBe 0 - router.mockSend() - router.mockSend() + router.sendToAll(await = true) + router.mockSend(await = false) // test one message in mailbox and one in each ActorCell resizer.reportMessageCount(router.routees, router.msgs.size) - resizer.record.totalQueueLength shouldBe 2 + resizer.record.totalQueueLength shouldBe 3 + router.close() } "start an underutilizationStreak when not fully utilized" in { @@ -132,11 +136,13 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT resizer.record = ResizeRecord( underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now.minusHours(1), highestUtilization = 1))) - val router = TestRouter(routees(2), resizer) - router.sendToAll() + val router = TestRouter(routees(2)) + router.sendToAll(await = true) resizer.reportMessageCount(router.routees, router.msgs.size) resizer.record.underutilizationStreak shouldBe empty + + router.close() } "leave the underutilizationStreak start date unchanged when not fully utilized" in { @@ -154,12 +160,13 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT resizer.record = ResizeRecord( underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 2))) - val router = TestRouter(routees(2), resizer) - router.mockSend() + val router = TestRouter(routees(2)) + router.mockSend(await = true) resizer.reportMessageCount(router.routees, router.msgs.size) resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2 + router.close() } "update the underutilizationStreak highestUtilization if current utilization is higher" in { @@ -167,24 +174,27 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT resizer.record = ResizeRecord( underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 1))) - val router = TestRouter(routees(3), resizer) - router.mockSend(routeeIdx = 0) - router.mockSend(routeeIdx = 1) + val router = TestRouter(routees(3)) + router.mockSend(await = true, routeeIdx = 0) + router.mockSend(await = true, routeeIdx = 1) resizer.reportMessageCount(router.routees, router.msgs.size) resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2 + router.close() } "not record a performance log when it's not fully utilized in two consecutive checks" in { val resizer = DefaultOptimalSizeExploringResizer() - val router = TestRouter(routees(2), resizer) + val router = TestRouter(routees(2)) resizer.reportMessageCount(router.routees, router.msgs.size) - router.sendToAll() + router.sendToAll(await = true) resizer.reportMessageCount(router.routees, router.msgs.size) resizer.performanceLog shouldBe empty + + router.close() } "not record the performance log when no message is processed" in { @@ -194,44 +204,51 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT messageCount = 2, checkTime = System.nanoTime()) - val router = TestRouter(routees(2), resizer) - - router.sendToAll() + val router = TestRouter(routees(2)) + router.sendToAll(await = true) resizer.reportMessageCount(router.routees, router.msgs.size) resizer.performanceLog shouldBe empty + + router.close() } "record the performance log with the correct pool size" in { val resizer = DefaultOptimalSizeExploringResizer() - val router = TestRouter(routees(2), resizer) - val msgs = router.sendToAll() + val router = TestRouter(routees(2)) + val msgs = router.sendToAll(await = true) resizer.reportMessageCount(router.routees, router.msgs.size) - msgs.head.open() + msgs.head.second.open() - router.sendToAll() + router.mockSend(await = true, routeeIdx = 0) + router.mockSend(await = false, routeeIdx = 1) resizer.reportMessageCount(router.routees, router.msgs.size) resizer.performanceLog.get(2) should not be empty + + router.close() } "record the performance log with the correct process speed" in { val resizer = DefaultOptimalSizeExploringResizer() - val router = TestRouter(routees(2), resizer) - val msgs = router.sendToAll() - router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed. + val router = TestRouter(routees(2)) + val msgs1 = router.sendToAll(await = true) + val msgs2 = router.sendToAll(await = false) //make sure the routees are still busy after the first batch of messages get processed. val before = LocalDateTime.now resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records - msgs.foreach(_.open()) //process two messages + msgs1.foreach(_.second.open()) //process two messages - Thread.sleep(1) // wait for routees to update their mail boxes + // wait for routees to update their mail boxes + msgs2.foreach(l ⇒ Await.ready(l.first, timeout.duration)) resizer.reportMessageCount(router.routees, router.msgs.size) val after = LocalDateTime.now resizer.performanceLog(2).toMillis shouldBe (java.time.Duration.between(before, after).toMillis / 2 +- 1) + + router.close() } "update the old performance log entry with updated speed " in { @@ -241,17 +258,17 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT resizer.performanceLog = Map(2 → oldSpeed.milliseconds) - val router = TestRouter(routees(2), resizer) - val msgs = router.sendToAll() - - router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed. + val router = TestRouter(routees(2)) + val msgs1 = router.sendToAll(await = true) + val msgs2 = router.sendToAll(await = false) //make sure the routees are still busy after the first batch of messages get processed. val before = LocalDateTime.now resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records - msgs.foreach(_.open()) //process two messages + msgs1.foreach(_.second.open()) //process two messages - Thread.sleep(1) // wait for routees to update their mail boxes + // wait for routees to update their mail boxes + msgs2.foreach(l ⇒ Await.ready(l.first, timeout.duration)) resizer.reportMessageCount(router.routees, router.msgs.size) @@ -259,6 +276,8 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT val newSpeed = java.time.Duration.between(before, after).toMillis / 2 resizer.performanceLog(2).toMillis shouldBe ((newSpeed + oldSpeed) / 2 +- 1) + + router.close() } } @@ -333,10 +352,11 @@ class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultT val resizer = DefaultOptimalSizeExploringResizer(lowerBound = 2) val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props(Props(new TestLatchingActor))) - Thread.sleep(10) + val latches = Latches(TestLatch(), TestLatch(0)) + router ! latches + Await.ready(latches.first, timeout.duration) poolSize(router) shouldBe resizer.lowerBound - } }