diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 0c57b61f8c..4cd5a876d5 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -269,8 +269,8 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) val a = system.actorOf(Props[FooActor]) Await.result(a ? "pigdog", timeout.duration) must be("pigdog") - intercept[NotSerializableException] { - Await.result(a ? new AnyRef, timeout.duration) + EventFilter[NotSerializableException](occurrences = 1) intercept { + a ! (new AnyRef) } system stop a } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 5df5932b21..6a9bdb0801 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -217,7 +217,7 @@ case class DeathPactException private[akka] (dead: ActorRef) * avoid cascading interrupts to other threads than the originally interrupted one. */ @SerialVersionUID(1L) -class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace +class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) /** * This message is published to the EventStream whenever an Actor receives a message it doesn't understand diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 209ed70478..59e740af2e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -182,19 +182,19 @@ private[akka] trait Cell { */ def systemImpl: ActorSystemImpl /** - * Recursively suspend this actor and all its children. + * Recursively suspend this actor and all its children. Must not throw exceptions. */ def suspend(): Unit /** - * Recursively resume this actor and all its children. + * Recursively resume this actor and all its children. Must not throw exceptions. */ def resume(causedByFailure: Throwable): Unit /** - * Restart this actor (will recursively restart or stop all children). + * Restart this actor (will recursively restart or stop all children). Must not throw exceptions. */ def restart(cause: Throwable): Unit /** - * Recursively terminate this actor and all its children. + * Recursively terminate this actor and all its children. Must not throw exceptions. */ def stop(): Unit /** @@ -217,11 +217,13 @@ private[akka] trait Cell { /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. + * Must not throw exceptions. */ def tell(message: Any, sender: ActorRef): Unit /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. + * Must not throw exceptions. */ def sendSystemMessage(msg: SystemMessage): Unit /** @@ -259,6 +261,14 @@ private[akka] object ActorCell { final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty + + final def catchingSend(system: ActorSystem, source: String, clazz: Class[_], code: ⇒ Unit): Unit = { + try code + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, source, clazz, "swallowing exception during message send")) + } + } } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 422636c1ed..079651d216 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -417,7 +417,11 @@ class LocalActorRefProvider( /** * Overridable supervision strategy to be used by the “/user” guardian. */ - protected def rootGuardianStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy + protected def rootGuardianStrategy: SupervisorStrategy = OneForOneStrategy() { + case ex ⇒ + log.error(ex, "guardian failed, shutting down system") + SupervisorStrategy.Stop + } /** * Overridable supervision strategy to be used by the “/user” guardian. diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 7c38c0cff5..d3a1a66432 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -147,24 +147,16 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep * lock, double-tap (well, N-tap, really); concurrent modification is * still not possible because we’re the only thread accessing the queues. */ - var interrupted = false while (systemQueue.nonEmpty || queue.nonEmpty) { while (systemQueue.nonEmpty) { val msg = systemQueue.dequeue() - try cell.sendSystemMessage(msg) - catch { - case _: InterruptedException ⇒ interrupted = true - } + cell.sendSystemMessage(msg) } if (queue.nonEmpty) { val envelope = queue.dequeue() - try cell.tell(envelope.message, envelope.sender) - catch { - case _: InterruptedException ⇒ interrupted = true - } + cell.tell(envelope.message, envelope.sender) } } - if (interrupted) throw new InterruptedException } finally try self.swapCell(cell) finally try diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index 6565eb7fe7..435a0f97bb 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -63,20 +63,26 @@ private[akka] trait Dispatch { this: ActorCell ⇒ } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) + final def suspend(): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Suspend())) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure)) + final def resume(causedByFailure: Throwable): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Resume(causedByFailure))) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) + final def restart(cause: Throwable): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Recreate(cause))) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) + final def stop(): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Terminate())) def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + ActorCell.catchingSend(system, self.path.toString, clazz(actor), + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))) - override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) + override def sendSystemMessage(message: SystemMessage): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, message)) } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index be029881ed..a35fd81042 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -229,9 +229,11 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running - def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this) + def sendSystemMessage(message: SystemMessage): Unit = + ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, None, this)) - override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, Option(sender), this)) def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 6c0eb7c993..2987ede478 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -14,6 +14,7 @@ import java.lang.{ Iterable ⇒ JIterable } import scala.collection.JavaConverters import scala.concurrent.util.Duration import scala.reflect.ClassTag +import akka.actor.NoSerializationVerificationNeeded /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -39,7 +40,7 @@ object TestEvent { object Mute { def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq) } - case class Mute(filters: Seq[EventFilter]) extends TestEvent { + case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */ @@ -48,7 +49,7 @@ object TestEvent { object UnMute { def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) } - case class UnMute(filters: Seq[EventFilter]) extends TestEvent { + case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */