Support config of custom router. See #1623
This commit is contained in:
parent
1f3926fa0e
commit
8d10d44929
7 changed files with 69 additions and 13 deletions
|
|
@ -178,7 +178,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
||||||
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
|
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
|
||||||
|
|
||||||
def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = {
|
def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = {
|
||||||
count.set(0)
|
(10 millis).dilated.sleep
|
||||||
for (m ← 0 until loops) {
|
for (m ← 0 until loops) {
|
||||||
router.!((t, latch, count))
|
router.!((t, latch, count))
|
||||||
(10 millis).dilated.sleep
|
(10 millis).dilated.sleep
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.util.Duration
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
object RoutingSpec {
|
object RoutingSpec {
|
||||||
|
|
||||||
|
|
@ -22,6 +23,10 @@ object RoutingSpec {
|
||||||
router = round-robin
|
router = round-robin
|
||||||
nr-of-instances = 3
|
nr-of-instances = 3
|
||||||
}
|
}
|
||||||
|
/myrouter {
|
||||||
|
router = "akka.routing.RoutingSpec$MyRouter"
|
||||||
|
foo = bar
|
||||||
|
}
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -38,6 +43,18 @@ object RoutingSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class MyRouter(config: Config) extends RouterConfig {
|
||||||
|
val foo = config.getString("foo")
|
||||||
|
def createRoute(routeeProps: Props, actorContext: ActorContext): Route = {
|
||||||
|
val routees = IndexedSeq(actorContext.actorOf(Props[Echo]))
|
||||||
|
registerRoutees(actorContext, routees)
|
||||||
|
|
||||||
|
{
|
||||||
|
case (sender, message) ⇒ Nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
|
@ -465,6 +482,10 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
||||||
sys.shutdown()
|
sys.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
"support custom router" in {
|
||||||
|
val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter")
|
||||||
|
myrouter.isTerminated must be(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"custom router" must {
|
"custom router" must {
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,9 @@ akka {
|
||||||
|
|
||||||
# routing (load-balance) scheme to use
|
# routing (load-balance) scheme to use
|
||||||
# available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast"
|
# available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast"
|
||||||
# or: fully qualified class name of the router class
|
# or: Fully qualified class name of the router class.
|
||||||
|
# The router class must extend akka.routing.CustomRouterConfig and and have constructor
|
||||||
|
# with com.typesafe.config.Config parameter.
|
||||||
# default is "from-code";
|
# default is "from-code";
|
||||||
# Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter).
|
# Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter).
|
||||||
# The type of router can be overridden in the configuration; specifying "from-code" means
|
# The type of router can be overridden in the configuration; specifying "from-code" means
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.event.EventStream
|
||||||
import com.typesafe.config._
|
import com.typesafe.config._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
|
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
|
||||||
|
import akka.util.ReflectiveAccess
|
||||||
|
|
||||||
case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope)
|
case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope)
|
||||||
|
|
||||||
|
|
@ -56,16 +57,7 @@ class Deployer(val settings: ActorSystem.Settings) {
|
||||||
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
|
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
|
||||||
|
|
||||||
val resizer: Option[Resizer] = if (config.hasPath("resizer")) {
|
val resizer: Option[Resizer] = if (config.hasPath("resizer")) {
|
||||||
val resizerConfig = deployment.getConfig("resizer")
|
Some(DefaultResizer(deployment.getConfig("resizer")))
|
||||||
Some(DefaultResizer(
|
|
||||||
lowerBound = resizerConfig.getInt("lower-bound"),
|
|
||||||
upperBound = resizerConfig.getInt("upper-bound"),
|
|
||||||
pressureThreshold = resizerConfig.getInt("pressure-threshold"),
|
|
||||||
rampupRate = resizerConfig.getDouble("rampup-rate"),
|
|
||||||
backoffThreshold = resizerConfig.getDouble("backoff-threshold"),
|
|
||||||
backoffRate = resizerConfig.getDouble("backoff-rate"),
|
|
||||||
stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS),
|
|
||||||
messagesPerResize = resizerConfig.getInt("messages-per-resize")))
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
@ -77,7 +69,17 @@ class Deployer(val settings: ActorSystem.Settings) {
|
||||||
case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer)
|
case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer)
|
||||||
case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
|
case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
|
||||||
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
||||||
case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key)
|
case fqn ⇒
|
||||||
|
val constructorSignature = Array[Class[_]](classOf[Config])
|
||||||
|
ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match {
|
||||||
|
case Right(router) ⇒ router
|
||||||
|
case Left(exception) ⇒
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
("Cannot instantiate router [%s], defined in [%s], " +
|
||||||
|
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
|
||||||
|
"[com.typesafe.config.Config] parameter")
|
||||||
|
.format(fqn, key), exception)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val recipe: Option[ActorRecipe] =
|
val recipe: Option[ActorRecipe] =
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,10 @@ import akka.actor._
|
||||||
import akka.dispatch.Future
|
import akka.dispatch.Future
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import akka.util.{ Duration, Timeout }
|
import akka.util.{ Duration, Timeout }
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
|
import com.typesafe.config.Config
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import scala.collection.JavaConversions.iterableAsScalaIterable
|
import scala.collection.JavaConversions.iterableAsScalaIterable
|
||||||
|
|
||||||
|
|
@ -760,6 +762,19 @@ trait Resizer {
|
||||||
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig)
|
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case object DefaultResizer {
|
||||||
|
def apply(resizerConfig: Config): DefaultResizer =
|
||||||
|
DefaultResizer(
|
||||||
|
lowerBound = resizerConfig.getInt("lower-bound"),
|
||||||
|
upperBound = resizerConfig.getInt("upper-bound"),
|
||||||
|
pressureThreshold = resizerConfig.getInt("pressure-threshold"),
|
||||||
|
rampupRate = resizerConfig.getDouble("rampup-rate"),
|
||||||
|
backoffThreshold = resizerConfig.getDouble("backoff-threshold"),
|
||||||
|
backoffRate = resizerConfig.getDouble("backoff-rate"),
|
||||||
|
stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS),
|
||||||
|
messagesPerResize = resizerConfig.getInt("messages-per-resize"))
|
||||||
|
}
|
||||||
|
|
||||||
case class DefaultResizer(
|
case class DefaultResizer(
|
||||||
/**
|
/**
|
||||||
* The fewest number of routees the router should ever have.
|
* The fewest number of routees the router should ever have.
|
||||||
|
|
|
||||||
|
|
@ -256,6 +256,14 @@ If you are interested in how to use the VoteCountRouter it looks like this:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest
|
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest
|
||||||
|
|
||||||
|
Configured Custom Router
|
||||||
|
************************
|
||||||
|
|
||||||
|
It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment
|
||||||
|
configuration you define the fully qualified class name of the router class. The router class must extend
|
||||||
|
``akka.routing.CustomRouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter.
|
||||||
|
The deployment section of the configuration is passed to the constructor.
|
||||||
|
|
||||||
Custom Resizer
|
Custom Resizer
|
||||||
**************
|
**************
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -255,6 +255,14 @@ All in all the custom router looks like this:
|
||||||
If you are interested in how to use the VoteCountRouter you can have a look at the test class
|
If you are interested in how to use the VoteCountRouter you can have a look at the test class
|
||||||
`RoutingSpec <https://github.com/jboner/akka/blob/master/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala>`_
|
`RoutingSpec <https://github.com/jboner/akka/blob/master/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala>`_
|
||||||
|
|
||||||
|
Configured Custom Router
|
||||||
|
************************
|
||||||
|
|
||||||
|
It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment
|
||||||
|
configuration you define the fully qualified class name of the router class. The router class must extend
|
||||||
|
``akka.routing.RouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter.
|
||||||
|
The deployment section of the configuration is passed to the constructor.
|
||||||
|
|
||||||
Custom Resizer
|
Custom Resizer
|
||||||
**************
|
**************
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue