diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 67b6cdb18b..d10f7ba29c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -560,7 +560,6 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) def systemDrain(newContents: SystemMessage): SystemMessage = null - def systemAppend(message: SystemMessage): Unit = {} def hasSystemMessages = false } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index fc03911405..3ceaec4e91 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -153,14 +153,14 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep val msg = systemQueue.dequeue() try cell.sendSystemMessage(msg) catch { - case _: InterruptedException ⇒ interrupted = true; Thread.interrupted() + case _: InterruptedException ⇒ interrupted = true; Thread.interrupted() // clear interrupted flag on the thread } } if (queue.nonEmpty) { val envelope = queue.dequeue() try cell.tell(envelope.message, envelope.sender) catch { - case _: InterruptedException ⇒ interrupted = true; Thread.interrupted() + case _: InterruptedException ⇒ interrupted = true; Thread.interrupted() // clear interrupted flag on the thread } } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 059027f0b0..128be2568a 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -139,7 +139,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ t match { // Wrap InterruptedExceptions and, clear the flag and rethrow case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t), uid), self) - Thread.interrupted() + Thread.interrupted() // clear interrupted flag on the thread throw t case _ ⇒ parent.tell(Failed(t, uid), self) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 8977a7e3c0..8bb5f0ace2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -244,9 +244,8 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * already dequeued message to deadLetters. */ final def processAllSystemMessages() { - var failure: Throwable = null + var interruption: Throwable = null var nextMessage = systemDrain(null) - var shouldStop = false while ((nextMessage ne null) && !isClosed) { val msg = nextMessage nextMessage = nextMessage.next @@ -255,21 +254,12 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) try { actor systemInvoke msg } catch { + // we know here that systemInvoke ensures that only InterruptedException and "fatal" exceptions get rethrown case e: InterruptedException ⇒ - if (failure eq null) failure = e - actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage)) - if (nextMessage ne null) { - val putBack = SystemMessage.reverse(nextMessage) - systemAppend(putBack) - nextMessage = null - shouldStop = true - } - case NonFatal(e) ⇒ - if (failure eq null) failure = e - actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage)) + if (interruption eq null) interruption = e } // don’t ever execute normal message when system message present! - if ((nextMessage eq null) && !shouldStop && !isClosed) nextMessage = systemDrain(null) + if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null) } /* * if we closed the mailbox, we must dump the remaining system messages @@ -282,12 +272,17 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) msg.next = null try dlm.systemEnqueue(actor.self, msg) catch { + case e: InterruptedException ⇒ + if (interruption eq null) interruption = e case NonFatal(e) ⇒ actor.system.eventStream.publish( Error(e, actor.self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage)) } } - // if something happened while processing, fail this actor (most probable: exception in supervisorStrategy) - if (failure ne null) actor.handleInvokeFailure(Nil, failure, failure.getMessage) + // if we got an interrupted exception while handling system messages, then rethrow it + if (interruption ne null) { + Thread.interrupted() // clear interrupted flag on the thread + throw interruption + } } /** @@ -363,12 +358,6 @@ private[akka] trait SystemMessageQueue { */ def systemDrain(newContents: SystemMessage): SystemMessage - /** - * Append a system message (possibly a list) to the queue. Should only be called from within failure handling code - * that is already "owning" the system messages. - */ - def systemAppend(message: SystemMessage): Unit - def hasSystemMessages: Boolean } @@ -399,32 +388,6 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ } } - @tailrec - final def systemAppend(message: SystemMessage): Unit = { - if (Mailbox.debug) println("appending " + message) - val head = systemQueueGet - assert(head != NoMessage) - if (head ne null) { - var last = head - var curr = head.next - while (curr ne null) { - last = curr - curr = curr.next - } - last.next = message - } else { - /* - * this write is safely published by the compareAndSet contained within - * systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec - * guarantees that “head” uses the value obtained from systemQueueGet above. - * Hence, SystemMessage.next does not need to be volatile. - */ - if (!systemQueuePut(head, message)) { - systemAppend(message) - } - } - } - @tailrec final def systemDrain(newContents: SystemMessage): SystemMessage = { val head = systemQueueGet