Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-12-20 12:33:17 +01:00
commit f3406acae3
2 changed files with 54 additions and 5 deletions

View file

@ -10,6 +10,7 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Await import akka.dispatch.Await
import com.typesafe.config.ConfigFactory
object RoutingSpec { object RoutingSpec {
@ -29,7 +30,18 @@ object RoutingSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
akka {
actor {
deployment {
/a1 {
router = round-robin
nr-of-instances = 3
}
}
}
}
""")) with DefaultTimeout with ImplicitSender {
val impl = system.asInstanceOf[ActorSystemImpl] val impl = system.asInstanceOf[ActorSystemImpl]
@ -59,6 +71,31 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
expectMsg(Terminated(router)) expectMsg(Terminated(router))
} }
"be able to send their routees" in {
val doneLatch = new CountDownLatch(1)
class TheActor extends Actor {
val routee1 = context.actorOf(Props[TestActor], "routee1")
val routee2 = context.actorOf(Props[TestActor], "routee2")
val routee3 = context.actorOf(Props[TestActor], "routee3")
val router = context.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(routee1, routee2, routee3))))
def receive = {
case RouterRoutees(iterable)
iterable.exists(_.path.name == "routee1") must be(true)
iterable.exists(_.path.name == "routee2") must be(true)
iterable.exists(_.path.name == "routee3") must be(true)
doneLatch.countDown()
case "doIt"
router ! CurrentRoutees
}
}
val theActor = system.actorOf(Props(new TheActor), "theActor")
theActor ! "doIt"
doneLatch.await(1, TimeUnit.SECONDS) must be(true)
}
} }
"no router" must { "no router" must {

View file

@ -4,11 +4,8 @@
package akka.routing package akka.routing
import akka.actor._ import akka.actor._
import akka.japi.Creator
import akka.config.ConfigurationException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.util.{ ReflectiveAccess, Timeout } import akka.util.Timeout
import akka.AkkaException
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -32,6 +29,9 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Nil case _: AutoReceivedMessage Nil
case Terminated(_) Nil case Terminated(_) Nil
case CurrentRoutees
sender ! RouterRoutees(_routees)
Nil
case _ case _
if (route.isDefinedAt(sender, message)) route(sender, message) if (route.isDefinedAt(sender, message)) route(sender, message)
else Nil else Nil
@ -147,6 +147,18 @@ trait Router extends Actor {
*/ */
case class Broadcast(message: Any) case class Broadcast(message: Any)
/**
* Sending this message to a router will make it send back its currently used routees.
* A RouterRoutees message is sent asynchronously to the "requester" containing information
* about what routees the router is routing over.
*/
case object CurrentRoutees
/**
* Message used to carry information about what routees the router is currently using.
*/
case class RouterRoutees(routees: Iterable[ActorRef])
/** /**
* For every message sent to a router, its route determines a set of destinations, * For every message sent to a router, its route determines a set of destinations,
* where for each recipient a different sender may be specified; typically the * where for each recipient a different sender may be specified; typically the