Better handling of interrupted exception. See #2385

This commit is contained in:
Björn Antonsson 2012-08-15 15:17:39 +02:00
parent ab3c8e7ee4
commit 39723fa765
4 changed files with 60 additions and 8 deletions

View file

@ -560,6 +560,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(newContents: SystemMessage): SystemMessage = null
def systemAppend(message: SystemMessage): Unit = {}
def hasSystemMessages = false
}

View file

@ -153,18 +153,21 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
val msg = systemQueue.dequeue()
try cell.sendSystemMessage(msg)
catch {
case _: InterruptedException interrupted = true
case _: InterruptedException interrupted = true; Thread.interrupted()
}
}
if (queue.nonEmpty) {
val envelope = queue.dequeue()
try cell.tell(envelope.message, envelope.sender)
catch {
case _: InterruptedException interrupted = true
case _: InterruptedException interrupted = true; Thread.interrupted()
}
}
}
if (interrupted) throw new InterruptedException
if (interrupted) {
Thread.interrupted() // clear interrupted flag on the thread
throw new InterruptedException
}
} finally try
self.swapCell(cell)
finally try

View file

@ -136,8 +136,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t), uid), self); throw t
t match { // Wrap InterruptedExceptions and, clear the flag and rethrow
case _: InterruptedException
parent.tell(Failed(new ActorInterruptedException(t), uid), self)
Thread.interrupted()
throw t
case _ parent.tell(Failed(t, uid), self)
}
} catch {

View file

@ -228,8 +228,11 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) {
processMailbox(left - 1, deadlineNs)
} else if (Thread.interrupted()) {
throw new InterruptedException("Interrupted while processing actor messages")
}
}
}
@ -243,6 +246,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
final def processAllSystemMessages() {
var failure: Throwable = null
var nextMessage = systemDrain(null)
var shouldStop = false
while ((nextMessage ne null) && !isClosed) {
val msg = nextMessage
nextMessage = nextMessage.next
@ -251,12 +255,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
try {
actor systemInvoke msg
} catch {
case e: InterruptedException
if (failure eq null) failure = e
actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage))
if (nextMessage ne null) {
val putBack = SystemMessage.reverse(nextMessage)
systemAppend(putBack)
nextMessage = null
shouldStop = true
}
case NonFatal(e)
if (failure eq null) failure = e
actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage))
}
// dont ever execute normal message when system message present!
if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null)
if ((nextMessage eq null) && !shouldStop && !isClosed) nextMessage = systemDrain(null)
}
/*
* if we closed the mailbox, we must dump the remaining system messages
@ -350,6 +363,12 @@ private[akka] trait SystemMessageQueue {
*/
def systemDrain(newContents: SystemMessage): SystemMessage
/**
* Append a system message (possibly a list) to the queue. Should only be called from within failure handling code
* that is already "owning" the system messages.
*/
def systemAppend(message: SystemMessage): Unit
def hasSystemMessages: Boolean
}
@ -380,6 +399,32 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
}
}
@tailrec
final def systemAppend(message: SystemMessage): Unit = {
if (Mailbox.debug) println("appending " + message)
val head = systemQueueGet
assert(head != NoMessage)
if (head ne null) {
var last = head
var curr = head.next
while (curr ne null) {
last = curr
curr = curr.next
}
last.next = message
} else {
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
* guarantees that head uses the value obtained from systemQueueGet above.
* Hence, SystemMessage.next does not need to be volatile.
*/
if (!systemQueuePut(head, message)) {
systemAppend(message)
}
}
}
@tailrec
final def systemDrain(newContents: SystemMessage): SystemMessage = {
val head = systemQueueGet