test exceptions during recreation, see #2336

fix two bugs:
- resumeChildren should only check the perpetrator if
  inResponseToFailure is true
- handleInvokeFailure must not suspend the survivors in case of an
  exception in postRestart
This commit is contained in:
Roland 2012-08-02 16:59:11 +02:00
parent a19df590a7
commit 2c9ddeb629
5 changed files with 134 additions and 89 deletions

View file

@ -5,19 +5,15 @@
package akka.actor
import language.postfixOps
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import scala.concurrent.Await
import scala.concurrent.util.Duration
import scala.concurrent.util.duration.intToDurationInt
import scala.math.BigInt.int2bigInt
import scala.util.Random
import scala.util.control.NoStackTrace
import com.typesafe.config.{ ConfigFactory, Config }
import SupervisorStrategy.{ Resume, Restart, Directive }
import SupervisorStrategy.{ Resume, Restart, Stop, Directive }
import akka.actor.SupervisorStrategy.seqThrowable2Decider
import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher }
import akka.pattern.ask
@ -56,9 +52,11 @@ object SupervisorHierarchySpec {
case class Children(refs: Vector[ActorRef])
case class Event(msg: Any) { val time: Long = System.nanoTime }
case class ErrorLog(msg: String, log: Vector[Event])
case class Failure(directive: Directive, log: Map[ActorRef, Vector[Event]], depth: Int) extends RuntimeException with NoStackTrace {
override def toString = "Failure(" + directive + ", " + depth + ")"
case class Failure(directive: Directive, log: Map[ActorRef, Vector[Event]], depth: Int, var failPre: Int, var failPost: Int)
extends RuntimeException with NoStackTrace {
override def toString = "Failure(" + directive + ", " + depth + ", " + failPre + ", " + failPost + ")"
}
case class Dump(level: Int)
val config = ConfigFactory.parseString("""
hierarchy {
@ -82,14 +80,14 @@ object SupervisorHierarchySpec {
override def suspend(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("suspended")
a.log :+= Event("suspended " + cell.mailbox.status / 4)
super.suspend(cell)
}
override def resume(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("resumed")
super.resume(cell)
a.log :+= Event("resumed " + cell.mailbox.status / 4)
}
}
@ -99,6 +97,24 @@ object SupervisorHierarchySpec {
class Hierarchy(size: Int, breadth: Int, listener: ActorRef) extends Actor {
var failed = false
var suspended = false
var stopping = false
def abort(msg: String) {
listener ! ErrorLog(msg, log)
stopping = true
context stop self
}
def setFlags(directive: Directive): Unit = directive match {
case Restart failed = true
case Resume suspended = true
case _
}
def suspendCount = context.asInstanceOf[ActorCell].mailbox.status / 4
override def preStart {
val s = size - 1 // subtract myself
if (s > 0) {
@ -110,33 +126,6 @@ object SupervisorHierarchySpec {
context.watch(context.actorOf(Props(new Hierarchy(kidSize, breadth, listener)).withDispatcher("hierarchy")))
} else context.parent ! Children(Vector(self))
}
override def postRestart(cause: Throwable) {
cause match {
case Failure(_, l, _) log = l get self getOrElse Vector(Event("log lost"))
}
log :+= Event("restarted")
}
override val supervisorStrategy = OneForOneStrategy() {
case Failure(directive, _, 0)
log :+= Event("applying " + directive + " to " + sender)
directive
case f @ Failure(directive, logs, x)
import SupervisorStrategy._
directive match {
case Restart failed = true
case Resume suspended = true
case _
}
log :+= Event("escalating " + f)
throw Failure(directive, logs + (self -> log), x - 1)
}
def abort(msg: String) {
listener ! ErrorLog(msg, log)
stopping = true
context stop self
}
var preRestartCalled = false
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
@ -144,6 +133,37 @@ object SupervisorHierarchySpec {
if (preRestartCalled) abort("preRestart called twice")
log :+= Event("preRestart")
preRestartCalled = true
cause match {
case f @ Failure(_, _, _, x, _) if x > 0 f.failPre = x - 1; throw f
case _
}
}
override val supervisorStrategy = OneForOneStrategy() {
case Failure(directive, _, 0, _, _)
log :+= Event("applying " + directive + " to " + sender)
directive
case OriginalRestartException(f: Failure)
log :+= Event("re-applying " + f.directive + " to " + sender)
f.directive
case f @ Failure(directive, logs, x, _, _)
import SupervisorStrategy._
setFlags(directive)
log :+= Event("escalating " + f)
throw f.copy(log = logs + (self -> log), depth = x - 1)
}
override def postRestart(cause: Throwable) {
cause match {
case f: Failure log = f.log get self getOrElse Vector(Event("log lost"))
case OriginalRestartException(f: Failure) log = f.log get self getOrElse Vector(Event("log lost"))
case _
}
log :+= Event("restarted " + suspendCount)
cause match {
case f @ Failure(_, _, _, _, x) if x > 0 f.failPost = x - 1; throw f
case _
}
}
override def postStop {
@ -152,9 +172,6 @@ object SupervisorHierarchySpec {
}
}
var failed = false
var suspended = false
var stopping = false
var log = Vector.empty[Event]
def check(msg: Any): Boolean = {
suspended = false
@ -163,6 +180,9 @@ object SupervisorHierarchySpec {
abort("processing message while failed")
failed = false
false
} else if (context.asInstanceOf[ActorCell].mailbox.isSuspended) {
abort("processing message while suspended")
false
} else true
}
@ -171,10 +191,11 @@ object SupervisorHierarchySpec {
def receive = new Receive {
val handler: Receive = {
case f @ Failure(Resume, _, _) suspended = true; throw f.copy(log = Map(self -> log))
case f: Failure failed = true; throw f.copy(log = Map(self -> log))
case "ping" Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
case Terminated(_) abort("terminating")
case f: Failure setFlags(f.directive); throw f.copy(log = Map(self -> log))
case "ping" Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
case Dump(0) context stop self
case Dump(level) context.children foreach (_ ! Dump(level - 1))
case Terminated(_) abort("terminating")
case Children(refs)
kids ++= refs
gotKidsFrom += sender
@ -185,7 +206,7 @@ object SupervisorHierarchySpec {
}
}
case class Work(n: Int)
case object Work
sealed trait Action
case class Ping(ref: ActorRef) extends Action
@ -250,17 +271,16 @@ object SupervisorHierarchySpec {
* all are terminated, transfer them to a WeakHashMap and verify that
* they are indeed GCed
*
* TODO RK: also test exceptions during recreate
*
* TODO RK: also test recreate including terminating children
*/
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] {
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Int] {
import context.system
// dont escalate from this one!
override val supervisorStrategy = OneForOneStrategy() {
case Failure(directive, _, _) directive
case f: Failure f.directive
case OriginalRestartException(f: Failure) f.directive
}
var children = Vector.empty[ActorRef]
@ -295,7 +315,8 @@ object SupervisorHierarchySpec {
testActor ! "stressTestStopped"
}
startWith(Idle, null)
// number of Work packages to execute for the test
startWith(Idle, 500000)
when(Idle) {
case Event(Init, _)
@ -321,14 +342,14 @@ object SupervisorHierarchySpec {
onTransition {
case Init -> Stress
self ! Work(500000)
self ! Work
// set timeout for completion of the whole test (i.e. including Finishing and Stopping)
setTimer("phase", StateTimeout, 30.seconds.dilated, false)
}
val workSchedule = 250.millis
private def randomDepth: Int = Random.nextFloat match {
private def random012: Int = Random.nextFloat match {
case x if x > 0.1 0
case x if x > 0.03 1
case _ 2
@ -337,22 +358,30 @@ object SupervisorHierarchySpec {
var ignoreNotResumedLogs = true
when(Stress) {
case Event(w: Work, _) if idleChildren.isEmpty
case Event(Work, _) if idleChildren.isEmpty
ignoreNotResumedLogs = false
context stop hierarchy
goto(Failed)
case Event(Work(x), _) if x > 0
case Event(Work, x) if x > 0
nextJob.next match {
case Ping(ref) ref ! "ping"
case Fail(ref, dir) ref ! Failure(dir, Map.empty, randomDepth)
case Fail(ref, dir) ref ! Failure(dir, Map.empty, random012, random012, random012)
}
if (idleChildren.nonEmpty) self ! Work(x - 1)
else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))
stay
case Event(Work(_), _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
if (idleChildren.nonEmpty) self ! Work
else context.system.scheduler.scheduleOnce(workSchedule, self, Work)
stay using (x - 1)
case Event(Work, _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
stay
case Event(StateTimeout, todo)
log.info("dumping state due to StateTimeout")
log.info("children: " + children.size + " pinged: " + pingChildren.size + " idle: " + idleChildren.size + " work: " + todo)
println(system.asInstanceOf[ActorSystemImpl].printTree)
ignoreNotResumedLogs = false
hierarchy ! Dump(2)
goto(Failed)
}
when(Finishing) {
@ -403,11 +432,25 @@ object SupervisorHierarchySpec {
testActor ! "stressTestFailed"
stop
case Event(StateTimeout, _)
getErrors()
printErrors()
testActor ! "timeout in Failed"
stop
case Event("pong", _) stay // dont care?
case Event(w: Work, _) stay
case Event("pong", _) stay // dont care?
case Event(Work, _) stay
}
def getErrors() = {
def rec(target: ActorRef, depth: Int): Unit = {
target match {
case l: LocalActorRef
errors :+= target -> ErrorLog("forced", l.underlying.actor.asInstanceOf[Hierarchy].log)
if (depth > 0) {
l.underlying.children foreach (rec(_, depth - 1))
}
}
}
rec(hierarchy, 2)
}
def printErrors(): Unit = {
@ -534,7 +577,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
}
"survive being stressed" in {
system.eventStream.publish(Mute(EventFilter[Failure]()))
system.eventStream.publish(Mute(EventFilter[Failure](), EventFilter[PreRestartException](), EventFilter[PostRestartException]()))
system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter")))
val fsm = system.actorOf(Props(new StressTest(testActor, depth = 6, breadth = 6)), "stressTest")

View file

@ -314,7 +314,7 @@ private[akka] class ActorCell(
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, "error while processing " + message)
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
@ -327,7 +327,7 @@ private[akka] class ActorCell(
}
currentMessage = null // reset current message after successful invocation
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, e.getMessage)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}

View file

@ -119,10 +119,10 @@ private[akka] trait Children { this: ActorCell ⇒
case _
}
protected def resumeChildren(perp: ActorRef): Unit =
protected def resumeChildren(inResponseToFailure: Boolean, perp: ActorRef): Unit =
childrenRefs.stats foreach {
case ChildRestartStats(child: InternalActorRef, _, _)
child.resume(inResponseToFailure = perp == child)
child.resume(inResponseToFailure = inResponseToFailure && perp == child)
}
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name)

View file

@ -91,7 +91,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// must happen atomically
try resumeNonRecursive()
finally if (inResponseToFailure) clearFailed()
resumeChildren(perp)
resumeChildren(inResponseToFailure, perp)
}
protected def terminate() {
@ -117,7 +117,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = {
final def handleInvokeFailure(childrenNotToSuspend: Iterable[ActorRef], t: Throwable, message: String): Unit = {
publish(Error(t, self.path.toString, clazz(actor), message))
// prevent any further messages to be processed until the actor has been restarted
if (!isFailed) try {
@ -127,7 +127,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
case Envelope(Failed(_), child) setFailed(child); Set(child)
case _ setFailed(self); Set.empty
}
suspendChildren(skip)
suspendChildren(skip ++ childrenNotToSuspend)
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
@ -156,30 +156,32 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
}
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try {
try resumeNonRecursive()
finally clearFailed() // must happen in any case, so that failure is propagated
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
// need to keep a snapshot of the surviving children before the new actor instance creates new ones
val survivors = children
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
try {
try resumeNonRecursive()
finally clearFailed() // must happen in any case, so that failure is propagated
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage)
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage)
}
}
final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match {
@ -196,7 +198,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
*/
try actor.supervisorStrategy.handleChildTerminated(this, child, children)
catch {
case NonFatal(e) handleInvokeFailure(e, "handleChildTerminated failed")
case NonFatal(e) handleInvokeFailure(Nil, e, "handleChildTerminated failed")
}
/*
* if the removal changed the state of the (terminating) children container,

View file

@ -271,7 +271,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
}
// if something happened while processing, fail this actor (most probable: exception in supervisorStrategy)
if (failure ne null) actor.handleInvokeFailure(failure, failure.getMessage)
if (failure ne null) actor.handleInvokeFailure(Nil, failure, failure.getMessage)
}
/**