From b8eb128fdaec8ec026abce467a1ee7ff279da89c Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 3 Aug 2012 14:19:42 +0200 Subject: [PATCH] =?UTF-8?q?clean=20up=20hierarchy=E2=80=99s=20log=20stashi?= =?UTF-8?q?ng=20in=20preparation=20for=20remaining=20TODOs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../akka/actor/SupervisorHierarchySpec.scala | 85 ++++++++++++------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index b36bde739e..5e41955562 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -20,6 +20,7 @@ import akka.pattern.ask import akka.testkit.{ ImplicitSender, EventFilter, DefaultTimeout, AkkaSpec } import akka.testkit.{ filterException, duration2TestDuration, TestLatch } import akka.testkit.TestEvent.Mute +import java.util.concurrent.ConcurrentHashMap object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) @@ -52,9 +53,8 @@ object SupervisorHierarchySpec { case class Children(refs: Vector[ActorRef]) case class Event(msg: Any) { val time: Long = System.nanoTime } case class ErrorLog(msg: String, log: Vector[Event]) - case class Failure(directive: Directive, log: Map[ActorRef, Vector[Event]], depth: Int, var failPre: Int, var failPost: Int) - extends RuntimeException with NoStackTrace { - override def toString = "Failure(" + directive + ", " + depth + ", " + failPre + ", " + failPost + ")" + case class Failure(directive: Directive, depth: Int, var failPre: Int, var failPost: Int) extends RuntimeException with NoStackTrace { + override def toString = productPrefix + productIterator.mkString("(", ",", ")") } case class Dump(level: Int) @@ -95,15 +95,22 @@ object SupervisorHierarchySpec { override def dispatcher(): MessageDispatcher = instance } + /* + * This stores structural data of the hierarchy which would otherwise be lost + * upon Restart or would have to be managed by the highest supervisor (which + * is undesirable). + */ + case class HierarchyState(log: Vector[Event], kids: Map[String, Int]) + val stateCache = new ConcurrentHashMap[ActorRef, HierarchyState]() + class Hierarchy(size: Int, breadth: Int, listener: ActorRef) extends Actor { var failed = false var suspended = false - var stopping = false def abort(msg: String) { listener ! ErrorLog(msg, log) - stopping = true + log = Vector(Event("log sent")) context stop self } @@ -116,58 +123,68 @@ object SupervisorHierarchySpec { def suspendCount = context.asInstanceOf[ActorCell].mailbox.status / 4 override def preStart { + log :+= Event("started") val s = size - 1 // subtract myself - if (s > 0) { - val kids = Random.nextInt(Math.min(breadth, s)) + 1 - val sizes = s / kids - var rest = s % kids - def kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes - for (_ ← 1 to kids) - context.watch(context.actorOf(Props(new Hierarchy(kidSize, breadth, listener)).withDispatcher("hierarchy"))) - } else context.parent ! Children(Vector(self)) + val kidInfo: Map[String, Int] = + if (s > 0) { + val kids = Random.nextInt(Math.min(breadth, s)) + 1 + val sizes = s / kids + var rest = s % kids + val propsTemplate = Props.empty.withDispatcher("hierarchy") + (1 to kids).map { (id) ⇒ + val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes + val props = propsTemplate.withCreator(new Hierarchy(kidSize, breadth, listener)) + val name = id.toString + context.watch(context.actorOf(props, name)) + (name, kidSize) + }(collection.breakOut) + } else { + context.parent ! Children(Vector(self)) + Map() + } + stateCache.put(self, HierarchyState(log, kidInfo)) } var preRestartCalled = false override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { // do not scrap children if (preRestartCalled) abort("preRestart called twice") - log :+= Event("preRestart") - preRestartCalled = true - cause match { - case f @ Failure(_, _, _, x, _) if x > 0 ⇒ f.failPre = x - 1; throw f - case _ ⇒ + else { + log :+= Event("preRestart") + stateCache.put(self, stateCache.get(self).copy(log = log)) + preRestartCalled = true + cause match { + case f @ Failure(_, _, x, _) if x > 0 ⇒ f.failPre = x - 1; throw f + case _ ⇒ + } } } override val supervisorStrategy = OneForOneStrategy() { - case Failure(directive, _, 0, _, _) ⇒ + case Failure(directive, 0, _, _) ⇒ log :+= Event("applying " + directive + " to " + sender) directive case OriginalRestartException(f: Failure) ⇒ log :+= Event("re-applying " + f.directive + " to " + sender) f.directive - case f @ Failure(directive, logs, x, _, _) ⇒ + case f @ Failure(directive, x, _, _) ⇒ import SupervisorStrategy._ setFlags(directive) log :+= Event("escalating " + f) - throw f.copy(log = logs + (self -> log), depth = x - 1) + throw f.copy(depth = x - 1) } override def postRestart(cause: Throwable) { - cause match { - case f: Failure ⇒ log = f.log get self getOrElse Vector(Event("log lost")) - case OriginalRestartException(f: Failure) ⇒ log = f.log get self getOrElse Vector(Event("log lost")) - case _ ⇒ - } + log = stateCache.get(self).log log :+= Event("restarted " + suspendCount) cause match { - case f @ Failure(_, _, _, _, x) if x > 0 ⇒ f.failPost = x - 1; throw f - case _ ⇒ + case f @ Failure(_, _, _, x) if x > 0 ⇒ f.failPost = x - 1; throw f + case _ ⇒ } } override def postStop { - if (failed || (suspended && !stopping)) { + if (failed || suspended) { listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log) } } @@ -191,7 +208,7 @@ object SupervisorHierarchySpec { def receive = new Receive { val handler: Receive = { - case f: Failure ⇒ setFlags(f.directive); throw f.copy(log = Map(self -> log)) + case f: Failure ⇒ setFlags(f.directive); throw f case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" case Dump(0) ⇒ context stop self case Dump(level) ⇒ context.children foreach (_ ! Dump(level - 1)) @@ -272,6 +289,8 @@ object SupervisorHierarchySpec { * they are indeed GCed * * TODO RK: also test recreate including terminating children + * + * TODO RK: test exceptions in constructor */ class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Int] { @@ -320,7 +339,7 @@ object SupervisorHierarchySpec { when(Idle) { case Event(Init, _) ⇒ - hierarchy = context.watch(context.actorOf(Props(new Hierarchy(size = 500, breadth = breadth, self)).withDispatcher("hierarchy"))) + hierarchy = context.watch(context.actorOf(Props(new Hierarchy(size = 500, breadth = breadth, self)).withDispatcher("hierarchy"), "head")) setTimer("phase", StateTimeout, 5 seconds, false) goto(Init) } @@ -344,7 +363,7 @@ object SupervisorHierarchySpec { case Init -> Stress ⇒ self ! Work // set timeout for completion of the whole test (i.e. including Finishing and Stopping) - setTimer("phase", StateTimeout, 30.seconds.dilated, false) + setTimer("phase", StateTimeout, 50.seconds.dilated, false) } val workSchedule = 250.millis @@ -365,7 +384,7 @@ object SupervisorHierarchySpec { case Event(Work, x) if x > 0 ⇒ nextJob.next match { case Ping(ref) ⇒ ref ! "ping" - case Fail(ref, dir) ⇒ ref ! Failure(dir, Map.empty, random012, random012, random012) + case Fail(ref, dir) ⇒ ref ! Failure(dir, random012, random012, random012) } if (idleChildren.nonEmpty) self ! Work else context.system.scheduler.scheduleOnce(workSchedule, self, Work)