From 90cba9ce0de54418152b45cbc0c2061f16b0231d Mon Sep 17 00:00:00 2001 From: Kailuo Wang Date: Fri, 21 Aug 2015 21:03:57 -0400 Subject: [PATCH] +act #18356 Metrics based resizer for router --- .../routing/MetricsBasedResizerSpec.scala | 345 ++++++++++++++++++ .../test/scala/akka/routing/ResizerSpec.scala | 49 ++- akka-actor/src/main/resources/reference.conf | 57 +++ .../main/scala/akka/routing/Broadcast.scala | 2 +- .../akka/routing/ConsistentHashing.scala | 2 +- .../routing/OptimalSizeExploringResizer.scala | 275 ++++++++++++++ .../src/main/scala/akka/routing/Random.scala | 2 +- .../src/main/scala/akka/routing/Resizer.scala | 39 +- .../main/scala/akka/routing/RoundRobin.scala | 2 +- .../scala/akka/routing/RoutedActorCell.scala | 1 - .../routing/ScatterGatherFirstCompleted.scala | 2 +- .../scala/akka/routing/SmallestMailbox.scala | 2 +- .../scala/akka/routing/TailChopping.scala | 2 +- .../akka/util/JavaDurationConverters.scala | 18 + .../SurviveNetworkInstabilitySpec.scala | 2 +- .../code/docs/jrouting/RouterDocTest.java | 8 +- akka-docs/rst/java/routing.rst | 46 ++- .../code/docs/routing/RouterDocSpec.scala | 18 + akka-docs/rst/scala/routing.rst | 49 ++- 19 files changed, 904 insertions(+), 17 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala create mode 100644 akka-actor/src/main/scala/akka/routing/OptimalSizeExploringResizer.scala create mode 100644 akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala diff --git a/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala new file mode 100644 index 0000000000..731a1116e9 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/MetricsBasedResizerSpec.scala @@ -0,0 +1,345 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.routing + +import java.time.LocalDateTime + +import akka.actor._ +import akka.testkit._ +import akka.testkit.TestEvent._ + +import OptimalSizeExploringResizer._ +import MetricsBasedResizerSpec._ +import akka.util.Timeout + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{ Try, Random } +import akka.pattern.ask + +object MetricsBasedResizerSpec { + + class TestLatchingActor(implicit timeout: Timeout) extends Actor { + import context.system + + def receive = { + case latch: TestLatch ⇒ + Try(Await.ready(latch, timeout.duration)) + } + } + + def routee(implicit system: ActorSystem, timeout: Timeout): ActorRefRoutee = + ActorRefRoutee(system.actorOf(Props(new TestLatchingActor))) + + def routees(num: Int = 10)(implicit system: ActorSystem, timeout: Timeout) = (1 to num).map(_ ⇒ routee).toVector + + case class TestRouter(routees: Vector[ActorRefRoutee], resizer: Resizer)(implicit system: ActorSystem, timeout: Timeout) { + + system.registerOnTermination(close()) + + var msgs: Set[TestLatch] = Set() + + def mockSend(l: TestLatch = TestLatch(), + routeeIdx: Int = Random.nextInt(routees.length), + wait: Boolean = true)(implicit sender: ActorRef): TestLatch = { + val target = routees(routeeIdx) + target.send(l, sender) + msgs = msgs + l + if (wait) waitForMessageToArrive() + l + } + + def waitForMessageToArrive(): Unit = Thread.sleep(1.milliseconds.dilated.toMillis) + + def close(): Unit = msgs.foreach(_.open()) + + def sendToAll()(implicit sender: ActorRef): Seq[TestLatch] = { + val sentMessages = (0 until routees.length).map(i ⇒ mockSend(routeeIdx = i, wait = false)) + waitForMessageToArrive() + sentMessages + } + + } + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with ImplicitSender { + + override def atStartup: Unit = { + // when shutting down some Resize messages might hang around + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*Resize"))) + } + + "MetricsBasedResizer isTimeForResize" must { + + "be true with empty history" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.record = ResizeRecord(checkTime = 0) + resizer.isTimeForResize(0) should ===(true) + } + + "be false if the last resize is too close within actionInterval enough history" in { + val resizer = DefaultOptimalSizeExploringResizer(actionInterval = 10.seconds) + resizer.record = ResizeRecord(checkTime = System.nanoTime() - 8.seconds.toNanos) + + resizer.isTimeForResize(100) should ===(false) + } + + "be true if the last resize is before actionInterval ago" in { + val resizer = DefaultOptimalSizeExploringResizer(actionInterval = 10.seconds) + resizer.record = ResizeRecord(checkTime = System.nanoTime() - 11.seconds.toNanos) + + resizer.isTimeForResize(100) should ===(true) + } + + } + + "MetricsBasedResizer reportMessageCount" must { + + "record last messageCounter correctly" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.reportMessageCount(Vector(routee), 3) + resizer.record.messageCount shouldBe 3 + } + + "record last totalQueueLength correctly" in { + val resizer = DefaultOptimalSizeExploringResizer() + val router = TestRouter(routees(2), resizer) + + resizer.reportMessageCount(router.routees, router.msgs.size) + + resizer.record.totalQueueLength shouldBe 0 + + router.mockSend() + router.mockSend() + + resizer.reportMessageCount(router.routees, router.msgs.size) + resizer.record.totalQueueLength shouldBe 2 + + } + + "start an underutilizationStreak when not fully utilized" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.reportMessageCount(routees(2), 0) + resizer.record.underutilizationStreak should not be empty + resizer.record.underutilizationStreak.get.start.isBefore(LocalDateTime.now.plusSeconds(1)) shouldBe true + resizer.record.underutilizationStreak.get.start.isAfter(LocalDateTime.now.minusSeconds(1)) shouldBe true + } + + "stop an underutilizationStreak when fully utilized" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.record = ResizeRecord( + underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now.minusHours(1), highestUtilization = 1))) + + val router = TestRouter(routees(2), resizer) + router.sendToAll() + + resizer.reportMessageCount(router.routees, router.msgs.size) + resizer.record.underutilizationStreak shouldBe empty + } + + "leave the underutilizationStreak start date unchanged when not fully utilized" in { + val start: LocalDateTime = LocalDateTime.now.minusHours(1) + val resizer = DefaultOptimalSizeExploringResizer() + resizer.record = ResizeRecord( + underutilizationStreak = Some(UnderUtilizationStreak(start = start, highestUtilization = 1))) + + resizer.reportMessageCount(routees(2), 0) + resizer.record.underutilizationStreak.get.start shouldBe start + } + + "leave the underutilizationStreak highestUtilization unchanged if current utilization is lower" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.record = ResizeRecord( + underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 2))) + + val router = TestRouter(routees(2), resizer) + router.mockSend() + + resizer.reportMessageCount(router.routees, router.msgs.size) + resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2 + + } + + "update the underutilizationStreak highestUtilization if current utilization is higher" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.record = ResizeRecord( + underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 1))) + + val router = TestRouter(routees(3), resizer) + router.mockSend(routeeIdx = 0) + router.mockSend(routeeIdx = 1) + + resizer.reportMessageCount(router.routees, router.msgs.size) + resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2 + + } + + "not record a performance log when it's not fully utilized in two consecutive checks" in { + val resizer = DefaultOptimalSizeExploringResizer() + val router = TestRouter(routees(2), resizer) + resizer.reportMessageCount(router.routees, router.msgs.size) + + router.sendToAll() + resizer.reportMessageCount(router.routees, router.msgs.size) + + resizer.performanceLog shouldBe empty + } + + "not record the performance log when no message is processed" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.record = ResizeRecord( + totalQueueLength = 2, + messageCount = 2, + checkTime = System.nanoTime()) + + val router = TestRouter(routees(2), resizer) + + router.sendToAll() + + resizer.reportMessageCount(router.routees, router.msgs.size) + + resizer.performanceLog shouldBe empty + } + + "record the performance log with the correct pool size" in { + val resizer = DefaultOptimalSizeExploringResizer() + val router = TestRouter(routees(2), resizer) + val msgs = router.sendToAll() + resizer.reportMessageCount(router.routees, router.msgs.size) + msgs.head.open() + + router.sendToAll() + resizer.reportMessageCount(router.routees, router.msgs.size) + resizer.performanceLog.get(2) should not be empty + } + + "record the performance log with the correct process speed" in { + val resizer = DefaultOptimalSizeExploringResizer() + val router = TestRouter(routees(2), resizer) + val msgs = router.sendToAll() + router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed. + + val before = LocalDateTime.now + resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records + + msgs.foreach(_.open()) //process two messages + + Thread.sleep(1) // wait for routees to update their mail boxes + + resizer.reportMessageCount(router.routees, router.msgs.size) + + val after = LocalDateTime.now + resizer.performanceLog(2).toMillis shouldBe (java.time.Duration.between(before, after).toMillis / 2 +- 1) + } + + "update the old performance log entry with updated speed " in { + val oldSpeed = 50 + val resizer = DefaultOptimalSizeExploringResizer( + weightOfLatestMetric = 0.5) + + resizer.performanceLog = Map(2 → oldSpeed.milliseconds) + + val router = TestRouter(routees(2), resizer) + val msgs = router.sendToAll() + + router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed. + + val before = LocalDateTime.now + resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records + + msgs.foreach(_.open()) //process two messages + + Thread.sleep(1) // wait for routees to update their mail boxes + + resizer.reportMessageCount(router.routees, router.msgs.size) + + val after = LocalDateTime.now + val newSpeed = java.time.Duration.between(before, after).toMillis / 2 + + resizer.performanceLog(2).toMillis shouldBe ((newSpeed + oldSpeed) / 2 +- 1) + } + + } + + "MetricsBasedResizer resize" must { + "downsize to close to the highest retention when a streak of underutilization started downsizeAfterUnderutilizedFor" in { + val resizer = DefaultOptimalSizeExploringResizer( + downsizeAfterUnderutilizedFor = 72.hours, + downsizeRatio = 0.5) + + resizer.record = ResizeRecord(underutilizationStreak = Some( + UnderUtilizationStreak(start = LocalDateTime.now.minusHours(73), highestUtilization = 8))) + resizer.resize(routees(20)) should be(4 - 20) + } + + "does not downsize on empty history" in { + val resizer = DefaultOptimalSizeExploringResizer() + resizer.resize(routees()) should be(0) + } + + "always go to lowerBound if below it" in { + val resizer = DefaultOptimalSizeExploringResizer(lowerBound = 50, upperBound = 100) + resizer.resize(routees(20)) should be(30) + } + + "always go to uppperBound if above it" in { + val resizer = DefaultOptimalSizeExploringResizer(upperBound = 50) + resizer.resize(routees(80)) should be(-30) + } + + "explore when there is performance log but not go beyond exploreStepSize" in { + val resizer = DefaultOptimalSizeExploringResizer( + exploreStepSize = 0.3, + explorationProbability = 1) + resizer.performanceLog = Map(11 → 1.milli, 13 → 1.millis, 12 → 3.millis) + + val exploreSamples = (1 to 100).map(_ ⇒ resizer.resize(routees(10))) + exploreSamples.forall(change ⇒ Math.abs(change) >= 1 && Math.abs(change) <= (10 * 0.3)) should be(true) + + } + } + + "MetricsBasedResizer optimize" must { + "optimize towards the fastest pool size" in { + val resizer = DefaultOptimalSizeExploringResizer(explorationProbability = 0) + resizer.performanceLog = Map(7 → 5.millis, 10 → 3.millis, 11 → 2.millis, 12 → 4.millis) + resizer.resize(routees(10)) should be(1) + resizer.resize(routees(12)) should be(-1) + resizer.resize(routees(7)) should be(2) + } + + "ignore further away sample data when optmizing" in { + val resizer = DefaultOptimalSizeExploringResizer(explorationProbability = 0, numOfAdjacentSizesToConsiderDuringOptimization = 4) + resizer.performanceLog = Map( + 7 → 5.millis, + 8 → 2.millis, + 10 → 3.millis, + 11 → 4.millis, + 12 → 3.millis, + 13 → 1.millis) + + resizer.resize(routees(10)) should be(-1) + } + } + + "MetricsBasedResizer" must { + + def poolSize(router: ActorRef): Int = + Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size + + "start with lowerbound pool size" in { + + val resizer = DefaultOptimalSizeExploringResizer(lowerBound = 2) + val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props(Props(new TestLatchingActor))) + Thread.sleep(10) + + poolSize(router) shouldBe resizer.lowerBound + + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 7cd4d5f88a..c46b7ddb63 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -3,14 +3,14 @@ */ package akka.routing +import com.typesafe.config.{ Config, ConfigFactory } + import language.postfixOps -import akka.actor.Actor +import akka.actor.{ ActorSystem, Actor, Props, ActorRef } import akka.testkit._ import akka.testkit.TestEvent._ -import akka.actor.Props import scala.concurrent.Await import scala.concurrent.duration._ -import akka.actor.ActorRef import akka.pattern.ask import scala.util.Try @@ -50,6 +50,49 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with def routeeSize(router: ActorRef): Int = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size + "Resizer fromConfig" must { + def parseCfg(cfgString: String): Config = { + val referenceCfg = ConfigFactory.defaultReference(ActorSystem.findClassLoader()) + ConfigFactory.parseString(cfgString).withFallback(referenceCfg.getConfig("akka.actor.deployment.default")) + } + + "load DefaultResizer from config when resizer is enabled" in { + val cfg = parseCfg(""" + resizer { + enabled = on + } + """) + Resizer.fromConfig(cfg).get shouldBe a[DefaultResizer] + } + + "load MetricsBasedResizer from config when optimal-size-exploring-resizer is enabled" in { + val cfg = parseCfg(""" + optimal-size-exploring-resizer { + enabled = on + } + """) + Resizer.fromConfig(cfg).get shouldBe a[DefaultOptimalSizeExploringResizer] + } + + "throws exception when both resizer and optimal-size-exploring-resizer is enabled" in { + val cfg = parseCfg(""" + optimal-size-exploring-resizer { + enabled = on + } + resizer { + enabled = on + } + """) + intercept[ResizerInitializationException] { + Resizer.fromConfig(cfg) + } + } + + "return None if neither resizer is enabled which is default" in { + Resizer.fromConfig(parseCfg("")) shouldBe empty + } + } + "DefaultResizer" must { "use settings to evaluate capacity" in { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index acf6fc71af..9154e6caf7 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -237,6 +237,63 @@ akka { # Use 1 to resize before each message. messages-per-resize = 10 } + + # Routers with dynamically resizable number of routees based on + # performance metrics. + # This feature is enabled by including (parts of) this section in + # the deployment, cannot be enabled together with default resizer. + optimal-size-exploring-resizer { + + enabled = off + + # The fewest number of routees the router should ever have. + lower-bound = 1 + + # The most number of routees the router should ever have. + # Must be greater than or equal to lower-bound. + upper-bound = 10 + + # probability of doing a ramping down when all routees are busy + # during exploration. + chance-of-ramping-down-when-full = 0.2 + + # Interval between each resize attempt + action-interval = 5s + + # If the routees have not been fully utilized (i.e. all routees busy) + # for such length, the resizer will downsize the pool. + downsize-after-underutilized-for = 72h + + # Duration exploration, the ratio between the largest step size and + # current pool size. E.g. if the current pool size is 50, and the + # explore-step-size is 0.1, the maximum pool size change during + # exploration will be +- 5 + explore-step-size = 0.1 + + # Probabily of doing an exploration v.s. optmization. + chance-of-exploration = 0.4 + + # When downsizing after a long streak of underutilization, the resizer + # will downsize the pool to the highest utiliziation multiplied by a + # a downsize rasio. This downsize ratio determines the new pools size + # in comparison to the highest utilization. + # E.g. if the highest utilization is 10, and the down size ratio + # is 0.8, the pool will be downsized to 8 + downsize-ratio = 0.8 + + # When optimizing, the resizer only considers the sizes adjacent to the + # current size. This number indicates how many adjacent sizes to consider. + optimization-range = 16 + + # The weight of the latest metric over old metrics when collecting + # performance metrics. + # E.g. if the last processing speed is 10 millis per message at pool + # size 5, and if the new processing speed collected is 6 millis per + # message at pool size 5. Given a weight of 0.3, the metrics + # representing pool size 5 will be 6 * 0.3 + 10 * 0.7, i.e. 8.8 millis + # Obviously, this number should be between 0 and 1. + weight-of-latest-metric = 0.5 + } } /IO-DNS/inet-address { diff --git a/akka-actor/src/main/scala/akka/routing/Broadcast.scala b/akka-actor/src/main/scala/akka/routing/Broadcast.scala index 77fe2506b4..135ceaffee 100644 --- a/akka-actor/src/main/scala/akka/routing/Broadcast.scala +++ b/akka-actor/src/main/scala/akka/routing/Broadcast.scala @@ -67,7 +67,7 @@ final case class BroadcastPool( def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index eb29ef72ee..b6f530c8dc 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -272,7 +272,7 @@ final case class ConsistentHashingPool( def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/routing/OptimalSizeExploringResizer.scala b/akka-actor/src/main/scala/akka/routing/OptimalSizeExploringResizer.scala new file mode 100644 index 0000000000..075631b5fb --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/OptimalSizeExploringResizer.scala @@ -0,0 +1,275 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.routing + +import java.time.LocalDateTime + +import scala.collection.immutable +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.duration._ + +import com.typesafe.config.Config + +import akka.actor._ +import akka.util.JavaDurationConverters._ + +import OptimalSizeExploringResizer._ + +trait OptimalSizeExploringResizer extends Resizer { + /** + * Report the messageCount as well as current routees so that the + * it can collect metrics. + * Caution: this method is not thread safe. + * + * @param currentRoutees + * @param messageCounter + */ + def reportMessageCount(currentRoutees: immutable.IndexedSeq[Routee], messageCounter: Long): Unit +} + +case object OptimalSizeExploringResizer { + /** + * INTERNAL API + */ + private[routing]type PoolSize = Int + + /** + * INTERNAL API + */ + private[routing] case class UnderUtilizationStreak(start: LocalDateTime, highestUtilization: Int) + + /** + * INTERNAL API + */ + private[routing] case class ResizeRecord( + underutilizationStreak: Option[UnderUtilizationStreak] = None, + messageCount: Long = 0, + totalQueueLength: Int = 0, + checkTime: Long = 0) + + /** + * INTERNAL API + */ + private[routing]type PerformanceLog = Map[PoolSize, Duration] + + def apply(resizerCfg: Config): OptimalSizeExploringResizer = + DefaultOptimalSizeExploringResizer( + lowerBound = resizerCfg.getInt("lower-bound"), + upperBound = resizerCfg.getInt("upper-bound"), + chanceOfScalingDownWhenFull = resizerCfg.getDouble("chance-of-ramping-down-when-full"), + actionInterval = resizerCfg.getDuration("action-interval").asScala, + downsizeAfterUnderutilizedFor = resizerCfg.getDuration("downsize-after-underutilized-for").asScala, + numOfAdjacentSizesToConsiderDuringOptimization = resizerCfg.getInt("optimization-range"), + exploreStepSize = resizerCfg.getDouble("explore-step-size"), + explorationProbability = resizerCfg.getDouble("chance-of-exploration"), + weightOfLatestMetric = resizerCfg.getDouble("weight-of-latest-metric"), + downsizeRatio = resizerCfg.getDouble("downsize-ratio")) + +} + +/** + * This resizer resizes the pool to an optimal size that provides + * the most message throughput. + * + * This resizer works best when you expect the pool size to + * performance function to be a convex function. + * + * For example, when you have a CPU bound tasks, the optimal + * size is bound to the number of CPU cores. + * When your task is IO bound, the optimal size is bound to + * optimal number of concurrent connections to that IO service - + * e.g. a 4 node elastic search cluster may handle 4-8 + * concurrent requests at optimal speed. + * + * It achieves this by keeping track of message throughput at + * each pool size and performing the following three + * resizing operations (one at a time) periodically: + * + * * Downsize if it hasn't seen all routees ever fully + * utilized for a period of time. + * * Explore to a random nearby pool size to try and + * collect throughput metrics. + * * Optimize to a nearby pool size with a better (than any other + * nearby sizes) throughput metrics. + * + * When the pool is fully-utilized (i.e. all routees are busy), + * it randomly choose between exploring and optimizing. + * When the pool has not been fully-utilized for a period of time, + * it will downsize the pool to the last seen max utilization + * multiplied by a configurable ratio. + * + * By constantly exploring and optimizing, the resizer will + * eventually walk to the optimal size and remain nearby. + * When the optimal size changes it will start walking towards + * the new one. + * + * It keeps a performance log so it's stateful as well as + * having a larger memory footprint than the default [[Resizer]]. + * The memory usage is O(n) where n is the number of sizes + * you allow, i.e. upperBound - lowerBound. + * + * For documentation about the the parameters, see the reference.conf - + * akka.actor.deployment.default.optimal-size-exploring-resizer + * + */ +@SerialVersionUID(1L) +case class DefaultOptimalSizeExploringResizer( + lowerBound: PoolSize = 1, + upperBound: PoolSize = 30, + chanceOfScalingDownWhenFull: Double = 0.2, + actionInterval: Duration = 5.seconds, + numOfAdjacentSizesToConsiderDuringOptimization: Int = 16, + exploreStepSize: Double = 0.1, + downsizeRatio: Double = 0.8, + downsizeAfterUnderutilizedFor: Duration = 72.hours, + explorationProbability: Double = 0.4, + weightOfLatestMetric: Double = 0.5) extends OptimalSizeExploringResizer { + /** + * Leave package accessible for testing purpose + */ + private[routing] var performanceLog: PerformanceLog = Map.empty + /** + * Leave package accessible for testing purpose + */ + private[routing] var record: ResizeRecord = ResizeRecord() + + /** + * Leave package accessible for testing purpose + */ + private[routing] var stopExploring = false + + private def random = ThreadLocalRandom.current() + + private def checkParamAsProbability(value: Double, paramName: String): Unit = + if (value < 0 || value > 1) throw new IllegalArgumentException(s"$paramName must be between 0 and 1 (inclusive), was: [%s]".format(value)) + + private def checkParamAsPositiveNum(value: Double, paramName: String): Unit = checkParamLowerBound(value, 0, paramName) + + private def checkParamLowerBound(value: Double, lowerBound: Double, paramName: String): Unit = + if (value < lowerBound) throw new IllegalArgumentException(s"$paramName must be >= $lowerBound, was: [%s]".format(value)) + + checkParamAsPositiveNum(lowerBound, "lowerBound") + checkParamAsPositiveNum(upperBound, "upperBound") + if (upperBound < lowerBound) throw new IllegalArgumentException("upperBound must be >= lowerBound, was: [%s] < [%s]".format(upperBound, lowerBound)) + + checkParamLowerBound(numOfAdjacentSizesToConsiderDuringOptimization, 2, "numOfAdjacentSizesToConsiderDuringOptimization") + checkParamAsProbability(chanceOfScalingDownWhenFull, "chanceOfScalingDownWhenFull") + checkParamAsPositiveNum(numOfAdjacentSizesToConsiderDuringOptimization, "numOfAdjacentSizesToConsiderDuringOptimization") + checkParamAsPositiveNum(exploreStepSize, "exploreStepSize") + checkParamAsPositiveNum(downsizeRatio, "downsizeRatio") + checkParamAsProbability(explorationProbability, "explorationProbability") + checkParamAsProbability(weightOfLatestMetric, "weightOfLatestMetric") + + private val actionInternalNanos = actionInterval.toNanos + + def isTimeForResize(messageCounter: Long): Boolean = { + System.nanoTime() > record.checkTime + actionInternalNanos + } + + def reportMessageCount(currentRoutees: immutable.IndexedSeq[Routee], messageCounter: Long): Unit = { + val (newPerfLog, newRecord) = updatedStats(currentRoutees, messageCounter) + + performanceLog = newPerfLog + record = newRecord + } + + private[routing] def updatedStats(currentRoutees: immutable.IndexedSeq[Routee], messageCounter: Long): (PerformanceLog, ResizeRecord) = { + val now = LocalDateTime.now + val currentSize = currentRoutees.length + + val messagesInRoutees = currentRoutees map { + case ActorRefRoutee(a: ActorRefWithCell) ⇒ + a.underlying match { + case cell: ActorCell ⇒ + cell.mailbox.numberOfMessages + (if (cell.currentMessage != null) 1 else 0) + case cell ⇒ cell.numberOfMessages + } + case x ⇒ 0 + } + + val totalQueueLength = messagesInRoutees.sum + val utilized = messagesInRoutees.count(_ > 0) + + val fullyUtilized = utilized == currentSize + + val newUnderutilizationStreak = if (fullyUtilized) + None + else + Some(UnderUtilizationStreak( + record.underutilizationStreak.fold(now)(_.start), + Math.max(record.underutilizationStreak.fold(0)(_.highestUtilization), utilized))) + + val newPerformanceLog: PerformanceLog = + if (fullyUtilized && record.underutilizationStreak.isEmpty && record.checkTime > 0) { + val totalMessageReceived = messageCounter - record.messageCount + val queueSizeChange = record.totalQueueLength - totalQueueLength + val totalProcessed = queueSizeChange + totalMessageReceived + if (totalProcessed > 0) { + val duration = Duration.fromNanos(System.nanoTime() - record.checkTime) + val last: Duration = duration / totalProcessed + //exponentially decrease the weight of old last metrics data + val toUpdate = performanceLog.get(currentSize).fold(last) { oldSpeed ⇒ + (oldSpeed * (1.0 - weightOfLatestMetric)) + (last * weightOfLatestMetric) + } + performanceLog + (currentSize → toUpdate) + } else performanceLog + } else performanceLog + + val newRecord = record.copy( + underutilizationStreak = newUnderutilizationStreak, + messageCount = messageCounter, + totalQueueLength = totalQueueLength, + checkTime = System.nanoTime()) + + (newPerformanceLog, newRecord) + + } + + def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = { + val currentSize = currentRoutees.length + val now = LocalDateTime.now + val proposedChange = + if (record.underutilizationStreak.fold(false)(_.start.isBefore(now.minus(downsizeAfterUnderutilizedFor.asJava)))) { + val downsizeTo = (record.underutilizationStreak.get.highestUtilization * downsizeRatio).toInt + Math.min(downsizeTo - currentSize, 0) + } else if (performanceLog.isEmpty || record.underutilizationStreak.isDefined) { + 0 + } else { + if (!stopExploring && random.nextDouble() < explorationProbability) + explore(currentSize) + else + optimize(currentSize) + } + Math.max(lowerBound, Math.min(proposedChange + currentSize, upperBound)) - currentSize + } + + private def optimize(currentSize: PoolSize): Int = { + + val adjacentDispatchWaits: Map[PoolSize, Duration] = { + def adjacency = (size: Int) ⇒ Math.abs(currentSize - size) + val sizes = performanceLog.keys.toSeq + val numOfSizesEachSide = numOfAdjacentSizesToConsiderDuringOptimization / 2 + val leftBoundary = sizes.filter(_ < currentSize).sortBy(adjacency).take(numOfSizesEachSide).lastOption.getOrElse(currentSize) + val rightBoundary = sizes.filter(_ >= currentSize).sortBy(adjacency).take(numOfSizesEachSide).lastOption.getOrElse(currentSize) + performanceLog.filter { case (size, _) ⇒ size >= leftBoundary && size <= rightBoundary } + } + + val optimalSize = adjacentDispatchWaits.minBy(_._2)._1 + val movement = (optimalSize - currentSize) / 2.0 + if (movement < 0) + Math.floor(movement).toInt + else + Math.ceil(movement).toInt + + } + + private def explore(currentSize: PoolSize): Int = { + val change = Math.max(1, random.nextInt(Math.ceil(currentSize * exploreStepSize).toInt)) + if (random.nextDouble() < chanceOfScalingDownWhenFull) + -change + else + change + } + +} diff --git a/akka-actor/src/main/scala/akka/routing/Random.scala b/akka-actor/src/main/scala/akka/routing/Random.scala index bf99c7261b..6123a41f61 100644 --- a/akka-actor/src/main/scala/akka/routing/Random.scala +++ b/akka-actor/src/main/scala/akka/routing/Random.scala @@ -68,7 +68,7 @@ final case class RandomPool( def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/routing/Resizer.scala b/akka-actor/src/main/scala/akka/routing/Resizer.scala index 37b2f93127..72a18b9847 100644 --- a/akka-actor/src/main/scala/akka/routing/Resizer.scala +++ b/akka-actor/src/main/scala/akka/routing/Resizer.scala @@ -7,6 +7,9 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong +import akka.AkkaException +import akka.event.Logging.Error.NoCause + import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration @@ -54,8 +57,26 @@ trait Resizer { * This method is invoked only in the context of the Router actor. */ def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int + } +object Resizer { + def fromConfig(parentConfig: Config): Option[Resizer] = { + val defaultResizerConfig = parentConfig.getConfig("resizer") + val metricsBasedResizerConfig = parentConfig.getConfig("optimal-size-exploring-resizer") + (defaultResizerConfig.getBoolean("enabled"), metricsBasedResizerConfig.getBoolean("enabled")) match { + case (true, false) ⇒ Some(DefaultResizer(defaultResizerConfig)) + case (false, true) ⇒ Some(OptimalSizeExploringResizer(metricsBasedResizerConfig)) + case (false, false) ⇒ None + case (true, true) ⇒ + throw new ResizerInitializationException(s"cannot enable both resizer and optimal-size-exploring-resizer", null) + } + } +} + +@SerialVersionUID(1L) +class ResizerInitializationException(message: String, cause: Throwable) extends AkkaException(message, cause) + case object DefaultResizer { /** @@ -273,11 +294,13 @@ private[akka] final class ResizablePoolCell( resizer.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) { super.sendMessage(Envelope(ResizablePoolActor.Resize, self, system)) } + super.sendMessage(envelope) } private[akka] def resize(initial: Boolean): Unit = { if (resizeInProgress.get || initial) try { + tryReportMessageCount() val requestedCapacity = resizer.resize(router.routees) if (requestedCapacity > 0) { val newRoutees = Vector.fill(requestedCapacity)(pool.newRoutee(routeeProps, this)) @@ -290,6 +313,16 @@ private[akka] final class ResizablePoolCell( } finally resizeInProgress.set(false) } + /** + * This approach is chosen for binary compatibility + */ + private def tryReportMessageCount(): Unit = { + resizer match { + case r: OptimalSizeExploringResizer ⇒ r.reportMessageCount(router.routees, resizeCounter.get()) + case _ ⇒ //ignore + } + } + } /** @@ -309,10 +342,12 @@ private[akka] class ResizablePoolActor(supervisorStrategy: SupervisorStrategy) val resizerCell = context match { case x: ResizablePoolCell ⇒ x case _ ⇒ - throw ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass) + throw ActorInitializationException("Resizable router actor can only be used when resizer is defined, not in " + context.getClass) } override def receive = ({ - case Resize ⇒ resizerCell.resize(initial = false) + case Resize ⇒ + resizerCell.resize(initial = false) }: Actor.Receive) orElse super.receive + } diff --git a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala index 134e6319e2..399b35b5ac 100644 --- a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala +++ b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala @@ -73,7 +73,7 @@ final case class RoundRobinPool( def this(config: Config) = this(nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala index 427924d0a5..9c9f955717 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala @@ -147,7 +147,6 @@ private[akka] class RoutedActorCell( * INTERNAL API */ private[akka] class RouterActor extends Actor { - val cell = context match { case x: RoutedActorCell ⇒ x case _ ⇒ diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index 77391a4504..6823e990a6 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -110,7 +110,7 @@ final case class ScatterGatherFirstCompletedPool( this( nrOfInstances = config.getInt("nr-of-instances"), within = config.getMillisDuration("within"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala index e212844fb0..0ad4849815 100644 --- a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala +++ b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala @@ -183,7 +183,7 @@ final case class SmallestMailboxPool( def this(config: Config) = this( nrOfInstances = config.getInt("nr-of-instances"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index 88ef3d0aa0..2cbbe7fa9e 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -154,7 +154,7 @@ final case class TailChoppingPool( nrOfInstances = config.getInt("nr-of-instances"), within = config.getMillisDuration("within"), interval = config.getMillisDuration("tail-chopping-router.interval"), - resizer = DefaultResizer.fromConfig(config), + resizer = Resizer.fromConfig(config), usePoolDispatcher = config.hasPath("pool-dispatcher")) /** diff --git a/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala b/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala new file mode 100644 index 0000000000..32b61e5fd9 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.util +import java.time.{ Duration ⇒ JDuration } +import scala.concurrent.duration.Duration +/** + * INTERNAL API + */ +private[akka] object JavaDurationConverters { + final implicit class JavaDurationOps(val self: JDuration) extends AnyVal { + def asScala: Duration = Duration.fromNanos(self.toNanos) + } + + final implicit class ScalaDurationOps(val self: Duration) extends AnyVal { + def asJava: JDuration = JDuration.ofNanos(self.toNanos) + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala index 2988827714..c43138112b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala @@ -226,7 +226,7 @@ abstract class SurviveNetworkInstabilitySpec cluster.join(first) // let them join and stabilize heartbeating - Thread.sleep(5000) + Thread.sleep(5000.millis.dilated.toMillis) } enterBarrier("joined-5") diff --git a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java index d08651d7e3..9f92400105 100644 --- a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java @@ -338,7 +338,13 @@ public class RouterDocTest { getContext().actorOf(new RoundRobinPool(5).withResizer(resizer).props( Props.create(Worker.class)), "router30"); //#resize-pool-2 - + + //#optimal-size-exploring-resize-pool + ActorRef router31 = + getContext().actorOf(FromConfig.getInstance().props( + Props.create(Worker.class)), "router31"); + //#optimal-size-exploring-resize-pool + public void onReceive(Object msg) {} } diff --git a/akka-docs/rst/java/routing.rst b/akka-docs/rst/java/routing.rst index 2a1ad6a71f..4cfd8e08da 100644 --- a/akka-docs/rst/java/routing.rst +++ b/akka-docs/rst/java/routing.rst @@ -594,7 +594,16 @@ Dynamically Resizable Pool All pools can be used with a fixed number of routees or with a resize strategy to adjust the number of routees dynamically. -Pool with resizer defined in configuration: +There are two types of resizers: the default ``Resizer`` and the ``OptimalSizeExploringResizer``. + +Default Resizer +--------------- + +The default resizer ramps up and down pool size based on pressure, measured by the percentage of busy routees +in the pool. It ramps up pool size if the pressure is higher than a certain threshold and backs off if the +pressure is lower than certain threshold. Both thresholds are configurable. + +Pool with default resizer defined in configuration: .. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-resize-pool @@ -610,6 +619,41 @@ Pool with resizer defined in code: *It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used instead of any programmatically sent parameters.* +Optimal Size Exploring Resizer +------------------------------ + +The ``OptimalSizeExploringResizer`` resizes the pool to an optimal size that provides the most message throughput. + +It achieves this by keeping track of message throughput at each pool size and performing one of the following +three resizing operations periodically: + +* Downsize if it hasn't seen all routees ever fully utilized for a period of time. +* Explore to a random nearby pool size to try and collect throughput metrics. +* Optimize to a nearby pool size with a better (than any other nearby sizes) throughput metrics. + +When the pool is fully-utilized (i.e. all routees are busy), it randomly choose between exploring and optimizing. +When the pool has not been fully-utilized for a period of time, it will downsize the pool to the last seen max +utilization multiplied by a configurable ratio. + +By constantly exploring and optimizing, the resizer will eventually walk to the optimal size and +remain nearby. When the optimal size changes it will start walking towards the new one. +This resizer works best when you expect the pool size to performance function to be a convex function. +For example, when you have a CPU bound tasks, the optimal size is bound to the number of CPU cores. +When your task is IO bound, the optimal size is bound to optimal number of concurrent connections to that IO service - +e.g. a 4 node elastic search cluster may handle 4-8 concurrent requests at optimal speed. + +It keeps a performance log so it's stateful as well as having a larger memory footprint than the default ``Resizer``. +The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBound - lowerBound. + +Pool with ``OptimalSizeExploringResizer`` defined in configuration: + +.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool + +.. includecode:: code/docs/routing/RouterDocTest.java#optimal-size-exploring-resize-pool + +Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer`` +section of the reference :ref:`configuration`. + .. note:: Resizing is triggered by sending messages to the actor pool, but it is not diff --git a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala index df2b9baa3d..d0627a50af 100644 --- a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala @@ -227,6 +227,19 @@ akka.actor.deployment { } #//#config-resize-pool +#//#config-optimal-size-exploring-resize-pool +akka.actor.deployment { + /parent/router31 { + router = round-robin-pool + optimal-size-exploring-resizer { + enabled = on + action-interval = 5s + downsize-after-underutilized-for = 72h + } + } +} +#//#config-optimal-size-exploring-resize-pool + #//#config-pool-dispatcher akka.actor.deployment { /poolWithDispatcher { @@ -462,6 +475,11 @@ router-dispatcher {} "router30") //#resize-pool-2 + //#optimal-size-exploring-resize-pool + val router31: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router31") + //#optimal-size-exploring-resize-pool + def receive = { case _ => } diff --git a/akka-docs/rst/scala/routing.rst b/akka-docs/rst/scala/routing.rst index e884de3bff..9e4b674248 100644 --- a/akka-docs/rst/scala/routing.rst +++ b/akka-docs/rst/scala/routing.rst @@ -593,7 +593,16 @@ Dynamically Resizable Pool Most pools can be used with a fixed number of routees or with a resize strategy to adjust the number of routees dynamically. -Pool with resizer defined in configuration: +There are two types of resizers: the default ``Resizer`` and the ``OptimalSizeExploringResizer``. + +Default Resizer +--------------- + +The default resizer ramps up and down pool size based on pressure, measured by the percentage of busy routees +in the pool. It ramps up pool size if the pressure is higher than a certain threshold and backs off if the +pressure is lower than certain threshold. Both thresholds are configurable. + +Pool with default resizer defined in configuration: .. includecode:: code/docs/routing/RouterDocSpec.scala#config-resize-pool @@ -609,6 +618,43 @@ Pool with resizer defined in code: *It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used instead of any programmatically sent parameters.* + +Optimal Size Exploring Resizer +------------------------------ + +The ``OptimalSizeExploringResizer`` resizes the pool to an optimal size that provides the most message throughput. + +This resizer works best when you expect the pool size to performance function to be a convex function. +For example, when you have a CPU bound tasks, the optimal size is bound to the number of CPU cores. +When your task is IO bound, the optimal size is bound to optimal number of concurrent connections to that IO service - +e.g. a 4 node elastic search cluster may handle 4-8 concurrent requests at optimal speed. + +It achieves this by keeping track of message throughput at each pool size and performing the following +three resizing operations (one at a time) periodically: + +* Downsize if it hasn't seen all routees ever fully utilized for a period of time. +* Explore to a random nearby pool size to try and collect throughput metrics. +* Optimize to a nearby pool size with a better (than any other nearby sizes) throughput metrics. + +When the pool is fully-utilized (i.e. all routees are busy), it randomly choose between exploring and optimizing. +When the pool has not been fully-utilized for a period of time, it will downsize the pool to the last seen max +utilization multiplied by a configurable ratio. + +By constantly exploring and optimizing, the resizer will eventually walk to the optimal size and +remain nearby. When the optimal size changes it will start walking towards the new one. + +It keeps a performance log so it's stateful as well as having a larger memory footprint than the default ``Resizer``. +The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBound - lowerBound. + +Pool with ``OptimalSizeExploringResizer`` defined in configuration: + +.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool + +.. includecode:: code/docs/routing/RouterDocSpec.scala#optimal-size-exploring-resize-pool + +Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer`` +section of the reference :ref:`configuration`. + .. note:: Resizing is triggered by sending messages to the actor pool, but it is not @@ -619,6 +665,7 @@ will be used instead of any programmatically sent parameters.* this, configure the pool to use a balancing dispatcher, see `Configuring Dispatchers`_ for more information. + .. _router-design-scala: How Routing is Designed within Akka