diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index fec850df33..b36bde739e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -5,19 +5,15 @@ package akka.actor import language.postfixOps - import java.util.concurrent.{ TimeUnit, CountDownLatch } - import scala.concurrent.Await import scala.concurrent.util.Duration import scala.concurrent.util.duration.intToDurationInt import scala.math.BigInt.int2bigInt import scala.util.Random import scala.util.control.NoStackTrace - import com.typesafe.config.{ ConfigFactory, Config } - -import SupervisorStrategy.{ Resume, Restart, Directive } +import SupervisorStrategy.{ Resume, Restart, Stop, Directive } import akka.actor.SupervisorStrategy.seqThrowable2Decider import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher } import akka.pattern.ask @@ -56,9 +52,11 @@ object SupervisorHierarchySpec { case class Children(refs: Vector[ActorRef]) case class Event(msg: Any) { val time: Long = System.nanoTime } case class ErrorLog(msg: String, log: Vector[Event]) - case class Failure(directive: Directive, log: Map[ActorRef, Vector[Event]], depth: Int) extends RuntimeException with NoStackTrace { - override def toString = "Failure(" + directive + ", " + depth + ")" + case class Failure(directive: Directive, log: Map[ActorRef, Vector[Event]], depth: Int, var failPre: Int, var failPost: Int) + extends RuntimeException with NoStackTrace { + override def toString = "Failure(" + directive + ", " + depth + ", " + failPre + ", " + failPost + ")" } + case class Dump(level: Int) val config = ConfigFactory.parseString(""" hierarchy { @@ -82,14 +80,14 @@ object SupervisorHierarchySpec { override def suspend(cell: ActorCell): Unit = { val a = cell.actor.asInstanceOf[Hierarchy] - a.log :+= Event("suspended") + a.log :+= Event("suspended " + cell.mailbox.status / 4) super.suspend(cell) } override def resume(cell: ActorCell): Unit = { val a = cell.actor.asInstanceOf[Hierarchy] - a.log :+= Event("resumed") super.resume(cell) + a.log :+= Event("resumed " + cell.mailbox.status / 4) } } @@ -99,6 +97,24 @@ object SupervisorHierarchySpec { class Hierarchy(size: Int, breadth: Int, listener: ActorRef) extends Actor { + var failed = false + var suspended = false + var stopping = false + + def abort(msg: String) { + listener ! ErrorLog(msg, log) + stopping = true + context stop self + } + + def setFlags(directive: Directive): Unit = directive match { + case Restart ⇒ failed = true + case Resume ⇒ suspended = true + case _ ⇒ + } + + def suspendCount = context.asInstanceOf[ActorCell].mailbox.status / 4 + override def preStart { val s = size - 1 // subtract myself if (s > 0) { @@ -110,33 +126,6 @@ object SupervisorHierarchySpec { context.watch(context.actorOf(Props(new Hierarchy(kidSize, breadth, listener)).withDispatcher("hierarchy"))) } else context.parent ! Children(Vector(self)) } - override def postRestart(cause: Throwable) { - cause match { - case Failure(_, l, _) ⇒ log = l get self getOrElse Vector(Event("log lost")) - } - log :+= Event("restarted") - } - - override val supervisorStrategy = OneForOneStrategy() { - case Failure(directive, _, 0) ⇒ - log :+= Event("applying " + directive + " to " + sender) - directive - case f @ Failure(directive, logs, x) ⇒ - import SupervisorStrategy._ - directive match { - case Restart ⇒ failed = true - case Resume ⇒ suspended = true - case _ ⇒ - } - log :+= Event("escalating " + f) - throw Failure(directive, logs + (self -> log), x - 1) - } - - def abort(msg: String) { - listener ! ErrorLog(msg, log) - stopping = true - context stop self - } var preRestartCalled = false override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { @@ -144,6 +133,37 @@ object SupervisorHierarchySpec { if (preRestartCalled) abort("preRestart called twice") log :+= Event("preRestart") preRestartCalled = true + cause match { + case f @ Failure(_, _, _, x, _) if x > 0 ⇒ f.failPre = x - 1; throw f + case _ ⇒ + } + } + + override val supervisorStrategy = OneForOneStrategy() { + case Failure(directive, _, 0, _, _) ⇒ + log :+= Event("applying " + directive + " to " + sender) + directive + case OriginalRestartException(f: Failure) ⇒ + log :+= Event("re-applying " + f.directive + " to " + sender) + f.directive + case f @ Failure(directive, logs, x, _, _) ⇒ + import SupervisorStrategy._ + setFlags(directive) + log :+= Event("escalating " + f) + throw f.copy(log = logs + (self -> log), depth = x - 1) + } + + override def postRestart(cause: Throwable) { + cause match { + case f: Failure ⇒ log = f.log get self getOrElse Vector(Event("log lost")) + case OriginalRestartException(f: Failure) ⇒ log = f.log get self getOrElse Vector(Event("log lost")) + case _ ⇒ + } + log :+= Event("restarted " + suspendCount) + cause match { + case f @ Failure(_, _, _, _, x) if x > 0 ⇒ f.failPost = x - 1; throw f + case _ ⇒ + } } override def postStop { @@ -152,9 +172,6 @@ object SupervisorHierarchySpec { } } - var failed = false - var suspended = false - var stopping = false var log = Vector.empty[Event] def check(msg: Any): Boolean = { suspended = false @@ -163,6 +180,9 @@ object SupervisorHierarchySpec { abort("processing message while failed") failed = false false + } else if (context.asInstanceOf[ActorCell].mailbox.isSuspended) { + abort("processing message while suspended") + false } else true } @@ -171,10 +191,11 @@ object SupervisorHierarchySpec { def receive = new Receive { val handler: Receive = { - case f @ Failure(Resume, _, _) ⇒ suspended = true; throw f.copy(log = Map(self -> log)) - case f: Failure ⇒ failed = true; throw f.copy(log = Map(self -> log)) - case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" - case Terminated(_) ⇒ abort("terminating") + case f: Failure ⇒ setFlags(f.directive); throw f.copy(log = Map(self -> log)) + case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" + case Dump(0) ⇒ context stop self + case Dump(level) ⇒ context.children foreach (_ ! Dump(level - 1)) + case Terminated(_) ⇒ abort("terminating") case Children(refs) ⇒ kids ++= refs gotKidsFrom += sender @@ -185,7 +206,7 @@ object SupervisorHierarchySpec { } } - case class Work(n: Int) + case object Work sealed trait Action case class Ping(ref: ActorRef) extends Action @@ -250,17 +271,16 @@ object SupervisorHierarchySpec { * all are terminated, transfer them to a WeakHashMap and verify that * they are indeed GCed * - * TODO RK: also test exceptions during recreate - * * TODO RK: also test recreate including terminating children */ - class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] { + class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Int] { import context.system // don’t escalate from this one! override val supervisorStrategy = OneForOneStrategy() { - case Failure(directive, _, _) ⇒ directive + case f: Failure ⇒ f.directive + case OriginalRestartException(f: Failure) ⇒ f.directive } var children = Vector.empty[ActorRef] @@ -295,7 +315,8 @@ object SupervisorHierarchySpec { testActor ! "stressTestStopped" } - startWith(Idle, null) + // number of Work packages to execute for the test + startWith(Idle, 500000) when(Idle) { case Event(Init, _) ⇒ @@ -321,14 +342,14 @@ object SupervisorHierarchySpec { onTransition { case Init -> Stress ⇒ - self ! Work(500000) + self ! Work // set timeout for completion of the whole test (i.e. including Finishing and Stopping) setTimer("phase", StateTimeout, 30.seconds.dilated, false) } val workSchedule = 250.millis - private def randomDepth: Int = Random.nextFloat match { + private def random012: Int = Random.nextFloat match { case x if x > 0.1 ⇒ 0 case x if x > 0.03 ⇒ 1 case _ ⇒ 2 @@ -337,22 +358,30 @@ object SupervisorHierarchySpec { var ignoreNotResumedLogs = true when(Stress) { - case Event(w: Work, _) if idleChildren.isEmpty ⇒ + case Event(Work, _) if idleChildren.isEmpty ⇒ + ignoreNotResumedLogs = false context stop hierarchy goto(Failed) - case Event(Work(x), _) if x > 0 ⇒ + case Event(Work, x) if x > 0 ⇒ nextJob.next match { case Ping(ref) ⇒ ref ! "ping" - case Fail(ref, dir) ⇒ ref ! Failure(dir, Map.empty, randomDepth) + case Fail(ref, dir) ⇒ ref ! Failure(dir, Map.empty, random012, random012, random012) } - if (idleChildren.nonEmpty) self ! Work(x - 1) - else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1)) - stay - case Event(Work(_), _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing) + if (idleChildren.nonEmpty) self ! Work + else context.system.scheduler.scheduleOnce(workSchedule, self, Work) + stay using (x - 1) + case Event(Work, _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing) case Event("pong", _) ⇒ pingChildren -= sender idleChildren :+= sender stay + case Event(StateTimeout, todo) ⇒ + log.info("dumping state due to StateTimeout") + log.info("children: " + children.size + " pinged: " + pingChildren.size + " idle: " + idleChildren.size + " work: " + todo) + println(system.asInstanceOf[ActorSystemImpl].printTree) + ignoreNotResumedLogs = false + hierarchy ! Dump(2) + goto(Failed) } when(Finishing) { @@ -403,11 +432,25 @@ object SupervisorHierarchySpec { testActor ! "stressTestFailed" stop case Event(StateTimeout, _) ⇒ + getErrors() printErrors() testActor ! "timeout in Failed" stop - case Event("pong", _) ⇒ stay // don’t care? - case Event(w: Work, _) ⇒ stay + case Event("pong", _) ⇒ stay // don’t care? + case Event(Work, _) ⇒ stay + } + + def getErrors() = { + def rec(target: ActorRef, depth: Int): Unit = { + target match { + case l: LocalActorRef ⇒ + errors :+= target -> ErrorLog("forced", l.underlying.actor.asInstanceOf[Hierarchy].log) + if (depth > 0) { + l.underlying.children foreach (rec(_, depth - 1)) + } + } + } + rec(hierarchy, 2) } def printErrors(): Unit = { @@ -534,7 +577,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w } "survive being stressed" in { - system.eventStream.publish(Mute(EventFilter[Failure]())) + system.eventStream.publish(Mute(EventFilter[Failure](), EventFilter[PreRestartException](), EventFilter[PostRestartException]())) system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter"))) val fsm = system.actorOf(Props(new StressTest(testActor, depth = 6, breadth = 6)), "stressTest") diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 020ea211fd..a5667f2634 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -314,7 +314,7 @@ private[akka] class ActorCell( case NoMessage ⇒ // only here to suppress warning } } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, "error while processing " + message) } //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status @@ -327,7 +327,7 @@ private[akka] class ActorCell( } currentMessage = null // reset current message after successful invocation } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, e.getMessage) } finally { checkReceiveTimeout // Reschedule receive timeout } diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 6e77672f39..4716795c8b 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -119,10 +119,10 @@ private[akka] trait Children { this: ActorCell ⇒ case _ ⇒ } - protected def resumeChildren(perp: ActorRef): Unit = + protected def resumeChildren(inResponseToFailure: Boolean, perp: ActorRef): Unit = childrenRefs.stats foreach { case ChildRestartStats(child: InternalActorRef, _, _) ⇒ - child.resume(inResponseToFailure = perp == child) + child.resume(inResponseToFailure = inResponseToFailure && perp == child) } def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index bb6beebbce..9907f036a9 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -91,7 +91,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ // must happen “atomically” try resumeNonRecursive() finally if (inResponseToFailure) clearFailed() - resumeChildren(perp) + resumeChildren(inResponseToFailure, perp) } protected def terminate() { @@ -117,7 +117,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } } - final def handleInvokeFailure(t: Throwable, message: String): Unit = { + final def handleInvokeFailure(childrenNotToSuspend: Iterable[ActorRef], t: Throwable, message: String): Unit = { publish(Error(t, self.path.toString, clazz(actor), message)) // prevent any further messages to be processed until the actor has been restarted if (!isFailed) try { @@ -127,7 +127,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ case Envelope(Failed(_), child) ⇒ setFailed(child); Set(child) case _ ⇒ setFailed(self); Set.empty } - suspendChildren(skip) + suspendChildren(skip ++ childrenNotToSuspend) // tell supervisor t match { // Wrap InterruptedExceptions and rethrow case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t @@ -156,30 +156,32 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } } - private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try { - try resumeNonRecursive() - finally clearFailed() // must happen in any case, so that failure is propagated - + private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = { // need to keep a snapshot of the surviving children before the new actor instance creates new ones val survivors = children - val freshActor = newActor() - actor = freshActor // this must happen before postRestart has a chance to fail - if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. + try { + try resumeNonRecursive() + finally clearFailed() // must happen in any case, so that failure is propagated - freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + val freshActor = newActor() + actor = freshActor // this must happen before postRestart has a chance to fail + if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. - // only after parent is up and running again do restart the children which were not stopped - survivors foreach (child ⇒ - try child.asInstanceOf[InternalActorRef].restart(cause) - catch { - case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) - }) - } catch { - case NonFatal(e) ⇒ - clearActorFields(actor) // in order to prevent preRestart() from happening again - handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage) + freshActor.postRestart(cause) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + + // only after parent is up and running again do restart the children which were not stopped + survivors foreach (child ⇒ + try child.asInstanceOf[InternalActorRef].restart(cause) + catch { + case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) + }) + } catch { + case NonFatal(e) ⇒ + clearActorFields(actor) // in order to prevent preRestart() from happening again + handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage) + } } final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match { @@ -196,7 +198,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ */ try actor.supervisorStrategy.handleChildTerminated(this, child, children) catch { - case NonFatal(e) ⇒ handleInvokeFailure(e, "handleChildTerminated failed") + case NonFatal(e) ⇒ handleInvokeFailure(Nil, e, "handleChildTerminated failed") } /* * if the removal changed the state of the (terminating) children container, diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index f7062d5138..f1f887068b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -271,7 +271,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } } // if something happened while processing, fail this actor (most probable: exception in supervisorStrategy) - if (failure ne null) actor.handleInvokeFailure(failure, failure.getMessage) + if (failure ne null) actor.handleInvokeFailure(Nil, failure, failure.getMessage) } /**