make sure that InterruptExceptions are not swallowed, see #2963

also remove an old work-around in CallingThreadDispatcherModelSpec and
describe the rules for interrupting in the testing docs
This commit is contained in:
Roland 2013-01-27 12:56:35 +01:00
parent afb5740d74
commit b881963907
5 changed files with 75 additions and 51 deletions

View file

@ -415,9 +415,11 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
val a = newTestActor(dispatcher.id) val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo") val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar") val f2 = a ? Reply("bar")
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)).future } val f3 = a ? Interrupt
Thread.interrupted() // CallingThreadDispatcher may necessitate this
val f4 = a ? Reply("foo2") val f4 = a ? Reply("foo2")
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)).future } val f5 = a ? Interrupt
Thread.interrupted() // CallingThreadDispatcher may necessitate this
val f6 = a ? Reply("bar2") val f6 = a ? Reply("bar2")
val c = system.scheduler.scheduleOnce(2.seconds) { val c = system.scheduler.scheduleOnce(2.seconds) {

View file

@ -12,6 +12,7 @@ import scala.util.control.NonFatal
import akka.dispatch.NullMessage import akka.dispatch.NullMessage
import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell } import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell }
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import scala.util.control.Exception.Catcher
private[akka] trait Dispatch { this: ActorCell private[akka] trait Dispatch { this: ActorCell
@ -71,37 +72,25 @@ private[akka] trait Dispatch { this: ActorCell ⇒
this this
} }
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ private def handleException: Catcher[Unit] = {
final def suspend(): Unit = case e: InterruptedException
try dispatcher.systemDispatch(this, Suspend()) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "interrupted during message send"))
catch { Thread.currentThread.interrupt()
case e @ (_: InterruptedException | NonFatal(_)) case NonFatal(e)
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
} }
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(causedByFailure: Throwable): Unit = final def suspend(): Unit = try dispatcher.systemDispatch(this, Suspend()) catch handleException
try dispatcher.systemDispatch(this, Resume(causedByFailure))
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = final def resume(causedByFailure: Throwable): Unit = try dispatcher.systemDispatch(this, Resume(causedByFailure)) catch handleException
try dispatcher.systemDispatch(this, Recreate(cause))
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException
try dispatcher.systemDispatch(this, Terminate())
catch { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case e @ (_: InterruptedException | NonFatal(_)) final def stop(): Unit = try dispatcher.systemDispatch(this, Terminate()) catch handleException
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
def sendMessage(msg: Envelope): Unit = def sendMessage(msg: Envelope): Unit =
try { try {
@ -112,16 +101,8 @@ private[akka] trait Dispatch { this: ActorCell ⇒
s.deserialize(s.serialize(m).get, m.getClass).get s.deserialize(s.serialize(m).get, m.getClass).get
} }
dispatcher.dispatch(this, msg) dispatcher.dispatch(this, msg)
} catch { } catch handleException
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
override def sendSystemMessage(message: SystemMessage): Unit = override def sendSystemMessage(message: SystemMessage): Unit = try dispatcher.systemDispatch(this, message) catch handleException
try dispatcher.systemDispatch(this, message)
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
} }

View file

@ -546,6 +546,28 @@ production.
scenarios, but keep in mind that it has may give false negatives as well as scenarios, but keep in mind that it has may give false negatives as well as
false positives. false positives.
Thread Interruptions
--------------------
If the CallingThreadDispatcher sees that the current thread has its
``isInterrupted()`` flag set when message processing returns, it will throw an
:class:`InterruptedException` after finishing all its processing (i.e. all
messages which need processing as described above are processed before this
happens). As :meth:`tell` cannot throw exceptions due to its contract, this
exception will then be caught and logged, and the threads interrupted status
will be set again.
If during message processing an :class:`InterruptedException` is thrown then it
will be caught inside the CallingThreadDispatchers message handling loop, the
threads interrupted flag will be set and processing continues normally.
.. note::
The summary of these two paragraphs is that if the current thread is
interrupted while doing work under the CallingThreadDispatcher, then that
will result in the ``isInterrupted`` flag to be ``true`` when the message
send returns and no :class:`InterruptedException` will be thrown.
Benefits Benefits
-------- --------

View file

@ -614,6 +614,28 @@ production.
scenarios, but keep in mind that it has may give false negatives as well as scenarios, but keep in mind that it has may give false negatives as well as
false positives. false positives.
Thread Interruptions
--------------------
If the CallingThreadDispatcher sees that the current thread has its
``isInterrupted()`` flag set when message processing returns, it will throw an
:class:`InterruptedException` after finishing all its processing (i.e. all
messages which need processing as described above are processed before this
happens). As :meth:`tell` cannot throw exceptions due to its contract, this
exception will then be caught and logged, and the threads interrupted status
will be set again.
If during message processing an :class:`InterruptedException` is thrown then it
will be caught inside the CallingThreadDispatchers message handling loop, the
threads interrupted flag will be set and processing continues normally.
.. note::
The summary of these two paragraphs is that if the current thread is
interrupted while doing work under the CallingThreadDispatcher, then that
will result in the ``isInterrupted`` flag to be ``true`` when the message
send returns and no :class:`InterruptedException` will be thrown.
Benefits Benefits
-------- --------

View file

@ -13,7 +13,7 @@ import akka.pattern.pipe
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook }
import scala.throws import scala.util.control.Exception.Catcher
object RemoteActorRefProvider { object RemoteActorRefProvider {
private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
@ -339,21 +339,18 @@ private[akka] class RemoteActorRef private[akka] (
def isTerminated: Boolean = false def isTerminated: Boolean = false
def sendSystemMessage(message: SystemMessage): Unit = private def handleException: Catcher[Unit] = {
try remote.send(message, None, this) case e: InterruptedException
catch { remote.system.eventStream.publish(Error(e, path.toString, getClass, "interrupted during message send"))
case e @ (_: InterruptedException | NonFatal(_)) Thread.currentThread.interrupt()
remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) case NonFatal(e)
provider.deadLetters ! message remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send"))
} }
def sendSystemMessage(message: SystemMessage): Unit = try remote.send(message, None, this) catch handleException
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit =
try remote.send(message, Option(sender), this) try remote.send(message, Option(sender), this) catch handleException
catch {
case e @ (_: InterruptedException | NonFatal(_))
remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send"))
provider.deadLetters ! message
}
def start(): Unit = if (props.isDefined && deploy.isDefined) provider.useActorOnNode(path, props.get, deploy.get, getParent) def start(): Unit = if (props.isDefined && deploy.isDefined) provider.useActorOnNode(path, props.get, deploy.get, getParent)