diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 92e1f4f3ab..faae7350b6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -38,6 +38,13 @@ object DeployerSpec { router = scatter-gather within = 2 seconds } + /service-pool { + router = round-robin + pool { + lower-bound = 1 + upper-bound = 10 + } + } } """, ConfigParseOptions.defaults) @@ -121,18 +128,19 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { assertRouting(ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather") } + "be able to parse 'akka.actor.deployment._' with router pool" in { + val pool = DefaultRouterPool() + assertRouting(RoundRobinRouter(pool = Some(pool)), "/service-pool") + } + def assertRouting(expected: RouterConfig, service: String) { val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) - - deployment must be(Some( - Deploy( - service, - deployment.get.config, - None, - expected, - LocalScope))) - + deployment.get.path must be(service) + deployment.get.recipe must be(None) + deployment.get.routing.getClass must be(expected.getClass) + deployment.get.routing.pool must be(expected.pool) + deployment.get.scope must be(LocalScope) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RouterPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RouterPoolSpec.scala new file mode 100644 index 0000000000..3ca71e3180 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/RouterPoolSpec.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.routing + +import akka.actor.Actor +import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout +import akka.testkit.ImplicitSender +import akka.testkit.TestLatch +import akka.actor.Props +import akka.dispatch.Await +import akka.util.duration._ +import akka.actor.ActorRef + +object RouterPoolSpec { + + val config = """ + akka.actor.deployment { + /router1 { + router = round-robin + pool { + lower-bound = 2 + upper-bound = 3 + } + } + } + """ + + class TestActor extends Actor { + def receive = { + case latch: TestLatch ⇒ latch.countDown() + } + } + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class RouterPoolSpec extends AkkaSpec(RouterPoolSpec.config) with DefaultTimeout with ImplicitSender { + + import akka.routing.RouterPoolSpec._ + + "DefaultRouterPool" must { + + "use settings to evaluate capacity" in { + val pool = DefaultRouterPool( + lowerBound = 2, + upperBound = 3) + + val c1 = pool.capacity(IndexedSeq.empty[ActorRef]) + c1 must be(2) + + val current = IndexedSeq(system.actorOf(Props[TestActor]), system.actorOf(Props[TestActor])) + val c2 = pool.capacity(current) + c2 must be(0) + } + + "be possible to define programatically" in { + val latch = new TestLatch(3) + + val pool = DefaultRouterPool( + lowerBound = 2, + upperBound = 3) + val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(pool = Some(pool)))) + + router ! latch + router ! latch + router ! latch + + Await.ready(latch, 5 seconds) + + val current = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees] + current.routees.size must be(2) + } + + "be possible to define in configuration" in { + val latch = new TestLatch(3) + + val router = system.actorOf(Props[TestActor].withRouter(FromConfig()), "router1") + + router ! latch + router ! latch + router ! latch + + Await.ready(latch, 5 seconds) + + val current = Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees] + current.routees.size must be(2) + } + + } + +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 236eadc579..7d4800e807 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -97,6 +97,18 @@ akka { # precedence over nr-of-instances paths = [] } + + # FIXME document pool settings + pool { + lower-bound = 1 + upper-bound = 10 + pressure-threshold = 3 + rampup-rate = 0.2 + backoff-threshold = 0.7 + backoff-rate = 0.1 + # When the pool shrink the abandoned actors are stopped with PoisonPill after this delay + stop-delay = 1 second + } } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 988d6bf126..48475169ee 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -55,12 +55,26 @@ class Deployer(val settings: ActorSystem.Settings) { val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) + val pool: Option[RouterPool] = if (config.hasPath("pool")) { + val poolConfig = deployment.getConfig("pool") + Some(DefaultRouterPool( + lowerBound = poolConfig.getInt("lower-bound"), + upperBound = poolConfig.getInt("upper-bound"), + pressureThreshold = poolConfig.getInt("pressure-threshold"), + rampupRate = poolConfig.getDouble("rampup-rate"), + backoffThreshold = poolConfig.getDouble("backoff-threshold"), + backoffRate = poolConfig.getDouble("backoff-rate"), + stopDelay = Duration(poolConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS))) + } else { + None + } + val router: RouterConfig = deployment.getString("router") match { case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees) - case "random" ⇒ RandomRouter(nrOfInstances, routees) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees) + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, pool) + case "random" ⇒ RandomRouter(nrOfInstances, routees, pool) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, pool) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, pool) case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 5ee7ca76d1..2e4e01992e 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,8 +4,11 @@ package akka.routing import akka.actor._ +import akka.dispatch.Future import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Timeout } +import akka.util.duration._ import akka.config.ConfigurationException import scala.collection.JavaConversions.iterableAsScalaIterable @@ -20,11 +23,24 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _supervisor, _path) { + private val routeeProps = _props.copy(routerConfig = NoRouter) + @volatile - private[akka] var _routees: IndexedSeq[ActorRef] = _ // this MUST be initialized during createRoute + private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute def routees = _routees - val route = _props.routerConfig.createRoute(_props.copy(routerConfig = NoRouter), actorContext, this) + def addRoutees(newRoutees: IndexedSeq[ActorRef]) { + _routees = _routees ++ newRoutees + // subscribe to Terminated messages for all route destinations, to be handled by Router actor + newRoutees foreach underlying.watch + } + + def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) { + _routees = _routees filterNot (x ⇒ abandonedRoutees.contains(x)) + abandonedRoutees foreach underlying.unwatch + } + + val route = _props.routerConfig.createRoute(routeeProps, actorContext, this) def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { case _: AutoReceivedMessage ⇒ Nil @@ -37,15 +53,16 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup else Nil } + if (_props.routerConfig.pool.isEmpty && _routees.isEmpty) + throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!") + _routees match { - case null ⇒ throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!") - case x ⇒ - _routees = x // volatile write to publish the route before sending messages - // subscribe to Terminated messages for all route destinations, to be handled by Router actor - _routees foreach underlying.watch + case x ⇒ _routees = x // volatile write to publish the route before sending messages } override def !(message: Any)(implicit sender: ActorRef = null): Unit = { + _props.routerConfig.resizePool(routeeProps, actorContext, routees) + val s = if (sender eq null) underlying.system.deadLetters else sender val msg = message match { @@ -58,6 +75,11 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) } } + + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + _props.routerConfig.resizePool(routeeProps, actorContext, routees) + super.?(message)(timeout) + } } /** @@ -94,18 +116,47 @@ trait RouterConfig { protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _)) - protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { + def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): IndexedSeq[ActorRef] = (nrOfInstances, routees) match { case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) case (_, xs) ⇒ xs.map(context.actorFor(_))(scala.collection.breakOut) } protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Unit = { - registerRoutees(context, createRoutees(props, context, nrOfInstances, routees)) + pool match { + case None ⇒ registerRoutees(context, createRoutees(props, context, nrOfInstances, routees)) + case Some(p) ⇒ resizePool(props, context, context.self.asInstanceOf[RoutedActorRef].routees) + } } - protected def registerRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = { - context.self.asInstanceOf[RoutedActorRef]._routees = routees + /** + * Adds new routees to the router. + */ + def registerRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = { + context.self.asInstanceOf[RoutedActorRef].addRoutees(routees) + } + + /** + * Removes routees from the router. This method doesn't stop the routees. + */ + def unregisterRoutees(context: ActorContext, routees: IndexedSeq[ActorRef]): Unit = { + context.self.asInstanceOf[RoutedActorRef].removeRoutees(routees) + } + + def pool: Option[RouterPool] = None + + private val resizePoolInProgress = new AtomicBoolean + + def resizePool(props: Props, context: ActorContext, currentRoutees: IndexedSeq[ActorRef]) { + for (p ← pool) { + if (resizePoolInProgress.compareAndSet(false, true)) { + try { + p.resize(props, context, currentRoutees, this) + } finally { + resizePoolInProgress.set(false) + } + } + } } } @@ -151,7 +202,7 @@ trait Router extends Actor { final def receive = ({ case Terminated(child) ⇒ - ref._routees = ref._routees filterNot (_ == child) + ref.removeRoutees(IndexedSeq(child)) if (ref.routees.isEmpty) context.stop(self) }: Receive) orElse routerReceive @@ -236,7 +287,8 @@ object RoundRobinRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with RoundRobinLike { +case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val pool: Option[RouterPool] = None) + extends RouterConfig with RoundRobinLike { /** * Constructor that sets nrOfInstances to be created. @@ -253,6 +305,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = def this(t: java.lang.Iterable[String]) = { this(routees = iterableAsScalaIterable(t)) } + + /** + * Constructor that sets the pool to be used. + * Java API + */ + def this(pool: RouterPool) = this(pool = Some(pool)) } trait RoundRobinLike { this: RouterConfig ⇒ @@ -303,7 +361,8 @@ object RandomRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with RandomLike { +case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val pool: Option[RouterPool] = None) + extends RouterConfig with RandomLike { /** * Constructor that sets nrOfInstances to be created. @@ -320,6 +379,12 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) def this(t: java.lang.Iterable[String]) = { this(routees = iterableAsScalaIterable(t)) } + + /** + * Constructor that sets the pool to be used. + * Java API + */ + def this(pool: RouterPool) = this(pool = Some(pool)) } trait RandomLike { this: RouterConfig ⇒ @@ -373,7 +438,8 @@ object BroadcastRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with BroadcastLike { +case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val pool: Option[RouterPool] = None) + extends RouterConfig with BroadcastLike { /** * Constructor that sets nrOfInstances to be created. @@ -390,6 +456,13 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N def this(t: java.lang.Iterable[String]) = { this(routees = iterableAsScalaIterable(t)) } + + /** + * Constructor that sets the pool to be used. + * Java API + */ + def this(pool: RouterPool) = this(pool = Some(pool)) + } trait BroadcastLike { this: RouterConfig ⇒ @@ -432,7 +505,8 @@ object ScatterGatherFirstCompletedRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration) +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, + override val pool: Option[RouterPool] = None) extends RouterConfig with ScatterGatherFirstCompletedLike { /** @@ -450,6 +524,12 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It def this(t: java.lang.Iterable[String], w: Duration) = { this(routees = iterableAsScalaIterable(t), within = w) } + + /** + * Constructor that sets the pool to be used. + * Java API + */ + def this(pool: RouterPool, w: Duration) = this(pool = Some(pool), within = w) } trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ @@ -473,3 +553,159 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ } } } + +/** + * Routers with dynamically resizable number of routees is implemented by providing a pool + * implementation in [[akka.routing.RouterConfig]]. When the resize method is invoked you can + * create and register more routees with `routerConfig.registerRoutees(actorContext, newRoutees) + * or remove routees with `routerConfig.unregisterRoutees(actorContext, abandonedRoutees)` and + * sending [[akka.actor.PoisonPill]] to them. + */ +trait RouterPool { + def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) +} + +case class DefaultRouterPool( + /** + * The fewest number of routees the pool should ever have + */ + lowerBound: Int = 1, + /** + * The most number of routees the pool should ever have + */ + upperBound: Int = 10, + /** + * A routee is considered to be busy (under pressure) when + * it has at least this number of messages in its mailbox. + * When pressureThreshold is defined as 0 the routee + * is considered busy when it is currently processing a + * message. + */ + pressureThreshold: Int = 3, + /** + * Percentage to increase capacity whenever all routees are busy. + * For example, 0.2 would increase 20%, etc. + */ + rampupRate: Double = 0.2, + /** + * Fraction of capacity the pool has to fall below before backing off. + * For example, if this is 0.7, then we'll remove some routees when + * less than 70% of routees are busy. + * Use 0.0 to avoid removal of routees. + */ + backoffThreshold: Double = 0.7, + /** + * Fraction of routees to be removed when the pool reaches the + * backoffThreshold. + * Use 0.0 to avoid removal of routees. + */ + backoffRate: Double = 0.1, + /** + * When the pool shrink the abandoned actors are stopped with PoisonPill after this delay + */ + stopDelay: Duration = 1.second) extends RouterPool { + + def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) { + val requestedCapacity = capacity(currentRoutees) + + if (requestedCapacity > 0) { + val newRoutees = routerConfig.createRoutees(props, actorContext, requestedCapacity, Nil) + routerConfig.registerRoutees(actorContext, newRoutees) + } else if (requestedCapacity < 0) { + val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity) + routerConfig.unregisterRoutees(actorContext, abandon) + delayedStop(actorContext.system.scheduler, abandon) + } + } + + /** + * Give concurrent messages a chance to be placed in mailbox before + * sending PoisonPill. + */ + protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]) { + scheduler.scheduleOnce(stopDelay) { + abandon foreach (_ ! PoisonPill) + } + } + + /** + * Returns the overall desired change in pool capacity. Positive value will + * add routees to the pool. Negative value will remove routees from the + * pool. + * @param routees The current actor in the pool + * @return the number of routees by which the pool should be adjusted (positive, negative or zero) + */ + def capacity(routees: IndexedSeq[ActorRef]): Int = { + val currentSize = routees.size + val delta = filter(pressure(routees), currentSize) + val proposed = currentSize + delta + + if (proposed < lowerBound) delta + (lowerBound - proposed) + else if (proposed > upperBound) delta - (proposed - upperBound) + else delta + } + + /** + * Number of routees considered busy, or above 'pressure level'. + * + * Default implementation: + * When `pressureThreshold` > 0 the number of routees with at least + * the configured `pressureThreshold` messages in their mailbox, + * otherwise number of routees currently processing a + * message. + * + * @param routees the current pool of routees + * @return number of busy routees, between 0 and routees.size + */ + def pressure(routees: Seq[ActorRef]): Int = { + if (pressureThreshold > 0) { + routees count { + case a: LocalActorRef ⇒ a.underlying.mailbox.numberOfMessages >= pressureThreshold + case _ ⇒ false + } + } else { + routees count { + case a: LocalActorRef ⇒ + val cell = a.underlying + cell.mailbox.isScheduled && cell.currentMessage != null + case _ ⇒ false + } + } + } + + /** + * This method can be used to smooth the capacity delta by considering + * the current pressure and current capacity. + * + * @param pressure current number of busy routees + * @param capacity current number of routees + * @return proposed change in the capacity + */ + def filter(pressure: Int, capacity: Int): Int = { + rampup(pressure, capacity) + backoff(pressure, capacity) + } + + /** + * Computes a proposed positive (or zero) capacity delta using + * the configured `rampupRate`. + * @param pressure the current number of busy routees + * @param capacity the current number of total routees + * @return proposed increase in capacity + */ + def rampup(pressure: Int, capacity: Int): Int = + if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt + + /** + * Computes a proposed negative (or zero) capacity delta using + * the configured `backoffThreshold` and `backoffRate` + * @param pressure the current number of busy routees + * @param capacity the current number of total routees + * @return proposed decrease in capacity (as a negative number) + */ + def backoff(pressure: Int, capacity: Int): Int = + if (backoffThreshold > 0.0 && backoffRate > 0.0 && capacity > 0 && pressure.toDouble / capacity < backoffThreshold) + math.ceil(-1.0 * backoffRate * capacity) toInt + else 0 + +} +