make routers monitor their children

This commit is contained in:
Roland 2011-12-13 16:05:56 +01:00
parent 040f3076bc
commit 134fac4bfe
2 changed files with 96 additions and 28 deletions

View file

@ -8,25 +8,58 @@ import akka.actor._
import collection.mutable.LinkedList
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._
import akka.util.duration._
object RoutingSpec {
class TestActor extends Actor with Serializable {
class TestActor extends Actor {
def receive = {
case _
println("Hello")
}
}
class Echo extends Actor {
def receive = {
case _ sender ! self
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoutingSpec extends AkkaSpec with DefaultTimeout {
class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
val impl = system.asInstanceOf[ActorSystemImpl]
import akka.routing.RoutingSpec._
"routers in general" must {
"evict terminated routees" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)))
router ! ""
router ! ""
val c1, c2 = expectMsgType[ActorRef]
watch(router)
watch(c2)
c2.stop()
expectMsg(Terminated(c2))
// it might take a while until the Router has actually processed the Terminated message
awaitCond {
router ! ""
router ! ""
val res = receiveWhile(100 millis, messages = 2) {
case x: ActorRef x
}
res == Seq(c1, c1)
}
c1.stop()
expectMsg(Terminated(router))
}
}
"no router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props(new TestActor).withRouter(NoRouter))

View file

@ -25,10 +25,26 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
_supervisor,
_path) {
val route: Route = ({
case (_, _: AutoReceivedMessage) Nil
}: Route) orElse _props.routerConfig.createRoute(_props.creator, actorContext) orElse {
case _ Nil
@volatile
private[akka] var _routees: Vector[ActorRef] = _ // this MUST be initialized during createRoute
def routees = _routees
val route = _props.routerConfig.createRoute(_props.copy(routerConfig = NoRouter), actorContext, this)
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Nil
case Terminated(_) Nil
case _
if (route.isDefinedAt(sender, message)) route(sender, message)
else Nil
}
_routees match {
case null throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!")
case x
_routees = x // volatile write to publish the route before sending messages
// subscribe to Terminated messages for all route destinations, to be handled by Router actor
_routees foreach underlying.watch
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = {
@ -39,7 +55,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
case m m
}
route(s, message) match {
applyRoute(s, message) match {
case Nil super.!(message)(s)
case refs refs foreach (p p.recipient.!(msg)(p.sender))
}
@ -70,7 +86,7 @@ trait RouterConfig {
def targets: Iterable[String]
def createRoute(creator: () Actor, actorContext: ActorContext): Route
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route
def createActor(): Router = new Router {}
@ -89,6 +105,15 @@ trait RouterConfig {
case (x, Nil) (1 to x).map(_ context.actorOf(props))(scala.collection.breakOut)
case (_, xs) xs.map(context.actorFor(_))(scala.collection.breakOut)
}
protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[String]): Unit = {
val routees = createRoutees(props, context, nrOfInstances, targets)
registerRoutees(context, routees)
}
protected def registerRoutees(context: ActorContext, routees: Vector[ActorRef]): Unit = {
context.self.asInstanceOf[RoutedActorRef]._routees = routees
}
}
/**
@ -97,8 +122,22 @@ trait RouterConfig {
* through by returning an empty route.
*/
trait Router extends Actor {
final def receive = {
val ref = self match {
case x: RoutedActorRef x
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef")
}
final def receive = ({
case Terminated(child)
ref._routees = ref._routees filterNot (_ == child)
if (ref.routees.isEmpty) self.stop()
}: Receive) orElse routerReceive
def routerReceive: Receive = {
case _
}
}
@ -118,7 +157,7 @@ case class Broadcast(message: Any)
case object NoRouter extends RouterConfig {
def nrOfInstances: Int = 0
def targets: Iterable[String] = Nil
def createRoute(creator: () Actor, actorContext: ActorContext): Route = null
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null
}
object RoundRobinRouter {
@ -155,20 +194,19 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] =
}
trait RoundRobinLike { this: RouterConfig
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = {
createAndRegisterRoutees(props, context, nrOfInstances, targets)
val next = new AtomicInteger(0)
def getNext(): ActorRef = {
routees(next.getAndIncrement % routees.size)
ref.routees(next.getAndIncrement % ref.routees.size)
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routees)
case Broadcast(msg) toAll(sender, ref.routees)
case msg List(Destination(sender, getNext()))
}
}
@ -216,18 +254,17 @@ trait RandomLike { this: RouterConfig ⇒
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
}
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = {
createAndRegisterRoutees(props, context, nrOfInstances, targets)
def getNext(): ActorRef = {
routees(random.get.nextInt(routees.size))
ref.routees(random.get.nextInt(ref.routees.size))
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routees)
case Broadcast(msg) toAll(sender, ref.routees)
case msg List(Destination(sender, getNext()))
}
}
@ -268,14 +305,13 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = N
}
trait BroadcastLike { this: RouterConfig
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = {
createAndRegisterRoutees(props, context, nrOfInstances, targets)
{
case (sender, message)
message match {
case _ toAll(sender, routees)
case _ toAll(sender, ref.routees)
}
}
}
@ -316,16 +352,15 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It
}
trait ScatterGatherFirstCompletedLike { this: RouterConfig
def createRoute(creator: () Actor, context: ActorContext): Route = {
val routees: Vector[ActorRef] =
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = {
createAndRegisterRoutees(props, context, nrOfInstances, targets)
{
case (sender, message)
val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME!
asker.result.pipeTo(sender)
message match {
case _ toAll(asker, routees)
case _ toAll(asker, ref.routees)
}
}
}