clean up hierarchy’s log stashing in preparation for remaining TODOs
This commit is contained in:
parent
2c9ddeb629
commit
b8eb128fda
1 changed files with 52 additions and 33 deletions
|
|
@ -20,6 +20,7 @@ import akka.pattern.ask
|
|||
import akka.testkit.{ ImplicitSender, EventFilter, DefaultTimeout, AkkaSpec }
|
||||
import akka.testkit.{ filterException, duration2TestDuration, TestLatch }
|
||||
import akka.testkit.TestEvent.Mute
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
object SupervisorHierarchySpec {
|
||||
class FireWorkerException(msg: String) extends Exception(msg)
|
||||
|
|
@ -52,9 +53,8 @@ object SupervisorHierarchySpec {
|
|||
case class Children(refs: Vector[ActorRef])
|
||||
case class Event(msg: Any) { val time: Long = System.nanoTime }
|
||||
case class ErrorLog(msg: String, log: Vector[Event])
|
||||
case class Failure(directive: Directive, log: Map[ActorRef, Vector[Event]], depth: Int, var failPre: Int, var failPost: Int)
|
||||
extends RuntimeException with NoStackTrace {
|
||||
override def toString = "Failure(" + directive + ", " + depth + ", " + failPre + ", " + failPost + ")"
|
||||
case class Failure(directive: Directive, depth: Int, var failPre: Int, var failPost: Int) extends RuntimeException with NoStackTrace {
|
||||
override def toString = productPrefix + productIterator.mkString("(", ",", ")")
|
||||
}
|
||||
case class Dump(level: Int)
|
||||
|
||||
|
|
@ -95,15 +95,22 @@ object SupervisorHierarchySpec {
|
|||
override def dispatcher(): MessageDispatcher = instance
|
||||
}
|
||||
|
||||
/*
|
||||
* This stores structural data of the hierarchy which would otherwise be lost
|
||||
* upon Restart or would have to be managed by the highest supervisor (which
|
||||
* is undesirable).
|
||||
*/
|
||||
case class HierarchyState(log: Vector[Event], kids: Map[String, Int])
|
||||
val stateCache = new ConcurrentHashMap[ActorRef, HierarchyState]()
|
||||
|
||||
class Hierarchy(size: Int, breadth: Int, listener: ActorRef) extends Actor {
|
||||
|
||||
var failed = false
|
||||
var suspended = false
|
||||
var stopping = false
|
||||
|
||||
def abort(msg: String) {
|
||||
listener ! ErrorLog(msg, log)
|
||||
stopping = true
|
||||
log = Vector(Event("log sent"))
|
||||
context stop self
|
||||
}
|
||||
|
||||
|
|
@ -116,58 +123,68 @@ object SupervisorHierarchySpec {
|
|||
def suspendCount = context.asInstanceOf[ActorCell].mailbox.status / 4
|
||||
|
||||
override def preStart {
|
||||
log :+= Event("started")
|
||||
val s = size - 1 // subtract myself
|
||||
if (s > 0) {
|
||||
val kids = Random.nextInt(Math.min(breadth, s)) + 1
|
||||
val sizes = s / kids
|
||||
var rest = s % kids
|
||||
def kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes
|
||||
for (_ ← 1 to kids)
|
||||
context.watch(context.actorOf(Props(new Hierarchy(kidSize, breadth, listener)).withDispatcher("hierarchy")))
|
||||
} else context.parent ! Children(Vector(self))
|
||||
val kidInfo: Map[String, Int] =
|
||||
if (s > 0) {
|
||||
val kids = Random.nextInt(Math.min(breadth, s)) + 1
|
||||
val sizes = s / kids
|
||||
var rest = s % kids
|
||||
val propsTemplate = Props.empty.withDispatcher("hierarchy")
|
||||
(1 to kids).map { (id) ⇒
|
||||
val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes
|
||||
val props = propsTemplate.withCreator(new Hierarchy(kidSize, breadth, listener))
|
||||
val name = id.toString
|
||||
context.watch(context.actorOf(props, name))
|
||||
(name, kidSize)
|
||||
}(collection.breakOut)
|
||||
} else {
|
||||
context.parent ! Children(Vector(self))
|
||||
Map()
|
||||
}
|
||||
stateCache.put(self, HierarchyState(log, kidInfo))
|
||||
}
|
||||
|
||||
var preRestartCalled = false
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
|
||||
// do not scrap children
|
||||
if (preRestartCalled) abort("preRestart called twice")
|
||||
log :+= Event("preRestart")
|
||||
preRestartCalled = true
|
||||
cause match {
|
||||
case f @ Failure(_, _, _, x, _) if x > 0 ⇒ f.failPre = x - 1; throw f
|
||||
case _ ⇒
|
||||
else {
|
||||
log :+= Event("preRestart")
|
||||
stateCache.put(self, stateCache.get(self).copy(log = log))
|
||||
preRestartCalled = true
|
||||
cause match {
|
||||
case f @ Failure(_, _, x, _) if x > 0 ⇒ f.failPre = x - 1; throw f
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case Failure(directive, _, 0, _, _) ⇒
|
||||
case Failure(directive, 0, _, _) ⇒
|
||||
log :+= Event("applying " + directive + " to " + sender)
|
||||
directive
|
||||
case OriginalRestartException(f: Failure) ⇒
|
||||
log :+= Event("re-applying " + f.directive + " to " + sender)
|
||||
f.directive
|
||||
case f @ Failure(directive, logs, x, _, _) ⇒
|
||||
case f @ Failure(directive, x, _, _) ⇒
|
||||
import SupervisorStrategy._
|
||||
setFlags(directive)
|
||||
log :+= Event("escalating " + f)
|
||||
throw f.copy(log = logs + (self -> log), depth = x - 1)
|
||||
throw f.copy(depth = x - 1)
|
||||
}
|
||||
|
||||
override def postRestart(cause: Throwable) {
|
||||
cause match {
|
||||
case f: Failure ⇒ log = f.log get self getOrElse Vector(Event("log lost"))
|
||||
case OriginalRestartException(f: Failure) ⇒ log = f.log get self getOrElse Vector(Event("log lost"))
|
||||
case _ ⇒
|
||||
}
|
||||
log = stateCache.get(self).log
|
||||
log :+= Event("restarted " + suspendCount)
|
||||
cause match {
|
||||
case f @ Failure(_, _, _, _, x) if x > 0 ⇒ f.failPost = x - 1; throw f
|
||||
case _ ⇒
|
||||
case f @ Failure(_, _, _, x) if x > 0 ⇒ f.failPost = x - 1; throw f
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop {
|
||||
if (failed || (suspended && !stopping)) {
|
||||
if (failed || suspended) {
|
||||
listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log)
|
||||
}
|
||||
}
|
||||
|
|
@ -191,7 +208,7 @@ object SupervisorHierarchySpec {
|
|||
|
||||
def receive = new Receive {
|
||||
val handler: Receive = {
|
||||
case f: Failure ⇒ setFlags(f.directive); throw f.copy(log = Map(self -> log))
|
||||
case f: Failure ⇒ setFlags(f.directive); throw f
|
||||
case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
|
||||
case Dump(0) ⇒ context stop self
|
||||
case Dump(level) ⇒ context.children foreach (_ ! Dump(level - 1))
|
||||
|
|
@ -272,6 +289,8 @@ object SupervisorHierarchySpec {
|
|||
* they are indeed GCed
|
||||
*
|
||||
* TODO RK: also test recreate including terminating children
|
||||
*
|
||||
* TODO RK: test exceptions in constructor
|
||||
*/
|
||||
|
||||
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Int] {
|
||||
|
|
@ -320,7 +339,7 @@ object SupervisorHierarchySpec {
|
|||
|
||||
when(Idle) {
|
||||
case Event(Init, _) ⇒
|
||||
hierarchy = context.watch(context.actorOf(Props(new Hierarchy(size = 500, breadth = breadth, self)).withDispatcher("hierarchy")))
|
||||
hierarchy = context.watch(context.actorOf(Props(new Hierarchy(size = 500, breadth = breadth, self)).withDispatcher("hierarchy"), "head"))
|
||||
setTimer("phase", StateTimeout, 5 seconds, false)
|
||||
goto(Init)
|
||||
}
|
||||
|
|
@ -344,7 +363,7 @@ object SupervisorHierarchySpec {
|
|||
case Init -> Stress ⇒
|
||||
self ! Work
|
||||
// set timeout for completion of the whole test (i.e. including Finishing and Stopping)
|
||||
setTimer("phase", StateTimeout, 30.seconds.dilated, false)
|
||||
setTimer("phase", StateTimeout, 50.seconds.dilated, false)
|
||||
}
|
||||
|
||||
val workSchedule = 250.millis
|
||||
|
|
@ -365,7 +384,7 @@ object SupervisorHierarchySpec {
|
|||
case Event(Work, x) if x > 0 ⇒
|
||||
nextJob.next match {
|
||||
case Ping(ref) ⇒ ref ! "ping"
|
||||
case Fail(ref, dir) ⇒ ref ! Failure(dir, Map.empty, random012, random012, random012)
|
||||
case Fail(ref, dir) ⇒ ref ! Failure(dir, random012, random012, random012)
|
||||
}
|
||||
if (idleChildren.nonEmpty) self ! Work
|
||||
else context.system.scheduler.scheduleOnce(workSchedule, self, Work)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue