now that was a nice journey (related to #1804)

- first, fix quite some data races in RoutedActorRef wrt. the contained
  ActorCell’s childrenRef field (which is not even @volatile)
- then notice that there still are double-deregistrations happening in
  the dispatcher
- coming finally to the conclusion that the Mailbox should not really
  process all system messages in processAllSystemMessages(): we should
  really really stop after having closed the mailbox ;-)
- added simple test case which stops self twice to keep this fixed
This commit is contained in:
Roland 2012-02-09 16:59:16 +01:00
parent 34d9714e22
commit ca3deb4007
3 changed files with 110 additions and 41 deletions

View file

@ -50,6 +50,8 @@ object ActorModelSpec {
case object Restart extends ActorModelMessage
case object DoubleStop extends ActorModelMessage
case class ThrowException(e: Throwable) extends ActorModelMessage
val Ping = "Ping"
@ -86,6 +88,7 @@ object ActorModelSpec {
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!")
case ThrowException(e: Throwable) ack; busy.switchOff(); throw e
case DoubleStop ack; context.stop(self); context.stop(self); busy.switchOff
}
}
@ -190,13 +193,13 @@ object ActorModelSpec {
}
def assertRef(actorRef: ActorRef, dispatcher: MessageDispatcher = null)(
suspensions: Long = statsFor(actorRef).suspensions.get(),
resumes: Long = statsFor(actorRef).resumes.get(),
registers: Long = statsFor(actorRef).registers.get(),
unregisters: Long = statsFor(actorRef).unregisters.get(),
msgsReceived: Long = statsFor(actorRef).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(),
restarts: Long = statsFor(actorRef).restarts.get())(implicit system: ActorSystem) {
suspensions: Long = statsFor(actorRef, dispatcher).suspensions.get(),
resumes: Long = statsFor(actorRef, dispatcher).resumes.get(),
registers: Long = statsFor(actorRef, dispatcher).registers.get(),
unregisters: Long = statsFor(actorRef, dispatcher).unregisters.get(),
msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(),
restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) {
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher))
val deadline = System.currentTimeMillis + 1000
try {
@ -426,6 +429,14 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
assert(f5.value.isEmpty)
}
}
"not double-deregister" in {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id)
a ! DoubleStop
awaitCond(statsFor(a, dispatcher).registers.get == 1)
awaitCond(statsFor(a, dispatcher).unregisters.get == 1)
}
}
}

View file

@ -189,7 +189,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
final def processAllSystemMessages() {
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
actor systemInvoke nextMessage
nextMessage = nextMessage.next

View file

@ -16,6 +16,7 @@ import scala.collection.JavaConversions.iterableAsScalaIterable
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import akka.jsr166y.ThreadLocalRandom
@ -30,14 +31,77 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
_supervisor,
_path) {
private val routeeProps = _props.copy(routerConfig = NoRouter)
private val resizeProgress = new AtomicBoolean
/*
* CAUTION: RoutedActorRef is PROBLEMATIC
* ======================================
*
* We are constructing/assembling the children outside of the scope of the
* Router actor, inserting them in its childrenRef list, which is not at all
* synchronized. This is done exactly once at start-up, all other accesses
* are done from the Router actor. This means that the only thing which is
* really hairy is making sure that the Router does not touch its childrenRefs
* before we are done with them: create a locked latch really early (hence the
* override of newActorCell) and use that to block the Router constructor for
* as long as it takes to setup the RoutedActorRef itself.
*/
private[akka] var routeReady: ReentrantLock = _
override def newActorCell(
system: ActorSystemImpl,
ref: InternalActorRef,
props: Props,
supervisor: InternalActorRef,
receiveTimeout: Option[Duration]): ActorCell = {
/*
* TODO RK: check that this really sticks, since this is executed before
* the constructor of RoutedActorRef is executed (invoked from
* LocalActorRef); works on HotSpot and JRockit.
*/
routeReady = new ReentrantLock
routeReady.lock()
super.newActorCell(system, ref, props, supervisor, receiveTimeout)
}
private[akka] val routerConfig = _props.routerConfig
private[akka] val routeeProps = _props.copy(routerConfig = NoRouter)
private[akka] val resizeProgress = new AtomicBoolean
private val resizeCounter = new AtomicLong
@volatile
private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute
def routees = _routees
private[akka] var routeeProvider: RouteeProvider = _
val route =
try {
routeeProvider = routerConfig.createRouteeProvider(actorContext)
val r = routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send
resize()
r
} finally routeReady.unlock() // unblock Routers constructor
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
_routees match {
case x _routees = x // volatile write to publish the route before sending messages
}
/*
* end of construction
*/
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Nil
case Terminated(_) Nil
case CurrentRoutees
sender ! RouterRoutees(_routees)
Nil
case _
if (route.isDefinedAt(sender, message)) route(sender, message)
else Nil
}
/**
* Adds the routees to existing routees.
* Adds death watch of the routees so that they are removed when terminated.
@ -61,29 +125,6 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
abandonedRoutees foreach underlying.unwatch
}
private val routeeProvider = _props.routerConfig.createRouteeProvider(actorContext)
val route = _props.routerConfig.createRoute(routeeProps, routeeProvider)
// initial resize, before message send
resize()
def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match {
case _: AutoReceivedMessage Nil
case Terminated(_) Nil
case CurrentRoutees
sender ! RouterRoutees(_routees)
Nil
case _
if (route.isDefinedAt(sender, message)) route(sender, message)
else Nil
}
if (_props.routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!")
_routees match {
case x _routees = x // volatile write to publish the route before sending messages
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = {
resize()
@ -101,14 +142,9 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
}
def resize() {
for (r _props.routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) {
try {
r.resize(routeeProps, routeeProvider)
} finally {
resizeProgress.set(false)
}
}
for (r routerConfig.resizer) {
if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true))
super.!(Router.Resize)
}
}
}
@ -251,8 +287,18 @@ trait Router extends Actor {
case _ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef")
}
// make sure that we synchronize properly to get the childrenRefs into our CPU cache
ref.routeReady.lock()
try if (context.children.isEmpty)
throw new ActorInitializationException("RouterConfig did not create any children")
finally ref.routeReady.unlock()
final def receive = ({
case Router.Resize
try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider))
finally ref.resizeProgress.set(false)
case Terminated(child)
ref.removeRoutees(IndexedSeq(child))
if (ref.routees.isEmpty) context.stop(self)
@ -264,6 +310,10 @@ trait Router extends Actor {
}
}
object Router {
case object Resize
}
/**
* Used to broadcast a message to all connections in a router; only the
* contained message will be forwarded, i.e. the `Broadcast(...)`
@ -795,14 +845,22 @@ trait Resizer {
* for the initial resize and continues with 1 for the first message. Make sure to perform
* initial resize before first message (messageCounter == 0), because there is no guarantee
* that resize will be done when concurrent messages are in play.
*
* CAUTION: this method is invoked from the thread which tries to send a
* message to the pool, i.e. the ActorRef.!() method, hence it may be called
* concurrently.
*/
def isTimeForResize(messageCounter: Long): Boolean
/**
* Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize`
* returns true and no other resize is in progress.
* Create and register more routees with `routeeProvider.registerRoutees(newRoutees)
* or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and
* sending [[akka.actor.PoisonPill]] to them.
*
* This method is invoked only in the context of the Router actor in order to safely
* create/stop children.
*/
def resize(props: Props, routeeProvider: RouteeProvider)
}