diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 1fa9a1ff03..049a5891e2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -147,7 +147,7 @@ object FSMTimingSpec { } def resume(actorRef: ActorRef): Unit = actorRef match { - case l: ActorRefWithCell ⇒ l.resume(inResponseToFailure = false) + case l: ActorRefWithCell ⇒ l.resume(inResponseToFailure = null) case _ ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index ce35fd8c82..d455104378 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -22,6 +22,7 @@ import akka.testkit.{ filterException, duration2TestDuration, TestLatch } import akka.testkit.TestEvent.Mute import java.util.concurrent.ConcurrentHashMap import java.lang.ref.WeakReference +import akka.event.Logging object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) @@ -54,9 +55,11 @@ object SupervisorHierarchySpec { case class Ready(ref: ActorRef) case class Died(ref: ActorRef) case object Abort + case object PingOfDeath + case object PongOfDeath case class Event(msg: Any) { val time: Long = System.nanoTime } case class ErrorLog(msg: String, log: Vector[Event]) - case class Failure(directive: Directive, stop: Boolean, depth: Int, var failPre: Int, var failPost: Int, stopKids: Int) + case class Failure(directive: Directive, stop: Boolean, depth: Int, var failPre: Int, var failPost: Int, val failConstr: Int, stopKids: Int) extends RuntimeException with NoStackTrace { override def toString = productPrefix + productIterator.mkString("(", ",", ")") } @@ -83,15 +86,19 @@ object SupervisorHierarchySpec { Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { override def suspend(cell: ActorCell): Unit = { - val a = cell.actor.asInstanceOf[Hierarchy] - a.log :+= Event("suspended " + cell.mailbox.status / 4) + cell.actor match { + case h: Hierarchy ⇒ h.log :+= Event("suspended " + cell.mailbox.suspendCount) + case _ ⇒ + } super.suspend(cell) } override def resume(cell: ActorCell): Unit = { - val a = cell.actor.asInstanceOf[Hierarchy] super.resume(cell) - a.log :+= Event("resumed " + cell.mailbox.status / 4) + cell.actor match { + case h: Hierarchy ⇒ h.log :+= Event("resumed " + cell.mailbox.suspendCount) + case _ ⇒ + } } } @@ -104,11 +111,20 @@ object SupervisorHierarchySpec { * upon Restart or would have to be managed by the highest supervisor (which * is undesirable). */ - case class HierarchyState(log: Vector[Event], kids: Map[ActorRef, Int]) + case class HierarchyState(log: Vector[Event], kids: Map[ActorRef, Int], failConstr: Failure) val stateCache = new ConcurrentHashMap[ActorRef, HierarchyState]() class Hierarchy(size: Int, breadth: Int, listener: ActorRef, myLevel: Int) extends Actor { + var log = Vector.empty[Event] + + stateCache.get(self) match { + case hs @ HierarchyState(_, _, f: Failure) if f.failConstr > 0 ⇒ + stateCache.put(self, hs.copy(failConstr = f.copy(failConstr = f.failConstr - 1))) + throw f + case _ ⇒ + } + var failed = false var suspended = false @@ -125,7 +141,7 @@ object SupervisorHierarchySpec { case _ ⇒ } - def suspendCount = context.asInstanceOf[ActorCell].mailbox.status / 4 + def suspendCount = context.asInstanceOf[ActorCell].mailbox.suspendCount override def preStart { log :+= Event("started") @@ -143,7 +159,7 @@ object SupervisorHierarchySpec { (context.watch(context.actorOf(props, id.toString)), kidSize) }(collection.breakOut) } else Map() - stateCache.put(self, HierarchyState(log, kidInfo)) + stateCache.put(self, HierarchyState(log, kidInfo, null)) } var preRestartCalled = false @@ -170,20 +186,31 @@ object SupervisorHierarchySpec { } } - override val supervisorStrategy = OneForOneStrategy() { - case Failure(directive, stop, 0, _, failPost, _) ⇒ - log :+= Event("applying (" + directive + ", " + stop + ", " + failPost + ") to " + sender) - if (myLevel > 3 && failPost == 0 && stop) Stop else directive - case PostRestartException(_, Failure(directive, stop, 0, _, failPost, _), _) ⇒ - log :+= Event("re-applying (" + directive + ", " + stop + ", " + failPost + ") to " + sender) - if (myLevel > 3 && failPost == 0 && stop) Stop else directive - case f: Failure ⇒ - import SupervisorStrategy._ - setFlags(f.directive) - log :+= Event("escalating " + f) - throw f.copy(depth = f.depth - 1) - case _ ⇒ sender ! Dump(0); Resume + val unwrap: PartialFunction[Throwable, (Throwable, Throwable)] = { + case x @ PostRestartException(_, f: Failure, _) ⇒ (f, x) + case x @ ActorInitializationException(_, _, f: Failure) ⇒ (f, x) + case x ⇒ (x, x) } + override val supervisorStrategy = OneForOneStrategy()(unwrap andThen { + case _ if pongsToGo > 0 ⇒ Resume + case (f: Failure, orig) ⇒ + if (f.depth > 0) { + setFlags(f.directive) + log :+= Event("escalating " + f) + throw f.copy(depth = f.depth - 1) + } + val prefix = orig match { + case f: Failure ⇒ "applying " + case _ ⇒ "re-applying " + } + log :+= Event(prefix + f + " to " + sender) + if (myLevel > 3 && f.failPost == 0 && f.stop) Stop else f.directive + case (_, x) ⇒ + log :+= Event("unhandled exception" + Logging.stackTraceFor(x)) + sender ! Dump(0) + context.system.scheduler.scheduleOnce(1 second, self, Dump(0)) + Resume + }) override def postRestart(cause: Throwable) { val state = stateCache.get(self) @@ -194,7 +221,7 @@ object SupervisorHierarchySpec { val name = child.path.name if (context.actorFor(name).isTerminated) { listener ! Died(child) - val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1)) + val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1)).withDispatcher("hierarchy") context.watch(context.actorOf(props, name)) } } @@ -208,10 +235,11 @@ object SupervisorHierarchySpec { override def postStop { if (failed || suspended) { listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log) + } else { + stateCache.put(self, HierarchyState(log, Map(), null)) } } - var log = Vector.empty[Event] def check(msg: Any): Boolean = { suspended = false log :+= Event(msg) @@ -222,12 +250,21 @@ object SupervisorHierarchySpec { } else if (context.asInstanceOf[ActorCell].mailbox.isSuspended) { abort("processing message while suspended") false + } else if (!Thread.currentThread.getName.startsWith("SupervisorHierarchySpec-hierarchy")) { + abort("running on wrong thread " + Thread.currentThread + " dispatcher=" + context.props.dispatcher + "=>" + + context.asInstanceOf[ActorCell].dispatcher.id) + false } else true } + var pongsToGo = 0 + def receive = new Receive { val handler: Receive = { - case f: Failure ⇒ setFlags(f.directive); throw f + case f: Failure ⇒ + setFlags(f.directive) + stateCache.put(self, stateCache.get(self).copy(failConstr = f.copy())) + throw f case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" case Dump(0) ⇒ abort("dump") case Dump(level) ⇒ context.children foreach (_ ! Dump(level - 1)) @@ -237,13 +274,27 @@ object SupervisorHierarchySpec { * (if the unwatch() came too late), so just ignore in this case. */ val name = ref.path.name - if (context.actorFor(name).isTerminated) { + if (pongsToGo == 0 && context.actorFor(name).isTerminated) { listener ! Died(ref) val kids = stateCache.get(self).kids(ref) - val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)) + val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy") context.watch(context.actorOf(props, name)) } case Abort ⇒ abort("terminating") + case PingOfDeath ⇒ + if (size > 1) { + pongsToGo = context.children.size + context.children foreach (_ ! PingOfDeath) + } else { + context stop self + context.parent ! PongOfDeath + } + case PongOfDeath ⇒ + pongsToGo -= 1 + if (pongsToGo == 0) { + context stop self + context.parent ! PongOfDeath + } } override def isDefinedAt(msg: Any) = handler.isDefinedAt(msg) override def apply(msg: Any) = { if (check(msg)) handler(msg) } @@ -289,6 +340,9 @@ object SupervisorHierarchySpec { * 100 millis * - when receiving a Work() while all actors are "pinged", stop the * hierarchy and go to the Stopping state + * - make sure to remove all actors which die in the course of the test + * from the pinged and idle sets (others will be spawned from within the + * hierarchy) * * Finishing: * - after dealing out the last action, wait for the outstanding "pong" @@ -320,8 +374,6 @@ object SupervisorHierarchySpec { * happen that the new guy sends his Ready before the FSM has gotten the * Terminated, which would screw things up big time. Solution is to let the * supervisor do all, including notifying the FSM of the death of the guy. - * - * TODO RK: test exceptions in constructor */ class StressTest(testActor: ActorRef, size: Int, breadth: Int) extends Actor with LoggingFSM[State, Int] { @@ -329,9 +381,10 @@ object SupervisorHierarchySpec { // don’t escalate from this one! override val supervisorStrategy = OneForOneStrategy() { - case f: Failure ⇒ f.directive - case OriginalRestartException(f: Failure) ⇒ f.directive - case _ ⇒ Stop + case f: Failure ⇒ f.directive + case OriginalRestartException(f: Failure) ⇒ f.directive + case ActorInitializationException(f: Failure) ⇒ f.directive + case _ ⇒ Stop } var children = Vector.empty[ActorRef] @@ -424,7 +477,7 @@ object SupervisorHierarchySpec { nextJob.next match { case Ping(ref) ⇒ ref ! "ping" case Fail(ref, dir) ⇒ - val f = Failure(dir, stop = random012 > 0, depth = random012, failPre = random012, failPost = random012, + val f = Failure(dir, stop = random012 > 0, depth = random012, failPre = random012, failPost = random012, failConstr = random012, stopKids = random012 match { case 0 ⇒ 0 case 1 ⇒ Random.nextInt(breadth / 2) @@ -482,10 +535,11 @@ object SupervisorHierarchySpec { onTransition { case _ -> Stopping ⇒ ignoreNotResumedLogs = false - context stop hierarchy + hierarchy ! PingOfDeath } when(Stopping, stateTimeout = 5 seconds) { + case Event(PongOfDeath, _) ⇒ stay case Event(Terminated(r), _) if r == hierarchy ⇒ val undead = children filterNot (_.isTerminated) if (undead.nonEmpty) { @@ -548,34 +602,50 @@ object SupervisorHierarchySpec { testActor ! "stressTestFailed" stop case Event(StateTimeout, _) ⇒ - getErrors() + getErrors(hierarchy, 2) printErrors() testActor ! "timeout in Failed" stop - case Event("pong", _) ⇒ stay // don’t care? - case Event(Work, _) ⇒ stay + case Event("pong", _) ⇒ stay // don’t care? + case Event(Work, _) ⇒ stay + case Event(Died(_), _) ⇒ stay } - def getErrors() = { - def rec(target: ActorRef, depth: Int): Unit = { - target match { - case l: LocalActorRef ⇒ - errors :+= target -> ErrorLog("forced", l.underlying.actor.asInstanceOf[Hierarchy].log) - if (depth > 0) { - l.underlying.children foreach (rec(_, depth - 1)) - } - } + def getErrors(target: ActorRef, depth: Int): Unit = { + target match { + case l: LocalActorRef ⇒ + l.underlying.actor match { + case h: Hierarchy ⇒ errors :+= target -> ErrorLog("forced", h.log) + case _ ⇒ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log) + } + if (depth > 0) { + l.underlying.children foreach (getErrors(_, depth - 1)) + } + } + } + + def getErrorsUp(target: ActorRef): Unit = { + target match { + case l: LocalActorRef ⇒ + l.underlying.actor match { + case h: Hierarchy ⇒ errors :+= target -> ErrorLog("forced", h.log) + case _ ⇒ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log) + } + if (target != hierarchy) getErrorsUp(l.getParent) } - rec(hierarchy, 2) } def printErrors(): Unit = { + errors collect { + case (origin, ErrorLog("dump", _)) ⇒ getErrors(origin, 1) + case (origin, ErrorLog(msg, _)) if msg startsWith "not resumed" ⇒ getErrorsUp(origin) + } val merged = errors.sortBy(_._1.toString) flatMap { case (ref, ErrorLog(msg, log)) ⇒ println(ref + " " + msg) log map (l ⇒ (l.time, ref, l.msg.toString)) } - merged.sorted foreach println + merged.sorted.distinct foreach println } whenUnhandled { @@ -704,8 +774,12 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w } "survive being stressed" in { - system.eventStream.publish(Mute(EventFilter[Failure](), EventFilter[PreRestartException](), EventFilter[PostRestartException]())) - system.eventStream.publish(Mute(EventFilter.warning(start = "received dead "))) + system.eventStream.publish(Mute( + EventFilter[Failure](), + EventFilter[ActorInitializationException](), + EventFilter[NoSuchElementException]("head of empty list"), + EventFilter.error(start = "changing Resume into Restart"), + EventFilter.warning(start = "received dead "))) val fsm = system.actorOf(Props(new StressTest(testActor, size = 500, breadth = 6)), "stressTest") diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index b5d284d7af..52b3686aa1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -348,7 +348,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa assertNoCountDown(done, 1000, "Should not process messages while suspended") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) - a.resume(inResponseToFailure = false) + a.resume(inResponseToFailure = null) assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 1d0f3ec416..41ce7db15f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -65,7 +65,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit val msgs = (1 to 100).toList for (m ← msgs) actor ! m - actor.resume(inResponseToFailure = false) //Signal the actor to start treating it's message backlog + actor.resume(inResponseToFailure = null) //Signal the actor to start treating it's message backlog Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ea78bb705c..86f910ac08 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -153,7 +153,8 @@ object ActorInitializationException { * @param msg is the message which was optionally passed into preRestart() */ case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, origCause: Throwable, msg: Option[Any]) - extends ActorInitializationException(actor, "exception in preRestart(" + origCause.getClass + ", " + msg.map(_.getClass) + ")", cause) + extends ActorInitializationException(actor, + "exception in preRestart(" + (if (origCause == null) "null" else origCause.getClass) + ", " + msg.map(_.getClass) + ")", cause) /** * A PostRestartException is thrown when constructor or postRestart() method @@ -164,7 +165,8 @@ case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, * @param origCause is the exception which caused the restart in the first place */ case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, origCause: Throwable) - extends ActorInitializationException(actor, "exception post restart (" + origCause.getClass + ")", cause) + extends ActorInitializationException(actor, + "exception post restart (" + (if (origCause == null) "null" else origCause.getClass) + ")", cause) /** * This is an extractor for retrieving the original cause (i.e. the first diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index e7808b2cc9..09523fbab3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -188,7 +188,7 @@ private[akka] trait Cell { /** * Recursively resume this actor and all its children. */ - def resume(inResponseToFailure: Boolean): Unit + def resume(inResponseToFailure: Throwable): Unit /** * Restart this actor (will recursively restart or stop all children). */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 00a84f956a..e8cdf2955e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -191,7 +191,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe /* * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). */ - def resume(inResponseToFailure: Boolean): Unit + def resume(inResponseToFailure: Throwable): Unit def suspend(): Unit def restart(cause: Throwable): Unit def stop(): Unit @@ -288,7 +288,7 @@ private[akka] class LocalActorRef private[akka] ( /** * Resumes a suspended actor. */ - override def resume(inResponseToFailure: Boolean): Unit = actorCell.resume(inResponseToFailure) + override def resume(inResponseToFailure: Throwable): Unit = actorCell.resume(inResponseToFailure) /** * Shuts down the actor and its message queue @@ -388,7 +388,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody override def suspend(): Unit = () - override def resume(inResponseToFailure: Boolean): Unit = () + override def resume(inResponseToFailure: Throwable): Unit = () override def stop(): Unit = () override def isTerminated = false diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 7b1da320b6..64a4c2c7b1 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -253,7 +253,7 @@ abstract class SupervisorStrategy { def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10 directive match { - case Resume ⇒ resumeChild(child); true + case Resume ⇒ resumeChild(child, cause); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true case Stop ⇒ processFailure(context, false, child, cause, stats, children); true case Escalate ⇒ false @@ -265,7 +265,7 @@ abstract class SupervisorStrategy { * is not the currently failing child. Suspend/resume needs to be done in * matching pairs, otherwise actors will wake up too soon or never at all. */ - final def resumeChild(child: ActorRef): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = true) + final def resumeChild(child: ActorRef, cause: Throwable): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = cause) /** * Restart the given child, possibly suspending it first. diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index caad67503a..7977be5e5b 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -81,7 +81,7 @@ private[akka] class RepointableActorRef( def suspend(): Unit = underlying.suspend() - def resume(inResponseToFailure: Boolean): Unit = underlying.resume(inResponseToFailure) + def resume(inResponseToFailure: Throwable): Unit = underlying.resume(inResponseToFailure) def stop(): Unit = underlying.stop() @@ -171,7 +171,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep def system: ActorSystem = systemImpl def suspend(): Unit = { lock.lock(); try suspendCount += 1 finally lock.unlock() } - def resume(inResponseToFailure: Boolean): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } + def resume(inResponseToFailure: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } def restart(cause: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } def stop(): Unit = sendSystemMessage(Terminate()) def isTerminated: Boolean = false diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 202c5e4016..900bc60a0c 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -124,10 +124,10 @@ private[akka] trait Children { this: ActorCell ⇒ case _ ⇒ } - protected def resumeChildren(inResponseToFailure: Boolean, perp: ActorRef): Unit = + protected def resumeChildren(inResponseToFailure: Throwable, perp: ActorRef): Unit = childrenRefs.stats foreach { case ChildRestartStats(child: InternalActorRef, _, _) ⇒ - child.resume(inResponseToFailure = inResponseToFailure && perp == child) + child.resume(if (perp == child) inResponseToFailure else null) } def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) @@ -188,6 +188,8 @@ private[akka] trait Children { this: ActorCell ⇒ unreserveChild(name) throw e } + // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise + if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend() addChild(actor) actor } diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index 8c849366d8..a473e4df66 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -62,7 +62,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) + final def resume(inResponseToFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 9cdfdb2d37..feabe3104d 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -10,6 +10,7 @@ import akka.dispatch.{ Envelope, ChildTerminated } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal import akka.dispatch.SystemMessage +import akka.event.Logging private[akka] trait FaultHandling { this: ActorCell ⇒ @@ -66,7 +67,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) } else { // need to keep that suspend counter balanced - faultResume(inResponseToFailure = false) + faultResume(inResponseToFailure = null) } /** @@ -85,13 +86,19 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * @param inResponseToFailure signifies if it was our own failure which * prompted this action. */ - protected def faultResume(inResponseToFailure: Boolean): Unit = { - val perp = perpetrator - // done always to keep that suspend counter balanced - // must happen “atomically” - try resumeNonRecursive() - finally if (inResponseToFailure) clearFailed() - resumeChildren(inResponseToFailure, perp) + protected def faultResume(inResponseToFailure: Throwable): Unit = { + if ((actor == null || actor.context == null) && inResponseToFailure != null) { + system.eventStream.publish(Error(self.path.toString, clazz(actor), + "changing Resume into Restart after " + inResponseToFailure)) + faultRecreate(inResponseToFailure) + } else { + val perp = perpetrator + // done always to keep that suspend counter balanced + // must happen “atomically” + try resumeNonRecursive() + finally if (inResponseToFailure != null) clearFailed() + resumeChildren(inResponseToFailure, perp) + } } protected def terminate() { @@ -135,7 +142,8 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } } catch { case NonFatal(e) ⇒ - publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling")) + publish(Error(e, self.path.toString, clazz(actor), + "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) try children foreach stop finally finishTerminate() } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index fff56a3776..34047677aa 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -90,7 +90,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac /** * INTERNAL API */ -private[akka] case class Resume(inResponseToFailure: Boolean) extends SystemMessage // sent to self from ActorCell.resume +private[akka] case class Resume(inResponseToFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index f1f887068b..fd7057a963 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -107,6 +107,9 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) @inline final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0 + @inline + final def suspendCount: Int = status / suspendUnit + @inline final def isSuspended: Boolean = (status & suspendMask) != 0 diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index bd77c1d9ff..03ba363c89 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Resumes processing of `send` actions for the agent. */ - def resume(): Unit = updater.resume(inResponseToFailure = false) + def resume(): Unit = updater.resume(inResponseToFailure = null) /** * Closes the agents and makes it eligible for garbage collection. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index cff9dda274..910c57502a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -235,7 +235,7 @@ private[akka] class RemoteActorRef private[akka] ( def suspend(): Unit = sendSystemMessage(Suspend()) - def resume(inResponseToFailure: Boolean): Unit = sendSystemMessage(Resume(inResponseToFailure)) + def resume(inResponseToFailure: Throwable): Unit = sendSystemMessage(Resume(inResponseToFailure)) def stop(): Unit = sendSystemMessage(Terminate()) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index d25a156917..5028ab7dc9 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -171,7 +171,7 @@ class CallingThreadDispatcher( if (switched && !wasActive) { runQueue(mbox, queue) } - case m ⇒ m.systemEnqueue(actor.self, Resume(false)) + case m ⇒ m.systemEnqueue(actor.self, Resume(inResponseToFailure = null)) } }