another round of fixes due to suspend counting, see #2212

- always suspend/resume for Suspend/Resume/Recreate, no matter which
  state the actor is in, to keep the counter balanced
- preRestart failures are logged but otherwise ignored; there’s nothing
  else (apart from terminating the actor) which we could do at that
  point
- preRestart/postRestart exceptions have their own distinguishable
  subtype of ActorKilledException now
- fix some race conditions in tests to make them produce fewer false
  failures
- remove cruft from SupervisorStrategy and add methods which can
  actually be used to implement your own (with proper warning signs)
This commit is contained in:
Roland 2012-07-04 09:20:17 +02:00
parent 1148e20dbb
commit 78a39198f1
15 changed files with 243 additions and 137 deletions

View file

@ -227,7 +227,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
contextStackMustBeEmpty
}
filterException[java.lang.IllegalStateException] {
EventFilter[ActorInitializationException](occurrences = 1) intercept {
(intercept[java.lang.IllegalStateException] {
wrap(result
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result)))))))

View file

@ -145,7 +145,7 @@ object FSMTimingSpec {
}
def resume(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.resume()
case l: LocalActorRef l.resume(inResponseToFailure = false)
case _
}

View file

@ -215,6 +215,8 @@ object SupervisorHierarchySpec {
* TODO RK: also test exceptions during recreate
*
* TODO RK: also test recreate including terminating children
*
* TODO RK: also verify that preRestart is not called more than once per instance
*/
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] {

View file

@ -339,7 +339,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil))))
val dyingProps = Props(new Actor {
if (inits.incrementAndGet % 2 == 0) throw new IllegalStateException("Don't wanna!")
val init = inits.getAndIncrement()
if (init % 3 == 1) throw new IllegalStateException("Don't wanna!")
override def preRestart(cause: Throwable, msg: Option[Any]) {
if (init % 3 == 0) throw new IllegalStateException("Don't wanna!")
}
def receive = {
case Ping sender ! PongMessage
@ -352,8 +357,10 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
supervisor ! dyingProps
val dyingActor = expectMsgType[ActorRef]
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
filterEvents(
EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[PreRestartException]("Don't wanna!", occurrences = 1),
EventFilter[PostRestartException]("Don't wanna!", occurrences = 1)) {
intercept[RuntimeException] {
Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
}
@ -376,8 +383,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val child = context.watch(context.actorOf(Props(new Actor {
override def postRestart(reason: Throwable): Unit = testActor ! "child restarted"
def receive = {
case "die" throw new IllegalStateException("OHNOES")
case "test" sender ! "child green"
case l: TestLatch Await.ready(l, 5 seconds); throw new IllegalStateException("OHNOES")
case "test" sender ! "child green"
}
}), "child"))
@ -385,14 +392,18 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
def receive = {
case t @ Terminated(`child`) testActor ! "child terminated"
case "die" child ! "die"
case l: TestLatch child ! l
case "test" sender ! "green"
case "testchild" child forward "test"
}
}))
parent ! "die"
val latch = TestLatch()
parent ! latch
parent ! "testchild"
EventFilter[IllegalStateException]("OHNOES", occurrences = 2) intercept {
latch.countDown()
}
expectMsg("parent restarted")
expectMsg("child terminated")
parent ! "test"

View file

@ -335,7 +335,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
assertNoCountDown(done, 1000, "Should not process messages while suspended")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
a.resume
a.resume(inResponseToFailure = false)
assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)

View file

@ -61,7 +61,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
val msgs = (1 to 100).toList
for (m msgs) actor ! m
actor.resume //Signal the actor to start treating it's message backlog
actor.resume(inResponseToFailure = false) //Signal the actor to start treating it's message backlog
Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse
}

View file

@ -123,12 +123,36 @@ class InvalidActorNameException(message: String) extends AkkaException(message)
/**
* An ActorInitializationException is thrown when the the initialization logic for an Actor fails.
*/
class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable)
extends AkkaException(message, cause) /*with NoStackTrace*/ {
class ActorInitializationException private[akka] (val actor: ActorRef, message: String, cause: Throwable)
extends AkkaException(message, cause) {
def this(msg: String) = this(null, msg, null)
def this(actor: ActorRef, msg: String) = this(actor, msg, null)
}
/**
* A PreRestartException is thrown when the preRestart() method failed.
*
* @param actor is the actor whose preRestart() hook failed
* @param cause is the exception thrown by that actor within preRestart()
* @param origCause is the exception which caused the restart in the first place
* @param msg is the message which was optionally passed into preRestart()
*/
class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable, val msg: Option[Any])
extends ActorInitializationException(actor, "exception in preRestart(" + origCause + ", " + msg + ")", cause) {
}
/**
* A PostRestartException is thrown when constructor or postRestart() method
* fails during a restart attempt.
*
* @param actor is the actor whose constructor or postRestart() hook failed
* @param cause is the exception thrown by that actor within preRestart()
* @param origCause is the exception which caused the restart in the first place
*/
class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable)
extends ActorInitializationException(actor, "exception post restart (" + origCause + ")", cause) {
}
/**
* InvalidMessageException is thrown when an invalid message is sent to an Actor.
* Technically it's only "null" which is an InvalidMessageException but who knows,

View file

@ -12,7 +12,7 @@ import akka.event.Logging.{ Debug, Warning, Error }
import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException
import akka.event.Logging.{ LogEventException, LogEvent }
import collection.immutable.{ TreeSet, Stack, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
@ -362,6 +362,7 @@ private[akka] class ActorCell(
}
private def isNormal = childrenRefs match {
case TerminatingChildrenContainer(_, _, Termination | _: Recreation) false
case TerminatedChildrenContainer false
case _ true
}
@ -414,6 +415,17 @@ private[akka] class ActorCell(
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
/*
* have we told our supervisor that we Failed() and have not yet heard back?
* (actually: we might have heard back but not yet acted upon it, in case of
* a restart with dying children)
* might well be replaced by ref to a Cancellable in the future (see #2299)
*/
private var _failed = false
def currentlyFailed: Boolean = _failed
def setFailed(): Unit = _failed = true
def setNotFailed(): Unit = _failed = false
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell
final protected def randomName(): String = {
val n = nextNameSequence
@ -469,7 +481,7 @@ private[akka] class ActorCell(
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(): Unit = dispatcher.systemDispatch(this, Resume())
final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
@ -492,6 +504,23 @@ private[akka] class ActorCell(
a
}
/* =================
* T H E R U L E S
* =================
*
* Actors can be suspended for two reasons:
* - they fail
* - their supervisor gets suspended
*
* In particular they are not suspended multiple times because of cascading
* own failures, i.e. while currentlyFailed() they do not fail again. In case
* of a restart, failures in constructor/preStart count as new failures.
*/
private def suspendNonRecursive(): Unit = dispatcher suspend this
private def resumeNonRecursive(): Unit = dispatcher resume this
final def children: Iterable[ActorRef] = childrenRefs.children
/**
@ -542,7 +571,7 @@ private[akka] class ActorCell(
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case NonFatal(i: InstantiationException)
throw new ActorInitializationException(self,
@ -554,41 +583,46 @@ private[akka] class ActorCell(
}
}
def recreate(cause: Throwable): Unit = if (isNormal) {
try {
def recreate(cause: Throwable): Unit =
if (isNormal) {
val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
if (failedActor ne null) {
val c = currentMessage //One read only plz
try {
if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
// if the actor fails in preRestart, we can do nothing but log it: its best-effort
if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage))
} catch {
case NonFatal(e)
val ex = new PreRestartException(self, e, cause, Option(currentMessage))
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
} finally {
clearActorFields(failedActor)
}
}
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
childrenRefs match {
case ct: TerminatingChildrenContainer
childrenRefs = ct.copy(reason = Recreation(cause))
dispatcher suspend this
case _
doRecreate(cause, failedActor)
}
} catch {
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e match {
case i: InstantiationException i.getCause
case other other
})
} else {
// need to keep that suspend counter balanced
doResume(inResponseToFailure = false)
}
}
def suspend(): Unit = if (isNormal) {
dispatcher suspend this
def doSuspend(): Unit = {
// done always to keep that suspend counter balanced
suspendNonRecursive()
children foreach (_.asInstanceOf[InternalActorRef].suspend())
}
def resume(): Unit = if (isNormal) {
dispatcher resume this
children foreach (_.asInstanceOf[InternalActorRef].resume())
def doResume(inResponseToFailure: Boolean): Unit = {
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (inResponseToFailure) setNotFailed()
children foreach (_.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false))
}
def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
@ -598,12 +632,12 @@ private[akka] class ActorCell(
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
@ -614,12 +648,12 @@ private[akka] class ActorCell(
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
unwatch(watchee)
} else {
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
@ -634,15 +668,15 @@ private[akka] class ActorCell(
case ct: TerminatingChildrenContainer
childrenRefs = ct.copy(reason = Termination)
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
suspendNonRecursive()
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
case _ doTerminate()
}
}
def supervise(child: ActorRef): Unit = if (!isTerminating) {
if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
try {
@ -651,11 +685,12 @@ private[akka] class ActorCell(
case Recreate(cause) recreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() suspend()
case Resume() resume()
case Suspend() doSuspend()
case Resume(inRespToFailure) doResume(inRespToFailure)
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // to shut up the exhaustiveness warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
@ -677,20 +712,30 @@ private[akka] class ActorCell(
checkReceiveTimeout // Reschedule receive timeout
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = try {
dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children)
// now we may just have suspended the poor guy which made us fail by way of Escalate, so adjust the score
currentMessage match {
case Envelope(Failed(`t`), child) child.asInstanceOf[InternalActorRef].resume()
case _
final def handleInvokeFailure(t: Throwable, message: String): Unit = {
try {
dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t))
} catch {
case NonFatal(_) // no sense logging if logging does not work
}
} finally {
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
// prevent any further messages to be processed until the actor has been restarted
if (!currentlyFailed) {
// suspend self; these two must happen atomically
try suspendNonRecursive()
finally setFailed()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(`t`), child) Set(child)
case _ Set.empty
}
childrenRefs.stats collect {
case ChildRestartStats(child, _, _) if !(skip contains child) child
} foreach (_.asInstanceOf[InternalActorRef].suspend())
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
}
}
}
@ -718,7 +763,7 @@ private[akka] class ActorCell(
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause) handleFailure(sender, cause)
@ -738,73 +783,69 @@ private[akka] class ActorCell(
private def doTerminate() {
val a = actor
try {
try {
if (a ne null) a.postStop()
} finally {
dispatcher.detach(this)
}
} finally {
try {
parent.sendSystemMessage(ChildTerminated(self))
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watchedBy = emptyActorRefSet
}
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))
finally try
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watching = emptyActorRefSet
}
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally {
behaviorStack = behaviorStackPlaceHolder
clearActorFields(a)
actor = null
}
} finally watchedBy = emptyActorRefSet
}
finally try
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watching = emptyActorRefSet
}
finally {
if (system.settings.DebugLifecycle)
publish(Debug(self.path.toString, clazz(a), "stopped"))
behaviorStack = behaviorStackPlaceHolder
clearActorFields(a)
actor = null
}
}
private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try {
// after all killed children have terminated, recreate the rest, then go on to start the new instance
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
// must happen atomically
try resumeNonRecursive()
finally setNotFailed()
val survivors = children
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
dispatcher.resume(this)
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e) try {
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
actor.supervisorStrategy.handleSupervisorFailing(self, children) // FIXME Should this be called on actor or failedActor?
clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called.
} finally {
parent.tell(Failed(new ActorInitializationException(self, "exception during re-creation", e)), self)
}
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage)
}
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match {
case Some(stats) if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause
case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
case None publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final def handleChildTerminated(child: ActorRef): Unit = try {
@ -823,13 +864,7 @@ private[akka] class ActorCell(
actor.supervisorStrategy.handleChildTerminated(this, child, children)
}
} catch {
case NonFatal(e)
try {
dispatcher suspend this
actor.supervisorStrategy.handleSupervisorFailing(self, children)
} finally {
parent.tell(Failed(e), self)
}
case NonFatal(e) handleInvokeFailure(e, "handleChildTerminated failed")
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
@ -881,6 +916,9 @@ private[akka] class ActorCell(
}
}
// logging is not the main purpose, and if it fails theres nothing we can do
private final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }
private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
}

View file

@ -177,7 +177,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
/*
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
*/
def resume(): Unit
def resume(inResponseToFailure: Boolean): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
def stop(): Unit
@ -267,7 +267,7 @@ private[akka] class LocalActorRef private[akka] (
/**
* Resumes a suspended actor.
*/
override def resume(): Unit = actorCell.resume()
override def resume(inResponseToFailure: Boolean): Unit = actorCell.resume(inResponseToFailure)
/**
* Shuts down the actor and its message queue
@ -367,7 +367,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
override def suspend(): Unit = ()
override def resume(): Unit = ()
override def resume(inResponseToFailure: Boolean): Unit = ()
override def stop(): Unit = ()
override def isTerminated = false

View file

@ -196,19 +196,30 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
}
/**
* An Akka SupervisorStrategy is the policy to apply for crashing children
* An Akka SupervisorStrategy is the policy to apply for crashing children.
*
* <b>IMPORTANT:</b>
*
* You should not normally need to create new subclasses, instead use the
* existing [[akka.actor.OneForOneStrategy]] or [[akka.actor.AllForOneStrategy]],
* but if you do, please read the docs of the methods below carefully, as
* incorrect implementations may lead to blocked actor systems (i.e.
* permanently suspended actors).
*/
abstract class SupervisorStrategy {
import SupervisorStrategy._
/**
* Returns the Decider that is associated with this SupervisorStrategy
* Returns the Decider that is associated with this SupervisorStrategy.
* The Decider is invoked by the default implementation of `handleFailure`
* to obtain the Directive to be applied.
*/
def decider: Decider
/**
* This method is called after the child has been removed from the set of children.
* It does not need to do anything special.
*/
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit
@ -217,27 +228,46 @@ abstract class SupervisorStrategy {
*/
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit
//FIXME docs
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit =
if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].suspend())
//FIXME docs
def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit =
if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].restart(cause))
/**
* Returns whether it processed the failure or not
* This is the main entry point: in case of a childs failure, this method
* must try to handle the failure by resuming, restarting or stopping the
* child (and returning `true`), or it returns `false` to escalate the
* failure, which will lead to this actor re-throwing the exception which
* caused the failure. The exception will not be wrapped.
*/
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10
directive match {
case Resume child.asInstanceOf[InternalActorRef].resume(); true
case Resume resumeChild(child); true
case Restart processFailure(context, true, child, cause, stats, children); true
case Stop processFailure(context, false, child, cause, stats, children); true
case Escalate false
}
}
/**
* Resume the previously failed child: <b>do never apply this to a child which
* is not the currently failing child</b>. Suspend/resume needs to be done in
* matching pairs, otherwise actors will wake up too soon or never at all.
*/
final def resumeChild(child: ActorRef): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = true)
/**
* Restart the given child, possibly suspending it first.
*
* <b>IMPORTANT:</b>
*
* If the child is the currently failing one, it will already have been
* suspended, hence `suspendFirst` is false. If the child is not the
* currently failing one, then it did not request this treatment and is
* therefore not prepared to be resumed without prior suspend.
*/
final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = {
val c = child.asInstanceOf[InternalActorRef]
if (suspendFirst) c.suspend()
c.restart(cause)
}
}
/**
@ -276,7 +306,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) {
if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause))
children foreach (crs restartChild(crs.child, cause, suspendFirst = (crs.child != child)))
else
for (c children) context.stop(c.child)
}
@ -318,7 +348,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (restart && stats.requestRestartPermission(retriesWindow))
child.asInstanceOf[InternalActorRef].restart(cause)
restartChild(child, cause, suspendFirst = false)
else
context.stop(child) //TODO optimization to drop child here already?
}

View file

@ -86,7 +86,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac
/**
* INTERNAL API
*/
private[akka] case class Resume() extends SystemMessage // sent to self from ActorCell.resume
private[akka] case class Resume(inResponseToFailure: Boolean) extends SystemMessage // sent to self from ActorCell.resume
/**
* INTERNAL API
*/

View file

@ -30,7 +30,7 @@ private[akka] object Mailbox {
final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
// shifted by 2: the suspend count!
final val shouldScheduleMask = 3
final val shouldProcessMask = ~2
final val shouldNotProcessMask = ~2
final val suspendMask = ~3
final val suspendUnit = 4
@ -82,7 +82,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def shouldProcessMessage: Boolean = (status & shouldProcessMask) == 0
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0
@inline
final def isSuspended: Boolean = (status & suspendMask) != 0

View file

@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/**
* Resumes processing of `send` actions for the agent.
*/
def resume(): Unit = updater.resume()
def resume(): Unit = updater.resume(inResponseToFailure = false)
/**
* Closes the agents and makes it eligible for garbage collection.

View file

@ -236,7 +236,7 @@ private[akka] class RemoteActorRef private[akka] (
def suspend(): Unit = sendSystemMessage(Suspend())
def resume(): Unit = sendSystemMessage(Resume())
def resume(inResponseToFailure: Boolean): Unit = sendSystemMessage(Resume(inResponseToFailure))
def stop(): Unit = sendSystemMessage(Terminate())

View file

@ -151,7 +151,7 @@ class CallingThreadDispatcher(
override def suspend(actor: ActorCell) {
actor.mailbox match {
case m: CallingThreadMailbox m.suspendSwitch.switchOn
case m: CallingThreadMailbox m.suspendSwitch.switchOn; m.becomeSuspended()
case m m.systemEnqueue(actor.self, Suspend())
}
}
@ -163,11 +163,12 @@ class CallingThreadDispatcher(
val wasActive = queue.isActive
val switched = mbox.suspendSwitch.switchOff {
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue)
mbox.becomeOpen()
}
if (switched && !wasActive) {
runQueue(mbox, queue)
}
case m m.systemEnqueue(actor.self, Resume())
case m m.systemEnqueue(actor.self, Resume(false))
}
}