fix CTD vs. RepointableRef by swallowing exceptions during send

- it was always intended that tell() (and sendSystemMessage()) shall not
  throw any exceptions
- this is implemented by swallowing in ActorCell
  (suspend/resume/restart/stop/!/sendSystemMessage) and in
  RemoteActorRef (!/sendSystemMessage)
- current implementation uses a normal method, which adds overhead but
  keeps the code in one place (ActorCell.catchingSend); this is a great
  opportunity for making use of macros
This commit is contained in:
Roland 2012-08-15 21:46:05 +02:00
parent f7ea9bf3dd
commit c1c05ef95e
8 changed files with 43 additions and 28 deletions

View file

@ -269,8 +269,8 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf)
val a = system.actorOf(Props[FooActor])
Await.result(a ? "pigdog", timeout.duration) must be("pigdog")
intercept[NotSerializableException] {
Await.result(a ? new AnyRef, timeout.duration)
EventFilter[NotSerializableException](occurrences = 1) intercept {
a ! (new AnyRef)
}
system stop a
}

View file

@ -217,7 +217,7 @@ case class DeathPactException private[akka] (dead: ActorRef)
* avoid cascading interrupts to other threads than the originally interrupted one.
*/
@SerialVersionUID(1L)
class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace
class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause)
/**
* This message is published to the EventStream whenever an Actor receives a message it doesn't understand

View file

@ -182,19 +182,19 @@ private[akka] trait Cell {
*/
def systemImpl: ActorSystemImpl
/**
* Recursively suspend this actor and all its children.
* Recursively suspend this actor and all its children. Must not throw exceptions.
*/
def suspend(): Unit
/**
* Recursively resume this actor and all its children.
* Recursively resume this actor and all its children. Must not throw exceptions.
*/
def resume(causedByFailure: Throwable): Unit
/**
* Restart this actor (will recursively restart or stop all children).
* Restart this actor (will recursively restart or stop all children). Must not throw exceptions.
*/
def restart(cause: Throwable): Unit
/**
* Recursively terminate this actor and all its children.
* Recursively terminate this actor and all its children. Must not throw exceptions.
*/
def stop(): Unit
/**
@ -217,11 +217,13 @@ private[akka] trait Cell {
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions.
*/
def tell(message: Any, sender: ActorRef): Unit
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions.
*/
def sendSystemMessage(msg: SystemMessage): Unit
/**
@ -259,6 +261,14 @@ private[akka] object ActorCell {
final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
final def catchingSend(system: ActorSystem, source: String, clazz: Class[_], code: Unit): Unit = {
try code
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, source, clazz, "swallowing exception during message send"))
}
}
}
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)

View file

@ -417,7 +417,11 @@ class LocalActorRefProvider(
/**
* Overridable supervision strategy to be used by the /user guardian.
*/
protected def rootGuardianStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
protected def rootGuardianStrategy: SupervisorStrategy = OneForOneStrategy() {
case ex
log.error(ex, "guardian failed, shutting down system")
SupervisorStrategy.Stop
}
/**
* Overridable supervision strategy to be used by the /user guardian.

View file

@ -147,24 +147,16 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
* lock, double-tap (well, N-tap, really); concurrent modification is
* still not possible because were the only thread accessing the queues.
*/
var interrupted = false
while (systemQueue.nonEmpty || queue.nonEmpty) {
while (systemQueue.nonEmpty) {
val msg = systemQueue.dequeue()
try cell.sendSystemMessage(msg)
catch {
case _: InterruptedException interrupted = true
}
cell.sendSystemMessage(msg)
}
if (queue.nonEmpty) {
val envelope = queue.dequeue()
try cell.tell(envelope.message, envelope.sender)
catch {
case _: InterruptedException interrupted = true
cell.tell(envelope.message, envelope.sender)
}
}
}
if (interrupted) throw new InterruptedException
} finally try
self.swapCell(cell)
finally try

View file

@ -63,20 +63,26 @@ private[akka] trait Dispatch { this: ActorCell ⇒
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
final def suspend(): Unit =
ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Suspend()))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure))
final def resume(causedByFailure: Throwable): Unit =
ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Resume(causedByFailure)))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
final def restart(cause: Throwable): Unit =
ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Recreate(cause)))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
final def stop(): Unit =
ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Terminate()))
def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
ActorCell.catchingSend(system, self.path.toString, clazz(actor),
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)))
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message)
override def sendSystemMessage(message: SystemMessage): Unit =
ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, message))
}

View file

@ -229,9 +229,11 @@ private[akka] class RemoteActorRef private[akka] (
def isTerminated: Boolean = !running
def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this)
def sendSystemMessage(message: SystemMessage): Unit =
ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, None, this))
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this)
override def !(message: Any)(implicit sender: ActorRef = null): Unit =
ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, Option(sender), this))
def suspend(): Unit = sendSystemMessage(Suspend())

View file

@ -14,6 +14,7 @@ import java.lang.{ Iterable ⇒ JIterable }
import scala.collection.JavaConverters
import scala.concurrent.util.Duration
import scala.reflect.ClassTag
import akka.actor.NoSerializationVerificationNeeded
/**
* Implementation helpers of the EventFilter facilities: send `Mute`
@ -39,7 +40,7 @@ object TestEvent {
object Mute {
def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq)
}
case class Mute(filters: Seq[EventFilter]) extends TestEvent {
case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
/**
* Java API
*/
@ -48,7 +49,7 @@ object TestEvent {
object UnMute {
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq)
}
case class UnMute(filters: Seq[EventFilter]) extends TestEvent {
case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
/**
* Java API
*/