InterruptedException changes based on discussion. See #2385
This commit is contained in:
parent
a249e3e49b
commit
1aa2319023
4 changed files with 14 additions and 52 deletions
|
|
@ -244,9 +244,8 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
|
|||
* already dequeued message to deadLetters.
|
||||
*/
|
||||
final def processAllSystemMessages() {
|
||||
var failure: Throwable = null
|
||||
var interruption: Throwable = null
|
||||
var nextMessage = systemDrain(null)
|
||||
var shouldStop = false
|
||||
while ((nextMessage ne null) && !isClosed) {
|
||||
val msg = nextMessage
|
||||
nextMessage = nextMessage.next
|
||||
|
|
@ -255,21 +254,12 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
|
|||
try {
|
||||
actor systemInvoke msg
|
||||
} catch {
|
||||
// we know here that systemInvoke ensures that only InterruptedException and "fatal" exceptions get rethrown
|
||||
case e: InterruptedException ⇒
|
||||
if (failure eq null) failure = e
|
||||
actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage))
|
||||
if (nextMessage ne null) {
|
||||
val putBack = SystemMessage.reverse(nextMessage)
|
||||
systemAppend(putBack)
|
||||
nextMessage = null
|
||||
shouldStop = true
|
||||
}
|
||||
case NonFatal(e) ⇒
|
||||
if (failure eq null) failure = e
|
||||
actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage))
|
||||
if (interruption eq null) interruption = e
|
||||
}
|
||||
// don’t ever execute normal message when system message present!
|
||||
if ((nextMessage eq null) && !shouldStop && !isClosed) nextMessage = systemDrain(null)
|
||||
if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null)
|
||||
}
|
||||
/*
|
||||
* if we closed the mailbox, we must dump the remaining system messages
|
||||
|
|
@ -282,12 +272,17 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
|
|||
msg.next = null
|
||||
try dlm.systemEnqueue(actor.self, msg)
|
||||
catch {
|
||||
case e: InterruptedException ⇒
|
||||
if (interruption eq null) interruption = e
|
||||
case NonFatal(e) ⇒ actor.system.eventStream.publish(
|
||||
Error(e, actor.self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage))
|
||||
}
|
||||
}
|
||||
// if something happened while processing, fail this actor (most probable: exception in supervisorStrategy)
|
||||
if (failure ne null) actor.handleInvokeFailure(Nil, failure, failure.getMessage)
|
||||
// if we got an interrupted exception while handling system messages, then rethrow it
|
||||
if (interruption ne null) {
|
||||
Thread.interrupted() // clear interrupted flag on the thread
|
||||
throw interruption
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -363,12 +358,6 @@ private[akka] trait SystemMessageQueue {
|
|||
*/
|
||||
def systemDrain(newContents: SystemMessage): SystemMessage
|
||||
|
||||
/**
|
||||
* Append a system message (possibly a list) to the queue. Should only be called from within failure handling code
|
||||
* that is already "owning" the system messages.
|
||||
*/
|
||||
def systemAppend(message: SystemMessage): Unit
|
||||
|
||||
def hasSystemMessages: Boolean
|
||||
}
|
||||
|
||||
|
|
@ -399,32 +388,6 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
|
|||
}
|
||||
}
|
||||
|
||||
@tailrec
|
||||
final def systemAppend(message: SystemMessage): Unit = {
|
||||
if (Mailbox.debug) println("appending " + message)
|
||||
val head = systemQueueGet
|
||||
assert(head != NoMessage)
|
||||
if (head ne null) {
|
||||
var last = head
|
||||
var curr = head.next
|
||||
while (curr ne null) {
|
||||
last = curr
|
||||
curr = curr.next
|
||||
}
|
||||
last.next = message
|
||||
} else {
|
||||
/*
|
||||
* this write is safely published by the compareAndSet contained within
|
||||
* systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec
|
||||
* guarantees that “head” uses the value obtained from systemQueueGet above.
|
||||
* Hence, SystemMessage.next does not need to be volatile.
|
||||
*/
|
||||
if (!systemQueuePut(head, message)) {
|
||||
systemAppend(message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@tailrec
|
||||
final def systemDrain(newContents: SystemMessage): SystemMessage = {
|
||||
val head = systemQueueGet
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue