Document failure bubbling in Akka Typed, #22665

* also reworking the supervision thing a bit
This commit is contained in:
Johan Andrén 2018-02-02 16:34:58 +01:00 committed by Patrik Nordwall
parent 7dfd05eaf3
commit 819bb09228
7 changed files with 304 additions and 39 deletions

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class FaultToleranceDocTest extends JUnitSuite {
// #bubbling-example
interface Message {}
class Fail implements Message {
public final String text;
Fail(String text) {
this.text = text;
}
}
// #bubbling-example
@Test
public void bubblingSample() {
// #bubbling-example
final Behavior<Message> failingChildBehavior = Behaviors.immutable(Message.class)
.onMessage(Fail.class, (ctx, message) -> {
throw new RuntimeException(message.text);
})
.build();
Behavior<Message> middleManagementBehavior = Behaviors.deferred((ctx) -> {
ctx.getLog().info("Middle management starting up");
final ActorRef<Message> child = ctx.spawn(failingChildBehavior, "child");
// we want to know when the child terminates, but since we do not handle
// the Terminated signal, we will in turn fail on child termination
ctx.watch(child);
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
return Behaviors.immutable(Message.class)
.onMessage(Message.class, (innerCtx, msg) -> {
// just pass messages on to the child
child.tell(msg);
return Behaviors.same();
}).build();
});
Behavior<Message> bossBehavior = Behaviors.deferred((ctx) -> {
ctx.getLog().info("Boss starting up");
final ActorRef<Message> middleManagement = ctx.spawn(middleManagementBehavior, "middle-management");
ctx.watch(middleManagement);
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
return Behaviors.immutable(Message.class)
.onMessage(Message.class, (innerCtx, msg) -> {
// just pass messages on to the child
middleManagement.tell(msg);
return Behaviors.same();
}).build();
});
final ActorSystem<Message> system =
ActorSystem.create(bossBehavior, "boss");
system.tell(new Fail("boom"));
// this will now bubble up all the way to the boss and as that is the user guardian it means
// the entire actor system will stop
// #bubbling-example
}
}

View file

@ -303,8 +303,10 @@ object ActorContextSpec {
case (ctx, t @ Terminated(test))
outstanding get test match {
case Some(reply)
if (t.failure eq null) reply ! Success
else reply ! Failed(t.failure)
t.failure match {
case None reply ! Success
case Some(ex) reply ! Failed(ex)
}
guardian(outstanding - test)
case None same
}

View file

@ -38,33 +38,103 @@ class WatchSpec extends TestKit("WordSpec", WatchSpec.config)
import WatchSpec._
"Actor monitoring" must {
class WatchSetup {
val terminator = systemActor(terminatorBehavior)
val receivedTerminationSignal: Promise[ActorRef[Nothing]] = Promise()
val watchProbe = TestProbe[Done]()
class WatchSetup {
val terminator = systemActor(terminatorBehavior)
val receivedTerminationSignal: Promise[Terminated] = Promise()
val watchProbe = TestProbe[Done]()
val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[StartWatching] {
case (ctx, StartWatching(watchee))
ctx.watch(watchee)
watchProbe.ref ! Done
Behaviors.same
}.onSignal {
case (_, Terminated(stopped))
receivedTerminationSignal.success(stopped)
Behaviors.stopped
}
).onFailure[Throwable](SupervisorStrategy.stop))
}
"get notified of actor termination" in new WatchSetup {
val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[StartWatching] {
case (ctx, StartWatching(watchee))
ctx.watch(watchee)
watchProbe.ref ! Done
Behaviors.same
}.onSignal {
case (_, t: Terminated)
receivedTerminationSignal.success(t)
Behaviors.stopped
}
).onFailure[Throwable](SupervisorStrategy.stop))
}
"Actor monitoring" must {
"get notified of graceful actor termination" in new WatchSetup {
watcher ! StartWatching(terminator)
watchProbe.expectMessage(Done)
terminator ! Stop
receivedTerminationSignal.future.futureValue shouldEqual terminator
val termination = receivedTerminationSignal.future.futureValue
termination.ref shouldEqual terminator
termination.failure shouldBe empty
}
"notify a parent of child termination because of failure" in {
case class Failed(t: Terminated) // we need to wrap it as it is handled specially
val probe = TestProbe[Any]()
val ex = new TestException("boom")
val parent = spawn(Behaviors.deferred[Any] { ctx
val child = ctx.spawn(Behaviors.immutable[Any]((ctx, msg)
throw ex
), "child")
ctx.watch(child)
Behaviors.immutable[Any] { (ctx, msg)
child ! msg
Behaviors.same
}.onSignal {
case (_, t: Terminated)
probe.ref ! Failed(t)
Behaviors.same
}
}, "parent")
EventFilter[TestException](occurrences = 1).intercept {
parent ! "boom"
}
val terminated = probe.expectMessageType[Failed].t
terminated.failure should ===(Some(ex)) // here we get the exception from the child
}
"fail the actor itself with DeathPact if it does not accept Terminated" in {
case class Failed(t: Terminated) // we need to wrap it as it is handled specially
val probe = TestProbe[Any]()
val ex = new TestException("boom")
val grossoBosso = spawn(Behaviors.deferred[Any] { ctx
val middleManagement = ctx.spawn(Behaviors.deferred[Any] { ctx
val sixPackJoe = ctx.spawn(Behaviors.immutable[Any]((ctx, msg)
throw ex
), "joe")
ctx.watch(sixPackJoe)
Behaviors.immutable[Any] { (ctx, msg)
sixPackJoe ! msg
Behaviors.same
} // no handling of terminated, even though we watched!!!
}, "middle-management")
ctx.watch(middleManagement)
Behaviors.immutable[Any] { (ctx, msg)
middleManagement ! msg
Behaviors.same
}.onSignal {
case (_, t: Terminated)
probe.ref ! Failed(t)
Behaviors.stopped
}
}, "grosso-bosso")
EventFilter[TestException](occurrences = 1).intercept {
EventFilter[DeathPactException](occurrences = 1).intercept {
grossoBosso ! "boom"
}
}
val terminated = probe.expectMessageType[Failed].t
terminated.failure.isDefined should ===(true)
terminated.failure.get shouldBe a[DeathPactException]
}
"allow idempotent invocations of watch" in new WatchSetup {
watcher ! StartWatching(terminator)
watchProbe.expectMessage(Done)
@ -72,8 +142,7 @@ class WatchSpec extends TestKit("WordSpec", WatchSpec.config)
watcher ! StartWatching(terminator)
watchProbe.expectMessage(Done)
terminator ! Stop
receivedTerminationSignal.future.futureValue shouldEqual terminator
receivedTerminationSignal.future.futureValue.ref shouldEqual terminator
}
class WatchWithSetup {

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.akka.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ DeathPactException, SupervisorStrategy, TypedAkkaSpecWithShutdown }
import akka.testkit.typed.TestKit
import com.typesafe.config.ConfigFactory
class FaultToleranceDocSpec extends TestKit(ConfigFactory.parseString(
"""
# silenced to not put noise in test logs
akka.loglevel = OFF
""")) with TypedAkkaSpecWithShutdown {
"Bubbling of failures" must {
"have an example for the docs" in {
// FIXME I think we could have much better examples of this but I'm stuck so this will have to do for now
// #bubbling-example
sealed trait Message
case class Fail(text: String) extends Message
val worker = Behaviors.immutable[Message] { (ctx, msg)
msg match {
case Fail(text) throw new RuntimeException(text)
}
}
val middleManagementBehavior = Behaviors.deferred[Message] { ctx
ctx.log.info("Middle management starting up")
val child = ctx.spawn(worker, "child")
ctx.watch(child)
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
Behaviors.immutable[Message] { (ctx, msg)
child ! msg
Behaviors.same
}
}
val bossBehavior = Behaviors.supervise(Behaviors.deferred[Message] { ctx
ctx.log.info("Boss starting up")
val middleManagment = ctx.spawn(middleManagementBehavior, "middle-management")
ctx.watch(middleManagment)
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
Behaviors.immutable[Message] { (ctx, msg)
middleManagment ! msg
Behaviors.same
}
}).onFailure[DeathPactException](SupervisorStrategy.restart)
// (spawn comes from the testkit)
val boss = spawn(bossBehavior, "upper-management")
boss ! Fail("ping")
// #bubbling-example
}
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.actor.typed
import java.util.Optional
/**
* Envelope that is published on the eventStream for every message that is
* dropped due to overfull queues.
@ -57,20 +59,30 @@ case object PostStop extends PostStop {
/**
* Lifecycle signal that is fired when an Actor that was watched has terminated.
* Watching is performed by invoking the
* [[akka.actor.typed.scaladsl.ActorContext.watch]] . The DeathWatch service is
* [[akka.actor.typed.scaladsl.ActorContext.watch]]. The DeathWatch service is
* idempotent, meaning that registering twice has the same effect as registering
* once. Registration does not need to happen before the Actor terminates, a
* notification is guaranteed to arrive after both registration and termination
* have occurred. This message is also sent when the watched actor is on a node
* that has been removed from the cluster when using akka-cluster or has been
* marked unreachable when using akka-remote directly.
*
* @param ref Scala API: the `ActorRef` for the terminated actor
*/
final case class Terminated(ref: ActorRef[Nothing])(failed: Throwable) extends Signal {
def wasFailed: Boolean = failed ne null
def failure: Throwable = failed
def failureOption: Option[Throwable] = Option(failed)
/**
* Scala API: If the watched actor is a direct child, and was stopped because it failed, this will contain the
* Exception it failed with, for all other cases it will be `None`.
*/
def failure: Option[Throwable] = Option(failed)
/** Java API */
/** Java API: The actor that was watched and got terminated */
def getRef(): ActorRef[Void] = ref.asInstanceOf[ActorRef[Void]]
/**
* Java API: If the watched actor is a direct child, and was stopped because it failed, this will contain the
* Exception it failed with, for all other cases it will be an empty `Optional`.
*/
def getFailure: Optional[Throwable] = Optional.ofNullable(failed)
}

View file

@ -19,14 +19,18 @@ import scala.util.control.NonFatal
import Behavior._
import ActorRefAdapter.toUntyped
var behavior: Behavior[T] = _initialBehavior
protected var behavior: Behavior[T] = _initialBehavior
var _ctx: ActorContextAdapter[T] = _
private var _ctx: ActorContextAdapter[T] = _
def ctx: ActorContextAdapter[T] =
if (_ctx ne null) _ctx
else throw new IllegalStateException("Context was accessed before typed actor was started.")
var failures: Map[a.ActorRef, Throwable] = Map.empty
/**
* Failures from failed children, that were stopped through untyped supervision, this is what allows us to pass
* child exception in Terminated for direct children.
*/
private var failures: Map[a.ActorRef, Throwable] = Map.empty
def receive = running
@ -108,9 +112,9 @@ import scala.util.control.NonFatal
}
override def unhandled(msg: Any): Unit = msg match {
case Terminated(ref) throw a.DeathPactException(toUntyped(ref))
case msg: Signal // that's ok
case other super.unhandled(other)
case t @ Terminated(ref) throw DeathPactException(ref)
case msg: Signal // that's ok
case other super.unhandled(other)
}
override val supervisorStrategy = a.OneForOneStrategy() {

View file

@ -1,7 +1,14 @@
# Fault Tolerance
The default supervision strategy is for the Actor be stopped. However that can be modified by wrapping behaviors in a call to `Behaviors.supervise`
for example to restart on `IllegalStateExceptions`:
When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor will by default be stopped. Note that there is an important distinction between failures and validation errors:
A validation error means that the data of a command sent to an actor is not valid, this should rather be modelled as a part of the actor protocol than make the actor throw exceptions.
A failure is instead something unexpected or outside the control of the actor itself, for example a database connection that broke. Opposite to validation errors, it is seldom useful to model such as parts of the protocol as a sending actor very seldom can do anything useful about it.
For failures it is useful to apply the "let it crash" philosophy: instead of mixing fine grained recovery and correction of internal state that may have become partially invalid because of the failure with the business logic we move that responsibility somewhere else. For many cases the resolution can then be to "crash" the actor, and start a new one, with a fresh state that we know is valid.
In Akka Typed this "somewhere else" is called supervision. Supervision allows you to delaratively describe what should happen when a certain type of exceptions are thrown inside an actor. To use supervision the actual Actor behavior is wrapped using `Behaviors.supervise`, for example to restart on `IllegalStateExceptions`:
Scala
: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #restart }
@ -9,7 +16,7 @@ Scala
Java
: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart }
Or to resume instead:
Or to resume, ignore the failure and process the next message, instead:
Scala
: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #resume }
@ -38,3 +45,26 @@ Java
For a full list of strategies see the public methods on `SupervisorStrategy`
## Bubble failures up through the hierarchy
In some scenarios it may be useful to push the decision about what to do on a failure upwards in the Actor hierarchy
and let the parent actor handle what should happen on failures (in untyped Akka Actors this is how it works by default).
For a parent to be notified when a child is terminated it has to `watch` the child. If the child was stopped because of
a failure this will be included in the `Terminated` signal in the `failed` field.
If the parent in turn does not handle the `Terminated` message it will itself fail with an `akka.actor.typed.DeathPactException`. Note that `DeathPactException` cannot be supervised.
This means that a hierarchy of actors can have a child failure bubble up making each actor on the way stop but informing the
top-most parent that there was a failure and how to deal with it, however, the original exception that caused the failure
will only be available to the immediate parent out of the box (this is most often a good thing, not leaking implementation details).
There might be cases when you want the original exception to bubble up the hierarchy, this can be done by handling the
`Terminated` signal, and rethrowing the exception in each actor.
Scala
: @@snip [FaultToleranceDocSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala) { #bubbling-example }
Java
: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java) { #bubbling-example }