diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index bec066d97a..3d3fecdf01 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -227,7 +227,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { contextStackMustBeEmpty } - filterException[java.lang.IllegalStateException] { + EventFilter[ActorInitializationException](occurrences = 1) intercept { (intercept[java.lang.IllegalStateException] { wrap(result ⇒ actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result))))))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index df47c801bb..79c4e33a9d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -145,7 +145,7 @@ object FSMTimingSpec { } def resume(actorRef: ActorRef): Unit = actorRef match { - case l: LocalActorRef ⇒ l.resume() + case l: LocalActorRef ⇒ l.resume(inResponseToFailure = false) case _ ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 33387443bb..42eccf2a81 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -215,6 +215,8 @@ object SupervisorHierarchySpec { * TODO RK: also test exceptions during recreate * * TODO RK: also test recreate including terminating children + * + * TODO RK: also verify that preRestart is not called more than once per instance */ class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index ba0314714e..521f8d9f00 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -339,7 +339,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil)))) val dyingProps = Props(new Actor { - if (inits.incrementAndGet % 2 == 0) throw new IllegalStateException("Don't wanna!") + val init = inits.getAndIncrement() + if (init % 3 == 1) throw new IllegalStateException("Don't wanna!") + + override def preRestart(cause: Throwable, msg: Option[Any]) { + if (init % 3 == 0) throw new IllegalStateException("Don't wanna!") + } def receive = { case Ping ⇒ sender ! PongMessage @@ -352,8 +357,10 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende supervisor ! dyingProps val dyingActor = expectMsgType[ActorRef] - filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), - EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { + filterEvents( + EventFilter[RuntimeException]("Expected", occurrences = 1), + EventFilter[PreRestartException]("Don't wanna!", occurrences = 1), + EventFilter[PostRestartException]("Don't wanna!", occurrences = 1)) { intercept[RuntimeException] { Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout) } @@ -376,8 +383,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val child = context.watch(context.actorOf(Props(new Actor { override def postRestart(reason: Throwable): Unit = testActor ! "child restarted" def receive = { - case "die" ⇒ throw new IllegalStateException("OHNOES") - case "test" ⇒ sender ! "child green" + case l: TestLatch ⇒ Await.ready(l, 5 seconds); throw new IllegalStateException("OHNOES") + case "test" ⇒ sender ! "child green" } }), "child")) @@ -385,14 +392,18 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende def receive = { case t @ Terminated(`child`) ⇒ testActor ! "child terminated" - case "die" ⇒ child ! "die" + case l: TestLatch ⇒ child ! l case "test" ⇒ sender ! "green" case "testchild" ⇒ child forward "test" } })) - parent ! "die" + val latch = TestLatch() + parent ! latch parent ! "testchild" + EventFilter[IllegalStateException]("OHNOES", occurrences = 2) intercept { + latch.countDown() + } expectMsg("parent restarted") expectMsg("child terminated") parent ! "test" diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 4d83c85b82..0a85b0158f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -335,7 +335,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa assertNoCountDown(done, 1000, "Should not process messages while suspended") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) - a.resume + a.resume(inResponseToFailure = false) assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index a9855fef7d..5d052330dc 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -61,7 +61,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit val msgs = (1 to 100).toList for (m ← msgs) actor ! m - actor.resume //Signal the actor to start treating it's message backlog + actor.resume(inResponseToFailure = false) //Signal the actor to start treating it's message backlog Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2721ccffa0..327743d5d4 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -123,12 +123,36 @@ class InvalidActorNameException(message: String) extends AkkaException(message) /** * An ActorInitializationException is thrown when the the initialization logic for an Actor fails. */ -class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable) - extends AkkaException(message, cause) /*with NoStackTrace*/ { +class ActorInitializationException private[akka] (val actor: ActorRef, message: String, cause: Throwable) + extends AkkaException(message, cause) { def this(msg: String) = this(null, msg, null) def this(actor: ActorRef, msg: String) = this(actor, msg, null) } +/** + * A PreRestartException is thrown when the preRestart() method failed. + * + * @param actor is the actor whose preRestart() hook failed + * @param cause is the exception thrown by that actor within preRestart() + * @param origCause is the exception which caused the restart in the first place + * @param msg is the message which was optionally passed into preRestart() + */ +class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable, val msg: Option[Any]) + extends ActorInitializationException(actor, "exception in preRestart(" + origCause + ", " + msg + ")", cause) { +} + +/** + * A PostRestartException is thrown when constructor or postRestart() method + * fails during a restart attempt. + * + * @param actor is the actor whose constructor or postRestart() hook failed + * @param cause is the exception thrown by that actor within preRestart() + * @param origCause is the exception which caused the restart in the first place + */ +class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable) + extends ActorInitializationException(actor, "exception post restart (" + origCause + ")", cause) { +} + /** * InvalidMessageException is thrown when an invalid message is sent to an Actor. * Technically it's only "null" which is an InvalidMessageException but who knows, diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 7931e5428e..928944ca44 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -12,7 +12,7 @@ import akka.event.Logging.{ Debug, Warning, Error } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension -import akka.event.Logging.LogEventException +import akka.event.Logging.{ LogEventException, LogEvent } import collection.immutable.{ TreeSet, Stack, TreeMap } import akka.util.{ Unsafe, Duration, Helpers, NonFatal } @@ -362,6 +362,7 @@ private[akka] class ActorCell( } private def isNormal = childrenRefs match { case TerminatingChildrenContainer(_, _, Termination | _: Recreation) ⇒ false + case TerminatedChildrenContainer ⇒ false case _ ⇒ true } @@ -414,6 +415,17 @@ private[akka] class ActorCell( var watching: Set[ActorRef] = emptyActorRefSet var watchedBy: Set[ActorRef] = emptyActorRefSet + /* + * have we told our supervisor that we Failed() and have not yet heard back? + * (actually: we might have heard back but not yet acted upon it, in case of + * a restart with dying children) + * might well be replaced by ref to a Cancellable in the future (see #2299) + */ + private var _failed = false + def currentlyFailed: Boolean = _failed + def setFailed(): Unit = _failed = true + def setNotFailed(): Unit = _failed = false + //Not thread safe, so should only be used inside the actor that inhabits this ActorCell final protected def randomName(): String = { val n = nextNameSequence @@ -469,7 +481,7 @@ private[akka] class ActorCell( final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(): Unit = dispatcher.systemDispatch(this, Resume()) + final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) @@ -492,6 +504,23 @@ private[akka] class ActorCell( a } + /* ================= + * T H E R U L E S + * ================= + * + * Actors can be suspended for two reasons: + * - they fail + * - their supervisor gets suspended + * + * In particular they are not suspended multiple times because of cascading + * own failures, i.e. while currentlyFailed() they do not fail again. In case + * of a restart, failures in constructor/preStart count as new failures. + */ + + private def suspendNonRecursive(): Unit = dispatcher suspend this + + private def resumeNonRecursive(): Unit = dispatcher resume this + final def children: Iterable[ActorRef] = childrenRefs.children /** @@ -542,7 +571,7 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { case NonFatal(i: InstantiationException) ⇒ throw new ActorInitializationException(self, @@ -554,41 +583,46 @@ private[akka] class ActorCell( } } - def recreate(cause: Throwable): Unit = if (isNormal) { - try { + def recreate(cause: Throwable): Unit = + if (isNormal) { val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting")) if (failedActor ne null) { - val c = currentMessage //One read only plz try { - if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) + // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort + if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage)) + } catch { + case NonFatal(e) ⇒ + val ex = new PreRestartException(self, e, cause, Option(currentMessage)) + publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) } finally { clearActorFields(failedActor) } } + assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) childrenRefs match { case ct: TerminatingChildrenContainer ⇒ childrenRefs = ct.copy(reason = Recreation(cause)) - dispatcher suspend this case _ ⇒ doRecreate(cause, failedActor) } - } catch { - case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e match { - case i: InstantiationException ⇒ i.getCause - case other ⇒ other - }) + } else { + // need to keep that suspend counter balanced + doResume(inResponseToFailure = false) } - } - def suspend(): Unit = if (isNormal) { - dispatcher suspend this + def doSuspend(): Unit = { + // done always to keep that suspend counter balanced + suspendNonRecursive() children foreach (_.asInstanceOf[InternalActorRef].suspend()) } - def resume(): Unit = if (isNormal) { - dispatcher resume this - children foreach (_.asInstanceOf[InternalActorRef].resume()) + def doResume(inResponseToFailure: Boolean): Unit = { + // done always to keep that suspend counter balanced + // must happen “atomically” + try resumeNonRecursive() + finally if (inResponseToFailure) setNotFailed() + children foreach (_.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false)) } def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { @@ -598,12 +632,12 @@ private[akka] class ActorCell( if (watcheeSelf && !watcherSelf) { if (!watchedBy.contains(watcher)) { watchedBy += watcher - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) } } else if (!watcheeSelf && watcherSelf) { watch(watchee) } else { - system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) + publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) } } @@ -614,12 +648,12 @@ private[akka] class ActorCell( if (watcheeSelf && !watcherSelf) { if (watchedBy.contains(watcher)) { watchedBy -= watcher - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) } } else if (!watcheeSelf && watcherSelf) { unwatch(watchee) } else { - system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) + publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) } } @@ -634,15 +668,15 @@ private[akka] class ActorCell( case ct: TerminatingChildrenContainer ⇒ childrenRefs = ct.copy(reason = Termination) // do not process normal messages while waiting for all children to terminate - dispatcher suspend this - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) + suspendNonRecursive() + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping")) case _ ⇒ doTerminate() } } def supervise(child: ActorRef): Unit = if (!isTerminating) { if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } try { @@ -651,11 +685,12 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() + case Suspend() ⇒ doSuspend() + case Resume(inRespToFailure) ⇒ doResume(inRespToFailure) case Terminate() ⇒ terminate() case Supervise(child) ⇒ supervise(child) case ChildTerminated(child) ⇒ handleChildTerminated(child) + case NoMessage ⇒ // to shut up the exhaustiveness warning } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) @@ -677,20 +712,30 @@ private[akka] class ActorCell( checkReceiveTimeout // Reschedule receive timeout } - final def handleInvokeFailure(t: Throwable, message: String): Unit = try { - dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - // now we may just have suspended the poor guy which made us fail by way of Escalate, so adjust the score - currentMessage match { - case Envelope(Failed(`t`), child) ⇒ child.asInstanceOf[InternalActorRef].resume() - case _ ⇒ + final def handleInvokeFailure(t: Throwable, message: String): Unit = { + try { + dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t)) + } catch { + case NonFatal(_) ⇒ // no sense logging if logging does not work } - } finally { - t match { // Wrap InterruptedExceptions and rethrow - case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t - case _ ⇒ parent.tell(Failed(t), self) + // prevent any further messages to be processed until the actor has been restarted + if (!currentlyFailed) { + // suspend self; these two must happen “atomically” + try suspendNonRecursive() + finally setFailed() + // suspend children + val skip: Set[ActorRef] = currentMessage match { + case Envelope(Failed(`t`), child) ⇒ Set(child) + case _ ⇒ Set.empty + } + childrenRefs.stats collect { + case ChildRestartStats(child, _, _) if !(skip contains child) ⇒ child + } foreach (_.asInstanceOf[InternalActorRef].suspend()) + // tell supervisor + t match { // Wrap InterruptedExceptions and rethrow + case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t + case _ ⇒ parent.tell(Failed(t), self) + } } } @@ -718,7 +763,7 @@ private[akka] class ActorCell( def autoReceiveMessage(msg: Envelope): Unit = { if (system.settings.DebugAutoReceive) - system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) + publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { case Failed(cause) ⇒ handleFailure(sender, cause) @@ -738,73 +783,69 @@ private[akka] class ActorCell( private def doTerminate() { val a = actor - try { - try { - if (a ne null) a.postStop() - } finally { - dispatcher.detach(this) - } - } finally { - try { - parent.sendSystemMessage(ChildTerminated(self)) - - if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) - try { - watchedBy foreach { - watcher ⇒ - try watcher.tell(terminated, self) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } - } finally watchedBy = emptyActorRefSet - } - - if (!watching.isEmpty) { - try { - watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + try if (a ne null) a.postStop() + finally try dispatcher.detach(this) + finally try parent.sendSystemMessage(ChildTerminated(self)) + finally try + if (!watchedBy.isEmpty) { + val terminated = Terminated(self)(existenceConfirmed = true) + try { + watchedBy foreach { + watcher ⇒ + try watcher.tell(terminated, self) catch { + case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(a), "deathwatch")) } - } - } finally watching = emptyActorRefSet - } - if (system.settings.DebugLifecycle) - system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped")) - } finally { - behaviorStack = behaviorStackPlaceHolder - clearActorFields(a) - actor = null + } + } finally watchedBy = emptyActorRefSet } + finally try + if (!watching.isEmpty) { + try { + watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { + case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + } + } + } finally watching = emptyActorRefSet + } + finally { + if (system.settings.DebugLifecycle) + publish(Debug(self.path.toString, clazz(a), "stopped")) + behaviorStack = behaviorStackPlaceHolder + clearActorFields(a) + actor = null } } private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try { - // after all killed children have terminated, recreate the rest, then go on to start the new instance - actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) + // must happen “atomically” + try resumeNonRecursive() + finally setNotFailed() + + val survivors = children + val freshActor = newActor() actor = freshActor // this must happen before postRestart has a chance to fail if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) - dispatcher.resume(this) + // only after parent is up and running again do restart the children which were not stopped + survivors foreach (child ⇒ + try child.asInstanceOf[InternalActorRef].restart(cause) + catch { + case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) + }) } catch { - case NonFatal(e) ⇒ try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - actor.supervisorStrategy.handleSupervisorFailing(self, children) // FIXME Should this be called on actor or failedActor? - clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called. - } finally { - parent.tell(Failed(new ActorInitializationException(self, "exception during re-creation", e)), self) - } + case NonFatal(e) ⇒ + clearActorFields(actor) // in order to prevent preRestart() from happening again + handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage) } final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match { case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause - case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + case None ⇒ publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = try { @@ -823,13 +864,7 @@ private[akka] class ActorCell( actor.supervisorStrategy.handleChildTerminated(this, child, children) } } catch { - case NonFatal(e) ⇒ - try { - dispatcher suspend this - actor.supervisorStrategy.handleSupervisorFailing(self, children) - } finally { - parent.tell(Failed(e), self) - } + case NonFatal(e) ⇒ handleInvokeFailure(e, "handleChildTerminated failed") } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ @@ -881,6 +916,9 @@ private[akka] class ActorCell( } } + // logging is not the main purpose, and if it fails there’s nothing we can do + private final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ } + private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 0620a73a28..5a6e101ea8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -177,7 +177,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe /* * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). */ - def resume(): Unit + def resume(inResponseToFailure: Boolean): Unit def suspend(): Unit def restart(cause: Throwable): Unit def stop(): Unit @@ -267,7 +267,7 @@ private[akka] class LocalActorRef private[akka] ( /** * Resumes a suspended actor. */ - override def resume(): Unit = actorCell.resume() + override def resume(inResponseToFailure: Boolean): Unit = actorCell.resume(inResponseToFailure) /** * Shuts down the actor and its message queue @@ -367,7 +367,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody override def suspend(): Unit = () - override def resume(): Unit = () + override def resume(inResponseToFailure: Boolean): Unit = () override def stop(): Unit = () override def isTerminated = false diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 27a9f346db..b3737d5b57 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -196,19 +196,30 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { } /** - * An Akka SupervisorStrategy is the policy to apply for crashing children + * An Akka SupervisorStrategy is the policy to apply for crashing children. + * + * IMPORTANT: + * + * You should not normally need to create new subclasses, instead use the + * existing [[akka.actor.OneForOneStrategy]] or [[akka.actor.AllForOneStrategy]], + * but if you do, please read the docs of the methods below carefully, as + * incorrect implementations may lead to “blocked” actor systems (i.e. + * permanently suspended actors). */ abstract class SupervisorStrategy { import SupervisorStrategy._ /** - * Returns the Decider that is associated with this SupervisorStrategy + * Returns the Decider that is associated with this SupervisorStrategy. + * The Decider is invoked by the default implementation of `handleFailure` + * to obtain the Directive to be applied. */ def decider: Decider /** * This method is called after the child has been removed from the set of children. + * It does not need to do anything special. */ def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit @@ -217,27 +228,46 @@ abstract class SupervisorStrategy { */ def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit - //FIXME docs - def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = - if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].suspend()) - - //FIXME docs - def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = - if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].restart(cause)) - /** - * Returns whether it processed the failure or not + * This is the main entry point: in case of a child’s failure, this method + * must try to handle the failure by resuming, restarting or stopping the + * child (and returning `true`), or it returns `false` to escalate the + * failure, which will lead to this actor re-throwing the exception which + * caused the failure. The exception will not be wrapped. */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10 directive match { - case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true + case Resume ⇒ resumeChild(child); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true case Stop ⇒ processFailure(context, false, child, cause, stats, children); true case Escalate ⇒ false } } + /** + * Resume the previously failed child: do never apply this to a child which + * is not the currently failing child. Suspend/resume needs to be done in + * matching pairs, otherwise actors will wake up too soon or never at all. + */ + final def resumeChild(child: ActorRef): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = true) + + /** + * Restart the given child, possibly suspending it first. + * + * IMPORTANT: + * + * If the child is the currently failing one, it will already have been + * suspended, hence `suspendFirst` is false. If the child is not the + * currently failing one, then it did not request this treatment and is + * therefore not prepared to be resumed without prior suspend. + */ + final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = { + val c = child.asInstanceOf[InternalActorRef] + if (suspendFirst) c.suspend() + c.restart(cause) + } + } /** @@ -276,7 +306,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_.requestRestartPermission(retriesWindow))) - children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause)) + children foreach (crs ⇒ restartChild(crs.child, cause, suspendFirst = (crs.child != child))) else for (c ← children) context.stop(c.child) } @@ -318,7 +348,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) - child.asInstanceOf[InternalActorRef].restart(cause) + restartChild(child, cause, suspendFirst = false) else context.stop(child) //TODO optimization to drop child here already? } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 12eea14ffc..fa3ccc8674 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -86,7 +86,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac /** * INTERNAL API */ -private[akka] case class Resume() extends SystemMessage // sent to self from ActorCell.resume +private[akka] case class Resume(inResponseToFailure: Boolean) extends SystemMessage // sent to self from ActorCell.resume /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index d66b16cc27..58f9998485 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -30,7 +30,7 @@ private[akka] object Mailbox { final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant // shifted by 2: the suspend count! final val shouldScheduleMask = 3 - final val shouldProcessMask = ~2 + final val shouldNotProcessMask = ~2 final val suspendMask = ~3 final val suspendUnit = 4 @@ -82,7 +82,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline - final def shouldProcessMessage: Boolean = (status & shouldProcessMask) == 0 + final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0 @inline final def isSuspended: Boolean = (status & suspendMask) != 0 diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index 64834178a8..2b4c4b0f00 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Resumes processing of `send` actions for the agent. */ - def resume(): Unit = updater.resume() + def resume(): Unit = updater.resume(inResponseToFailure = false) /** * Closes the agents and makes it eligible for garbage collection. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index eaecf67792..fa989fb235 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -236,7 +236,7 @@ private[akka] class RemoteActorRef private[akka] ( def suspend(): Unit = sendSystemMessage(Suspend()) - def resume(): Unit = sendSystemMessage(Resume()) + def resume(inResponseToFailure: Boolean): Unit = sendSystemMessage(Resume(inResponseToFailure)) def stop(): Unit = sendSystemMessage(Terminate()) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 2fe664d7b6..1eba0ac23a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -151,7 +151,7 @@ class CallingThreadDispatcher( override def suspend(actor: ActorCell) { actor.mailbox match { - case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn + case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn; m.becomeSuspended() case m ⇒ m.systemEnqueue(actor.self, Suspend()) } } @@ -163,11 +163,12 @@ class CallingThreadDispatcher( val wasActive = queue.isActive val switched = mbox.suspendSwitch.switchOff { CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue) + mbox.becomeOpen() } if (switched && !wasActive) { runQueue(mbox, queue) } - case m ⇒ m.systemEnqueue(actor.self, Resume()) + case m ⇒ m.systemEnqueue(actor.self, Resume(false)) } }