diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index f9e9ab42b1..33387443bb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -4,10 +4,16 @@ package akka.actor -import akka.testkit._ +import java.lang.Exception import java.util.concurrent.{ TimeUnit, CountDownLatch } -import akka.dispatch.Await +import scala.util.Random +import scala.util.control.NoStackTrace +import com.typesafe.config.{ ConfigFactory, Config } +import SupervisorStrategy.{ Resume, Restart, Directive } +import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher, Await } import akka.pattern.ask +import akka.testkit._ +import akka.testkit.TestEvent.Mute import akka.util.Duration import akka.util.duration._ @@ -38,10 +44,350 @@ object SupervisorHierarchySpec { case "ping" ⇒ sender ! "pong" } } + + case class Event(msg: Any) { val time: Long = System.nanoTime } + case class ErrorLog(msg: String, log: Vector[Event]) + case class Failure(directive: Directive, log: Vector[Event]) extends RuntimeException with NoStackTrace { + override def toString = "Failure(" + directive + ")" + } + val strategy = OneForOneStrategy() { case Failure(directive, _) ⇒ directive } + + val config = ConfigFactory.parseString(""" + hierarchy { + type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator" + } + akka.loglevel = INFO + akka.actor.debug.fsm = on + """) + + class MyDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends DispatcherConfigurator(config, prerequisites) { + + private val instance: MessageDispatcher = + new Dispatcher(prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { + + override def suspend(cell: ActorCell): Unit = { + val a = cell.actor.asInstanceOf[Hierarchy] + a.log :+= Event("suspended") + super.suspend(cell) + } + + override def resume(cell: ActorCell): Unit = { + val a = cell.actor.asInstanceOf[Hierarchy] + a.log :+= Event("resumed") + super.resume(cell) + } + + } + + override def dispatcher(): MessageDispatcher = instance + } + + class Hierarchy(depth: Int, breadth: Int, listener: ActorRef) extends Actor { + + override def preStart { + if (depth > 1) + for (_ ← 1 to breadth) + context.watch(context.actorOf(Props(new Hierarchy(depth - 1, breadth, listener)).withDispatcher("hierarchy"))) + listener ! self + } + override def postRestart(cause: Throwable) { + cause match { + case Failure(_, l) ⇒ log = l + } + log :+= Event("restarted") + } + + override def supervisorStrategy = strategy + override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { + // do not scrap children + } + + override def postStop { + if (failed || suspended) { + listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log) + } + } + + var failed = false + var suspended = false + var log = Vector.empty[Event] + def check(msg: Any) = { + suspended = false + log :+= Event(msg) + if (failed) { + listener ! ErrorLog("processing message while failed", log) + failed = false + context stop self + } + } + + def receive = new Receive { + val handler: Receive = { + case f @ Failure(Resume, _) ⇒ suspended = true; throw f.copy(log = log) + case f: Failure ⇒ failed = true; throw f.copy(log = log) + case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" + case Terminated(_) ⇒ listener ! ErrorLog("terminating", log); context stop self + } + override def isDefinedAt(msg: Any) = handler.isDefinedAt(msg) + override def apply(msg: Any) = { check(msg); handler(msg) } + } + } + + case class Work(n: Int) + + sealed trait Action + case class Ping(ref: ActorRef) extends Action + case class Fail(ref: ActorRef, directive: Directive) extends Action + + sealed trait State + case object Idle extends State + case object Init extends State + case object Stress extends State + case object Finishing extends State + case object LastPing extends State + case object Stopping extends State + case object Failed extends State + + /* + * This stress test will construct a supervision hierarchy of configurable + * depth and breadth and then randomly fail and check its actors. The actors + * perform certain checks internally (verifying that they do not run when + * suspended, for example), and they are checked for health by the test + * procedure. + * + * Execution happens in phases (which is the reason for FSM): + * + * Idle: + * - upon reception of Init message, construct hierary and go to Init state + * + * Init: + * - receive refs of all contained actors + * + * Stress: + * - deal out actions (Fail or "ping"), keeping the hierarchy busy + * - whenever all actors are in the "pinged" list (i.e. have not yet + * answered with a "pong"), delay processing of the next Work() by + * 100 millis + * - when receiving a Work() while all actors are "pinged", stop the + * hierarchy and go to the Stopping state + * + * Finishing: + * - after dealing out the last action, wait for the outstanding "pong" + * messages + * - when last "pong" is received, goto LastPing state + * - upon state timeout, stop the hierarchy and go to the Failed state + * + * LastPing: + * - upon entering this state, send a "ping" to all actors + * - when last "pong" is received, goto Stopping state + * - upon state timeout, stop the hierarchy and go to the Failed state + * + * Stopping: + * - upon entering this state, stop the hierarchy + * - upon termination of the hierarchy send back successful result + * + * Whenever an ErrorLog is received, goto Failed state + * + * Failed: + * - accumulate ErrorLog messages + * - upon termination of the hierarchy send back failed result and print + * the logs, merged and in chronological order. + * + * TODO RK: also test Stop directive, and keep a complete list of all + * actors ever created, then verify after stop()ping the hierarchy that + * all are terminated, transfer them to a WeakHashMap and verify that + * they are indeed GCed + * + * TODO RK: make hierarchy construction stochastic so that it includes + * different breadth (including the degenerate breadth-1 case). + * + * TODO RK: also test Escalate by adding an exception with a `var depth` + * which gets decremented within the supervisor and gets handled when zero + * is reached (Restart resolution) + * + * TODO RK: also test exceptions during recreate + * + * TODO RK: also test recreate including terminating children + */ + + class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] { + import context.system + + override def supervisorStrategy = strategy + + var children = Vector.empty[ActorRef] + var idleChildren = Vector.empty[ActorRef] + var pingChildren = Set.empty[ActorRef] + + val nextJob = Iterator.continually(Random.nextFloat match { + case x if x > 0.5 ⇒ + // ping one child + val pick = ((x - 0.5) * 2 * idleChildren.size).toInt + val ref = idleChildren(pick) + idleChildren = idleChildren.take(pick) ++ idleChildren.drop(pick + 1) + pingChildren += ref + Ping(ref) + case x ⇒ + // fail one child + val pick = ((if (x > 0.25) x - 0.25 else x) * 4 * children.size).toInt + Fail(children(pick), if (x > 0.25) Restart else Resume) + }) + + val familySize = ((1 - BigInt(breadth).pow(depth)) / (1 - breadth)).toInt + var hierarchy: ActorRef = _ + + override def preRestart(cause: Throwable, msg: Option[Any]) { + throw new ActorKilledException("I want to DIE") + } + + override def postRestart(cause: Throwable) { + throw new ActorKilledException("I said I wanted to DIE, dammit!") + } + + override def postStop { + testActor ! "stressTestStopped" + } + + startWith(Idle, null) + + when(Idle) { + case Event(Init, _) ⇒ + hierarchy = context.watch(context.actorOf(Props(new Hierarchy(depth, breadth, self)).withDispatcher("hierarchy"))) + setTimer("phase", StateTimeout, 5 seconds, false) + goto(Init) + } + + when(Init) { + case Event(ref: ActorRef, _) ⇒ + if (idleChildren.nonEmpty || pingChildren.nonEmpty) + throw new IllegalStateException("received unexpected child " + children.size) + children :+= ref + if (children.size == familySize) { + idleChildren = children + goto(Stress) + } else stay + case Event(StateTimeout, _) ⇒ + testActor ! "only got %d out of %d refs".format(children.size, familySize) + stop() + } + + onTransition { + case Init -> Stress ⇒ + self ! Work(familySize * 1000) + // set timeout for completion of the whole test (i.e. including Finishing and Stopping) + setTimer("phase", StateTimeout, 60 seconds, false) + } + + val workSchedule = 250.millis + + when(Stress) { + case Event(w: Work, _) if idleChildren.isEmpty ⇒ + context stop hierarchy + goto(Failed) + case Event(Work(x), _) if x > 0 ⇒ + nextJob.next match { + case Ping(ref) ⇒ ref ! "ping" + case Fail(ref, dir) ⇒ ref ! Failure(dir, Vector.empty) + } + if (idleChildren.nonEmpty) self ! Work(x - 1) + else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1)) + stay + case Event(Work(_), _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing) + case Event("pong", _) ⇒ + pingChildren -= sender + idleChildren :+= sender + stay + } + + when(Finishing) { + case Event("pong", _) ⇒ + pingChildren -= sender + idleChildren :+= sender + if (pingChildren.isEmpty) goto(LastPing) else stay + } + + onTransition { + case _ -> LastPing ⇒ + idleChildren foreach (_ ! "ping") + pingChildren ++= idleChildren + idleChildren = Vector.empty + } + + when(LastPing) { + case Event("pong", _) ⇒ + pingChildren -= sender + idleChildren :+= sender + if (pingChildren.isEmpty) goto(Stopping) else stay + } + + onTransition { + case _ -> Stopping ⇒ context stop hierarchy + } + + when(Stopping, stateTimeout = 5 seconds) { + case Event(Terminated(r), _) if r == hierarchy ⇒ + testActor ! "stressTestSuccessful" + stop + case Event(StateTimeout, _) ⇒ + testActor ! "timeout in Stopping" + stop + } + + var errors = Vector.empty[(ActorRef, ErrorLog)] + + when(Failed, stateTimeout = 5 seconds) { + case Event(e: ErrorLog, _) ⇒ + errors :+= sender -> e + stay + case Event(Terminated(r), _) if r == hierarchy ⇒ + printErrors() + testActor ! "stressTestFailed" + stop + case Event(StateTimeout, _) ⇒ + printErrors() + testActor ! "timeout in Failed" + stop + case Event("pong", _) ⇒ stay // don’t care? + } + + def printErrors(): Unit = { + val merged = errors flatMap { + case (ref, ErrorLog(msg, log)) ⇒ + println(ref + " " + msg) + log map (l ⇒ (l.time, ref, l.msg.toString)) + } + merged.sorted foreach println + } + + whenUnhandled { + case Event(e: ErrorLog, _) ⇒ + errors :+= sender -> e + // don’t stop the hierarchy, that is going to happen all by itself and in the right order + goto(Failed) + case Event(StateTimeout, _) ⇒ + println("pingChildren:\n" + pingChildren.mkString("\n")) + context stop hierarchy + goto(Failed) + case Event(msg, _) ⇒ + testActor ! ("received unexpected msg: " + msg) + stop + } + + initialize + + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout with ImplicitSender { +class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) with DefaultTimeout with ImplicitSender { import SupervisorHierarchySpec._ "A Supervisor Hierarchy" must { @@ -132,6 +478,25 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout with Implicit latch.countDown() expectMsg("pong") } + + "survive being stressed" in { + system.eventStream.publish(Mute(EventFilter[Failure]())) + system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter"))) + + val fsm = system.actorOf(Props(new StressTest(testActor, 6, 3)), "stressTest") + + fsm ! FSM.SubscribeTransitionCallBack(system.actorOf(Props(new Actor { + def receive = { + case s: FSM.CurrentState[_] ⇒ log.info("{}", s) + case t: FSM.Transition[_] ⇒ log.info("{}", t) + } + }))) + + fsm ! Init + + expectMsg(70 seconds, "stressTestSuccessful") + expectMsg("stressTestStopped") + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 3db5b5b5dc..ba0314714e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -349,7 +349,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende throw e } }) - val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration) + supervisor ! dyingProps + val dyingActor = expectMsgType[ActorRef] filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { @@ -358,7 +359,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } } - Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage + dyingActor ! Ping + expectMsg(PongMessage) inits.get must be(3) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c5d2afb3fa..7931e5428e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -682,6 +682,11 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) + // now we may just have suspended the poor guy which made us fail by way of Escalate, so adjust the score + currentMessage match { + case Envelope(Failed(`t`), child) ⇒ child.asInstanceOf[InternalActorRef].resume() + case _ ⇒ + } } finally { t match { // Wrap InterruptedExceptions and rethrow case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 25fc0250af..d66b16cc27 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -23,12 +23,16 @@ private[akka] object Mailbox { * the following assigned numbers CANNOT be changed without looking at the code which uses them! */ - // primary status: only first three + // primary status final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant - final val Suspended = 1 // Deliberately without type ascription to make it a compile-time constant - final val Closed = 2 // Deliberately without type ascription to make it a compile-time constant + final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant // secondary status: Scheduled bit may be added to Open/Suspended - final val Scheduled = 4 // Deliberately without type ascription to make it a compile-time constant + final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant + // shifted by 2: the suspend count! + final val shouldScheduleMask = 3 + final val shouldProcessMask = ~2 + final val suspendMask = ~3 + final val suspendUnit = 4 // mailbox debugging helper using println (see below) // since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1) @@ -78,10 +82,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline - final def shouldProcessMessage: Boolean = (status & 3) == Open + final def shouldProcessMessage: Boolean = (status & shouldProcessMask) == 0 @inline - final def isSuspended: Boolean = (status & 3) == Suspended + final def isSuspended: Boolean = (status & suspendMask) != 0 @inline final def isClosed: Boolean = status == Closed @@ -100,21 +104,30 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes /** * set new primary status Open. Caller does not need to worry about whether * status was Scheduled or not. + * + * @returns true if the suspend count reached zero */ @tailrec final def becomeOpen(): Boolean = status match { case Closed ⇒ setStatus(Closed); false - case s ⇒ updateStatus(s, Open | s & Scheduled) || becomeOpen() + case s ⇒ + val next = if (s < suspendUnit) s else s - suspendUnit + if (updateStatus(s, next)) next < suspendUnit + else becomeOpen() } /** * set new primary status Suspended. Caller does not need to worry about whether * status was Scheduled or not. + * + * @returns true if the previous suspend count was zero */ @tailrec final def becomeSuspended(): Boolean = status match { case Closed ⇒ setStatus(Closed); false - case s ⇒ updateStatus(s, Suspended | s & Scheduled) || becomeSuspended() + case s ⇒ + if (updateStatus(s, s + suspendUnit)) s < suspendUnit + else becomeSuspended() } /** @@ -135,11 +148,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes val s = status /* * only try to add Scheduled bit if pure Open/Suspended, not Closed or with - * Scheduled bit already set (this is one of the reasons why the numbers - * cannot be changed in object Mailbox above) + * Scheduled bit already set */ - if (s <= Suspended) updateStatus(s, s | Scheduled) || setAsScheduled() - else false + if ((s & shouldScheduleMask) != Open) false + else updateStatus(s, s | Scheduled) || setAsScheduled() } /** @@ -148,12 +160,6 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes @tailrec final def setAsIdle(): Boolean = { val s = status - /* - * only try to remove Scheduled bit if currently Scheduled, not Closed or - * without Scheduled bit set (this is one of the reasons why the numbers - * cannot be changed in object Mailbox above) - */ - updateStatus(s, s & ~Scheduled) || setAsIdle() }