diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d10f7ba29c..67b6cdb18b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -560,6 +560,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) def systemDrain(newContents: SystemMessage): SystemMessage = null + def systemAppend(message: SystemMessage): Unit = {} def hasSystemMessages = false } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 450a7afc34..fc03911405 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -153,18 +153,21 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep val msg = systemQueue.dequeue() try cell.sendSystemMessage(msg) catch { - case _: InterruptedException ⇒ interrupted = true + case _: InterruptedException ⇒ interrupted = true; Thread.interrupted() } } if (queue.nonEmpty) { val envelope = queue.dequeue() try cell.tell(envelope.message, envelope.sender) catch { - case _: InterruptedException ⇒ interrupted = true + case _: InterruptedException ⇒ interrupted = true; Thread.interrupted() } } } - if (interrupted) throw new InterruptedException + if (interrupted) { + Thread.interrupted() // clear interrupted flag on the thread + throw new InterruptedException + } } finally try self.swapCell(cell) finally try diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 1d51777db1..059027f0b0 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -136,9 +136,12 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) // tell supervisor - t match { // Wrap InterruptedExceptions and rethrow - case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t), uid), self); throw t - case _ ⇒ parent.tell(Failed(t, uid), self) + t match { // Wrap InterruptedExceptions and, clear the flag and rethrow + case _: InterruptedException ⇒ + parent.tell(Failed(new ActorInterruptedException(t), uid), self) + Thread.interrupted() + throw t + case _ ⇒ parent.tell(Failed(t, uid), self) } } catch { case NonFatal(e) ⇒ diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index fd7057a963..8977a7e3c0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -228,8 +228,11 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next processAllSystemMessages() - if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) + if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) { processMailbox(left - 1, deadlineNs) + } else if (Thread.interrupted()) { + throw new InterruptedException("Interrupted while processing actor messages") + } } } @@ -243,6 +246,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) final def processAllSystemMessages() { var failure: Throwable = null var nextMessage = systemDrain(null) + var shouldStop = false while ((nextMessage ne null) && !isClosed) { val msg = nextMessage nextMessage = nextMessage.next @@ -251,12 +255,21 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) try { actor systemInvoke msg } catch { + case e: InterruptedException ⇒ + if (failure eq null) failure = e + actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage)) + if (nextMessage ne null) { + val putBack = SystemMessage.reverse(nextMessage) + systemAppend(putBack) + nextMessage = null + shouldStop = true + } case NonFatal(e) ⇒ if (failure eq null) failure = e actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system message " + msg + ": " + e.getMessage)) } // don’t ever execute normal message when system message present! - if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null) + if ((nextMessage eq null) && !shouldStop && !isClosed) nextMessage = systemDrain(null) } /* * if we closed the mailbox, we must dump the remaining system messages @@ -350,6 +363,12 @@ private[akka] trait SystemMessageQueue { */ def systemDrain(newContents: SystemMessage): SystemMessage + /** + * Append a system message (possibly a list) to the queue. Should only be called from within failure handling code + * that is already "owning" the system messages. + */ + def systemAppend(message: SystemMessage): Unit + def hasSystemMessages: Boolean } @@ -380,6 +399,32 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ } } + @tailrec + final def systemAppend(message: SystemMessage): Unit = { + if (Mailbox.debug) println("appending " + message) + val head = systemQueueGet + assert(head != NoMessage) + if (head ne null) { + var last = head + var curr = head.next + while (curr ne null) { + last = curr + curr = curr.next + } + last.next = message + } else { + /* + * this write is safely published by the compareAndSet contained within + * systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec + * guarantees that “head” uses the value obtained from systemQueueGet above. + * Hence, SystemMessage.next does not need to be volatile. + */ + if (!systemQueuePut(head, message)) { + systemAppend(message) + } + } + } + @tailrec final def systemDrain(newContents: SystemMessage): SystemMessage = { val head = systemQueueGet