Improvements and finalization of dynamically resizable routers, replaces ActorPool. See 1557

* resize on nth message instead of always each message
* improved pressure evaluation
* more tests
* documentation
* removed ActorPool
This commit is contained in:
Patrik Nordwall 2012-01-10 15:53:27 +01:00
parent 8b71bf5bea
commit 19845d93e8
21 changed files with 591 additions and 1226 deletions

View file

@ -38,9 +38,9 @@ object DeployerSpec {
router = scatter-gather router = scatter-gather
within = 2 seconds within = 2 seconds
} }
/service-pool { /service-resizer {
router = round-robin router = round-robin
pool { resizer {
lower-bound = 1 lower-bound = 1
upper-bound = 10 upper-bound = 10
} }
@ -128,9 +128,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
assertRouting(ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather") assertRouting(ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather")
} }
"be able to parse 'akka.actor.deployment._' with router pool" in { "be able to parse 'akka.actor.deployment._' with router resizer" in {
val pool = DefaultRouterPool() val resizer = DefaultResizer()
assertRouting(RoundRobinRouter(pool = Some(pool)), "/service-pool") assertRouting(RoundRobinRouter(resizer = Some(resizer)), "/service-resizer")
} }
def assertRouting(expected: RouterConfig, service: String) { def assertRouting(expected: RouterConfig, service: String) {
@ -139,7 +139,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
deployment.get.path must be(service) deployment.get.path must be(service)
deployment.get.recipe must be(None) deployment.get.recipe must be(None)
deployment.get.routing.getClass must be(expected.getClass) deployment.get.routing.getClass must be(expected.getClass)
deployment.get.routing.pool must be(expected.pool) deployment.get.routing.resizer must be(expected.resizer)
deployment.get.scope must be(LocalScope) deployment.get.scope must be(LocalScope)
} }

View file

@ -1,361 +0,0 @@
package akka.routing
import akka.actor._
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.AkkaSpec
import akka.dispatch.{ Await, Promise, Future }
object ActorPoolSpec {
trait Foo {
def sq(x: Int, sleep: Long): Future[Int]
}
class FooImpl extends Foo {
import TypedActor.dispatcher
def sq(x: Int, sleep: Long): Future[Int] = {
if (sleep > 0) Thread.sleep(sleep)
Promise.successful(x * x)
}
}
val faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TypedActorPoolSpec extends AkkaSpec with DefaultTimeout {
import ActorPoolSpec._
"Actor Pool (2)" must {
"support typed actors" in {
val ta = TypedActor(system)
val pool = ta.createProxy[Foo](new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
val typedActor = TypedActor(context)
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance(p: Props) = typedActor.getActorRefFor(typedActor.typedActorOf[Foo, FooImpl](props = p.withTimeout(10 seconds)))
def receive = _route
}, Props().withTimeout(10 seconds).withFaultHandler(faultHandler))
val results = for (i 1 to 100) yield (i, pool.sq(i, 0))
for ((i, r) results)
Await.result(r, timeout.duration) must equal(i * i)
ta.stop(pool)
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
import ActorPoolSpec._
"Actor Pool" must {
"have expected capacity" in {
val latch = TestLatch(2)
val count = new AtomicInteger(0)
val pool = system.actorOf(
Props(new Actor with DefaultActorPool with FixedCapacityStrategy with SmallestMailboxSelector {
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case _
count.incrementAndGet
latch.countDown()
sender.tell("success")
}
}))
def limit = 2
def selectionCount = 1
def partialFill = true
def receive = _route
}).withFaultHandler(faultHandler))
val successes = TestLatch(2)
val successCounter = system.actorOf(Props(new Actor {
def receive = {
case "success" successes.countDown()
}
}))
implicit val replyTo = successCounter
pool ! "a"
pool ! "b"
Await.ready(latch, TestLatch.DefaultTimeout)
Await.ready(successes, TestLatch.DefaultTimeout)
count.get must be(2)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
system.stop(pool)
}
"pass ticket #705" in {
val pool = system.actorOf(
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def receive = _route
def pressureThreshold = 1
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case req: String {
(10 millis).dilated.sleep
sender.tell("Response")
}
}
}))
}).withFaultHandler(faultHandler))
try {
(for (count 1 to 500) yield pool.?("Test", 20 seconds)) foreach {
Await.result(_, 20 seconds) must be("Response")
}
} finally {
system.stop(pool)
}
}
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = system.actorOf(
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveActorsPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case n: Int
(n millis).dilated.sleep
count.incrementAndGet
latch.countDown()
}
}))
def lowerBound = 2
def upperBound = 4
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def receive = _route
}).withFaultHandler(faultHandler))
// first message should create the minimum number of delgates
pool ! 1
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
var loops = 0
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
pool ? t
(50 millis).dilated.sleep
}
}
// 2 more should go thru without triggering more
loops = 2
loop(500)
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
// a whole bunch should max it out
loops = 10
loop(500)
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4)
system.stop(pool)
}
"grow as needed under mailbox pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of messages in the delegate mailboxes
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = system.actorOf(
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case n: Int
(n millis).dilated.sleep
count.incrementAndGet
latch.countDown()
}
}))
def lowerBound = 2
def upperBound = 4
def pressureThreshold = 3
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def receive = _route
}).withFaultHandler(faultHandler))
var loops = 0
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
pool ! t
}
}
// send a few messages and observe pool at its lower bound
loops = 3
loop(500)
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
// send a bunch over the threshold and observe an increment
loops = 15
loop(500)
Await.ready(latch, 10 seconds)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3)
system.stop(pool)
}
"round robin" in {
val latch1 = TestLatch(2)
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
val pool1 = system.actorOf(
Props(new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def instance(p: Props): ActorRef = system.actorOf(p.withCreator(new Actor {
def receive = {
case _
delegates put (self.path.toString, "")
latch1.countDown()
}
}))
def limit = 1
def selectionCount = 1
def rampupRate = 0.1
def partialFill = true
def receive = _route
}).withFaultHandler(faultHandler))
pool1 ! "a"
pool1 ! "b"
Await.ready(latch1, TestLatch.DefaultTimeout)
delegates.size must be(1)
system.stop(pool1)
val latch2 = TestLatch(2)
delegates.clear()
val pool2 = system.actorOf(
Props(new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case _
delegates put (self.path.toString, "")
latch2.countDown()
}
}))
def limit = 2
def selectionCount = 1
def rampupRate = 0.1
def partialFill = false
def receive = _route
}).withFaultHandler(faultHandler))
pool2 ! "a"
pool2 ! "b"
Await.ready(latch2, TestLatch.DefaultTimeout)
delegates.size must be(2)
system.stop(pool2)
}
"backoff" in {
val latch = TestLatch(10)
val pool = system.actorOf(
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case n: Int
(n millis).dilated.sleep
latch.countDown()
}
}))
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def receive = _route
}).withFaultHandler(faultHandler))
// put some pressure on the pool
for (m 0 to 10) pool ! 250
(5 millis).dilated.sleep
val z = Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size
z must be >= (2)
// let it cool down
for (m 0 to 3) {
pool ! 1
(500 millis).dilated.sleep
}
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be <= (z)
system.stop(pool)
}
}
}

View file

@ -0,0 +1,249 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.actor.Actor
import akka.testkit._
import akka.actor.Props
import akka.dispatch.Await
import akka.util.duration._
import akka.actor.ActorRef
import java.util.concurrent.atomic.AtomicInteger
object ResizerSpec {
val config = """
akka.actor.deployment {
/router1 {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 3
}
}
}
"""
class TestActor extends Actor {
def receive = {
case latch: TestLatch latch.countDown()
}
}
class BusyActor extends Actor {
def receive = {
case (latch: TestLatch, busy: TestLatch)
latch.countDown()
Await.ready(busy, 5 seconds)
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with ImplicitSender {
import akka.routing.ResizerSpec._
"DefaultResizer" must {
"use settings to evaluate capacity" in {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 3)
val c1 = resizer.capacity(IndexedSeq.empty[ActorRef])
c1 must be(2)
val current = IndexedSeq(system.actorOf(Props[TestActor]), system.actorOf(Props[TestActor]))
val c2 = resizer.capacity(current)
c2 must be(0)
}
"use settings to evaluate rampUp" in {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 10,
rampupRate = 0.2)
resizer.rampup(pressure = 9, capacity = 10) must be(0)
resizer.rampup(pressure = 5, capacity = 5) must be(1)
resizer.rampup(pressure = 6, capacity = 6) must be(2)
}
"use settings to evaluate backoff" in {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 10,
backoffThreshold = 0.3,
backoffRate = 0.1)
resizer.backoff(pressure = 10, capacity = 10) must be(0)
resizer.backoff(pressure = 4, capacity = 10) must be(0)
resizer.backoff(pressure = 3, capacity = 10) must be(0)
resizer.backoff(pressure = 2, capacity = 10) must be(-1)
resizer.backoff(pressure = 0, capacity = 10) must be(-1)
resizer.backoff(pressure = 1, capacity = 9) must be(-1)
resizer.backoff(pressure = 0, capacity = 9) must be(-1)
}
"be possible to define programatically" in {
val latch = new TestLatch(3)
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 3)
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
router ! latch
router ! latch
router ! latch
Await.ready(latch, 5 seconds)
val current = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees]
current.routees.size must be(2)
}
"be possible to define in configuration" in {
val latch = new TestLatch(3)
val router = system.actorOf(Props[TestActor].withRouter(FromConfig()), "router1")
router ! latch
router ! latch
router ! latch
Await.ready(latch, 5 seconds)
val current = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees]
current.routees.size must be(2)
}
"resize when busy" in {
val busy = new TestLatch(1)
val resizer = DefaultResizer(
lowerBound = 1,
upperBound = 3,
pressureThreshold = 0,
resizeOnNthMessage = 1)
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
val latch1 = new TestLatch(1)
router.!((latch1, busy))
Await.ready(latch1, 2 seconds)
val latch2 = new TestLatch(1)
router.!((latch2, busy))
Await.ready(latch2, 2 seconds)
val latch3 = new TestLatch(1)
router.!((latch3, busy))
Await.ready(latch3, 2 seconds)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
busy.countDown()
}
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 4,
rampupRate = 0.1,
pressureThreshold = 1,
resizeOnNthMessage = 1,
backoffThreshold = 0.0)
val router = system.actorOf(Props(new Actor {
def receive = {
case n: Int
(n millis).dilated.sleep
count.incrementAndGet
latch.countDown()
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
// first message should create the minimum number of routees
router ! 1
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
def loop(loops: Int, t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
router ! t
(10 millis).dilated.sleep
}
}
// 2 more should go thru without triggering more
loop(2, 200)
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(2)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
// a whole bunch should max it out
loop(10, 200)
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(10)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4)
}
"backoff" in {
val resizer = DefaultResizer(
lowerBound = 1,
upperBound = 5,
rampupRate = 1.0,
backoffRate = 1.0,
backoffThreshold = 0.20,
pressureThreshold = 1,
resizeOnNthMessage = 1)
val router = system.actorOf(Props(new Actor {
def receive = {
case n: Int
(n millis).dilated.sleep
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
// put some pressure on the router
for (m 0 to 5) {
router ! 100
(5 millis).dilated.sleep
}
val z = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size
z must be >= (2)
(300 millis).dilated.sleep
// let it cool down
for (m 0 to 3) {
router ! 1
(200 millis).dilated.sleep
}
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be < (z)
}
}
}

View file

@ -1,93 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.actor.Actor
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
import akka.testkit.TestLatch
import akka.actor.Props
import akka.dispatch.Await
import akka.util.duration._
import akka.actor.ActorRef
object RouterPoolSpec {
val config = """
akka.actor.deployment {
/router1 {
router = round-robin
pool {
lower-bound = 2
upper-bound = 3
}
}
}
"""
class TestActor extends Actor {
def receive = {
case latch: TestLatch latch.countDown()
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RouterPoolSpec extends AkkaSpec(RouterPoolSpec.config) with DefaultTimeout with ImplicitSender {
import akka.routing.RouterPoolSpec._
"DefaultRouterPool" must {
"use settings to evaluate capacity" in {
val pool = DefaultRouterPool(
lowerBound = 2,
upperBound = 3)
val c1 = pool.capacity(IndexedSeq.empty[ActorRef])
c1 must be(2)
val current = IndexedSeq(system.actorOf(Props[TestActor]), system.actorOf(Props[TestActor]))
val c2 = pool.capacity(current)
c2 must be(0)
}
"be possible to define programatically" in {
val latch = new TestLatch(3)
val pool = DefaultRouterPool(
lowerBound = 2,
upperBound = 3)
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(pool = Some(pool))))
router ! latch
router ! latch
router ! latch
Await.ready(latch, 5 seconds)
val current = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees]
current.routees.size must be(2)
}
"be possible to define in configuration" in {
val latch = new TestLatch(3)
val router = system.actorOf(Props[TestActor].withRouter(FromConfig()), "router1")
router ! latch
router ! latch
router ! latch
Await.ready(latch, 5 seconds)
val current = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees]
current.routees.size must be(2)
}
}
}

View file

@ -1,34 +0,0 @@
package akka.ticket
import akka.actor._
import akka.routing._
import akka.testkit.AkkaSpec
import akka.dispatch.Await
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket703Spec extends AkkaSpec {
"A ? call to an actor pool" should {
"reuse the proper timeout" in {
val actorPool = system.actorOf(
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def receive = _route
def pressureThreshold = 1
def instance(p: Props) = system.actorOf(p.withCreator(new Actor {
def receive = {
case req: String
Thread.sleep(6000L)
sender.tell("Response")
}
}))
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))
Await.result(actorPool.?("Ping", 10000), 10 seconds) must be === "Response"
}
}
}

View file

@ -98,16 +98,54 @@ akka {
paths = [] paths = []
} }
# FIXME document pool settings # Routers with dynamically resizable number of routees
pool { resizer {
# The fewest number of routees the router should ever have.
lower-bound = 1 lower-bound = 1
# The most number of routees the router should ever have.
# Must be greater than or equal to lower-bound.
upper-bound = 10 upper-bound = 10
pressure-threshold = 3
# Threshold to evaluate if routee is considered to be busy (under pressure).
# Implementation depends on this value (default is 1).
# 0: number of routees currently processing a message.
# 1: number of routees currently processing a message has
# some messages in mailbox.
# > 1: number of routees with at least the configured pressure-threshold
# messages in their mailbox. Note that estimating mailbox size of
# default UnboundedMailbox is O(N) operation.
pressure-threshold = 1
# Percentage to increase capacity whenever all routees are busy.
# For example, 0.2 would increase 20% (rounded up), i.e. if current
# capacity is 6 it will request an increase of 2 more routees.
rampup-rate = 0.2 rampup-rate = 0.2
backoff-threshold = 0.7
# Minimum fraction of busy routees before backing off.
# For example, if this is 0.3, then we'll remove some routees only when
# less than 30% of routees are busy, i.e. if current capacity is 10 and
# 3 are busy then the capacity is unchanged, but if 2 or less are busy
# the capacity is decreased.
# Use 0.0 or negative to avoid removal of routees.
backoff-threshold = 0.3
# Fraction of routees to be removed when the resizer reaches the
# backoffThreshold.
# For example, 0.1 would decrease 10% (rounded up), i.e. if current
# capacity is 9 it will request an decrease of 1 routee.
backoff-rate = 0.1 backoff-rate = 0.1
# When the pool shrink the abandoned actors are stopped with PoisonPill after this delay
stop-delay = 1 second # When the resizer reduce the capacity the abandoned routee actors are stopped
# with PoisonPill after this delay. The reason for the delay is to give concurrent
# messages a chance to be placed in mailbox before sending PoisonPill.
# Use 0s to skip delay.
stop-delay = 1s
# Number of messages between resize operation.
# Use 1 to resize before each message.
resize-on-nth-message = 10
} }
} }
} }

View file

@ -55,26 +55,27 @@ class Deployer(val settings: ActorSystem.Settings) {
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
val pool: Option[RouterPool] = if (config.hasPath("pool")) { val resizer: Option[Resizer] = if (config.hasPath("resizer")) {
val poolConfig = deployment.getConfig("pool") val resizerConfig = deployment.getConfig("resizer")
Some(DefaultRouterPool( Some(DefaultResizer(
lowerBound = poolConfig.getInt("lower-bound"), lowerBound = resizerConfig.getInt("lower-bound"),
upperBound = poolConfig.getInt("upper-bound"), upperBound = resizerConfig.getInt("upper-bound"),
pressureThreshold = poolConfig.getInt("pressure-threshold"), pressureThreshold = resizerConfig.getInt("pressure-threshold"),
rampupRate = poolConfig.getDouble("rampup-rate"), rampupRate = resizerConfig.getDouble("rampup-rate"),
backoffThreshold = poolConfig.getDouble("backoff-threshold"), backoffThreshold = resizerConfig.getDouble("backoff-threshold"),
backoffRate = poolConfig.getDouble("backoff-rate"), backoffRate = resizerConfig.getDouble("backoff-rate"),
stopDelay = Duration(poolConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS))) stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS),
resizeOnNthMessage = resizerConfig.getInt("resize-on-nth-message")))
} else { } else {
None None
} }
val router: RouterConfig = deployment.getString("router") match { val router: RouterConfig = deployment.getString("router") match {
case "from-code" NoRouter case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, pool) case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, pool) case "random" RandomRouter(nrOfInstances, routees, resizer)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, pool) case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "broadcast" BroadcastRouter(nrOfInstances, routees, pool) case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case x throw new ConfigurationException("unknown router type " + x + " for path " + key) case x throw new ConfigurationException("unknown router type " + x + " for path " + key)
} }

View file

@ -1,492 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.dispatch.{ Promise }
import akka.actor._
/**
* Actor pooling
*
* An actor pool is an message router for a set of delegate actors. The pool is an actor itself.
* There are a handful of basic concepts that need to be understood when working with and defining your pool.
*
* Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message.
* Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types.
* The first determines the size itself - either fixed or bounded.
* The second determines how to adjust of the pool according to some internal pressure characteristic.
* Filters - A filter can be used to refine the raw pressure value returned from a capacitor.
*
* It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say
* that one couldn't instance different classes within the pool, only that the pool, when selecting and routing,
* will not take any type information into consideration.
*
* @author Garrick Evans
*/
object ActorPool {
case object Stat
case class Stats(size: Int)
}
/**
* Defines the nature of an actor pool.
*/
trait ActorPool {
/**
* Adds a new actor to the pool. The DefaultActorPool implementation will start and link (supervise) this actor.
* This method is invoked whenever the pool determines it must boost capacity.
* @return A new actor for the pool
*/
def instance(defaults: Props): ActorRef
/**
* This method gets called when a delegate is to be evicted, by default it sends a PoisonPill to the delegate
*/
def evict(delegate: ActorRef): Unit = delegate ! PoisonPill
/**
* Returns the overall desired change in pool capacity. This method is used by non-static pools as the means
* for the capacity strategy to influence the pool.
* @param _delegates The current sequence of pooled actors
* @return the number of delegates by which the pool should be adjusted (positive, negative or zero)
*/
def capacity(delegates: Seq[ActorRef]): Int
/**
* Provides the results of the selector, one or more actors, to which an incoming message is forwarded.
* This method returns an iterator since a selector might return more than one actor to handle the message.
* You might want to do this to perform redundant processing of particularly error-prone messages.
* @param _delegates The current sequence of pooled actors
* @return a list of actors to which the message will be delivered
*/
def select(delegates: Seq[ActorRef]): Seq[ActorRef]
}
/**
* A default implementation of a pool that:
* First, invokes the pool's capacitor that tells it, based on the current delegate count
* and its own heuristic by how many delegates the pool should be resized. Resizing can
* can be incremental, decremental or flat. If there is a change to capacity, new delegates
* are added or existing ones are removed. Removed actors are sent the PoisonPill message.
* New actors are automatically started and linked. The pool supervises the actors and will
* use the fault handling strategy specified by the mixed-in ActorPoolSupervisionConfig.
* Pooled actors may be any lifecycle. If you're testing pool sizes during runtime, take a
* look at the unit tests... Any delegate with a <b>Permanent</b> lifecycle will be
* restarted and the pool size will be level with what it was prior to the fault. In just
* about every other case, e.g. the delegates are <b>Temporary</b> or the delegate cannot be
* restarted within the time interval specified in the fault handling strategy, the pool will
* be temporarily shy by that actor (it will have been removed by not back-filled). The
* back-fill if any is required, will occur on the next message [as usual].
*
* Second, invokes the pool's selector that returns a list of delegates that are to receive
* the incoming message. Selectors may return more than one actor. If <i>partialFill</i>
* is true then it might also the case that fewer than number of desired actors will be
* returned. If <i>partialFill</i> is false, the selector may return duplicate actors to
* reach the desired <i>selectionCount</i>.
*
* Lastly, routes by forwarding the incoming message to each delegate in the selected set.
*/
trait DefaultActorPool extends ActorPool { this: Actor
import ActorPool._
protected[akka] var _delegates = Vector[ActorRef]()
val defaultProps: Props = Props.default.withDispatcher(this.context.dispatcher.id)
override def preStart() {
resizeIfAppropriate()
}
override def postStop() {
_delegates foreach evict
_delegates = Vector.empty
}
protected def _route(): Actor.Receive = {
// for testing...
case Stat
sender ! Stats(_delegates length)
case Terminated(victim)
_delegates = _delegates filterNot { victim == }
case msg
resizeIfAppropriate()
select(_delegates) foreach { _ forward msg }
}
protected def resizeIfAppropriate() {
val requestedCapacity = capacity(_delegates)
val newDelegates = requestedCapacity match {
case qty if qty > 0
_delegates ++ Vector.fill(requestedCapacity)(context.watch(instance(defaultProps)))
case qty if qty < 0
_delegates.splitAt(_delegates.length + requestedCapacity) match {
case (keep, abandon)
abandon foreach evict
keep
}
case _ _delegates //No change
}
_delegates = newDelegates
}
}
/**
* Selectors
*
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool.
* Note that it's acceptable to return more than one actor to handle a given message.
*/
/**
* Returns the set of delegates with the least amount of message backlog.
*/
trait SmallestMailboxSelector {
/**
* @return the number of delegates that will receive each message
*/
def selectionCount: Int
/**
* If there aren't enough delegates to provide the selectionCount, either
* send the message to fewer, or send the message selectionCount times
* including more than once to some of the delegates. This setting does
* not matter if you configure selectionCount to always be less than or
* equal to the number of delegates in the pool.
* @return true to send to fewer delegates or false to send to duplicate delegates
*/
def partialFill: Boolean
def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
var set: Seq[ActorRef] = Nil
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
def mailboxSize(a: ActorRef): Int = a match {
case l: LocalActorRef l.underlying.mailbox.numberOfMessages
case _ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
}
while (take > 0) {
set = delegates.sortWith((a, b) mailboxSize(a) < mailboxSize(b)).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times?
take -= set.size
}
set
}
}
/**
* Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection
*/
trait RoundRobinSelector {
private var _last: Int = -1;
/**
* @return the number of delegates that will receive each message
*/
def selectionCount: Int
/**
* If there aren't enough delegates to provide the selectionCount, either
* send the message to fewer, or send the message selectionCount times
* including more than once to some of the delegates. This setting does
* not matter if you configure selectionCount to always be less than or
* equal to the number of delegates in the pool.
* @return true to send to fewer delegates or false to send to duplicate delegates
*/
def partialFill: Boolean
def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
val length = delegates.length
val take = if (partialFill) math.min(selectionCount, length)
else selectionCount
val set =
for (i 0 until take) yield {
_last = (_last + 1) % length
delegates(_last)
}
set
}
}
/**
* Capacitors
*
* These traits define how to alter the size of the pool according to some desired behavior.
* Capacitors are required (minimally) by the pool to establish bounds on the number of delegates
* that may exist in the pool.
*/
/**
* Ensures a fixed number of delegates in the pool
*/
trait FixedSizeCapacitor {
/**
* @return the fixed number of delegates the pool should have
*/
def limit: Int
def capacity(delegates: Seq[ActorRef]): Int = (limit - delegates.size) max 0
}
/**
* Constrains the number of delegates to a bounded range.
* You probably don't want to use this trait directly,
* instead look at [[akka.routing.CapacityStrategy]] and [[akka.routing.BoundedCapacityStrategy]].
* To use this trait you have to implement _eval() which is provided by
* [[akka.routing.BoundedCapacityStrategy]] in terms of pressure() and filter()
* methods.
*/
trait BoundedCapacitor {
/**
* @return the fewest delegates the pool should ever have
*/
def lowerBound: Int
/**
* @return the most delegates the pool should ever have
*/
def upperBound: Int
def capacity(delegates: Seq[ActorRef]): Int = {
val current = delegates length
val delta = _eval(delegates)
val proposed = current + delta
if (proposed < lowerBound) delta + (lowerBound - proposed)
else if (proposed > upperBound) delta - (proposed - upperBound)
else delta
}
/**
* This method is defined when you mix in [[akka.routing.CapacityStrategy]]; it
* returns the "raw" proposed delta which is then clamped by
* lowerBound and upperBound.
* @return proposed delta ignoring bounds
*/
protected def _eval(delegates: Seq[ActorRef]): Int
}
/**
* Implements pressure() to return the number of delegates with overly-full mailboxes,
* where the pressureThreshold method defines what counts as overly-full.
*/
trait MailboxPressureCapacitor {
/**
* The pressure will be the number of delegates with at least
* pressureThreshold messages in their mailbox.
* @return mailbox size that counts as pressure
*/
def pressureThreshold: Int
def pressure(delegates: Seq[ActorRef]): Int =
delegates count {
case a: LocalActorRef a.underlying.mailbox.numberOfMessages > pressureThreshold
case _ false
}
}
/**
* Implements pressure() to return the number of actors currently processing a
* message.
* In other words, this capacitor counts how many
* delegates are tied up actively processing a message
*/
trait ActiveActorsPressureCapacitor {
def pressure(delegates: Seq[ActorRef]): Int =
delegates count {
case a: LocalActorRef
val cell = a.underlying
cell.mailbox.isScheduled && cell.currentMessage != null
case _ false
}
}
/**
* A [[akka.routing.CapacityStrategy]] implements methods pressure() and filter(), where
* pressure() returns the number of "busy" delegates, and filter() computes
* a proposed delta (positive, negative, or zero) in the size of the delegate
* pool.
*/
trait CapacityStrategy {
import ActorPool._
/**
* This method returns the number of delegates considered busy, or 'pressure level',
* which will be fed into the capacitor and evaluated against the established threshhold.
* For instance, in general, if the current pressure level exceeds the capacity of the
* pool, new delegates will be added.
* @param delegates the current pool of delegates
* @return number of busy delegates, between 0 and delegates.length
*/
def pressure(delegates: Seq[ActorRef]): Int
/**
* This method can be used to smooth the response of the capacitor by considering
* the current pressure and current capacity.
*
* @param pressure current number of busy delegates
* @param capacity current number of delegates
* @return proposed change in the capacity
*/
def filter(pressure: Int, capacity: Int): Int
/**
* Overrides the _eval() method in [[akka.routing.BoundedCapacity]],
* using filter and pressure to compute a proposed delta.
* @param delegates current delegates
* @return proposed delta in capacity
*/
protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
}
/**
* Use this trait to setup a pool that uses a fixed delegate count.
*/
trait FixedCapacityStrategy extends FixedSizeCapacitor
/**
* Use this trait to setup a pool that may have a variable number of
* delegates but always within an established upper and lower limit.
*
* If mix this into your pool implementation, you must also provide a
* PressureCapacitor and a Filter.
*/
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
/**
* Filters
* These traits compute a proposed capacity delta from the pressure (pressure
* is the number of busy delegates) and the current capacity.
*/
/**
* The basic filter trait that composes ramp-up and and back-off subfiltering.
* filter() is defined to be the sum of rampup() and backoff().
*/
trait Filter {
/**
* Computes a proposed positive (or zero) capacity delta.
* @param pressure the current number of busy delegates
* @param capacity the current number of total delegates
* @return proposed increase in capacity
*/
def rampup(pressure: Int, capacity: Int): Int
/**
* Computes a proposed negative (or zero) capacity delta.
* @param pressure the current number of busy delegates
* @param capacity the current number of total delegates
* @return proposed decrease in capacity (as a negative number)
*/
def backoff(pressure: Int, capacity: Int): Int
// pass through both filters just to be sure any internal counters
// are updated consistently. ramping up is always + and backing off
// is always - and each should return 0 otherwise...
def filter(pressure: Int, capacity: Int): Int =
rampup(pressure, capacity) + backoff(pressure, capacity)
}
/**
* This trait is a convenient shorthand to use the [[akka.routing.BasicRampup]]
* and [[akka.routing.BasicBackoff]] subfilters together.
*/
trait BasicFilter extends Filter with BasicRampup with BasicBackoff
/**
* Filter performs steady incremental growth using only the basic ramp-up subfilter.
* The pool of delegates never gets smaller, only larger.
*/
trait BasicNoBackoffFilter extends BasicRampup {
def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity)
}
/**
* Basic incremental growth as a percentage of the current pool capacity.
* Whenever pressure reaches capacity (i.e. all delegates are busy),
* the capacity is increased by a percentage.
*/
trait BasicRampup {
/**
* Percentage to increase capacity whenever all delegates are busy.
* For example, 0.2 would increase 20%, etc.
* @return percentage increase in capacity when delegates are all busy.
*/
def rampupRate: Double
def rampup(pressure: Int, capacity: Int): Int =
if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt
}
/**
* Basic decrement as a percentage of the current pool capacity.
* Whenever pressure as a percentage of capacity falls below the
* backoffThreshold, capacity is reduced by the backoffRate.
*/
trait BasicBackoff {
/**
* Fraction of capacity the pool has to fall below before backing off.
* For example, if this is 0.7, then we'll remove some delegates when
* less than 70% of delegates are busy.
* @return fraction of busy delegates where we start to backoff
*/
def backoffThreshold: Double
/**
* Fraction of delegates to be removed when the pool reaches the
* backoffThreshold.
* @return percentage of delegates to remove
*/
def backoffRate: Double
def backoff(pressure: Int, capacity: Int): Int =
if (capacity > 0 && pressure / capacity < backoffThreshold) math.ceil(-1.0 * backoffRate * capacity) toInt else 0
}
/**
* This filter tracks the average pressure over the lifetime of the pool (or since last reset) and
* will begin to reduce capacity once this value drops below the provided threshold. The number of
* delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied
* by the difference in capacity and pressure.
*
* In essence, [[akka.routing.RunningMeanBackoff]] works the same way as [[akka.routing.BasicBackoff]]
* except that it uses
* a running mean pressure and capacity rather than the current pressure and capacity.
*/
trait RunningMeanBackoff {
/**
* Fraction of mean capacity the pool has to fall below before backing off.
* For example, if this is 0.7, then we'll remove some delegates when
* less than 70% of delegates are busy on average.
* @return fraction of busy delegates where we start to backoff
*/
def backoffThreshold: Double
/**
* The fraction of delegates to be removed when the running mean reaches the
* backoffThreshold.
* @return percentage reduction in capacity
*/
def backoffRate: Double
private var _pressure: Double = 0.0
private var _capacity: Double = 0.0
def backoff(pressure: Int, capacity: Int): Int = {
_pressure += pressure
_capacity += capacity
if (capacity > 0 && pressure / capacity < backoffThreshold
&& _capacity > 0 && _pressure / _capacity < backoffThreshold) //Why does the entire clause need to be true?
math.floor(-1.0 * backoffRate * (capacity - pressure)).toInt
else 0
}
/**
* Resets the running mean pressure and capacity.
* This is never invoked by the library, you have to do
* it by hand if there are points in time where it makes
* sense.
*/
def backoffReset {
_pressure = 0.0
_capacity = 0.0
}
}

View file

@ -36,7 +36,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
} }
def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) { def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) {
_routees = _routees filterNot (x abandonedRoutees.contains(x)) _routees = _routees diff abandonedRoutees
abandonedRoutees foreach underlying.unwatch abandonedRoutees foreach underlying.unwatch
} }
@ -53,7 +53,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
else Nil else Nil
} }
if (_props.routerConfig.pool.isEmpty && _routees.isEmpty) if (_props.routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!") throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!")
_routees match { _routees match {
@ -61,7 +61,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
} }
override def !(message: Any)(implicit sender: ActorRef = null): Unit = { override def !(message: Any)(implicit sender: ActorRef = null): Unit = {
_props.routerConfig.resizePool(routeeProps, actorContext, routees) _props.routerConfig.resize(routeeProps, actorContext, routees)
val s = if (sender eq null) underlying.system.deadLetters else sender val s = if (sender eq null) underlying.system.deadLetters else sender
@ -77,7 +77,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
} }
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
_props.routerConfig.resizePool(routeeProps, actorContext, routees) _props.routerConfig.resize(routeeProps, actorContext, routees)
super.?(message)(timeout) super.?(message)(timeout)
} }
} }
@ -123,9 +123,9 @@ trait RouterConfig {
} }
protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Unit = { protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Unit = {
pool match { resizer match {
case None registerRoutees(context, createRoutees(props, context, nrOfInstances, routees)) case None registerRoutees(context, createRoutees(props, context, nrOfInstances, routees))
case Some(p) resizePool(props, context, context.self.asInstanceOf[RoutedActorRef].routees) case Some(p) resize(props, context, context.self.asInstanceOf[RoutedActorRef].routees)
} }
} }
@ -143,17 +143,18 @@ trait RouterConfig {
context.self.asInstanceOf[RoutedActorRef].removeRoutees(routees) context.self.asInstanceOf[RoutedActorRef].removeRoutees(routees)
} }
def pool: Option[RouterPool] = None def resizer: Option[Resizer] = None
private val resizePoolInProgress = new AtomicBoolean private val resizeProgress = new AtomicBoolean
private val resizeCounter = new AtomicLong
def resizePool(props: Props, context: ActorContext, currentRoutees: IndexedSeq[ActorRef]) { def resize(props: Props, context: ActorContext, currentRoutees: IndexedSeq[ActorRef]) {
for (p pool) { for (r resizer) {
if (resizePoolInProgress.compareAndSet(false, true)) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) {
try { try {
p.resize(props, context, currentRoutees, this) r.resize(props, context, currentRoutees, this)
} finally { } finally {
resizePoolInProgress.set(false) resizeProgress.set(false)
} }
} }
} }
@ -287,7 +288,7 @@ object RoundRobinRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val pool: Option[RouterPool] = None) case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RoundRobinLike { extends RouterConfig with RoundRobinLike {
/** /**
@ -307,10 +308,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
} }
/** /**
* Constructor that sets the pool to be used. * Constructor that sets the resizer to be used.
* Java API * Java API
*/ */
def this(pool: RouterPool) = this(pool = Some(pool)) def this(resizer: Resizer) = this(resizer = Some(resizer))
} }
trait RoundRobinLike { this: RouterConfig trait RoundRobinLike { this: RouterConfig
@ -361,7 +362,7 @@ object RandomRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val pool: Option[RouterPool] = None) case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RandomLike { extends RouterConfig with RandomLike {
/** /**
@ -381,10 +382,10 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
} }
/** /**
* Constructor that sets the pool to be used. * Constructor that sets the resizer to be used.
* Java API * Java API
*/ */
def this(pool: RouterPool) = this(pool = Some(pool)) def this(resizer: Resizer) = this(resizer = Some(resizer))
} }
trait RandomLike { this: RouterConfig trait RandomLike { this: RouterConfig
@ -438,7 +439,7 @@ object BroadcastRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val pool: Option[RouterPool] = None) case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with BroadcastLike { extends RouterConfig with BroadcastLike {
/** /**
@ -458,10 +459,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
} }
/** /**
* Constructor that sets the pool to be used. * Constructor that sets the resizer to be used.
* Java API * Java API
*/ */
def this(pool: RouterPool) = this(pool = Some(pool)) def this(resizer: Resizer) = this(resizer = Some(resizer))
} }
@ -506,7 +507,7 @@ object ScatterGatherFirstCompletedRouter {
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val pool: Option[RouterPool] = None) override val resizer: Option[Resizer] = None)
extends RouterConfig with ScatterGatherFirstCompletedLike { extends RouterConfig with ScatterGatherFirstCompletedLike {
/** /**
@ -526,10 +527,10 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
} }
/** /**
* Constructor that sets the pool to be used. * Constructor that sets the resizer to be used.
* Java API * Java API
*/ */
def this(pool: RouterPool, w: Duration) = this(pool = Some(pool), within = w) def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w)
} }
trait ScatterGatherFirstCompletedLike { this: RouterConfig trait ScatterGatherFirstCompletedLike { this: RouterConfig
@ -555,55 +556,101 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
} }
/** /**
* Routers with dynamically resizable number of routees is implemented by providing a pool * Routers with dynamically resizable number of routees is implemented by providing a Resizer
* implementation in [[akka.routing.RouterConfig]]. When the resize method is invoked you can * implementation in [[akka.routing.RouterConfig]].
* create and register more routees with `routerConfig.registerRoutees(actorContext, newRoutees)
* or remove routees with `routerConfig.unregisterRoutees(actorContext, abandonedRoutees)` and
* sending [[akka.actor.PoisonPill]] to them.
*/ */
trait RouterPool { trait Resizer {
/**
* Is it time for resizing. Typically implemented with modulo of nth message, but
* could be based on elapsed time or something else. The messageCounter starts with 0
* for the initial resize and continues with 1 for the first message. Make sure to perform
* initial resize before first message (messageCounter == 0), because there is no guarantee
* that resize will be done when concurrent messages are in play.
*/
def isTimeForResize(messageCounter: Long): Boolean
/**
* Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize`
* returns true and no other resize is in progress.
* Create and register more routees with `routerConfig.registerRoutees(actorContext, newRoutees)
* or remove routees with `routerConfig.unregisterRoutees(actorContext, abandonedRoutees)` and
* sending [[akka.actor.PoisonPill]] to them.
*/
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig)
} }
case class DefaultRouterPool( case class DefaultResizer(
/** /**
* The fewest number of routees the pool should ever have * The fewest number of routees the router should ever have.
*/ */
lowerBound: Int = 1, lowerBound: Int = 1,
/** /**
* The most number of routees the pool should ever have * The most number of routees the router should ever have.
* Must be greater than or equal to `lowerBound`.
*/ */
upperBound: Int = 10, upperBound: Int = 10,
/** /**
* A routee is considered to be busy (under pressure) when * Threshold to evaluate if routee is considered to be busy (under pressure).
* it has at least this number of messages in its mailbox. * Implementation depends on this value (default is 1).
* When pressureThreshold is defined as 0 the routee * <ul>
* is considered busy when it is currently processing a * <li> 0: number of routees currently processing a message.</li>
* message. * <li> 1: number of routees currently processing a message has
* some messages in mailbox.</li>
* <li> > 1: number of routees with at least the configured `pressureThreshold`
* messages in their mailbox. Note that estimating mailbox size of
* default UnboundedMailbox is O(N) operation.</li>
* </ul>
*/ */
pressureThreshold: Int = 3, pressureThreshold: Int = 1,
/** /**
* Percentage to increase capacity whenever all routees are busy. * Percentage to increase capacity whenever all routees are busy.
* For example, 0.2 would increase 20%, etc. * For example, 0.2 would increase 20% (rounded up), i.e. if current
* capacity is 6 it will request an increase of 2 more routees.
*/ */
rampupRate: Double = 0.2, rampupRate: Double = 0.2,
/** /**
* Fraction of capacity the pool has to fall below before backing off. * Minimum fraction of busy routees before backing off.
* For example, if this is 0.7, then we'll remove some routees when * For example, if this is 0.3, then we'll remove some routees only when
* less than 70% of routees are busy. * less than 30% of routees are busy, i.e. if current capacity is 10 and
* Use 0.0 to avoid removal of routees. * 3 are busy then the capacity is unchanged, but if 2 or less are busy
* the capacity is decreased.
*
* Use 0.0 or negative to avoid removal of routees.
*/ */
backoffThreshold: Double = 0.7, backoffThreshold: Double = 0.3,
/** /**
* Fraction of routees to be removed when the pool reaches the * Fraction of routees to be removed when the resizer reaches the
* backoffThreshold. * backoffThreshold.
* Use 0.0 to avoid removal of routees. * For example, 0.1 would decrease 10% (rounded up), i.e. if current
* capacity is 9 it will request an decrease of 1 routee.
*/ */
backoffRate: Double = 0.1, backoffRate: Double = 0.1,
/** /**
* When the pool shrink the abandoned actors are stopped with PoisonPill after this delay * When the resizer reduce the capacity the abandoned routee actors are stopped
* with PoisonPill after this delay. The reason for the delay is to give concurrent
* messages a chance to be placed in mailbox before sending PoisonPill.
* Use 0 seconds to skip delay.
*/ */
stopDelay: Duration = 1.second) extends RouterPool { stopDelay: Duration = 1.second,
/**
* Number of messages between resize operation.
* Use 1 to resize before each message.
*/
resizeOnNthMessage: Int = 10) extends Resizer {
/**
* Java API constructor for default values except bounds.
*/
def this(lower: Int, upper: Int) = this(lowerBound = lower, upperBound = upper)
if (lowerBound < 0) throw new IllegalArgumentException("lowerBound must be >= 0, was: [%s]".format(lowerBound))
if (upperBound < 0) throw new IllegalArgumentException("upperBound must be >= 0, was: [%s]".format(upperBound))
if (upperBound < lowerBound) throw new IllegalArgumentException("upperBound must be >= lowerBound, was: [%s] < [%s]".format(upperBound, lowerBound))
if (rampupRate < 0.0) throw new IllegalArgumentException("rampupRate must be >= 0.0, was [%s]".format(rampupRate))
if (backoffThreshold > 1.0) throw new IllegalArgumentException("backoffThreshold must be <= 1.0, was [%s]".format(backoffThreshold))
if (backoffRate < 0.0) throw new IllegalArgumentException("backoffRate must be >= 0.0, was [%s]".format(backoffRate))
if (resizeOnNthMessage <= 0) throw new IllegalArgumentException("resizeOnNthMessage must be > 0, was [%s]".format(resizeOnNthMessage))
def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % resizeOnNthMessage == 0)
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) { def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) {
val requestedCapacity = capacity(currentRoutees) val requestedCapacity = capacity(currentRoutees)
@ -623,17 +670,23 @@ case class DefaultRouterPool(
* sending PoisonPill. * sending PoisonPill.
*/ */
protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]) { protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]) {
scheduler.scheduleOnce(stopDelay) { if (abandon.nonEmpty) {
abandon foreach (_ ! PoisonPill) if (stopDelay <= Duration.Zero) {
abandon foreach (_ ! PoisonPill)
} else {
scheduler.scheduleOnce(stopDelay) {
abandon foreach (_ ! PoisonPill)
}
}
} }
} }
/** /**
* Returns the overall desired change in pool capacity. Positive value will * Returns the overall desired change in resizer capacity. Positive value will
* add routees to the pool. Negative value will remove routees from the * add routees to the resizer. Negative value will remove routees from the
* pool. * resizer.
* @param routees The current actor in the pool * @param routees The current actor in the resizer
* @return the number of routees by which the pool should be adjusted (positive, negative or zero) * @return the number of routees by which the resizer should be adjusted (positive, negative or zero)
*/ */
def capacity(routees: IndexedSeq[ActorRef]): Int = { def capacity(routees: IndexedSeq[ActorRef]): Int = {
val currentSize = routees.size val currentSize = routees.size
@ -648,27 +701,41 @@ case class DefaultRouterPool(
/** /**
* Number of routees considered busy, or above 'pressure level'. * Number of routees considered busy, or above 'pressure level'.
* *
* Default implementation: * Implementation depends on the value of `pressureThreshold`
* When `pressureThreshold` > 0 the number of routees with at least * (default is 1).
* the configured `pressureThreshold` messages in their mailbox, * <ul>
* otherwise number of routees currently processing a * <li> 0: number of routees currently processing a message.</li>
* message. * <li> 1: number of routees currently processing a message has
* some messages in mailbox.</li>
* <li> > 1: number of routees with at least the configured `pressureThreshold`
* messages in their mailbox. Note that estimating mailbox size of
* default UnboundedMailbox is O(N) operation.</li>
* </ul>
* *
* @param routees the current pool of routees * @param routees the current resizer of routees
* @return number of busy routees, between 0 and routees.size * @return number of busy routees, between 0 and routees.size
*/ */
def pressure(routees: Seq[ActorRef]): Int = { def pressure(routees: IndexedSeq[ActorRef]): Int = {
if (pressureThreshold > 0) { if (pressureThreshold == 1) {
routees count {
case a: LocalActorRef
val cell = a.underlying
a.underlying.mailbox.isScheduled && cell.currentMessage != null && a.underlying.mailbox.hasMessages
case x
false
}
} else if (pressureThreshold > 1) {
routees count { routees count {
case a: LocalActorRef a.underlying.mailbox.numberOfMessages >= pressureThreshold case a: LocalActorRef a.underlying.mailbox.numberOfMessages >= pressureThreshold
case _ false case x false
} }
} else { } else {
routees count { routees count {
case a: LocalActorRef case a: LocalActorRef
val cell = a.underlying val cell = a.underlying
cell.mailbox.isScheduled && cell.currentMessage != null a.underlying.mailbox.isScheduled && cell.currentMessage != null
case _ false case x
false
} }
} }
} }
@ -704,7 +771,7 @@ case class DefaultRouterPool(
*/ */
def backoff(pressure: Int, capacity: Int): Int = def backoff(pressure: Int, capacity: Int): Int =
if (backoffThreshold > 0.0 && backoffRate > 0.0 && capacity > 0 && pressure.toDouble / capacity < backoffThreshold) if (backoffThreshold > 0.0 && backoffRate > 0.0 && capacity > 0 && pressure.toDouble / capacity < backoffThreshold)
math.ceil(-1.0 * backoffRate * capacity) toInt math.floor(-1.0 * backoffRate * capacity) toInt
else 0 else 0
} }

View file

@ -47,5 +47,12 @@ public class RouterViaConfigExample {
for (int i = 1; i <= 10; i++) { for (int i = 1; i <= 10; i++) {
router.tell(new ExampleActor.Message(i)); router.tell(new ExampleActor.Message(i));
} }
//#configurableRoutingWithResizer
ActorRef router2 = system.actorOf(new Props(ExampleActor.class).withRouter(new FromConfig()), "router2");
//#configurableRoutingWithResizer
for (int i = 1; i <= 10; i++) {
router2.tell(new ExampleActor.Message(i));
}
} }
} }

View file

@ -4,6 +4,7 @@
package akka.docs.jrouting; package akka.docs.jrouting;
import akka.routing.RoundRobinRouter; import akka.routing.RoundRobinRouter;
import akka.routing.DefaultResizer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
@ -56,5 +57,15 @@ public class RouterViaProgramExample {
for (int i = 1; i <= 6; i++) { for (int i = 1; i <= 6; i++) {
router2.tell(new ExampleActor.Message(i)); router2.tell(new ExampleActor.Message(i));
} }
//#programmaticRoutingWithResizer
int lowerBound = 2;
int upperBound = 15;
DefaultResizer resizer = new DefaultResizer(lowerBound, upperBound);
ActorRef router3 = system.actorOf(new Props(ExampleActor.class).withRouter(new RoundRobinRouter(nrOfInstances)));
//#programmaticRoutingWithResizer
for (int i = 1; i <= 6; i++) {
router3.tell(new ExampleActor.Message(i));
}
} }
} }

View file

@ -170,6 +170,25 @@ This message is called ``Broadcast`` and is used in the following manner:
Only the actual message is forwarded to the routees, i.e. "Watch out for Davy Jones' locker" in the example above. Only the actual message is forwarded to the routees, i.e. "Watch out for Davy Jones' locker" in the example above.
It is up to the routee implementation whether to handle the broadcast message or not. It is up to the routee implementation whether to handle the broadcast message or not.
Dynamically Resizable Routers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
All routers can be used with a fixed number of routees or with a resize strategy to adjust the number
of routees dynamically.
This is an example of how to create a resizable router that is defined in configuration:
.. includecode:: ../scala/code/akka/docs/routing/RouterViaConfigExample.scala#config-resize
.. includecode:: code/akka/docs/jrouting/RouterViaConfigExample.java#configurableRoutingWithResizer
Several more configuration options are availble and described in ``akka.actor.deployment.default.resizer``
section of the reference :ref:`configuration`.
This is an example of how to programatically create a resizable router:
.. includecode:: code/akka/docs/jrouting/RouterViaProgramExample.java#programmaticRoutingWithResizer
Custom Router Custom Router
^^^^^^^^^^^^^ ^^^^^^^^^^^^^
@ -218,4 +237,10 @@ If you are interested in how to use the VoteCountRouter it looks like this:
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest .. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest
Custom Resizer
**************
A router with dynamically resizable number of routees is implemented by providing a ``akka.routing.Resizer``
in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration
of how to write your own resize strategy.

View file

@ -27,3 +27,8 @@ determines when the actor will stop itself and hence closes the window for a
reply to be received; it is independent of the timeout applied when awaiting reply to be received; it is independent of the timeout applied when awaiting
completion of the :class:`Future`, however, the actor will complete the completion of the :class:`Future`, however, the actor will complete the
:class:`Future` with an :class:`AskTimeoutException` when it stops itself. :class:`Future` with an :class:`AskTimeoutException` when it stops itself.
ActorPool
---------
The ActorPool has been replaced by dynamically resizable routers.

View file

@ -1,24 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.routing
import akka.routing.{ BasicNoBackoffFilter, SmallestMailboxSelector, DefaultActorPool }
import akka.actor.{ ActorRef, Props, Actor }
//#testPool
class TestPool extends Actor with DefaultActorPool with SmallestMailboxSelector with BasicNoBackoffFilter {
def capacity(delegates: Seq[ActorRef]) = 5
protected def receive = _route
def rampupRate = 0.1
def selectionCount = 1
def partialFill = true
def instance(defaults: Props) = context.actorOf(defaults.withCreator(new Actor {
def receive = {
case _ // do something
}
}))
}
//#testPool

View file

@ -1,26 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.routing
import akka.actor.ActorRef
//#boundedCapacitor
trait BoundedCapacitor {
def lowerBound: Int
def upperBound: Int
def capacity(delegates: Seq[ActorRef]): Int = {
val current = delegates length
var delta = _eval(delegates)
val proposed = current + delta
if (proposed < lowerBound) delta += (lowerBound - proposed)
else if (proposed > upperBound) delta -= (proposed - upperBound)
delta
}
protected def _eval(delegates: Seq[ActorRef]): Int
}
//#boundedCapacitor

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.routing
import akka.routing.ActorPool
import akka.actor.ActorRef
//#capacityStrategy
trait CapacityStrategy {
import ActorPool._
def pressure(delegates: Seq[ActorRef]): Int
def filter(pressure: Int, capacity: Int): Int
protected def _eval(delegates: Seq[ActorRef]): Int =
filter(pressure(delegates), delegates.size)
}
//#capacityStrategy

View file

@ -25,6 +25,17 @@ object RouterWithConfigExample extends App {
} }
} }
//#config //#config
//#config-resize
akka.actor.deployment {
/router2 {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 15
}
}
}
//#config-resize
""") """)
val system = ActorSystem("Example", config) val system = ActorSystem("Example", config)
//#configurableRouting //#configurableRouting
@ -32,4 +43,10 @@ object RouterWithConfigExample extends App {
"router") "router")
//#configurableRouting //#configurableRouting
1 to 10 foreach { i router ! Message(i) } 1 to 10 foreach { i router ! Message(i) }
//#configurableRoutingWithResizer
val router2 = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"router2")
//#configurableRoutingWithResizer
1 to 10 foreach { i router2 ! Message(i) }
} }

View file

@ -5,6 +5,7 @@ package akka.docs.routing
import akka.routing.RoundRobinRouter import akka.routing.RoundRobinRouter
import akka.actor.{ ActorRef, Props, Actor, ActorSystem } import akka.actor.{ ActorRef, Props, Actor, ActorSystem }
import akka.routing.DefaultResizer
case class Message1(nbr: Int) case class Message1(nbr: Int)
@ -31,4 +32,12 @@ object RoutingProgrammaticallyExample extends App {
RoundRobinRouter(routees = routees))) RoundRobinRouter(routees = routees)))
//#programmaticRoutingRoutees //#programmaticRoutingRoutees
1 to 6 foreach { i router2 ! Message1(i) } 1 to 6 foreach { i router2 ! Message1(i) }
//#programmaticRoutingWithResizer
val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
val router3 = system.actorOf(Props[ExampleActor1].withRouter(
RoundRobinRouter(resizer = Some(resizer))))
//#programmaticRoutingWithResizer
1 to 6 foreach { i router3 ! Message1(i) }
} }

View file

@ -171,6 +171,26 @@ This message is called ``Broadcast`` and is used in the following manner:
Only the actual message is forwarded to the routees, i.e. "Watch out for Davy Jones' locker" in the example above. Only the actual message is forwarded to the routees, i.e. "Watch out for Davy Jones' locker" in the example above.
It is up to the routee implementation whether to handle the broadcast message or not. It is up to the routee implementation whether to handle the broadcast message or not.
Dynamically Resizable Routers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
All routers can be used with a fixed number of routees or with a resize strategy to adjust the number
of routees dynamically.
This is an example of how to create a resizable router that is defined in configuration:
.. includecode:: code/akka/docs/routing/RouterViaConfigExample.scala#config-resize
.. includecode:: code/akka/docs/routing/RouterViaConfigExample.scala#configurableRoutingWithResizer
Several more configuration options are availble and described in ``akka.actor.deployment.default.resizer``
section of the reference :ref:`configuration`.
This is an example of how to programatically create a resizable router:
.. includecode:: code/akka/docs/routing/RouterViaProgramExample.scala#programmaticRoutingWithResizer
Custom Router Custom Router
^^^^^^^^^^^^^ ^^^^^^^^^^^^^
@ -217,73 +237,10 @@ All in all the custom router looks like this:
If you are interested in how to use the VoteCountRouter you can have a look at the test class If you are interested in how to use the VoteCountRouter you can have a look at the test class
`RoutingSpec <https://github.com/jboner/akka/blob/master/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala>`_ `RoutingSpec <https://github.com/jboner/akka/blob/master/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala>`_
Actor Pool Custom Resizer
---------- **************
An actor pool routes incoming messages to other actors. It has different semantics however when it comes to how those A router with dynamically resizable number of routees is implemented by providing a ``akka.routing.Resizer``
actors are managed and selected for dispatch. Therein lies the difference. The pool manages, from start to shutdown, in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration
the lifecycle of all delegated actors. The number of actors in a pool can be fixed or grow and shrink over time. of how to write your own resize strategy.
Also, messages can be routed to more than one actor in the pool if so desired. This is a useful little feature for
accounting for expected failure - especially with remoting - where you can invoke the same request of multiple
actors and just take the first, best response.
The actor pool is built around three concepts: capacity, filtering and selection.
Selection
^^^^^^^^^
All pools require a ``Selector`` to be mixed-in. This trait controls how and how many actors in the pool will
receive the incoming message. Define *selectionCount* to some positive number greater than one to route to
multiple actors. Currently two are provided:
* `SmallestMailboxSelector <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L148>`_ - Using the exact same logic as the iterator of the same name, the pooled actor with the fewest number of pending messages will be chosen.
* `RoundRobinSelector <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L184>`_ - Performs a very simple index-based selection, wrapping around the end of the list, very much like the CyclicIterator does.
Partial Fills
*************
When selecting more than one pooled actor, its possible that in order to fulfill the requested amount,
the selection set must contain duplicates. By setting ``partialFill`` to ``true``, you instruct the selector to
return only unique actors from the pool.
Capacity
^^^^^^^^
As you'd expect, capacity traits determine how the pool is funded with actors. There are two types of strategies that can be employed:
* `FixedCapacityStrategy <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L346>`_ - When you mix this into your actor pool, you define a pool size and when the pool is started, it will have that number of actors within to which messages will be delegated.
* `BoundedCapacityStrategy <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L355>`_ - When you mix this into your actor pool, you define upper and lower bounds, and when the pool is started, it will have the minimum number of actors in place to handle messages. You must also mix-in a Capacitor and a Filter when using this strategy (see below).
The *BoundedCapacityStrategy* requires additional logic to function. Specifically it requires a *Capacitor* and a *Filter*.
Capacitors are used to determine the pressure that the pool is under and provide a (usually) raw reading of this information.
Currently we provide for the use of either mailbox backlog or active futures count as a means of evaluating pool pressure.
Each expresses itself as a simple number - a reading of the number of actors either with mailbox sizes over a certain threshold
or blocking a thread waiting on a future to complete or expire.
Filtering
^^^^^^^^^
A *Filter* is a trait that modifies the raw pressure reading returned from a Capacitor such that it drives the
adjustment of the pool capacity to a desired end. More simply, if we just used the pressure reading alone,
we might only ever increase the size of the pool (to respond to overload) or we might only have a single
mechanism for reducing the pool size when/if it became necessary. This behavior is fully under your control
through the use of *Filters*. Let's take a look at some code to see how this works:
.. includecode:: code/akka/docs/routing/BoundedCapacitorExample.scala#boundedCapacitor
.. includecode:: code/akka/docs/routing/CapacityStrategyExample.scala#capacityStrategy
Here we see how the filter function will have the chance to modify the pressure reading to influence the capacity change.
You are free to implement filter() however you like. We provide a
`Filter <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L279>`_ trait that
evaluates both a rampup and a backoff subfilter to determine how to use the pressure reading to alter the pool capacity.
There are several sub filters available to use, though again you may create whatever makes the most sense for you pool:
* `BasicRampup <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L409>`_ - When pressure exceeds current capacity, increase the number of actors in the pool by some factor (*rampupRate*) of the current pool size.
* `BasicBackoff <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L426>`_ - When the pressure ratio falls under some predefined amount (*backoffThreshold*), decrease the number of actors in the pool by some factor of the current pool size.
* `RunningMeanBackoff <https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/routing/Pool.scala#L454>`_ - This filter tracks the average pressure-to-capacity over the lifetime of the pool (or since the last time the filter was reset) and will begin to reduce capacity once this mean falls below some predefined amount. The number of actors that will be stopped is determined by some factor of the difference between the current capacity and pressure. The idea behind this filter is to reduce the likelihood of "thrashing" (removing then immediately creating...) pool actors by delaying the backoff until some quiescent stage of the pool. Put another way, use this subfilter to allow quick rampup to handle load and more subtle backoff as that decreases over time.
Example Usage
^^^^^^^^^^^^^
.. includecode:: code/akka/docs/routing/ActorPoolExample.scala#testPool

View file

@ -26,10 +26,10 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings
if (nodes.isEmpty || deploy.routing == NoRouter) d if (nodes.isEmpty || deploy.routing == NoRouter) d
else { else {
val r = deploy.routing match { val r = deploy.routing match {
case RoundRobinRouter(x, _) RemoteRoundRobinRouter(x, nodes) case RoundRobinRouter(x, _, resizer) RemoteRoundRobinRouter(x, nodes, resizer)
case RandomRouter(x, _) RemoteRandomRouter(x, nodes) case RandomRouter(x, _, resizer) RemoteRandomRouter(x, nodes, resizer)
case BroadcastRouter(x, _) RemoteBroadcastRouter(x, nodes) case BroadcastRouter(x, _, resizer) RemoteBroadcastRouter(x, nodes, resizer)
case ScatterGatherFirstCompletedRouter(x, _, w) RemoteScatterGatherFirstCompletedRouter(x, nodes, w) case ScatterGatherFirstCompletedRouter(x, _, w, resizer) RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer)
} }
Some(deploy.copy(routing = r)) Some(deploy.copy(routing = r))
} }

View file

@ -11,7 +11,7 @@ import akka.config.ConfigurationException
import akka.util.Duration import akka.util.Duration
trait RemoteRouterConfig extends RouterConfig { trait RemoteRouterConfig extends RouterConfig {
override protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Vector[ActorRef] = (nrOfInstances, routees) match { override def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match {
case (_, Nil) throw new ConfigurationException("must specify list of remote nodes") case (_, Nil) throw new ConfigurationException("must specify list of remote nodes")
case (n, xs) case (n, xs)
val nodes = routees map { val nodes = routees map {
@ -20,7 +20,7 @@ trait RemoteRouterConfig extends RouterConfig {
} }
val node = Stream.continually(nodes).flatten.iterator val node = Stream.continually(nodes).flatten.iterator
val impl = context.system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here? val impl = context.system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
Vector.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield { IndexedSeq.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i val name = "c" + i
val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next)) val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next))
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy)) impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy))
@ -39,13 +39,20 @@ trait RemoteRouterConfig extends RouterConfig {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with RoundRobinLike { case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with RoundRobinLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
} }
/** /**
@ -59,13 +66,20 @@ case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String])
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with RandomLike { case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with RandomLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
} }
/** /**
@ -79,13 +93,20 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String]) ext
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with BroadcastLike { case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with BroadcastLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
} }
/** /**
@ -99,12 +120,19 @@ case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String])
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration,
override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with ScatterGatherFirstCompletedLike { extends RemoteRouterConfig with ScatterGatherFirstCompletedLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w) def this(n: Int, t: java.lang.Iterable[String], w: Duration) = this(n, t.asScala, w)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer, w: Duration) = this(0, Nil, w, Some(resizer))
} }