From 134fac4bfe8d6f8b4e9b96dfbfdf532d48ae3c86 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 13 Dec 2011 16:05:56 +0100 Subject: [PATCH] make routers monitor their children --- .../test/scala/akka/routing/RoutingSpec.scala | 37 +++++++- .../src/main/scala/akka/routing/Routing.scala | 87 +++++++++++++------ 2 files changed, 96 insertions(+), 28 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 135890cb16..fec56431e5 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -8,25 +8,58 @@ import akka.actor._ import collection.mutable.LinkedList import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ +import akka.util.duration._ object RoutingSpec { - class TestActor extends Actor with Serializable { + class TestActor extends Actor { def receive = { case _ ⇒ println("Hello") } } + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RoutingSpec extends AkkaSpec with DefaultTimeout { +class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val impl = system.asInstanceOf[ActorSystemImpl] import akka.routing.RoutingSpec._ + "routers in general" must { + + "evict terminated routees" in { + val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))) + router ! "" + router ! "" + val c1, c2 = expectMsgType[ActorRef] + watch(router) + watch(c2) + c2.stop() + expectMsg(Terminated(c2)) + // it might take a while until the Router has actually processed the Terminated message + awaitCond { + router ! "" + router ! "" + val res = receiveWhile(100 millis, messages = 2) { + case x: ActorRef ⇒ x + } + res == Seq(c1, c1) + } + c1.stop() + expectMsg(Terminated(router)) + } + + } + "no router" must { "be started when constructed" in { val routedActor = system.actorOf(Props(new TestActor).withRouter(NoRouter)) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index d4ac57b998..179a3bf23f 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -25,10 +25,26 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _supervisor, _path) { - val route: Route = ({ - case (_, _: AutoReceivedMessage) ⇒ Nil - }: Route) orElse _props.routerConfig.createRoute(_props.creator, actorContext) orElse { - case _ ⇒ Nil + @volatile + private[akka] var _routees: Vector[ActorRef] = _ // this MUST be initialized during createRoute + def routees = _routees + + val route = _props.routerConfig.createRoute(_props.copy(routerConfig = NoRouter), actorContext, this) + + def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { + case _: AutoReceivedMessage ⇒ Nil + case Terminated(_) ⇒ Nil + case _ ⇒ + if (route.isDefinedAt(sender, message)) route(sender, message) + else Nil + } + + _routees match { + case null ⇒ throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!") + case x ⇒ + _routees = x // volatile write to publish the route before sending messages + // subscribe to Terminated messages for all route destinations, to be handled by Router actor + _routees foreach underlying.watch } override def !(message: Any)(implicit sender: ActorRef = null): Unit = { @@ -39,7 +55,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup case m ⇒ m } - route(s, message) match { + applyRoute(s, message) match { case Nil ⇒ super.!(message)(s) case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) } @@ -70,7 +86,7 @@ trait RouterConfig { def targets: Iterable[String] - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Route + def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route def createActor(): Router = new Router {} @@ -89,6 +105,15 @@ trait RouterConfig { case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) case (_, xs) ⇒ xs.map(context.actorFor(_))(scala.collection.breakOut) } + + protected def createAndRegisterRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[String]): Unit = { + val routees = createRoutees(props, context, nrOfInstances, targets) + registerRoutees(context, routees) + } + + protected def registerRoutees(context: ActorContext, routees: Vector[ActorRef]): Unit = { + context.self.asInstanceOf[RoutedActorRef]._routees = routees + } } /** @@ -97,8 +122,22 @@ trait RouterConfig { * through by returning an empty route. */ trait Router extends Actor { - final def receive = { + + val ref = self match { + case x: RoutedActorRef ⇒ x + case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") + } + + final def receive = ({ + case Terminated(child) ⇒ + ref._routees = ref._routees filterNot (_ == child) + if (ref.routees.isEmpty) self.stop() + + }: Receive) orElse routerReceive + + def routerReceive: Receive = { + case _ ⇒ } } @@ -118,7 +157,7 @@ case class Broadcast(message: Any) case object NoRouter extends RouterConfig { def nrOfInstances: Int = 0 def targets: Iterable[String] = Nil - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Route = null + def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null } object RoundRobinRouter { @@ -155,20 +194,19 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] = } trait RoundRobinLike { this: RouterConfig ⇒ - def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { - val routees: Vector[ActorRef] = - createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = { + createAndRegisterRoutees(props, context, nrOfInstances, targets) val next = new AtomicInteger(0) def getNext(): ActorRef = { - routees(next.getAndIncrement % routees.size) + ref.routees(next.getAndIncrement % ref.routees.size) } { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, routees) + case Broadcast(msg) ⇒ toAll(sender, ref.routees) case msg ⇒ List(Destination(sender, getNext())) } } @@ -216,18 +254,17 @@ trait RandomLike { this: RouterConfig ⇒ override def initialValue = SecureRandom.getInstance("SHA1PRNG") } - def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { - val routees: Vector[ActorRef] = - createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = { + createAndRegisterRoutees(props, context, nrOfInstances, targets) def getNext(): ActorRef = { - routees(random.get.nextInt(routees.size)) + ref.routees(random.get.nextInt(ref.routees.size)) } { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, routees) + case Broadcast(msg) ⇒ toAll(sender, ref.routees) case msg ⇒ List(Destination(sender, getNext())) } } @@ -268,14 +305,13 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = N } trait BroadcastLike { this: RouterConfig ⇒ - def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { - val routees: Vector[ActorRef] = - createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = { + createAndRegisterRoutees(props, context, nrOfInstances, targets) { case (sender, message) ⇒ message match { - case _ ⇒ toAll(sender, routees) + case _ ⇒ toAll(sender, ref.routees) } } } @@ -316,16 +352,15 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It } trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ - def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { - val routees: Vector[ActorRef] = - createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + def createRoute(props: Props, context: ActorContext, ref: RoutedActorRef): Route = { + createAndRegisterRoutees(props, context, nrOfInstances, targets) { case (sender, message) ⇒ val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME! asker.result.pipeTo(sender) message match { - case _ ⇒ toAll(asker, routees) + case _ ⇒ toAll(asker, ref.routees) } } }