diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 6ccad2a95f..35cc429fa6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -178,7 +178,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - count.set(0) + (10 millis).dilated.sleep for (m ← 0 until loops) { router.!((t, latch, count)) (10 millis).dilated.sleep diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 077e69e5d9..a9ec39ff6e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -13,6 +13,7 @@ import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory import java.util.concurrent.ConcurrentHashMap +import com.typesafe.config.Config object RoutingSpec { @@ -22,6 +23,10 @@ object RoutingSpec { router = round-robin nr-of-instances = 3 } + /myrouter { + router = "akka.routing.RoutingSpec$MyRouter" + foo = bar + } } """ @@ -38,6 +43,18 @@ object RoutingSpec { } } + class MyRouter(config: Config) extends RouterConfig { + val foo = config.getString("foo") + def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { + val routees = IndexedSeq(actorContext.actorOf(Props[Echo])) + registerRoutees(actorContext, routees) + + { + case (sender, message) ⇒ Nil + } + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -465,6 +482,10 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with sys.shutdown() } } + "support custom router" in { + val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter") + myrouter.isTerminated must be(false) + } } "custom router" must { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 07e363fca9..02d1a49035 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -65,7 +65,9 @@ akka { # routing (load-balance) scheme to use # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast" - # or: fully qualified class name of the router class + # or: Fully qualified class name of the router class. + # The router class must extend akka.routing.CustomRouterConfig and and have constructor + # with com.typesafe.config.Config parameter. # default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). # The type of router can be overridden in the configuration; specifying "from-code" means diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 23c6da6661..5ac4c13391 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -13,6 +13,7 @@ import akka.event.EventStream import com.typesafe.config._ import akka.routing._ import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } +import akka.util.ReflectiveAccess case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) @@ -56,16 +57,7 @@ class Deployer(val settings: ActorSystem.Settings) { val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) val resizer: Option[Resizer] = if (config.hasPath("resizer")) { - val resizerConfig = deployment.getConfig("resizer") - Some(DefaultResizer( - lowerBound = resizerConfig.getInt("lower-bound"), - upperBound = resizerConfig.getInt("upper-bound"), - pressureThreshold = resizerConfig.getInt("pressure-threshold"), - rampupRate = resizerConfig.getDouble("rampup-rate"), - backoffThreshold = resizerConfig.getDouble("backoff-threshold"), - backoffRate = resizerConfig.getDouble("backoff-rate"), - stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), - messagesPerResize = resizerConfig.getInt("messages-per-resize"))) + Some(DefaultResizer(deployment.getConfig("resizer"))) } else { None } @@ -77,7 +69,17 @@ class Deployer(val settings: ActorSystem.Settings) { case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) + case fqn ⇒ + val constructorSignature = Array[Class[_]](classOf[Config]) + ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match { + case Right(router) ⇒ router + case Left(exception) ⇒ + throw new IllegalArgumentException( + ("Cannot instantiate router [%s], defined in [%s], " + + "make sure it extends [akka.routing.RouterConfig] and has constructor with " + + "[com.typesafe.config.Config] parameter") + .format(fqn, key), exception) + } } val recipe: Option[ActorRecipe] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index f3065788ec..0473f99fd6 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,8 +7,10 @@ import akka.actor._ import akka.dispatch.Future import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeUnit import akka.util.{ Duration, Timeout } import akka.util.duration._ +import com.typesafe.config.Config import akka.config.ConfigurationException import scala.collection.JavaConversions.iterableAsScalaIterable @@ -760,6 +762,19 @@ trait Resizer { def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) } +case object DefaultResizer { + def apply(resizerConfig: Config): DefaultResizer = + DefaultResizer( + lowerBound = resizerConfig.getInt("lower-bound"), + upperBound = resizerConfig.getInt("upper-bound"), + pressureThreshold = resizerConfig.getInt("pressure-threshold"), + rampupRate = resizerConfig.getDouble("rampup-rate"), + backoffThreshold = resizerConfig.getDouble("backoff-threshold"), + backoffRate = resizerConfig.getDouble("backoff-rate"), + stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), + messagesPerResize = resizerConfig.getInt("messages-per-resize")) +} + case class DefaultResizer( /** * The fewest number of routees the router should ever have. diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index cdcc869b2a..42ad1108ea 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -256,6 +256,14 @@ If you are interested in how to use the VoteCountRouter it looks like this: .. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest +Configured Custom Router +************************ + +It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment +configuration you define the fully qualified class name of the router class. The router class must extend +``akka.routing.CustomRouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter. +The deployment section of the configuration is passed to the constructor. + Custom Resizer ************** diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 4e75be8798..5b2ed24d28 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -255,6 +255,14 @@ All in all the custom router looks like this: If you are interested in how to use the VoteCountRouter you can have a look at the test class `RoutingSpec `_ +Configured Custom Router +************************ + +It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment +configuration you define the fully qualified class name of the router class. The router class must extend +``akka.routing.RouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter. +The deployment section of the configuration is passed to the constructor. + Custom Resizer **************