diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala
index d87d688231..6ccad2a95f 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala
@@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
val latch1 = new TestLatch(1)
- router.!((latch1, busy))
+ router ! (latch1, busy)
Await.ready(latch1, 2 seconds)
val latch2 = new TestLatch(1)
- router.!((latch2, busy))
+ router ! (latch2, busy)
Await.ready(latch2, 2 seconds)
val latch3 = new TestLatch(1)
- router.!((latch3, busy))
+ router ! (latch3, busy)
Await.ready(latch3, 2 seconds)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
index 9811313688..077e69e5d9 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
@@ -267,28 +267,33 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
val usedActors = new ConcurrentHashMap[Int, String]()
val router = system.actorOf(Props(new Actor {
def receive = {
- case busy: TestLatch ⇒
+ case (busy: TestLatch, receivedLatch: TestLatch) ⇒
usedActors.put(0, self.path.toString)
+ self ! "another in busy mailbox"
+ receivedLatch.countDown()
Await.ready(busy, TestLatch.DefaultTimeout)
case (msg: Int, receivedLatch: TestLatch) ⇒
usedActors.put(msg, self.path.toString)
receivedLatch.countDown()
+ case s: String ⇒
}
}).withRouter(SmallestMailboxRouter(3)))
val busy = TestLatch(1)
- router ! busy
+ val received0 = TestLatch(1)
+ router ! (busy, received0)
+ Await.ready(received0, TestLatch.DefaultTimeout)
val received1 = TestLatch(1)
- router.!((1, received1))
+ router ! (1, received1)
Await.ready(received1, TestLatch.DefaultTimeout)
val received2 = TestLatch(1)
- router.!((2, received2))
+ router ! (2, received2)
Await.ready(received2, TestLatch.DefaultTimeout)
val received3 = TestLatch(1)
- router.!((3, received3))
+ router ! (3, received3)
Await.ready(received3, TestLatch.DefaultTimeout)
busy.countDown()
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index f021eb867a..f3065788ec 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -290,8 +290,11 @@ object RoundRobinRouter {
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
*
* The configuration parameter trumps the constructor arguments. This means that
- * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
- * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
+ * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
+ * be ignored if the router is defined in the configuration file for the actor being used.
+ *
+ * @param routees string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RoundRobinLike {
@@ -307,9 +310,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
/**
* Constructor that sets the routees to be used.
* Java API
+ * @param routeePaths string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
- def this(t: java.lang.Iterable[String]) = {
- this(routees = iterableAsScalaIterable(t))
+ def this(routeePaths: java.lang.Iterable[String]) = {
+ this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@@ -365,8 +370,11 @@ object RandomRouter {
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
*
* The configuration parameter trumps the constructor arguments. This means that
- * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
- * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
+ * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
+ * be ignored if the router is defined in the configuration file for the actor being used.
+ *
+ * @param routees string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RandomLike {
@@ -382,9 +390,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
/**
* Constructor that sets the routees to be used.
* Java API
+ * @param routeePaths string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
- def this(t: java.lang.Iterable[String]) = {
- this(routees = iterableAsScalaIterable(t))
+ def this(routeePaths: java.lang.Iterable[String]) = {
+ this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@@ -436,7 +446,7 @@ object SmallestMailboxRouter {
}
}
/**
- * A Router that tries to send to the routee with fewest messages in mailbox.
+ * A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
* The selection is done in this order:
*
* - pick any idle routee (not processing message) with empty mailbox
@@ -452,8 +462,11 @@ object SmallestMailboxRouter {
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
*
* The configuration parameter trumps the constructor arguments. This means that
- * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
- * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
+ * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
+ * be ignored if the router is defined in the configuration file for the actor being used.
+ *
+ * @param routees string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with SmallestMailboxLike {
@@ -469,9 +482,11 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
/**
* Constructor that sets the routees to be used.
* Java API
+ * @param routeePaths string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
- def this(t: java.lang.Iterable[String]) = {
- this(routees = iterableAsScalaIterable(t))
+ def this(routeePaths: java.lang.Iterable[String]) = {
+ this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@@ -518,6 +533,19 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
case _ ⇒ false
}
+ /**
+ * Returns true if the actor is currently suspended.
+ * It will always return false for remote actors.
+ * Method is exposed to subclasses to be able to implement custom
+ * routers based on mailbox and actor internal state.
+ */
+ protected def isSuspended(a: ActorRef): Boolean = a match {
+ case x: LocalActorRef ⇒
+ val cell = x.underlying
+ cell.mailbox.isSuspended
+ case _ ⇒ false
+ }
+
/**
* Returns the number of pending messages in the mailbox of the actor.
* It will always return 0 for remote actors.
@@ -535,16 +563,14 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
def getNext(): ActorRef = {
// non-local actors mailbox size is unknown, so consider them lowest priority
- val local: IndexedSeq[LocalActorRef] = for (a ← ref.routees if a.isInstanceOf[LocalActorRef]) yield a.asInstanceOf[LocalActorRef]
- // anyone not processing message and with empty mailbox
- val idle = local.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a))
- idle getOrElse {
- // anyone with empty mailbox
- val emptyMailbox = local.find(a ⇒ !hasMessages(a))
- emptyMailbox getOrElse {
- // sort on mailbox size
- local.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse {
- // no locals, just pick one, random
+ val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) ⇒ l }
+ // 1. anyone not processing message and with empty mailbox
+ activeLocal.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) getOrElse {
+ // 2. anyone with empty mailbox
+ activeLocal.find(a ⇒ !hasMessages(a)) getOrElse {
+ // 3. sort on mailbox size
+ activeLocal.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse {
+ // 4. no locals, just pick one, random
ref.routees(random.get.nextInt(ref.routees.size))
}
}
@@ -580,8 +606,11 @@ object BroadcastRouter {
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
*
* The configuration parameter trumps the constructor arguments. This means that
- * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
- * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
+ * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
+ * be ignored if the router is defined in the configuration file for the actor being used.
+ *
+ * @param routees string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with BroadcastLike {
@@ -597,9 +626,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
/**
* Constructor that sets the routees to be used.
* Java API
+ * @param routeePaths string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
- def this(t: java.lang.Iterable[String]) = {
- this(routees = iterableAsScalaIterable(t))
+ def this(routeePaths: java.lang.Iterable[String]) = {
+ this(routees = iterableAsScalaIterable(routeePaths))
}
/**
@@ -648,8 +679,11 @@ object ScatterGatherFirstCompletedRouter {
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
*
* The configuration parameter trumps the constructor arguments. This means that
- * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
- * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
+ * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
+ * be ignored if the router is defined in the configuration file for the actor being used.
+ *
+ * @param routees string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None)
@@ -666,9 +700,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
/**
* Constructor that sets the routees to be used.
* Java API
+ * @param routeePaths string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
- def this(t: java.lang.Iterable[String], w: Duration) = {
- this(routees = iterableAsScalaIterable(t), within = w)
+ def this(routeePaths: java.lang.Iterable[String], w: Duration) = {
+ this(routees = iterableAsScalaIterable(routeePaths), within = w)
}
/**
diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst
index 8cc5b94260..cdcc869b2a 100644
--- a/akka-docs/java/routing.rst
+++ b/akka-docs/java/routing.rst
@@ -45,9 +45,8 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
-*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in
-the configuration file then this value will be used instead of any programmatically sent parameters, but you must
-also define the ``router`` property in the configuration.*
+*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
+instead of any programmatically sent parameters.*
Once you have the router actor it is just to send messages to it as you would to any actor:
@@ -125,7 +124,7 @@ Try to run it a couple of times to verify its behavior if you don't trust us.
SmallestMailboxRouter
*********************
-A Router that tries to send to the routee with fewest messages in mailbox.
+A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
The selection is done in this order:
* pick any idle routee (not processing message) with empty mailbox
diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst
index 8c0e0f7366..4e75be8798 100644
--- a/akka-docs/scala/routing.rst
+++ b/akka-docs/scala/routing.rst
@@ -45,9 +45,8 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
-*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in
-the configuration file then this value will be used instead of any programmatically sent parameters, but you must
-also define the ``router`` property in the configuration.*
+*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
+instead of any programmatically sent parameters.*
Once you have the router actor it is just to send messages to it as you would to any actor:
@@ -126,7 +125,7 @@ Try to run it a couple of times to verify its behavior if you don't trust us.
SmallestMailboxRouter
*********************
-A Router that tries to send to the routee with fewest messages in mailbox.
+A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
The selection is done in this order:
* pick any idle routee (not processing message) with empty mailbox