diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index da26490a42..bdefe3e7b7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -67,18 +67,22 @@ class ActorDSLSpec extends AkkaSpec { "have a maximum queue size" in { val i = inbox() system.eventStream.subscribe(testActor, classOf[Warning]) - for (_ ← 1 to 1000) i.receiver ! 0 - expectNoMsg(1 second) - EventFilter.warning(start = "dropping message", occurrences = 1) intercept { + try { + for (_ ← 1 to 1000) i.receiver ! 0 + expectNoMsg(1 second) + EventFilter.warning(start = "dropping message", occurrences = 1) intercept { + i.receiver ! 42 + } + expectMsgType[Warning] i.receiver ! 42 - } - expectMsgType[Warning] - i.receiver ! 42 - expectNoMsg(1 second) - val gotit = for (_ ← 1 to 1000) yield i.receive() - gotit must be((1 to 1000) map (_ ⇒ 0)) - intercept[TimeoutException] { - i.receive(1 second) + expectNoMsg(1 second) + val gotit = for (_ ← 1 to 1000) yield i.receive() + gotit must be((1 to 1000) map (_ ⇒ 0)) + intercept[TimeoutException] { + i.receive(1 second) + } + } finally { + system.eventStream.unsubscribe(testActor, classOf[Warning]) } } @@ -188,8 +192,8 @@ class ActorDSLSpec extends AkkaSpec { } }) a ! testActor - EventFilter[Exception](occurrences = 1) intercept { - a ! new Exception + EventFilter.warning("hi", occurrences = 1) intercept { + a ! new Exception("hi") } expectNoMsg(1 second) EventFilter[Exception]("hello", occurrences = 1) intercept { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 095305dce4..7200c4cd7e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -233,8 +233,8 @@ object SupervisorHierarchySpec { abort("invariant violated: " + state.kids.size + " != " + context.children.size) } cause match { - case f: Failure if f.failPost > 0 ⇒ f.failPost -= 1; throw f - case PostRestartException(`self`, f: Failure, _) if f.failPost > 0 ⇒ f.failPost -= 1; throw f + case f: Failure if f.failPost > 0 ⇒ { f.failPost -= 1; throw f } + case PostRestartException(`self`, f: Failure, _) if f.failPost > 0 ⇒ { f.failPost -= 1; throw f } case _ ⇒ } } @@ -272,12 +272,12 @@ object SupervisorHierarchySpec { setFlags(f.directive) stateCache.put(self, stateCache.get(self).copy(failConstr = f.copy())) throw f - case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" + case "ping" ⇒ { Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" } case Dump(0) ⇒ abort("dump") case Dump(level) ⇒ context.children foreach (_ ! Dump(level - 1)) case Terminated(ref) ⇒ /* - * It might be that we acted upon this death already in postRestart + * It might be that we acted upon this death already in postRestart * (if the unwatch() came too late), so just ignore in this case. */ val name = ref.path.name @@ -332,17 +332,17 @@ object SupervisorHierarchySpec { * This stress test will construct a supervision hierarchy of configurable * depth and breadth and then randomly fail and check its actors. The actors * perform certain checks internally (verifying that they do not run when - * suspended, for example), and they are checked for health by the test + * suspended, for example), and they are checked for health by the test * procedure. - * + * * Execution happens in phases (which is the reason for FSM): - * + * * Idle: * - upon reception of Init message, construct hierary and go to Init state - * + * * Init: * - receive refs of all contained actors - * + * * Stress: * - deal out actions (Fail or "ping"), keeping the hierarchy busy * - whenever all actors are in the "pinged" list (i.e. have not yet @@ -353,29 +353,29 @@ object SupervisorHierarchySpec { * - make sure to remove all actors which die in the course of the test * from the pinged and idle sets (others will be spawned from within the * hierarchy) - * + * * Finishing: * - after dealing out the last action, wait for the outstanding "pong" * messages * - when last "pong" is received, goto LastPing state * - upon state timeout, stop the hierarchy and go to the Failed state - * + * * LastPing: * - upon entering this state, send a "ping" to all actors * - when last "pong" is received, goto Stopping state * - upon state timeout, stop the hierarchy and go to the Failed state - * + * * Stopping: * - upon entering this state, stop the hierarchy * - upon termination of the hierarchy send back successful result - * + * * Whenever an ErrorLog is received, goto Failed state - * + * * Failed: * - accumulate ErrorLog messages * - upon termination of the hierarchy send back failed result and print * the logs, merged and in chronological order. - * + * * Remark about test failures which lead to stopping: * The FSM needs to know not the send more things to the dead guy, but it * also must not watch all targets, because the dead guy’s supervisor also @@ -558,10 +558,10 @@ object SupervisorHierarchySpec { stop } else if (false) { /* - * This part of the test is normally disabled, because it does not + * This part of the test is normally disabled, because it does not * work reliably: even though I found only these weak references * using YourKit just now, GC wouldn’t collect them and the test - * failed. I’m leaving this code in so that manual inspection remains + * failed. I’m leaving this code in so that manual inspection remains * an option (by setting the above condition to “true”). */ val weak = children map (new WeakReference(_)) @@ -756,7 +756,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w val worker = expectMsgType[ActorRef] worker ! "ping" expectMsg("pong") - EventFilter[Exception]("expected", occurrences = 1) intercept { + EventFilter.warning("expected", occurrences = 1) intercept { middle ! "fail" } middle ! "ping" @@ -781,13 +781,13 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w val worker = expectMsgType[ActorRef] worker ! "ping" expectMsg("pong") - EventFilter[Exception]("expected", occurrences = 1) intercept { + EventFilter.warning("expected", occurrences = 1) intercept { boss ! "fail" + awaitCond(worker.asInstanceOf[LocalActorRef].underlying.mailbox.isSuspended) + worker ! "ping" + expectNoMsg(2 seconds) + latch.countDown() } - awaitCond(worker.asInstanceOf[LocalActorRef].underlying.mailbox.isSuspended) - worker ! "ping" - expectNoMsg(2 seconds) - latch.countDown() expectMsg("pong") } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 7b7c36e0a5..10576187d2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -171,7 +171,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende override def preStart() { preStarts += 1; testActor ! ("preStart" + preStarts) } override def postStop() { postStops += 1; testActor ! ("postStop" + postStops) } def receive = { - case "crash" ⇒ testActor ! "crashed"; throw new RuntimeException("Expected") + case "crash" ⇒ { testActor ! "crashed"; throw new RuntimeException("Expected") } case "ping" ⇒ sender ! "pong" } } @@ -385,7 +385,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val child = context.watch(context.actorOf(Props(new Actor { override def postRestart(reason: Throwable): Unit = testActor ! "child restarted" def receive = { - case l: TestLatch ⇒ Await.ready(l, 5 seconds); throw new IllegalStateException("OHNOES") + case l: TestLatch ⇒ { Await.ready(l, 5 seconds); throw new IllegalStateException("OHNOES") } case "test" ⇒ sender ! "child green" } }), "child")) @@ -403,7 +403,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val latch = TestLatch() parent ! latch parent ! "testchild" - EventFilter[IllegalStateException]("OHNOES", occurrences = 2) intercept { + EventFilter[IllegalStateException]("OHNOES", occurrences = 1) intercept { latch.countDown() } expectMsg("parent restarted") diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index d61fd1496b..5fb2092d86 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -192,9 +192,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd EventFilter[ActorKilledException](occurrences = 1) intercept { actor ! Kill val set = receiveWhile(messages = 3) { - case Logging.Error(_: ActorKilledException, `aname`, `aclass`, "Kill") ⇒ 1 - case Logging.Debug(`aname`, `aclass`, "restarting") ⇒ 2 - case Logging.Debug(`aname`, `aclass`, "restarted") ⇒ 3 + case Logging.Error(_: ActorKilledException, `aname`, _, "Kill") ⇒ 1 + case Logging.Debug(`aname`, `aclass`, "restarting") ⇒ 2 + case Logging.Debug(`aname`, `aclass`, "restarted") ⇒ 3 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 23445ae1a0..f2b913d962 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -169,14 +169,14 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with RoundRobinRouter(1, supervisorStrategy = escalator))) //#supervision router ! CurrentRoutees - EventFilter[ActorKilledException](occurrences = 2) intercept { + EventFilter[ActorKilledException](occurrences = 1) intercept { expectMsgType[RouterRoutees].routees.head ! Kill } expectMsgType[ActorKilledException] val router2 = system.actorOf(Props.empty.withRouter(RoundRobinRouter(1).withSupervisorStrategy(escalator))) router2 ! CurrentRoutees - EventFilter[ActorKilledException](occurrences = 2) intercept { + EventFilter[ActorKilledException](occurrences = 1) intercept { expectMsgType[RouterRoutees].routees.head ! Kill } expectMsgType[ActorKilledException] @@ -194,7 +194,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with override def postRestart(reason: Throwable): Unit = testActor ! "restarted" }).withRouter(RoundRobinRouter(3)) val router = expectMsgType[ActorRef] - EventFilter[Exception]("die", occurrences = 2) intercept { + EventFilter[Exception]("die", occurrences = 1) intercept { router ! "die" } expectMsgType[Exception].getMessage must be("die") diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index a48d5c4c45..a1101f1936 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -383,7 +383,7 @@ private[akka] class ActorCell( case NoMessage ⇒ // only here to suppress warning } } catch handleNonFatalOrInterruptedException { e ⇒ - handleInvokeFailure(Nil, e, "error while processing " + message) + handleInvokeFailure(Nil, e) } if (todo != null) systemInvoke(todo) } @@ -398,7 +398,7 @@ private[akka] class ActorCell( } currentMessage = null // reset current message after successful invocation } catch handleNonFatalOrInterruptedException { e ⇒ - handleInvokeFailure(Nil, e, e.getMessage) + handleInvokeFailure(Nil, e) } finally { checkReceiveTimeout // Reschedule receive timeout } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 0249f8da9d..171290b4d1 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -4,13 +4,16 @@ package akka.actor import language.implicitConversions - import java.lang.{ Iterable ⇒ JIterable } import java.util.concurrent.TimeUnit import akka.japi.Util.immutableSeq import scala.collection.mutable.ArrayBuffer import scala.collection.immutable import scala.concurrent.duration.Duration +import akka.event.Logging.LogEvent +import akka.event.Logging.Error +import akka.event.Logging.Warning +import scala.util.control.NonFatal /** * INTERNAL API @@ -35,7 +38,7 @@ case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { case (Some(retries), _) if retries < 1 ⇒ false - case (Some(retries), None) ⇒ maxNrOfRetriesCount += 1; maxNrOfRetriesCount <= retries + case (Some(retries), None) ⇒ { maxNrOfRetriesCount += 1; maxNrOfRetriesCount <= retries } case (x, Some(window)) ⇒ retriesInWindowOkay(if (x.isDefined) x.get else 1, window) case (None, _) ⇒ true } @@ -273,18 +276,64 @@ abstract class SupervisorStrategy { * failure, which will lead to this actor re-throwing the exception which * caused the failure. The exception will not be wrapped. * + * This method calls [[akka.actor.SupervisorStrategy#logFailure]], which will + * log the failure unless it is escalated. You can customize the logging by + * setting [[akka.actor.SupervisorStrategy#loggingEnabled]] to `false` and + * do the logging inside the `decider` or override the `logFailure` method. + * * @param children is a lazy collection (a view) */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = decider.applyOrElse(cause, escalateDefault) directive match { - case Resume ⇒ resumeChild(child, cause); true - case Restart ⇒ processFailure(context, true, child, cause, stats, children); true - case Stop ⇒ processFailure(context, false, child, cause, stats, children); true - case Escalate ⇒ false + case d @ Resume ⇒ + logFailure(context, child, cause, d) + resumeChild(child, cause) + true + case d @ Restart ⇒ + logFailure(context, child, cause, d) + processFailure(context, true, child, cause, stats, children) + true + case d @ Stop ⇒ + logFailure(context, child, cause, d) + processFailure(context, false, child, cause, stats, children) + true + case d @ Escalate ⇒ + logFailure(context, child, cause, d) + false } } + /** + * Logging of actor failures is done when this is `true`. + */ + protected def loggingEnabled: Boolean = true + + /** + * Default logging of actor failures when + * [[akka.actor.SupervisorStrategy#loggingEnabled]] is `true`. + * `Escalate` failures are not logged here, since they are supposed + * to be handled at a level higher up in the hierarchy. + * `Resume` failures are logged at `Warning` level. + * `Stop` and `Restart` failures are logged at `Error` level. + */ + protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, decision: Directive): Unit = + if (loggingEnabled) { + val logMessage = cause match { + case e: ActorInitializationException ⇒ e.getCause.getMessage + case e ⇒ e.getMessage + } + decision match { + case Resume ⇒ publish(context, Warning(child.path.toString, getClass, logMessage)) + case Escalate ⇒ // don't log here + case _ ⇒ publish(context, Error(cause, child.path.toString, getClass, logMessage)) + } + } + + // logging is not the main purpose, and if it fails there’s nothing we can do + private def publish(context: ActorContext, logEvent: LogEvent): Unit = + try context.system.eventStream.publish(logEvent) catch { case NonFatal(_) ⇒ } + /** * Resume the previously failed child: do never apply this to a child which * is not the currently failing child. Suspend/resume needs to be done in @@ -319,12 +368,19 @@ abstract class SupervisorStrategy { * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window * @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. + * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled */ -case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) +case class AllForOneStrategy( + maxNrOfRetries: Int = -1, + withinTimeRange: Duration = Duration.Inf, + override val loggingEnabled: Boolean = true)(val decider: SupervisorStrategy.Decider) extends SupervisorStrategy { import SupervisorStrategy._ + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) = + this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider)) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) @@ -358,10 +414,17 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window * @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. + * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled */ -case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) +case class OneForOneStrategy( + maxNrOfRetries: Int = -1, + withinTimeRange: Duration = Duration.Inf, + override val loggingEnabled: Boolean = true)(val decider: SupervisorStrategy.Decider) extends SupervisorStrategy { + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) = + this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider)) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 053765296a..cfe2dbf9c7 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -24,11 +24,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ /* ================= * T H E R U L E S * ================= - * + * * Actors can be suspended for two reasons: * - they fail * - their supervisor gets suspended - * + * * In particular they are not suspended multiple times because of cascading * own failures, i.e. while currentlyFailed() they do not fail again. In case * of a restart, failures in constructor/preStart count as new failures. @@ -163,15 +163,14 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } } - final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable, message: String): Unit = { - publish(Error(t, self.path.toString, clazz(actor), message)) + final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = { // prevent any further messages to be processed until the actor has been restarted if (!isFailed) try { suspendNonRecursive() // suspend children val skip: Set[ActorRef] = currentMessage match { - case Envelope(Failed(_, _), child) ⇒ setFailed(child); Set(child) - case _ ⇒ setFailed(self); Set.empty + case Envelope(Failed(_, _), child) ⇒ { setFailed(child); Set(child) } + case _ ⇒ { setFailed(self); Set.empty } } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) t match { @@ -233,7 +232,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ }) } catch handleNonFatalOrInterruptedException { e ⇒ clearActorFields(actor) // in order to prevent preRestart() from happening again - handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage) + handleInvokeFailure(survivors, new PostRestartException(self, e, cause)) } } @@ -256,14 +255,15 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ final protected def handleChildTerminated(child: ActorRef): SystemMessage = { val status = removeChildAndGetStateChange(child) /* - * if this fails, we do nothing in case of terminating/restarting state, + * if this fails, we do nothing in case of terminating/restarting state, * otherwise tell the supervisor etc. (in that second case, the match * below will hit the empty default case, too) */ if (actor != null) { try actor.supervisorStrategy.handleChildTerminated(this, child, children) catch handleNonFatalOrInterruptedException { e ⇒ - handleInvokeFailure(Nil, e, "handleChildTerminated failed") + publish(Error(e, self.path.toString, clazz(actor), "handleChildTerminated failed")) + handleInvokeFailure(Nil, e) } } /* @@ -271,9 +271,9 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * then we are continuing the previously suspended recreate/create/terminate action */ status match { - case Some(c @ ChildrenContainer.Recreation(cause)) ⇒ finishRecreate(cause, actor); c.dequeueAll() - case Some(c @ ChildrenContainer.Creation()) ⇒ finishCreate(); c.dequeueAll() - case Some(ChildrenContainer.Termination) ⇒ finishTerminate(); null + case Some(c @ ChildrenContainer.Recreation(cause)) ⇒ { finishRecreate(cause, actor); c.dequeueAll() } + case Some(c @ ChildrenContainer.Creation()) ⇒ { finishCreate(); c.dequeueAll() } + case Some(ChildrenContainer.Termination) ⇒ { finishTerminate(); null } case _ ⇒ null } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 4d2c3e7b48..d84e722fed 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -29,7 +29,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC implicit def ec: ExecutionContext = system.dispatcher "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { - filterEvents(EventFilter[ActorActivationException](occurrences = 1)) { + filterEvents(EventFilter.warning(pattern = "failed to activate.*", occurrences = 1)) { val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")), "invalidActor") intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration) diff --git a/akka-docs/rst/java/fault-tolerance.rst b/akka-docs/rst/java/fault-tolerance.rst index 74b5a0cb5f..1c93bbdba3 100644 --- a/akka-docs/rst/java/fault-tolerance.rst +++ b/akka-docs/rst/java/fault-tolerance.rst @@ -52,6 +52,13 @@ restarts per minute. ``-1`` and ``Duration.Inf()`` means that the respective lim does not apply, leaving the possibility to specify an absolute upper limit on the restarts or to make the restarts work infinitely. +.. note:: + + If the strategy is declared inside the supervising actor (as opposed to + a separate class) its decider has access to all internal state of + the actor in a thread-safe fashion, including obtaining a reference to the + currently failed child (available as the ``getSender`` of the failure message). + Default Supervisor Strategy ^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -78,6 +85,22 @@ loss of the child. This strategy is also provided pre-packaged as :class:`StoppingSupervisorStrategy` configurator to be used when you want the ``"/user"`` guardian to apply it. +Logging of Actor Failures +^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default the ``SupervisorStrategy`` logs failures unless they are escalated. +Escalated failures are supposed to be handled, and potentially logged, at a level +higher in the hierarchy. + +You can mute the default logging of a ``SupervisorStrategy`` by setting +``loggingEnabled`` to ``false`` when instantiating it. Customized logging +can be done inside the ``Decider``. Note that the reference to the currently +failed child is available as the ``getSender`` when the ``SupervisorStrategy`` is +declared inside the supervising actor. + +You may also customize the logging in your own ``SupervisorStrategy`` implementation +by overriding the ``logFailure`` method. + Supervision of Top-Level Actors ------------------------------- diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index ab01e31216..5365aa5130 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -739,7 +739,7 @@ Please note, that the child actors are *still restarted*, but no new ``ActorRef` the same principles for the children, ensuring that their ``preStart()`` method is called only at the creation of their refs. -For more information see :ref:`what-restarting-means-scala`. +For more information see :ref:`supervision-restart`. Initialization via message passing ---------------------------------- diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index fcf7ad0763..8ec241d046 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -882,7 +882,7 @@ Please note, that the child actors are *still restarted*, but no new ``ActorRef` the same principles for the children, ensuring that their ``preStart()`` method is called only at the creation of their refs. -For more information see :ref:`what-restarting-means-scala`. +For more information see :ref:`supervision-restart`. Initialization via message passing ---------------------------------- diff --git a/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSpec.scala index aa029823a4..fe1e5b87de 100644 --- a/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSpec.scala @@ -93,7 +93,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { supervisor ! Props[Child] val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor //#create - EventFilter[ArithmeticException](occurrences = 1) intercept { + EventFilter.warning(occurrences = 1) intercept { //#resume child ! 42 // set state to 42 child ! "get" @@ -121,7 +121,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { child.isTerminated must be(true) //#stop } - EventFilter[Exception]("CRASH", occurrences = 4) intercept { + EventFilter[Exception]("CRASH", occurrences = 2) intercept { //#escalate-kill supervisor ! Props[Child] // create new child val child2 = expectMsgType[ActorRef] diff --git a/akka-docs/rst/scala/fault-tolerance.rst b/akka-docs/rst/scala/fault-tolerance.rst index 9a2d36fbe1..1e705b4b0b 100644 --- a/akka-docs/rst/scala/fault-tolerance.rst +++ b/akka-docs/rst/scala/fault-tolerance.rst @@ -82,6 +82,22 @@ loss of the child. This strategy is also provided pre-packaged as :class:`StoppingSupervisorStrategy` configurator to be used when you want the ``"/user"`` guardian to apply it. +Logging of Actor Failures +^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default the ``SupervisorStrategy`` logs failures unless they are escalated. +Escalated failures are supposed to be handled, and potentially logged, at a level +higher in the hierarchy. + +You can mute the default logging of a ``SupervisorStrategy`` by setting +``loggingEnabled`` to ``false`` when instantiating it. Customized logging +can be done inside the ``Decider``. Note that the reference to the currently +failed child is available as the ``sender`` when the ``SupervisorStrategy`` is +declared inside the supervising actor. + +You may also customize the logging in your own ``SupervisorStrategy`` implementation +by overriding the ``logFailure`` method. + Supervision of Top-Level Actors ------------------------------- diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index b33b67025e..696a29f24b 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -240,9 +240,10 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { expectMsg(ToClient(Done)) b ! Remove(B) b ! Remove(A) - EventFilter[BarrierEmpty](occurrences = 1) intercept { + EventFilter.warning(start = "cannot remove", occurrences = 1) intercept { b ! Remove(A) } + Thread.sleep(5000) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 7b029d3b4f..a993511b21 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -210,9 +210,7 @@ akka.actor.deployment { router ! CurrentRoutees EventFilter[ActorKilledException](occurrences = 1) intercept { - EventFilter[ActorKilledException](occurrences = 1).intercept { - expectMsgType[RouterRoutees].routees.head ! Kill - }(otherSystem) + expectMsgType[RouterRoutees].routees.head ! Kill } expectMsgType[ActorKilledException] } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 6284eb0780..e1f22f1379 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -197,7 +197,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsg(42) EventFilter[Exception]("crash", occurrences = 1).intercept { r ! new Exception("crash") - }(other) + } expectMsg("preRestart") r ! 42 expectMsg(42) @@ -242,7 +242,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsg(42) EventFilter[Exception]("crash", occurrences = 1).intercept { r ! new Exception("crash") - }(other) + } expectMsg("preRestart") r ! 42 expectMsg(42) @@ -258,7 +258,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { r ! new Exception("crash") - }(other) + } expectMsg("preRestart") r ! 42 expectMsg(42) @@ -274,7 +274,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { r ! new Exception("crash") - }(other) + } expectMsg("preRestart") r ! 42 expectMsg(42)