make cleanUp of systemMessages atomic

- extend systemDrain to take the new contents which shall be switched in
- make NoMessage placeholder which will signal final closing of the
  mailbox
- put that in when cleaning up, and check it when enqueuing
This commit is contained in:
Roland 2012-06-04 12:18:30 +02:00
parent 1821927023
commit fd1d0ce121
5 changed files with 40 additions and 35 deletions

View file

@ -374,7 +374,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path } def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path }
} foreach { } foreach {
case cell: ActorCell case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain(null)))
} }
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)

View file

@ -545,7 +545,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
becomeClosed() becomeClosed()
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
deadLetters ! DeadLetter(handle, receiver, receiver) deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(): SystemMessage = null def systemDrain(newContents: SystemMessage): SystemMessage = null
def hasSystemMessages = false def hasSystemMessages = false
} }

View file

@ -107,6 +107,10 @@ private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends Sys
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch
/**
* INTERNAL API
*/
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable { final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
def run(): Unit = def run(): Unit =

View file

@ -52,8 +52,7 @@ class BalancingDispatcher(
override def cleanUp(): Unit = { override def cleanUp(): Unit = {
val dlq = actor.systemImpl.deadLetterMailbox val dlq = actor.systemImpl.deadLetterMailbox
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that //Don't call the original implementation of this since it scraps all messages, and we don't want to do that
while (hasSystemMessages) { var message = systemDrain(NoMessage)
var message = systemDrain()
while (message ne null) { while (message ne null) {
// message must be virgin before being able to systemEnqueue again // message must be virgin before being able to systemEnqueue again
val next = message.next val next = message.next
@ -63,7 +62,6 @@ class BalancingDispatcher(
} }
} }
} }
}
protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue) protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue)

View file

@ -169,6 +169,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
*/ */
protected final def systemQueueGet: SystemMessage = protected final def systemQueueGet: SystemMessage =
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage] Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)
@ -208,14 +209,14 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
} }
final def processAllSystemMessages() { final def processAllSystemMessages() {
var nextMessage = systemDrain() var nextMessage = systemDrain(null)
try { try {
while ((nextMessage ne null) && !isClosed) { while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs) if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs)
actor systemInvoke nextMessage actor systemInvoke nextMessage
nextMessage = nextMessage.next nextMessage = nextMessage.next
// dont ever execute normal message when system message present! // dont ever execute normal message when system message present!
if (nextMessage eq null) nextMessage = systemDrain() if (nextMessage eq null) nextMessage = systemDrain(null)
} }
} catch { } catch {
case NonFatal(e) case NonFatal(e)
@ -235,8 +236,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
protected[dispatch] def cleanUp(): Unit = protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox val dlm = actor.systemImpl.deadLetterMailbox
while (hasSystemMessages) { var message = systemDrain(NoMessage)
var message = systemDrain()
while (message ne null) { while (message ne null) {
// message must be virgin before being able to systemEnqueue again // message must be virgin before being able to systemEnqueue again
val next = message.next val next = message.next
@ -244,7 +244,6 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
dlm.systemEnqueue(actor.self, message) dlm.systemEnqueue(actor.self, message)
message = next message = next
} }
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue) messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
@ -300,7 +299,7 @@ private[akka] trait SystemMessageQueue {
/** /**
* Dequeue all messages from system queue and return them as single-linked list. * Dequeue all messages from system queue and return them as single-linked list.
*/ */
def systemDrain(): SystemMessage def systemDrain(newContents: SystemMessage): SystemMessage
def hasSystemMessages: Boolean def hasSystemMessages: Boolean
} }
@ -315,6 +314,8 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
assert(message.next eq null) assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message) if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet val head = systemQueueGet
if (head == NoMessage) actor.system.deadLetterMailbox.systemEnqueue(receiver, message)
else {
/* /*
* this write is safely published by the compareAndSet contained within * this write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec * systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
@ -327,14 +328,16 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
systemEnqueue(receiver, message) systemEnqueue(receiver, message)
} }
} }
}
@tailrec @tailrec
final def systemDrain(): SystemMessage = { final def systemDrain(newContents: SystemMessage): SystemMessage = {
val head = systemQueueGet val head = systemQueueGet
if (systemQueuePut(head, null)) SystemMessage.reverse(head) else systemDrain() if (systemQueuePut(head, newContents)) SystemMessage.reverse(head) else systemDrain(newContents)
} }
def hasSystemMessages: Boolean = systemQueueGet ne null def hasSystemMessages: Boolean = systemQueueGet ne null
} }
/** /**