add stress test, keep count of suspend/resume, and fix resulting bugs

This commit is contained in:
Roland 2012-06-19 11:02:06 +02:00
parent 2e459f5f1d
commit 1148e20dbb
4 changed files with 401 additions and 23 deletions

View file

@ -4,10 +4,16 @@
package akka.actor package akka.actor
import akka.testkit._ import java.lang.Exception
import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.Await import scala.util.Random
import scala.util.control.NoStackTrace
import com.typesafe.config.{ ConfigFactory, Config }
import SupervisorStrategy.{ Resume, Restart, Directive }
import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher, Await }
import akka.pattern.ask import akka.pattern.ask
import akka.testkit._
import akka.testkit.TestEvent.Mute
import akka.util.Duration import akka.util.Duration
import akka.util.duration._ import akka.util.duration._
@ -38,10 +44,350 @@ object SupervisorHierarchySpec {
case "ping" sender ! "pong" case "ping" sender ! "pong"
} }
} }
case class Event(msg: Any) { val time: Long = System.nanoTime }
case class ErrorLog(msg: String, log: Vector[Event])
case class Failure(directive: Directive, log: Vector[Event]) extends RuntimeException with NoStackTrace {
override def toString = "Failure(" + directive + ")"
}
val strategy = OneForOneStrategy() { case Failure(directive, _) directive }
val config = ConfigFactory.parseString("""
hierarchy {
type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator"
}
akka.loglevel = INFO
akka.actor.debug.fsm = on
""")
class MyDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends DispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new Dispatcher(prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
override def suspend(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("suspended")
super.suspend(cell)
}
override def resume(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("resumed")
super.resume(cell)
}
}
override def dispatcher(): MessageDispatcher = instance
}
class Hierarchy(depth: Int, breadth: Int, listener: ActorRef) extends Actor {
override def preStart {
if (depth > 1)
for (_ 1 to breadth)
context.watch(context.actorOf(Props(new Hierarchy(depth - 1, breadth, listener)).withDispatcher("hierarchy")))
listener ! self
}
override def postRestart(cause: Throwable) {
cause match {
case Failure(_, l) log = l
}
log :+= Event("restarted")
}
override def supervisorStrategy = strategy
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
}
override def postStop {
if (failed || suspended) {
listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log)
}
}
var failed = false
var suspended = false
var log = Vector.empty[Event]
def check(msg: Any) = {
suspended = false
log :+= Event(msg)
if (failed) {
listener ! ErrorLog("processing message while failed", log)
failed = false
context stop self
}
}
def receive = new Receive {
val handler: Receive = {
case f @ Failure(Resume, _) suspended = true; throw f.copy(log = log)
case f: Failure failed = true; throw f.copy(log = log)
case "ping" Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
case Terminated(_) listener ! ErrorLog("terminating", log); context stop self
}
override def isDefinedAt(msg: Any) = handler.isDefinedAt(msg)
override def apply(msg: Any) = { check(msg); handler(msg) }
}
}
case class Work(n: Int)
sealed trait Action
case class Ping(ref: ActorRef) extends Action
case class Fail(ref: ActorRef, directive: Directive) extends Action
sealed trait State
case object Idle extends State
case object Init extends State
case object Stress extends State
case object Finishing extends State
case object LastPing extends State
case object Stopping extends State
case object Failed extends State
/*
* This stress test will construct a supervision hierarchy of configurable
* depth and breadth and then randomly fail and check its actors. The actors
* perform certain checks internally (verifying that they do not run when
* suspended, for example), and they are checked for health by the test
* procedure.
*
* Execution happens in phases (which is the reason for FSM):
*
* Idle:
* - upon reception of Init message, construct hierary and go to Init state
*
* Init:
* - receive refs of all contained actors
*
* Stress:
* - deal out actions (Fail or "ping"), keeping the hierarchy busy
* - whenever all actors are in the "pinged" list (i.e. have not yet
* answered with a "pong"), delay processing of the next Work() by
* 100 millis
* - when receiving a Work() while all actors are "pinged", stop the
* hierarchy and go to the Stopping state
*
* Finishing:
* - after dealing out the last action, wait for the outstanding "pong"
* messages
* - when last "pong" is received, goto LastPing state
* - upon state timeout, stop the hierarchy and go to the Failed state
*
* LastPing:
* - upon entering this state, send a "ping" to all actors
* - when last "pong" is received, goto Stopping state
* - upon state timeout, stop the hierarchy and go to the Failed state
*
* Stopping:
* - upon entering this state, stop the hierarchy
* - upon termination of the hierarchy send back successful result
*
* Whenever an ErrorLog is received, goto Failed state
*
* Failed:
* - accumulate ErrorLog messages
* - upon termination of the hierarchy send back failed result and print
* the logs, merged and in chronological order.
*
* TODO RK: also test Stop directive, and keep a complete list of all
* actors ever created, then verify after stop()ping the hierarchy that
* all are terminated, transfer them to a WeakHashMap and verify that
* they are indeed GCed
*
* TODO RK: make hierarchy construction stochastic so that it includes
* different breadth (including the degenerate breadth-1 case).
*
* TODO RK: also test Escalate by adding an exception with a `var depth`
* which gets decremented within the supervisor and gets handled when zero
* is reached (Restart resolution)
*
* TODO RK: also test exceptions during recreate
*
* TODO RK: also test recreate including terminating children
*/
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] {
import context.system
override def supervisorStrategy = strategy
var children = Vector.empty[ActorRef]
var idleChildren = Vector.empty[ActorRef]
var pingChildren = Set.empty[ActorRef]
val nextJob = Iterator.continually(Random.nextFloat match {
case x if x > 0.5
// ping one child
val pick = ((x - 0.5) * 2 * idleChildren.size).toInt
val ref = idleChildren(pick)
idleChildren = idleChildren.take(pick) ++ idleChildren.drop(pick + 1)
pingChildren += ref
Ping(ref)
case x
// fail one child
val pick = ((if (x > 0.25) x - 0.25 else x) * 4 * children.size).toInt
Fail(children(pick), if (x > 0.25) Restart else Resume)
})
val familySize = ((1 - BigInt(breadth).pow(depth)) / (1 - breadth)).toInt
var hierarchy: ActorRef = _
override def preRestart(cause: Throwable, msg: Option[Any]) {
throw new ActorKilledException("I want to DIE")
}
override def postRestart(cause: Throwable) {
throw new ActorKilledException("I said I wanted to DIE, dammit!")
}
override def postStop {
testActor ! "stressTestStopped"
}
startWith(Idle, null)
when(Idle) {
case Event(Init, _)
hierarchy = context.watch(context.actorOf(Props(new Hierarchy(depth, breadth, self)).withDispatcher("hierarchy")))
setTimer("phase", StateTimeout, 5 seconds, false)
goto(Init)
}
when(Init) {
case Event(ref: ActorRef, _)
if (idleChildren.nonEmpty || pingChildren.nonEmpty)
throw new IllegalStateException("received unexpected child " + children.size)
children :+= ref
if (children.size == familySize) {
idleChildren = children
goto(Stress)
} else stay
case Event(StateTimeout, _)
testActor ! "only got %d out of %d refs".format(children.size, familySize)
stop()
}
onTransition {
case Init -> Stress
self ! Work(familySize * 1000)
// set timeout for completion of the whole test (i.e. including Finishing and Stopping)
setTimer("phase", StateTimeout, 60 seconds, false)
}
val workSchedule = 250.millis
when(Stress) {
case Event(w: Work, _) if idleChildren.isEmpty
context stop hierarchy
goto(Failed)
case Event(Work(x), _) if x > 0
nextJob.next match {
case Ping(ref) ref ! "ping"
case Fail(ref, dir) ref ! Failure(dir, Vector.empty)
}
if (idleChildren.nonEmpty) self ! Work(x - 1)
else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))
stay
case Event(Work(_), _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
stay
}
when(Finishing) {
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
if (pingChildren.isEmpty) goto(LastPing) else stay
}
onTransition {
case _ -> LastPing
idleChildren foreach (_ ! "ping")
pingChildren ++= idleChildren
idleChildren = Vector.empty
}
when(LastPing) {
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
if (pingChildren.isEmpty) goto(Stopping) else stay
}
onTransition {
case _ -> Stopping context stop hierarchy
}
when(Stopping, stateTimeout = 5 seconds) {
case Event(Terminated(r), _) if r == hierarchy
testActor ! "stressTestSuccessful"
stop
case Event(StateTimeout, _)
testActor ! "timeout in Stopping"
stop
}
var errors = Vector.empty[(ActorRef, ErrorLog)]
when(Failed, stateTimeout = 5 seconds) {
case Event(e: ErrorLog, _)
errors :+= sender -> e
stay
case Event(Terminated(r), _) if r == hierarchy
printErrors()
testActor ! "stressTestFailed"
stop
case Event(StateTimeout, _)
printErrors()
testActor ! "timeout in Failed"
stop
case Event("pong", _) stay // dont care?
}
def printErrors(): Unit = {
val merged = errors flatMap {
case (ref, ErrorLog(msg, log))
println(ref + " " + msg)
log map (l (l.time, ref, l.msg.toString))
}
merged.sorted foreach println
}
whenUnhandled {
case Event(e: ErrorLog, _)
errors :+= sender -> e
// dont stop the hierarchy, that is going to happen all by itself and in the right order
goto(Failed)
case Event(StateTimeout, _)
println("pingChildren:\n" + pingChildren.mkString("\n"))
context stop hierarchy
goto(Failed)
case Event(msg, _)
testActor ! ("received unexpected msg: " + msg)
stop
}
initialize
}
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout with ImplicitSender { class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) with DefaultTimeout with ImplicitSender {
import SupervisorHierarchySpec._ import SupervisorHierarchySpec._
"A Supervisor Hierarchy" must { "A Supervisor Hierarchy" must {
@ -132,6 +478,25 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout with Implicit
latch.countDown() latch.countDown()
expectMsg("pong") expectMsg("pong")
} }
"survive being stressed" in {
system.eventStream.publish(Mute(EventFilter[Failure]()))
system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter")))
val fsm = system.actorOf(Props(new StressTest(testActor, 6, 3)), "stressTest")
fsm ! FSM.SubscribeTransitionCallBack(system.actorOf(Props(new Actor {
def receive = {
case s: FSM.CurrentState[_] log.info("{}", s)
case t: FSM.Transition[_] log.info("{}", t)
}
})))
fsm ! Init
expectMsg(70 seconds, "stressTestSuccessful")
expectMsg("stressTestStopped")
}
} }
} }

View file

@ -349,7 +349,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
throw e throw e
} }
}) })
val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration) supervisor ! dyingProps
val dyingActor = expectMsgType[ActorRef]
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
@ -358,7 +359,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
} }
} }
Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage dyingActor ! Ping
expectMsg(PongMessage)
inits.get must be(3) inits.get must be(3)

View file

@ -682,6 +682,11 @@ private[akka] class ActorCell(
// prevent any further messages to be processed until the actor has been restarted // prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this) dispatcher.suspend(this)
if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children)
// now we may just have suspended the poor guy which made us fail by way of Escalate, so adjust the score
currentMessage match {
case Envelope(Failed(`t`), child) child.asInstanceOf[InternalActorRef].resume()
case _
}
} finally { } finally {
t match { // Wrap InterruptedExceptions and rethrow t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t

View file

@ -23,12 +23,16 @@ private[akka] object Mailbox {
* the following assigned numbers CANNOT be changed without looking at the code which uses them! * the following assigned numbers CANNOT be changed without looking at the code which uses them!
*/ */
// primary status: only first three // primary status
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
final val Suspended = 1 // Deliberately without type ascription to make it a compile-time constant final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant
final val Closed = 2 // Deliberately without type ascription to make it a compile-time constant
// secondary status: Scheduled bit may be added to Open/Suspended // secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4 // Deliberately without type ascription to make it a compile-time constant final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
// shifted by 2: the suspend count!
final val shouldScheduleMask = 3
final val shouldProcessMask = ~2
final val suspendMask = ~3
final val suspendUnit = 4
// mailbox debugging helper using println (see below) // mailbox debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1) // since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
@ -78,10 +82,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline @inline
final def shouldProcessMessage: Boolean = (status & 3) == Open final def shouldProcessMessage: Boolean = (status & shouldProcessMask) == 0
@inline @inline
final def isSuspended: Boolean = (status & 3) == Suspended final def isSuspended: Boolean = (status & suspendMask) != 0
@inline @inline
final def isClosed: Boolean = status == Closed final def isClosed: Boolean = status == Closed
@ -100,21 +104,30 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
/** /**
* set new primary status Open. Caller does not need to worry about whether * set new primary status Open. Caller does not need to worry about whether
* status was Scheduled or not. * status was Scheduled or not.
*
* @returns true if the suspend count reached zero
*/ */
@tailrec @tailrec
final def becomeOpen(): Boolean = status match { final def becomeOpen(): Boolean = status match {
case Closed setStatus(Closed); false case Closed setStatus(Closed); false
case s updateStatus(s, Open | s & Scheduled) || becomeOpen() case s
val next = if (s < suspendUnit) s else s - suspendUnit
if (updateStatus(s, next)) next < suspendUnit
else becomeOpen()
} }
/** /**
* set new primary status Suspended. Caller does not need to worry about whether * set new primary status Suspended. Caller does not need to worry about whether
* status was Scheduled or not. * status was Scheduled or not.
*
* @returns true if the previous suspend count was zero
*/ */
@tailrec @tailrec
final def becomeSuspended(): Boolean = status match { final def becomeSuspended(): Boolean = status match {
case Closed setStatus(Closed); false case Closed setStatus(Closed); false
case s updateStatus(s, Suspended | s & Scheduled) || becomeSuspended() case s
if (updateStatus(s, s + suspendUnit)) s < suspendUnit
else becomeSuspended()
} }
/** /**
@ -135,11 +148,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
val s = status val s = status
/* /*
* only try to add Scheduled bit if pure Open/Suspended, not Closed or with * only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set (this is one of the reasons why the numbers * Scheduled bit already set
* cannot be changed in object Mailbox above)
*/ */
if (s <= Suspended) updateStatus(s, s | Scheduled) || setAsScheduled() if ((s & shouldScheduleMask) != Open) false
else false else updateStatus(s, s | Scheduled) || setAsScheduled()
} }
/** /**
@ -148,12 +160,6 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
@tailrec @tailrec
final def setAsIdle(): Boolean = { final def setAsIdle(): Boolean = {
val s = status val s = status
/*
* only try to remove Scheduled bit if currently Scheduled, not Closed or
* without Scheduled bit set (this is one of the reasons why the numbers
* cannot be changed in object Mailbox above)
*/
updateStatus(s, s & ~Scheduled) || setAsIdle() updateStatus(s, s & ~Scheduled) || setAsIdle()
} }