+act #18356 Metrics based resizer for router

This commit is contained in:
Kailuo Wang 2015-08-21 21:03:57 -04:00
parent fb80ac8acb
commit 90cba9ce0d
19 changed files with 904 additions and 17 deletions

View file

@ -0,0 +1,345 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import java.time.LocalDateTime
import akka.actor._
import akka.testkit._
import akka.testkit.TestEvent._
import OptimalSizeExploringResizer._
import MetricsBasedResizerSpec._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{ Try, Random }
import akka.pattern.ask
object MetricsBasedResizerSpec {
class TestLatchingActor(implicit timeout: Timeout) extends Actor {
import context.system
def receive = {
case latch: TestLatch
Try(Await.ready(latch, timeout.duration))
}
}
def routee(implicit system: ActorSystem, timeout: Timeout): ActorRefRoutee =
ActorRefRoutee(system.actorOf(Props(new TestLatchingActor)))
def routees(num: Int = 10)(implicit system: ActorSystem, timeout: Timeout) = (1 to num).map(_ routee).toVector
case class TestRouter(routees: Vector[ActorRefRoutee], resizer: Resizer)(implicit system: ActorSystem, timeout: Timeout) {
system.registerOnTermination(close())
var msgs: Set[TestLatch] = Set()
def mockSend(l: TestLatch = TestLatch(),
routeeIdx: Int = Random.nextInt(routees.length),
wait: Boolean = true)(implicit sender: ActorRef): TestLatch = {
val target = routees(routeeIdx)
target.send(l, sender)
msgs = msgs + l
if (wait) waitForMessageToArrive()
l
}
def waitForMessageToArrive(): Unit = Thread.sleep(1.milliseconds.dilated.toMillis)
def close(): Unit = msgs.foreach(_.open())
def sendToAll()(implicit sender: ActorRef): Seq[TestLatch] = {
val sentMessages = (0 until routees.length).map(i mockSend(routeeIdx = i, wait = false))
waitForMessageToArrive()
sentMessages
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsBasedResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with ImplicitSender {
override def atStartup: Unit = {
// when shutting down some Resize messages might hang around
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*Resize")))
}
"MetricsBasedResizer isTimeForResize" must {
"be true with empty history" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.record = ResizeRecord(checkTime = 0)
resizer.isTimeForResize(0) should ===(true)
}
"be false if the last resize is too close within actionInterval enough history" in {
val resizer = DefaultOptimalSizeExploringResizer(actionInterval = 10.seconds)
resizer.record = ResizeRecord(checkTime = System.nanoTime() - 8.seconds.toNanos)
resizer.isTimeForResize(100) should ===(false)
}
"be true if the last resize is before actionInterval ago" in {
val resizer = DefaultOptimalSizeExploringResizer(actionInterval = 10.seconds)
resizer.record = ResizeRecord(checkTime = System.nanoTime() - 11.seconds.toNanos)
resizer.isTimeForResize(100) should ===(true)
}
}
"MetricsBasedResizer reportMessageCount" must {
"record last messageCounter correctly" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.reportMessageCount(Vector(routee), 3)
resizer.record.messageCount shouldBe 3
}
"record last totalQueueLength correctly" in {
val resizer = DefaultOptimalSizeExploringResizer()
val router = TestRouter(routees(2), resizer)
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.record.totalQueueLength shouldBe 0
router.mockSend()
router.mockSend()
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.record.totalQueueLength shouldBe 2
}
"start an underutilizationStreak when not fully utilized" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.reportMessageCount(routees(2), 0)
resizer.record.underutilizationStreak should not be empty
resizer.record.underutilizationStreak.get.start.isBefore(LocalDateTime.now.plusSeconds(1)) shouldBe true
resizer.record.underutilizationStreak.get.start.isAfter(LocalDateTime.now.minusSeconds(1)) shouldBe true
}
"stop an underutilizationStreak when fully utilized" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.record = ResizeRecord(
underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now.minusHours(1), highestUtilization = 1)))
val router = TestRouter(routees(2), resizer)
router.sendToAll()
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.record.underutilizationStreak shouldBe empty
}
"leave the underutilizationStreak start date unchanged when not fully utilized" in {
val start: LocalDateTime = LocalDateTime.now.minusHours(1)
val resizer = DefaultOptimalSizeExploringResizer()
resizer.record = ResizeRecord(
underutilizationStreak = Some(UnderUtilizationStreak(start = start, highestUtilization = 1)))
resizer.reportMessageCount(routees(2), 0)
resizer.record.underutilizationStreak.get.start shouldBe start
}
"leave the underutilizationStreak highestUtilization unchanged if current utilization is lower" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.record = ResizeRecord(
underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 2)))
val router = TestRouter(routees(2), resizer)
router.mockSend()
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2
}
"update the underutilizationStreak highestUtilization if current utilization is higher" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.record = ResizeRecord(
underutilizationStreak = Some(UnderUtilizationStreak(start = LocalDateTime.now, highestUtilization = 1)))
val router = TestRouter(routees(3), resizer)
router.mockSend(routeeIdx = 0)
router.mockSend(routeeIdx = 1)
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.record.underutilizationStreak.get.highestUtilization shouldBe 2
}
"not record a performance log when it's not fully utilized in two consecutive checks" in {
val resizer = DefaultOptimalSizeExploringResizer()
val router = TestRouter(routees(2), resizer)
resizer.reportMessageCount(router.routees, router.msgs.size)
router.sendToAll()
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.performanceLog shouldBe empty
}
"not record the performance log when no message is processed" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.record = ResizeRecord(
totalQueueLength = 2,
messageCount = 2,
checkTime = System.nanoTime())
val router = TestRouter(routees(2), resizer)
router.sendToAll()
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.performanceLog shouldBe empty
}
"record the performance log with the correct pool size" in {
val resizer = DefaultOptimalSizeExploringResizer()
val router = TestRouter(routees(2), resizer)
val msgs = router.sendToAll()
resizer.reportMessageCount(router.routees, router.msgs.size)
msgs.head.open()
router.sendToAll()
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.performanceLog.get(2) should not be empty
}
"record the performance log with the correct process speed" in {
val resizer = DefaultOptimalSizeExploringResizer()
val router = TestRouter(routees(2), resizer)
val msgs = router.sendToAll()
router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed.
val before = LocalDateTime.now
resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records
msgs.foreach(_.open()) //process two messages
Thread.sleep(1) // wait for routees to update their mail boxes
resizer.reportMessageCount(router.routees, router.msgs.size)
val after = LocalDateTime.now
resizer.performanceLog(2).toMillis shouldBe (java.time.Duration.between(before, after).toMillis / 2 +- 1)
}
"update the old performance log entry with updated speed " in {
val oldSpeed = 50
val resizer = DefaultOptimalSizeExploringResizer(
weightOfLatestMetric = 0.5)
resizer.performanceLog = Map(2 oldSpeed.milliseconds)
val router = TestRouter(routees(2), resizer)
val msgs = router.sendToAll()
router.sendToAll() //make sure the routees are still busy after the first batch of messages get processed.
val before = LocalDateTime.now
resizer.reportMessageCount(router.routees, router.msgs.size) //updates the records
msgs.foreach(_.open()) //process two messages
Thread.sleep(1) // wait for routees to update their mail boxes
resizer.reportMessageCount(router.routees, router.msgs.size)
val after = LocalDateTime.now
val newSpeed = java.time.Duration.between(before, after).toMillis / 2
resizer.performanceLog(2).toMillis shouldBe ((newSpeed + oldSpeed) / 2 +- 1)
}
}
"MetricsBasedResizer resize" must {
"downsize to close to the highest retention when a streak of underutilization started downsizeAfterUnderutilizedFor" in {
val resizer = DefaultOptimalSizeExploringResizer(
downsizeAfterUnderutilizedFor = 72.hours,
downsizeRatio = 0.5)
resizer.record = ResizeRecord(underutilizationStreak = Some(
UnderUtilizationStreak(start = LocalDateTime.now.minusHours(73), highestUtilization = 8)))
resizer.resize(routees(20)) should be(4 - 20)
}
"does not downsize on empty history" in {
val resizer = DefaultOptimalSizeExploringResizer()
resizer.resize(routees()) should be(0)
}
"always go to lowerBound if below it" in {
val resizer = DefaultOptimalSizeExploringResizer(lowerBound = 50, upperBound = 100)
resizer.resize(routees(20)) should be(30)
}
"always go to uppperBound if above it" in {
val resizer = DefaultOptimalSizeExploringResizer(upperBound = 50)
resizer.resize(routees(80)) should be(-30)
}
"explore when there is performance log but not go beyond exploreStepSize" in {
val resizer = DefaultOptimalSizeExploringResizer(
exploreStepSize = 0.3,
explorationProbability = 1)
resizer.performanceLog = Map(11 1.milli, 13 1.millis, 12 3.millis)
val exploreSamples = (1 to 100).map(_ resizer.resize(routees(10)))
exploreSamples.forall(change Math.abs(change) >= 1 && Math.abs(change) <= (10 * 0.3)) should be(true)
}
}
"MetricsBasedResizer optimize" must {
"optimize towards the fastest pool size" in {
val resizer = DefaultOptimalSizeExploringResizer(explorationProbability = 0)
resizer.performanceLog = Map(7 5.millis, 10 3.millis, 11 2.millis, 12 4.millis)
resizer.resize(routees(10)) should be(1)
resizer.resize(routees(12)) should be(-1)
resizer.resize(routees(7)) should be(2)
}
"ignore further away sample data when optmizing" in {
val resizer = DefaultOptimalSizeExploringResizer(explorationProbability = 0, numOfAdjacentSizesToConsiderDuringOptimization = 4)
resizer.performanceLog = Map(
7 5.millis,
8 2.millis,
10 3.millis,
11 4.millis,
12 3.millis,
13 1.millis)
resizer.resize(routees(10)) should be(-1)
}
}
"MetricsBasedResizer" must {
def poolSize(router: ActorRef): Int =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size
"start with lowerbound pool size" in {
val resizer = DefaultOptimalSizeExploringResizer(lowerBound = 2)
val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props(Props(new TestLatchingActor)))
Thread.sleep(10)
poolSize(router) shouldBe resizer.lowerBound
}
}
}

View file

@ -3,14 +3,14 @@
*/
package akka.routing
import com.typesafe.config.{ Config, ConfigFactory }
import language.postfixOps
import akka.actor.Actor
import akka.actor.{ ActorSystem, Actor, Props, ActorRef }
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Props
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.pattern.ask
import scala.util.Try
@ -50,6 +50,49 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
def routeeSize(router: ActorRef): Int =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size
"Resizer fromConfig" must {
def parseCfg(cfgString: String): Config = {
val referenceCfg = ConfigFactory.defaultReference(ActorSystem.findClassLoader())
ConfigFactory.parseString(cfgString).withFallback(referenceCfg.getConfig("akka.actor.deployment.default"))
}
"load DefaultResizer from config when resizer is enabled" in {
val cfg = parseCfg("""
resizer {
enabled = on
}
""")
Resizer.fromConfig(cfg).get shouldBe a[DefaultResizer]
}
"load MetricsBasedResizer from config when optimal-size-exploring-resizer is enabled" in {
val cfg = parseCfg("""
optimal-size-exploring-resizer {
enabled = on
}
""")
Resizer.fromConfig(cfg).get shouldBe a[DefaultOptimalSizeExploringResizer]
}
"throws exception when both resizer and optimal-size-exploring-resizer is enabled" in {
val cfg = parseCfg("""
optimal-size-exploring-resizer {
enabled = on
}
resizer {
enabled = on
}
""")
intercept[ResizerInitializationException] {
Resizer.fromConfig(cfg)
}
}
"return None if neither resizer is enabled which is default" in {
Resizer.fromConfig(parseCfg("")) shouldBe empty
}
}
"DefaultResizer" must {
"use settings to evaluate capacity" in {

View file

@ -237,6 +237,63 @@ akka {
# Use 1 to resize before each message.
messages-per-resize = 10
}
# Routers with dynamically resizable number of routees based on
# performance metrics.
# This feature is enabled by including (parts of) this section in
# the deployment, cannot be enabled together with default resizer.
optimal-size-exploring-resizer {
enabled = off
# The fewest number of routees the router should ever have.
lower-bound = 1
# The most number of routees the router should ever have.
# Must be greater than or equal to lower-bound.
upper-bound = 10
# probability of doing a ramping down when all routees are busy
# during exploration.
chance-of-ramping-down-when-full = 0.2
# Interval between each resize attempt
action-interval = 5s
# If the routees have not been fully utilized (i.e. all routees busy)
# for such length, the resizer will downsize the pool.
downsize-after-underutilized-for = 72h
# Duration exploration, the ratio between the largest step size and
# current pool size. E.g. if the current pool size is 50, and the
# explore-step-size is 0.1, the maximum pool size change during
# exploration will be +- 5
explore-step-size = 0.1
# Probabily of doing an exploration v.s. optmization.
chance-of-exploration = 0.4
# When downsizing after a long streak of underutilization, the resizer
# will downsize the pool to the highest utiliziation multiplied by a
# a downsize rasio. This downsize ratio determines the new pools size
# in comparison to the highest utilization.
# E.g. if the highest utilization is 10, and the down size ratio
# is 0.8, the pool will be downsized to 8
downsize-ratio = 0.8
# When optimizing, the resizer only considers the sizes adjacent to the
# current size. This number indicates how many adjacent sizes to consider.
optimization-range = 16
# The weight of the latest metric over old metrics when collecting
# performance metrics.
# E.g. if the last processing speed is 10 millis per message at pool
# size 5, and if the new processing speed collected is 6 millis per
# message at pool size 5. Given a weight of 0.3, the metrics
# representing pool size 5 will be 6 * 0.3 + 10 * 0.7, i.e. 8.8 millis
# Obviously, this number should be between 0 and 1.
weight-of-latest-metric = 0.5
}
}
/IO-DNS/inet-address {

View file

@ -67,7 +67,7 @@ final case class BroadcastPool(
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -272,7 +272,7 @@ final case class ConsistentHashingPool(
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -0,0 +1,275 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import java.time.LocalDateTime
import scala.collection.immutable
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.actor._
import akka.util.JavaDurationConverters._
import OptimalSizeExploringResizer._
trait OptimalSizeExploringResizer extends Resizer {
/**
* Report the messageCount as well as current routees so that the
* it can collect metrics.
* Caution: this method is not thread safe.
*
* @param currentRoutees
* @param messageCounter
*/
def reportMessageCount(currentRoutees: immutable.IndexedSeq[Routee], messageCounter: Long): Unit
}
case object OptimalSizeExploringResizer {
/**
* INTERNAL API
*/
private[routing]type PoolSize = Int
/**
* INTERNAL API
*/
private[routing] case class UnderUtilizationStreak(start: LocalDateTime, highestUtilization: Int)
/**
* INTERNAL API
*/
private[routing] case class ResizeRecord(
underutilizationStreak: Option[UnderUtilizationStreak] = None,
messageCount: Long = 0,
totalQueueLength: Int = 0,
checkTime: Long = 0)
/**
* INTERNAL API
*/
private[routing]type PerformanceLog = Map[PoolSize, Duration]
def apply(resizerCfg: Config): OptimalSizeExploringResizer =
DefaultOptimalSizeExploringResizer(
lowerBound = resizerCfg.getInt("lower-bound"),
upperBound = resizerCfg.getInt("upper-bound"),
chanceOfScalingDownWhenFull = resizerCfg.getDouble("chance-of-ramping-down-when-full"),
actionInterval = resizerCfg.getDuration("action-interval").asScala,
downsizeAfterUnderutilizedFor = resizerCfg.getDuration("downsize-after-underutilized-for").asScala,
numOfAdjacentSizesToConsiderDuringOptimization = resizerCfg.getInt("optimization-range"),
exploreStepSize = resizerCfg.getDouble("explore-step-size"),
explorationProbability = resizerCfg.getDouble("chance-of-exploration"),
weightOfLatestMetric = resizerCfg.getDouble("weight-of-latest-metric"),
downsizeRatio = resizerCfg.getDouble("downsize-ratio"))
}
/**
* This resizer resizes the pool to an optimal size that provides
* the most message throughput.
*
* This resizer works best when you expect the pool size to
* performance function to be a convex function.
*
* For example, when you have a CPU bound tasks, the optimal
* size is bound to the number of CPU cores.
* When your task is IO bound, the optimal size is bound to
* optimal number of concurrent connections to that IO service -
* e.g. a 4 node elastic search cluster may handle 4-8
* concurrent requests at optimal speed.
*
* It achieves this by keeping track of message throughput at
* each pool size and performing the following three
* resizing operations (one at a time) periodically:
*
* * Downsize if it hasn't seen all routees ever fully
* utilized for a period of time.
* * Explore to a random nearby pool size to try and
* collect throughput metrics.
* * Optimize to a nearby pool size with a better (than any other
* nearby sizes) throughput metrics.
*
* When the pool is fully-utilized (i.e. all routees are busy),
* it randomly choose between exploring and optimizing.
* When the pool has not been fully-utilized for a period of time,
* it will downsize the pool to the last seen max utilization
* multiplied by a configurable ratio.
*
* By constantly exploring and optimizing, the resizer will
* eventually walk to the optimal size and remain nearby.
* When the optimal size changes it will start walking towards
* the new one.
*
* It keeps a performance log so it's stateful as well as
* having a larger memory footprint than the default [[Resizer]].
* The memory usage is O(n) where n is the number of sizes
* you allow, i.e. upperBound - lowerBound.
*
* For documentation about the the parameters, see the reference.conf -
* akka.actor.deployment.default.optimal-size-exploring-resizer
*
*/
@SerialVersionUID(1L)
case class DefaultOptimalSizeExploringResizer(
lowerBound: PoolSize = 1,
upperBound: PoolSize = 30,
chanceOfScalingDownWhenFull: Double = 0.2,
actionInterval: Duration = 5.seconds,
numOfAdjacentSizesToConsiderDuringOptimization: Int = 16,
exploreStepSize: Double = 0.1,
downsizeRatio: Double = 0.8,
downsizeAfterUnderutilizedFor: Duration = 72.hours,
explorationProbability: Double = 0.4,
weightOfLatestMetric: Double = 0.5) extends OptimalSizeExploringResizer {
/**
* Leave package accessible for testing purpose
*/
private[routing] var performanceLog: PerformanceLog = Map.empty
/**
* Leave package accessible for testing purpose
*/
private[routing] var record: ResizeRecord = ResizeRecord()
/**
* Leave package accessible for testing purpose
*/
private[routing] var stopExploring = false
private def random = ThreadLocalRandom.current()
private def checkParamAsProbability(value: Double, paramName: String): Unit =
if (value < 0 || value > 1) throw new IllegalArgumentException(s"$paramName must be between 0 and 1 (inclusive), was: [%s]".format(value))
private def checkParamAsPositiveNum(value: Double, paramName: String): Unit = checkParamLowerBound(value, 0, paramName)
private def checkParamLowerBound(value: Double, lowerBound: Double, paramName: String): Unit =
if (value < lowerBound) throw new IllegalArgumentException(s"$paramName must be >= $lowerBound, was: [%s]".format(value))
checkParamAsPositiveNum(lowerBound, "lowerBound")
checkParamAsPositiveNum(upperBound, "upperBound")
if (upperBound < lowerBound) throw new IllegalArgumentException("upperBound must be >= lowerBound, was: [%s] < [%s]".format(upperBound, lowerBound))
checkParamLowerBound(numOfAdjacentSizesToConsiderDuringOptimization, 2, "numOfAdjacentSizesToConsiderDuringOptimization")
checkParamAsProbability(chanceOfScalingDownWhenFull, "chanceOfScalingDownWhenFull")
checkParamAsPositiveNum(numOfAdjacentSizesToConsiderDuringOptimization, "numOfAdjacentSizesToConsiderDuringOptimization")
checkParamAsPositiveNum(exploreStepSize, "exploreStepSize")
checkParamAsPositiveNum(downsizeRatio, "downsizeRatio")
checkParamAsProbability(explorationProbability, "explorationProbability")
checkParamAsProbability(weightOfLatestMetric, "weightOfLatestMetric")
private val actionInternalNanos = actionInterval.toNanos
def isTimeForResize(messageCounter: Long): Boolean = {
System.nanoTime() > record.checkTime + actionInternalNanos
}
def reportMessageCount(currentRoutees: immutable.IndexedSeq[Routee], messageCounter: Long): Unit = {
val (newPerfLog, newRecord) = updatedStats(currentRoutees, messageCounter)
performanceLog = newPerfLog
record = newRecord
}
private[routing] def updatedStats(currentRoutees: immutable.IndexedSeq[Routee], messageCounter: Long): (PerformanceLog, ResizeRecord) = {
val now = LocalDateTime.now
val currentSize = currentRoutees.length
val messagesInRoutees = currentRoutees map {
case ActorRefRoutee(a: ActorRefWithCell)
a.underlying match {
case cell: ActorCell
cell.mailbox.numberOfMessages + (if (cell.currentMessage != null) 1 else 0)
case cell cell.numberOfMessages
}
case x 0
}
val totalQueueLength = messagesInRoutees.sum
val utilized = messagesInRoutees.count(_ > 0)
val fullyUtilized = utilized == currentSize
val newUnderutilizationStreak = if (fullyUtilized)
None
else
Some(UnderUtilizationStreak(
record.underutilizationStreak.fold(now)(_.start),
Math.max(record.underutilizationStreak.fold(0)(_.highestUtilization), utilized)))
val newPerformanceLog: PerformanceLog =
if (fullyUtilized && record.underutilizationStreak.isEmpty && record.checkTime > 0) {
val totalMessageReceived = messageCounter - record.messageCount
val queueSizeChange = record.totalQueueLength - totalQueueLength
val totalProcessed = queueSizeChange + totalMessageReceived
if (totalProcessed > 0) {
val duration = Duration.fromNanos(System.nanoTime() - record.checkTime)
val last: Duration = duration / totalProcessed
//exponentially decrease the weight of old last metrics data
val toUpdate = performanceLog.get(currentSize).fold(last) { oldSpeed
(oldSpeed * (1.0 - weightOfLatestMetric)) + (last * weightOfLatestMetric)
}
performanceLog + (currentSize toUpdate)
} else performanceLog
} else performanceLog
val newRecord = record.copy(
underutilizationStreak = newUnderutilizationStreak,
messageCount = messageCounter,
totalQueueLength = totalQueueLength,
checkTime = System.nanoTime())
(newPerformanceLog, newRecord)
}
def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = {
val currentSize = currentRoutees.length
val now = LocalDateTime.now
val proposedChange =
if (record.underutilizationStreak.fold(false)(_.start.isBefore(now.minus(downsizeAfterUnderutilizedFor.asJava)))) {
val downsizeTo = (record.underutilizationStreak.get.highestUtilization * downsizeRatio).toInt
Math.min(downsizeTo - currentSize, 0)
} else if (performanceLog.isEmpty || record.underutilizationStreak.isDefined) {
0
} else {
if (!stopExploring && random.nextDouble() < explorationProbability)
explore(currentSize)
else
optimize(currentSize)
}
Math.max(lowerBound, Math.min(proposedChange + currentSize, upperBound)) - currentSize
}
private def optimize(currentSize: PoolSize): Int = {
val adjacentDispatchWaits: Map[PoolSize, Duration] = {
def adjacency = (size: Int) Math.abs(currentSize - size)
val sizes = performanceLog.keys.toSeq
val numOfSizesEachSide = numOfAdjacentSizesToConsiderDuringOptimization / 2
val leftBoundary = sizes.filter(_ < currentSize).sortBy(adjacency).take(numOfSizesEachSide).lastOption.getOrElse(currentSize)
val rightBoundary = sizes.filter(_ >= currentSize).sortBy(adjacency).take(numOfSizesEachSide).lastOption.getOrElse(currentSize)
performanceLog.filter { case (size, _) size >= leftBoundary && size <= rightBoundary }
}
val optimalSize = adjacentDispatchWaits.minBy(_._2)._1
val movement = (optimalSize - currentSize) / 2.0
if (movement < 0)
Math.floor(movement).toInt
else
Math.ceil(movement).toInt
}
private def explore(currentSize: PoolSize): Int = {
val change = Math.max(1, random.nextInt(Math.ceil(currentSize * exploreStepSize).toInt))
if (random.nextDouble() < chanceOfScalingDownWhenFull)
-change
else
change
}
}

View file

@ -68,7 +68,7 @@ final case class RandomPool(
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -7,6 +7,9 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import akka.AkkaException
import akka.event.Logging.Error.NoCause
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
@ -54,8 +57,26 @@ trait Resizer {
* This method is invoked only in the context of the Router actor.
*/
def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int
}
object Resizer {
def fromConfig(parentConfig: Config): Option[Resizer] = {
val defaultResizerConfig = parentConfig.getConfig("resizer")
val metricsBasedResizerConfig = parentConfig.getConfig("optimal-size-exploring-resizer")
(defaultResizerConfig.getBoolean("enabled"), metricsBasedResizerConfig.getBoolean("enabled")) match {
case (true, false) Some(DefaultResizer(defaultResizerConfig))
case (false, true) Some(OptimalSizeExploringResizer(metricsBasedResizerConfig))
case (false, false) None
case (true, true)
throw new ResizerInitializationException(s"cannot enable both resizer and optimal-size-exploring-resizer", null)
}
}
}
@SerialVersionUID(1L)
class ResizerInitializationException(message: String, cause: Throwable) extends AkkaException(message, cause)
case object DefaultResizer {
/**
@ -273,11 +294,13 @@ private[akka] final class ResizablePoolCell(
resizer.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) {
super.sendMessage(Envelope(ResizablePoolActor.Resize, self, system))
}
super.sendMessage(envelope)
}
private[akka] def resize(initial: Boolean): Unit = {
if (resizeInProgress.get || initial) try {
tryReportMessageCount()
val requestedCapacity = resizer.resize(router.routees)
if (requestedCapacity > 0) {
val newRoutees = Vector.fill(requestedCapacity)(pool.newRoutee(routeeProps, this))
@ -290,6 +313,16 @@ private[akka] final class ResizablePoolCell(
} finally resizeInProgress.set(false)
}
/**
* This approach is chosen for binary compatibility
*/
private def tryReportMessageCount(): Unit = {
resizer match {
case r: OptimalSizeExploringResizer r.reportMessageCount(router.routees, resizeCounter.get())
case _ //ignore
}
}
}
/**
@ -309,10 +342,12 @@ private[akka] class ResizablePoolActor(supervisorStrategy: SupervisorStrategy)
val resizerCell = context match {
case x: ResizablePoolCell x
case _
throw ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
throw ActorInitializationException("Resizable router actor can only be used when resizer is defined, not in " + context.getClass)
}
override def receive = ({
case Resize resizerCell.resize(initial = false)
case Resize
resizerCell.resize(initial = false)
}: Actor.Receive) orElse super.receive
}

View file

@ -73,7 +73,7 @@ final case class RoundRobinPool(
def this(config: Config) =
this(nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -147,7 +147,6 @@ private[akka] class RoutedActorCell(
* INTERNAL API
*/
private[akka] class RouterActor extends Actor {
val cell = context match {
case x: RoutedActorCell x
case _

View file

@ -110,7 +110,7 @@ final case class ScatterGatherFirstCompletedPool(
this(
nrOfInstances = config.getInt("nr-of-instances"),
within = config.getMillisDuration("within"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -183,7 +183,7 @@ final case class SmallestMailboxPool(
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -154,7 +154,7 @@ final case class TailChoppingPool(
nrOfInstances = config.getInt("nr-of-instances"),
within = config.getMillisDuration("within"),
interval = config.getMillisDuration("tail-chopping-router.interval"),
resizer = DefaultResizer.fromConfig(config),
resizer = Resizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import java.time.{ Duration JDuration }
import scala.concurrent.duration.Duration
/**
* INTERNAL API
*/
private[akka] object JavaDurationConverters {
final implicit class JavaDurationOps(val self: JDuration) extends AnyVal {
def asScala: Duration = Duration.fromNanos(self.toNanos)
}
final implicit class ScalaDurationOps(val self: Duration) extends AnyVal {
def asJava: JDuration = JDuration.ofNanos(self.toNanos)
}
}

View file

@ -226,7 +226,7 @@ abstract class SurviveNetworkInstabilitySpec
cluster.join(first)
// let them join and stabilize heartbeating
Thread.sleep(5000)
Thread.sleep(5000.millis.dilated.toMillis)
}
enterBarrier("joined-5")

View file

@ -338,7 +338,13 @@ public class RouterDocTest {
getContext().actorOf(new RoundRobinPool(5).withResizer(resizer).props(
Props.create(Worker.class)), "router30");
//#resize-pool-2
//#optimal-size-exploring-resize-pool
ActorRef router31 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router31");
//#optimal-size-exploring-resize-pool
public void onReceive(Object msg) {}
}

View file

@ -594,7 +594,16 @@ Dynamically Resizable Pool
All pools can be used with a fixed number of routees or with a resize strategy to adjust the number
of routees dynamically.
Pool with resizer defined in configuration:
There are two types of resizers: the default ``Resizer`` and the ``OptimalSizeExploringResizer``.
Default Resizer
---------------
The default resizer ramps up and down pool size based on pressure, measured by the percentage of busy routees
in the pool. It ramps up pool size if the pressure is higher than a certain threshold and backs off if the
pressure is lower than certain threshold. Both thresholds are configurable.
Pool with default resizer defined in configuration:
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-resize-pool
@ -610,6 +619,41 @@ Pool with resizer defined in code:
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value
will be used instead of any programmatically sent parameters.*
Optimal Size Exploring Resizer
------------------------------
The ``OptimalSizeExploringResizer`` resizes the pool to an optimal size that provides the most message throughput.
It achieves this by keeping track of message throughput at each pool size and performing one of the following
three resizing operations periodically:
* Downsize if it hasn't seen all routees ever fully utilized for a period of time.
* Explore to a random nearby pool size to try and collect throughput metrics.
* Optimize to a nearby pool size with a better (than any other nearby sizes) throughput metrics.
When the pool is fully-utilized (i.e. all routees are busy), it randomly choose between exploring and optimizing.
When the pool has not been fully-utilized for a period of time, it will downsize the pool to the last seen max
utilization multiplied by a configurable ratio.
By constantly exploring and optimizing, the resizer will eventually walk to the optimal size and
remain nearby. When the optimal size changes it will start walking towards the new one.
This resizer works best when you expect the pool size to performance function to be a convex function.
For example, when you have a CPU bound tasks, the optimal size is bound to the number of CPU cores.
When your task is IO bound, the optimal size is bound to optimal number of concurrent connections to that IO service -
e.g. a 4 node elastic search cluster may handle 4-8 concurrent requests at optimal speed.
It keeps a performance log so it's stateful as well as having a larger memory footprint than the default ``Resizer``.
The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBound - lowerBound.
Pool with ``OptimalSizeExploringResizer`` defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool
.. includecode:: code/docs/routing/RouterDocTest.java#optimal-size-exploring-resize-pool
Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer``
section of the reference :ref:`configuration`.
.. note::
Resizing is triggered by sending messages to the actor pool, but it is not

View file

@ -227,6 +227,19 @@ akka.actor.deployment {
}
#//#config-resize-pool
#//#config-optimal-size-exploring-resize-pool
akka.actor.deployment {
/parent/router31 {
router = round-robin-pool
optimal-size-exploring-resizer {
enabled = on
action-interval = 5s
downsize-after-underutilized-for = 72h
}
}
}
#//#config-optimal-size-exploring-resize-pool
#//#config-pool-dispatcher
akka.actor.deployment {
/poolWithDispatcher {
@ -462,6 +475,11 @@ router-dispatcher {}
"router30")
//#resize-pool-2
//#optimal-size-exploring-resize-pool
val router31: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router31")
//#optimal-size-exploring-resize-pool
def receive = {
case _ =>
}

View file

@ -593,7 +593,16 @@ Dynamically Resizable Pool
Most pools can be used with a fixed number of routees or with a resize strategy to adjust the number
of routees dynamically.
Pool with resizer defined in configuration:
There are two types of resizers: the default ``Resizer`` and the ``OptimalSizeExploringResizer``.
Default Resizer
---------------
The default resizer ramps up and down pool size based on pressure, measured by the percentage of busy routees
in the pool. It ramps up pool size if the pressure is higher than a certain threshold and backs off if the
pressure is lower than certain threshold. Both thresholds are configurable.
Pool with default resizer defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-resize-pool
@ -609,6 +618,43 @@ Pool with resizer defined in code:
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value
will be used instead of any programmatically sent parameters.*
Optimal Size Exploring Resizer
------------------------------
The ``OptimalSizeExploringResizer`` resizes the pool to an optimal size that provides the most message throughput.
This resizer works best when you expect the pool size to performance function to be a convex function.
For example, when you have a CPU bound tasks, the optimal size is bound to the number of CPU cores.
When your task is IO bound, the optimal size is bound to optimal number of concurrent connections to that IO service -
e.g. a 4 node elastic search cluster may handle 4-8 concurrent requests at optimal speed.
It achieves this by keeping track of message throughput at each pool size and performing the following
three resizing operations (one at a time) periodically:
* Downsize if it hasn't seen all routees ever fully utilized for a period of time.
* Explore to a random nearby pool size to try and collect throughput metrics.
* Optimize to a nearby pool size with a better (than any other nearby sizes) throughput metrics.
When the pool is fully-utilized (i.e. all routees are busy), it randomly choose between exploring and optimizing.
When the pool has not been fully-utilized for a period of time, it will downsize the pool to the last seen max
utilization multiplied by a configurable ratio.
By constantly exploring and optimizing, the resizer will eventually walk to the optimal size and
remain nearby. When the optimal size changes it will start walking towards the new one.
It keeps a performance log so it's stateful as well as having a larger memory footprint than the default ``Resizer``.
The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBound - lowerBound.
Pool with ``OptimalSizeExploringResizer`` defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool
.. includecode:: code/docs/routing/RouterDocSpec.scala#optimal-size-exploring-resize-pool
Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer``
section of the reference :ref:`configuration`.
.. note::
Resizing is triggered by sending messages to the actor pool, but it is not
@ -619,6 +665,7 @@ will be used instead of any programmatically sent parameters.*
this, configure the pool to use a balancing dispatcher, see `Configuring
Dispatchers`_ for more information.
.. _router-design-scala:
How Routing is Designed within Akka