diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index d5a2f9a027..8d12f58a46 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -210,7 +210,10 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit collectCancellable(system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg)) Await.ready(ticks, 3 seconds) - (System.nanoTime() - startTime).nanos.toMillis must be(1800L plusOrMinus 199) + // LARS is a bit more aggressive in scheduling recurring tasks at the right + // frequency and may execute them a little earlier; the actual expected timing + // is 1599ms on a fast machine or 1699ms on a loaded one (plus some room for jenkins) + (System.nanoTime() - startTime).nanos.toMillis must be(1750L plusOrMinus 250) } "adjust for scheduler inaccuracy" taggedAs TimingTest in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 443d6f5981..75e4718718 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -415,9 +415,11 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val a = newTestActor(dispatcher.id) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") - val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(new ActorInterruptedException(ie)).future } + val f3 = a ? Interrupt + Thread.interrupted() // CallingThreadDispatcher may necessitate this val f4 = a ? Reply("foo2") - val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ Promise.failed(new ActorInterruptedException(ie)).future } + val f5 = a ? Interrupt + Thread.interrupted() // CallingThreadDispatcher may necessitate this val f6 = a ? Reply("bar2") val c = system.scheduler.scheduleOnce(2.seconds) { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 9bce41fc4f..910ca9bf4d 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -12,6 +12,7 @@ import scala.util.control.NonFatal import akka.dispatch.NullMessage import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell } import akka.serialization.SerializationExtension +import scala.util.control.Exception.Catcher private[akka] trait Dispatch { this: ActorCell ⇒ @@ -71,37 +72,25 @@ private[akka] trait Dispatch { this: ActorCell ⇒ this } - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def suspend(): Unit = - try dispatcher.systemDispatch(this, Suspend()) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) - } + private def handleException: Catcher[Unit] = { + case e: InterruptedException ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "interrupted during message send")) + Thread.currentThread.interrupt() + case NonFatal(e) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(causedByFailure: Throwable): Unit = - try dispatcher.systemDispatch(this, Resume(causedByFailure)) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) - } + final def suspend(): Unit = try dispatcher.systemDispatch(this, Suspend()) catch handleException // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def restart(cause: Throwable): Unit = - try dispatcher.systemDispatch(this, Recreate(cause)) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) - } + final def resume(causedByFailure: Throwable): Unit = try dispatcher.systemDispatch(this, Resume(causedByFailure)) catch handleException // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def stop(): Unit = - try dispatcher.systemDispatch(this, Terminate()) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) - } + final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def stop(): Unit = try dispatcher.systemDispatch(this, Terminate()) catch handleException def sendMessage(msg: Envelope): Unit = try { @@ -112,16 +101,8 @@ private[akka] trait Dispatch { this: ActorCell ⇒ s.deserialize(s.serialize(m).get, m.getClass).get } dispatcher.dispatch(this, msg) - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) - } + } catch handleException - override def sendSystemMessage(message: SystemMessage): Unit = - try dispatcher.systemDispatch(this, message) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) - } + override def sendSystemMessage(message: SystemMessage): Unit = try dispatcher.systemDispatch(this, message) catch handleException } diff --git a/akka-docs/rst/java/testing.rst b/akka-docs/rst/java/testing.rst index 07f0cbf60a..cfb5469c6b 100644 --- a/akka-docs/rst/java/testing.rst +++ b/akka-docs/rst/java/testing.rst @@ -546,6 +546,28 @@ production. scenarios, but keep in mind that it has may give false negatives as well as false positives. +Thread Interruptions +-------------------- + +If the CallingThreadDispatcher sees that the current thread has its +``isInterrupted()`` flag set when message processing returns, it will throw an +:class:`InterruptedException` after finishing all its processing (i.e. all +messages which need processing as described above are processed before this +happens). As :meth:`tell` cannot throw exceptions due to its contract, this +exception will then be caught and logged, and the thread’s interrupted status +will be set again. + +If during message processing an :class:`InterruptedException` is thrown then it +will be caught inside the CallingThreadDispatcher’s message handling loop, the +thread’s interrupted flag will be set and processing continues normally. + +.. note:: + + The summary of these two paragraphs is that if the current thread is + interrupted while doing work under the CallingThreadDispatcher, then that + will result in the ``isInterrupted`` flag to be ``true`` when the message + send returns and no :class:`InterruptedException` will be thrown. + Benefits -------- diff --git a/akka-docs/rst/scala/testing.rst b/akka-docs/rst/scala/testing.rst index ad9bf330f7..8841280e79 100644 --- a/akka-docs/rst/scala/testing.rst +++ b/akka-docs/rst/scala/testing.rst @@ -614,6 +614,28 @@ production. scenarios, but keep in mind that it has may give false negatives as well as false positives. +Thread Interruptions +-------------------- + +If the CallingThreadDispatcher sees that the current thread has its +``isInterrupted()`` flag set when message processing returns, it will throw an +:class:`InterruptedException` after finishing all its processing (i.e. all +messages which need processing as described above are processed before this +happens). As :meth:`tell` cannot throw exceptions due to its contract, this +exception will then be caught and logged, and the thread’s interrupted status +will be set again. + +If during message processing an :class:`InterruptedException` is thrown then it +will be caught inside the CallingThreadDispatcher’s message handling loop, the +thread’s interrupted flag will be set and processing continues normally. + +.. note:: + + The summary of these two paragraphs is that if the current thread is + interrupted while doing work under the CallingThreadDispatcher, then that + will result in the ``isInterrupted`` flag to be ``true`` when the message + send returns and no :class:`InterruptedException` will be thrown. + Benefits -------- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 1f32d88140..a64b9b3a71 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -13,7 +13,7 @@ import akka.pattern.pipe import scala.concurrent.Future import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } -import scala.throws +import scala.util.control.Exception.Catcher object RemoteActorRefProvider { private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) @@ -339,21 +339,18 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = false - def sendSystemMessage(message: SystemMessage): Unit = - try remote.send(message, None, this) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) - provider.deadLetters ! message - } + private def handleException: Catcher[Unit] = { + case e: InterruptedException ⇒ + remote.system.eventStream.publish(Error(e, path.toString, getClass, "interrupted during message send")) + Thread.currentThread.interrupt() + case NonFatal(e) ⇒ + remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send")) + } + + def sendSystemMessage(message: SystemMessage): Unit = try remote.send(message, None, this) catch handleException override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = - try remote.send(message, Option(sender), this) - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) - provider.deadLetters ! message - } + try remote.send(message, Option(sender), this) catch handleException def start(): Unit = if (props.isDefined && deploy.isDefined) provider.useActorOnNode(path, props.get, deploy.get, getParent)