diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 8c949f8776..46bf609c7a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -50,6 +50,8 @@ object ActorModelSpec { case object Restart extends ActorModelMessage + case object DoubleStop extends ActorModelMessage + case class ThrowException(e: Throwable) extends ActorModelMessage val Ping = "Ping" @@ -86,6 +88,7 @@ object ActorModelSpec { case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e + case DoubleStop ⇒ ack; context.stop(self); context.stop(self); busy.switchOff } } @@ -190,13 +193,13 @@ object ActorModelSpec { } def assertRef(actorRef: ActorRef, dispatcher: MessageDispatcher = null)( - suspensions: Long = statsFor(actorRef).suspensions.get(), - resumes: Long = statsFor(actorRef).resumes.get(), - registers: Long = statsFor(actorRef).registers.get(), - unregisters: Long = statsFor(actorRef).unregisters.get(), - msgsReceived: Long = statsFor(actorRef).msgsReceived.get(), - msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(), - restarts: Long = statsFor(actorRef).restarts.get())(implicit system: ActorSystem) { + suspensions: Long = statsFor(actorRef, dispatcher).suspensions.get(), + resumes: Long = statsFor(actorRef, dispatcher).resumes.get(), + registers: Long = statsFor(actorRef, dispatcher).registers.get(), + unregisters: Long = statsFor(actorRef, dispatcher).unregisters.get(), + msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(), + msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(), + restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) { val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher)) val deadline = System.currentTimeMillis + 1000 try { @@ -426,6 +429,14 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa assert(f5.value.isEmpty) } } + + "not double-deregister" in { + implicit val dispatcher = interceptedDispatcher() + val a = newTestActor(dispatcher.id) + a ! DoubleStop + awaitCond(statsFor(a, dispatcher).registers.get == 1) + awaitCond(statsFor(a, dispatcher).unregisters.get == 1) + } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 27853b49db..cc15ae2173 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -189,7 +189,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue final def processAllSystemMessages() { var nextMessage = systemDrain() try { - while (nextMessage ne null) { + while ((nextMessage ne null) && !isClosed) { if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c200be0871..c0bd0df251 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -16,6 +16,7 @@ import scala.collection.JavaConversions.iterableAsScalaIterable import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock import akka.jsr166y.ThreadLocalRandom @@ -30,14 +31,77 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _supervisor, _path) { - private val routeeProps = _props.copy(routerConfig = NoRouter) - private val resizeProgress = new AtomicBoolean + /* + * CAUTION: RoutedActorRef is PROBLEMATIC + * ====================================== + * + * We are constructing/assembling the children outside of the scope of the + * Router actor, inserting them in its childrenRef list, which is not at all + * synchronized. This is done exactly once at start-up, all other accesses + * are done from the Router actor. This means that the only thing which is + * really hairy is making sure that the Router does not touch its childrenRefs + * before we are done with them: create a locked latch really early (hence the + * override of newActorCell) and use that to block the Router constructor for + * as long as it takes to setup the RoutedActorRef itself. + */ + private[akka] var routeReady: ReentrantLock = _ + override def newActorCell( + system: ActorSystemImpl, + ref: InternalActorRef, + props: Props, + supervisor: InternalActorRef, + receiveTimeout: Option[Duration]): ActorCell = { + /* + * TODO RK: check that this really sticks, since this is executed before + * the constructor of RoutedActorRef is executed (invoked from + * LocalActorRef); works on HotSpot and JRockit. + */ + routeReady = new ReentrantLock + routeReady.lock() + super.newActorCell(system, ref, props, supervisor, receiveTimeout) + } + + private[akka] val routerConfig = _props.routerConfig + private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) + private[akka] val resizeProgress = new AtomicBoolean private val resizeCounter = new AtomicLong @volatile private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute def routees = _routees + private[akka] var routeeProvider: RouteeProvider = _ + val route = + try { + routeeProvider = routerConfig.createRouteeProvider(actorContext) + val r = routerConfig.createRoute(routeeProps, routeeProvider) + // initial resize, before message send + resize() + r + } finally routeReady.unlock() // unblock Router’s constructor + + if (routerConfig.resizer.isEmpty && _routees.isEmpty) + throw new ActorInitializationException("router " + routerConfig + " did not register routees!") + + _routees match { + case x ⇒ _routees = x // volatile write to publish the route before sending messages + } + + /* + * end of construction + */ + + def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { + case _: AutoReceivedMessage ⇒ Nil + case Terminated(_) ⇒ Nil + case CurrentRoutees ⇒ + sender ! RouterRoutees(_routees) + Nil + case _ ⇒ + if (route.isDefinedAt(sender, message)) route(sender, message) + else Nil + } + /** * Adds the routees to existing routees. * Adds death watch of the routees so that they are removed when terminated. @@ -61,29 +125,6 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup abandonedRoutees foreach underlying.unwatch } - private val routeeProvider = _props.routerConfig.createRouteeProvider(actorContext) - val route = _props.routerConfig.createRoute(routeeProps, routeeProvider) - // initial resize, before message send - resize() - - def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { - case _: AutoReceivedMessage ⇒ Nil - case Terminated(_) ⇒ Nil - case CurrentRoutees ⇒ - sender ! RouterRoutees(_routees) - Nil - case _ ⇒ - if (route.isDefinedAt(sender, message)) route(sender, message) - else Nil - } - - if (_props.routerConfig.resizer.isEmpty && _routees.isEmpty) - throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!") - - _routees match { - case x ⇒ _routees = x // volatile write to publish the route before sending messages - } - override def !(message: Any)(implicit sender: ActorRef = null): Unit = { resize() @@ -101,14 +142,9 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup } def resize() { - for (r ← _props.routerConfig.resizer) { - if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) { - try { - r.resize(routeeProps, routeeProvider) - } finally { - resizeProgress.set(false) - } - } + for (r ← routerConfig.resizer) { + if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) + super.!(Router.Resize) } } } @@ -251,8 +287,18 @@ trait Router extends Actor { case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") } + // make sure that we synchronize properly to get the childrenRefs into our CPU cache + ref.routeReady.lock() + try if (context.children.isEmpty) + throw new ActorInitializationException("RouterConfig did not create any children") + finally ref.routeReady.unlock() + final def receive = ({ + case Router.Resize ⇒ + try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) + finally ref.resizeProgress.set(false) + case Terminated(child) ⇒ ref.removeRoutees(IndexedSeq(child)) if (ref.routees.isEmpty) context.stop(self) @@ -264,6 +310,10 @@ trait Router extends Actor { } } +object Router { + case object Resize +} + /** * Used to broadcast a message to all connections in a router; only the * contained message will be forwarded, i.e. the `Broadcast(...)` @@ -795,14 +845,22 @@ trait Resizer { * for the initial resize and continues with 1 for the first message. Make sure to perform * initial resize before first message (messageCounter == 0), because there is no guarantee * that resize will be done when concurrent messages are in play. + * + * CAUTION: this method is invoked from the thread which tries to send a + * message to the pool, i.e. the ActorRef.!() method, hence it may be called + * concurrently. */ def isTimeForResize(messageCounter: Long): Boolean + /** * Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize` * returns true and no other resize is in progress. * Create and register more routees with `routeeProvider.registerRoutees(newRoutees) * or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and * sending [[akka.actor.PoisonPill]] to them. + * + * This method is invoked only in the context of the Router actor in order to safely + * create/stop children. */ def resize(props: Props, routeeProvider: RouteeProvider) }