diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index b1d84d8d21..617bfa5b5f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -10,6 +10,7 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await +import com.typesafe.config.ConfigFactory object RoutingSpec { @@ -29,7 +30,18 @@ object RoutingSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { +class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" + akka { + actor { + deployment { + /a1 { + router = round-robin + nr-of-instances = 3 + } + } + } + } + """)) with DefaultTimeout with ImplicitSender { val impl = system.asInstanceOf[ActorSystemImpl] @@ -59,6 +71,31 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { expectMsg(Terminated(router)) } + "be able to send their routees" in { + val doneLatch = new CountDownLatch(1) + + class TheActor extends Actor { + val routee1 = context.actorOf(Props[TestActor], "routee1") + val routee2 = context.actorOf(Props[TestActor], "routee2") + val routee3 = context.actorOf(Props[TestActor], "routee3") + val router = context.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(routee1, routee2, routee3)))) + + def receive = { + case RouterRoutees(iterable) ⇒ + iterable.exists(_.path.name == "routee1") must be(true) + iterable.exists(_.path.name == "routee2") must be(true) + iterable.exists(_.path.name == "routee3") must be(true) + doneLatch.countDown() + case "doIt" ⇒ + router ! CurrentRoutees + } + } + + val theActor = system.actorOf(Props(new TheActor), "theActor") + theActor ! "doIt" + doneLatch.await(1, TimeUnit.SECONDS) must be(true) + } + } "no router" must { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 0ce9e9adfb..7f9394fdc5 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,11 +4,8 @@ package akka.routing import akka.actor._ -import akka.japi.Creator -import akka.config.ConfigurationException import java.util.concurrent.atomic.AtomicInteger -import akka.util.{ ReflectiveAccess, Timeout } -import akka.AkkaException +import akka.util.Timeout import scala.collection.JavaConversions._ import java.util.concurrent.TimeUnit @@ -32,6 +29,9 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { case _: AutoReceivedMessage ⇒ Nil case Terminated(_) ⇒ Nil + case CurrentRoutees ⇒ + sender ! RouterRoutees(_routees) + Nil case _ ⇒ if (route.isDefinedAt(sender, message)) route(sender, message) else Nil @@ -147,6 +147,18 @@ trait Router extends Actor { */ case class Broadcast(message: Any) +/** + * Sending this message to a router will make it send back its currently used routees. + * A RouterRoutees message is sent asynchronously to the "requester" containing information + * about what routees the router is routing over. + */ +case object CurrentRoutees + +/** + * Message used to carry information about what routees the router is currently using. + */ +case class RouterRoutees(routees: Iterable[ActorRef]) + /** * For every message sent to a router, its route determines a set of destinations, * where for each recipient a different sender may be specified; typically the