Merge pull request #27624 from akka/wip-24717-doc-apply-style12-patriknw

doc: stylish fault-tolerance.md, #24717
This commit is contained in:
Patrik Nordwall 2019-09-05 14:46:30 +02:00 committed by GitHub
commit 8765a4fbe6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 290 additions and 172 deletions

View file

@ -4,83 +4,143 @@
package jdocs.akka.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
// #bubbling-example
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.DeathPactException;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
// #bubbling-example
public class BubblingSample {
interface Message {}
// #bubbling-example
public interface Protocol {
public interface Command {}
public static class Fail implements Message {
public final String text;
public static class Fail implements Command {
public final String text;
public Fail(String text) {
this.text = text;
public Fail(String text) {
this.text = text;
}
}
public static class Hello implements Command {
public final String text;
public final ActorRef<String> replyTo;
public Hello(String text, ActorRef<String> replyTo) {
this.text = text;
this.replyTo = replyTo;
}
}
}
public static Behavior<Message> failingChildBehavior =
Behaviors.receive(Message.class)
.onMessage(
Fail.class,
message -> {
throw new RuntimeException(message.text);
})
public static class Worker extends AbstractBehavior<Protocol.Command> {
public static Behavior<Protocol.Command> create() {
return Behaviors.setup(context -> new Worker());
}
@Override
public Receive<Protocol.Command> createReceive() {
return newReceiveBuilder()
.onMessage(Protocol.Fail.class, this::onFail)
.onMessage(Protocol.Hello.class, this::onHello)
.build();
}
public static Behavior<Message> middleManagementBehavior =
Behaviors.setup(
(context) -> {
context.getLog().info("Middle management starting up");
final ActorRef<Message> child = context.spawn(failingChildBehavior, "child");
// we want to know when the child terminates, but since we do not handle
// the Terminated signal, we will in turn fail on child termination
context.watch(child);
private Behavior<Protocol.Command> onFail(Protocol.Fail message) {
throw new RuntimeException(message.text);
}
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
return Behaviors.receive(Message.class)
.onMessage(
Message.class,
message -> {
// just pass messages on to the child
child.tell(message);
return Behaviors.same();
})
.build();
});
private Behavior<Protocol.Command> onHello(Protocol.Hello message) {
message.replyTo.tell(message.text);
return this;
}
}
public static Behavior<Message> bossBehavior =
Behaviors.setup(
(context) -> {
context.getLog().info("Boss starting up");
final ActorRef<Message> middleManagement =
context.spawn(middleManagementBehavior, "middle-management");
context.watch(middleManagement);
public static class MiddleManagement extends AbstractBehavior<Protocol.Command> {
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
return Behaviors.receive(Message.class)
.onMessage(
Message.class,
message -> {
// just pass messages on to the child
middleManagement.tell(message);
return Behaviors.same();
})
.build();
});
public static Behavior<Protocol.Command> create() {
return Behaviors.setup(MiddleManagement::new);
}
private final ActorContext<Protocol.Command> context;
private final ActorRef<Protocol.Command> child;
private MiddleManagement(ActorContext<Protocol.Command> context) {
this.context = context;
context.getLog().info("Middle management starting up");
// default supervision of child, meaning that it will stop on failure
child = context.spawn(Worker.create(), "child");
// we want to know when the child terminates, but since we do not handle
// the Terminated signal, we will in turn fail on child termination
context.watch(child);
}
@Override
public Receive<Protocol.Command> createReceive() {
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
return newReceiveBuilder().onMessage(Protocol.Command.class, this::onCommand).build();
}
private Behavior<Protocol.Command> onCommand(Protocol.Command message) {
// just pass messages on to the child
child.tell(message);
return this;
}
}
public static class Boss extends AbstractBehavior<Protocol.Command> {
public static Behavior<Protocol.Command> create() {
return Behaviors.supervise(Behaviors.setup(Boss::new))
.onFailure(DeathPactException.class, SupervisorStrategy.restart());
}
private final ActorContext<Protocol.Command> context;
private final ActorRef<Protocol.Command> middleManagement;
private Boss(ActorContext<Protocol.Command> context) {
this.context = context;
context.getLog().info("Boss starting up");
// default supervision of child, meaning that it will stop on failure
middleManagement = context.spawn(MiddleManagement.create(), "middle-management");
context.watch(middleManagement);
}
@Override
public Receive<Protocol.Command> createReceive() {
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
return newReceiveBuilder().onMessage(Protocol.Command.class, this::onCommand).build();
}
private Behavior<Protocol.Command> onCommand(Protocol.Command message) {
// just pass messages on to the child
middleManagement.tell(message);
return this;
}
}
// #bubbling-example
public static void main(String[] args) {
final ActorSystem<Message> system = ActorSystem.create(bossBehavior, "boss");
final ActorSystem<Protocol.Command> system = ActorSystem.create(Boss.create(), "boss");
system.tell(new Fail("boom"));
// this will now bubble up all the way to the boss and as that is the user guardian it means
// the entire actor system will stop
system.tell(new Protocol.Fail("boom"));
// this will now bubble up all the way to the boss, which will be restarted
}
}
// #bubbling-example

View file

@ -4,36 +4,39 @@
package jdocs.akka.typed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.internal.adapter.ActorSystemAdapter;
import akka.testkit.javadsl.EventFilter;
import com.typesafe.config.ConfigFactory;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import static jdocs.akka.typed.BubblingSample.Boss;
import static jdocs.akka.typed.BubblingSample.Protocol;
public class BubblingSampleTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource("akka.loglevel = off");
@Test
public void testBubblingSample() throws Exception {
ActorRef<Protocol.Command> boss = testKit.spawn(Boss.create(), "upper-management");
TestProbe<String> replyProbe = testKit.createTestProbe(String.class);
boss.tell(new Protocol.Hello("hi 1", replyProbe.getRef()));
replyProbe.expectMessage("hi 1");
boss.tell(new Protocol.Fail("ping"));
final ActorSystem<BubblingSample.Message> system =
ActorSystem.create(
BubblingSample.bossBehavior,
"boss",
ConfigFactory.parseString(
"akka.loggers = [ akka.testkit.TestEventListener ]\n" + "akka.loglevel=warning"));
// actual exception and then the deathpacts
new EventFilter(Exception.class, ActorSystemAdapter.toClassic(system))
.occurrences(4)
.intercept(
() -> {
system.tell(new BubblingSample.Fail("boom"));
return null;
});
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
// message may be lost when MiddleManagement is stopped, but eventually it will be functional
// again
replyProbe.awaitAssert(
() -> {
boss.tell(new Protocol.Hello("hi 2", replyProbe.getRef()));
replyProbe.expectMessage(Duration.ofMillis(200), "hi 2");
return null;
});
}
}

View file

@ -14,36 +14,48 @@ import java.util.concurrent.TimeUnit;
public class SupervisionCompileOnlyTest {
// #wrap
interface CounterMessage {}
public static class Counter {
public interface Command {}
public static final class Increase implements CounterMessage {}
public static final class Increase implements Command {}
public static final class Get implements CounterMessage {
final ActorRef<Got> sender;
public static final class Get implements Command {
public final ActorRef<Got> replyTo;
public Get(ActorRef<Got> sender) {
this.sender = sender;
public Get(ActorRef<Got> replyTo) {
this.replyTo = replyTo;
}
}
}
public static final class Got {
final int n;
public static final class Got {
public final int n;
public Got(int n) {
this.n = n;
public Got(int n) {
this.n = n;
}
}
}
public static Behavior<CounterMessage> counter(int currentValue) {
return Behaviors.receive(CounterMessage.class)
.onMessage(Increase.class, o -> counter(currentValue + 1))
.onMessage(
Get.class,
o -> {
o.sender.tell(new Got(currentValue));
return Behaviors.same();
})
.build();
// #top-level
public static Behavior<Command> create() {
return Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart());
}
// #top-level
private static Behavior<Command> counter(int currentValue) {
return Behaviors.receive(Command.class)
.onMessage(Increase.class, o -> onIncrease(currentValue))
.onMessage(Get.class, command -> onGet(currentValue, command))
.build();
}
private static Behavior<Command> onIncrease(int currentValue) {
return counter(currentValue + 1);
}
private static Behavior<Command> onGet(int currentValue, Get command) {
command.replyTo.tell(new Got(currentValue));
return Behaviors.same();
}
}
// #wrap
@ -74,10 +86,6 @@ public class SupervisionCompileOnlyTest {
.onFailure(IllegalArgumentException.class, SupervisorStrategy.stop());
// #multiple
// #top-level
Behaviors.supervise(counter(1));
// #top-level
}
// #restart-stop-children

View file

@ -4,69 +4,101 @@
package docs.akka.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ DeathPactException, SupervisorStrategy }
import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike
import com.github.ghik.silencer.silent
import org.scalatest.WordSpecLike
object FaultToleranceDocSpec {
// #bubbling-example
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.DeathPactException
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
object Protocol {
sealed trait Command
case class Fail(text: String) extends Command
case class Hello(text: String, replyTo: ActorRef[String]) extends Command
}
import Protocol._
object Worker {
def apply(): Behavior[Command] =
Behaviors.receiveMessage {
case Fail(text) =>
throw new RuntimeException(text)
case Hello(text, replyTo) =>
replyTo ! text
Behaviors.same
}
}
object MiddleManagement {
def apply(): Behavior[Command] =
Behaviors.setup[Command] { context =>
context.log.info("Middle management starting up")
// default supervision of child, meaning that it will stop on failure
val child = context.spawn(Worker(), "child")
// we want to know when the child terminates, but since we do not handle
// the Terminated signal, we will in turn fail on child termination
context.watch(child)
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
Behaviors.receiveMessage { message =>
child ! message
Behaviors.same
}
}
}
object Boss {
def apply(): Behavior[Command] =
Behaviors
.supervise(Behaviors.setup[Command] { context =>
context.log.info("Boss starting up")
// default supervision of child, meaning that it will stop on failure
val middleManagement = context.spawn(MiddleManagement(), "middle-management")
context.watch(middleManagement)
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
Behaviors.receiveMessage[Command] { message =>
middleManagement ! message
Behaviors.same
}
})
.onFailure[DeathPactException](SupervisorStrategy.restart)
}
// #bubbling-example
}
@silent("never used")
class FaultToleranceDocSpec extends ScalaTestWithActorTestKit("""
# silenced to not put noise in test logs
akka.loglevel = off
""") with WordSpecLike {
import FaultToleranceDocSpec._
"Bubbling of failures" must {
"have an example for the docs" in {
val boss = spawn(Boss(), "upper-management")
val replyProbe = createTestProbe[String]()
boss ! Protocol.Hello("hi 1", replyProbe.ref)
replyProbe.expectMessage("hi 1")
boss ! Protocol.Fail("ping")
// FIXME I think we could have much better examples of this but I'm stuck so this will have to do for now
// #bubbling-example
sealed trait Message
case class Fail(text: String) extends Message
val worker = Behaviors.receive[Message] { (context, message) =>
message match {
case Fail(text) => throw new RuntimeException(text)
}
// message may be lost when MiddleManagement is stopped, but eventually it will be functional again
eventually {
boss ! Protocol.Hello("hi 2", replyProbe.ref)
replyProbe.expectMessage(200.millis, "hi 2")
}
val middleManagementBehavior = Behaviors.setup[Message] { context =>
context.log.info("Middle management starting up")
val child = context.spawn(worker, "child")
context.watch(child)
// here we don't handle Terminated at all which means that
// when the child fails or stops gracefully this actor will
// fail with a DeathWatchException
Behaviors.receive[Message] { (context, message) =>
child ! message
Behaviors.same
}
}
val bossBehavior = Behaviors
.supervise(Behaviors.setup[Message] { context =>
context.log.info("Boss starting up")
val middleManagement = context.spawn(middleManagementBehavior, "middle-management")
context.watch(middleManagement)
// here we don't handle Terminated at all which means that
// when middle management fails with a DeathWatchException
// this actor will also fail
Behaviors.receiveMessage[Message] { message =>
middleManagement ! message
Behaviors.same
}
})
.onFailure[DeathPactException](SupervisorStrategy.restart)
// (spawn comes from the testkit)
val boss = spawn(bossBehavior, "upper-management")
boss ! Fail("ping")
// #bubbling-example
}
}

View file

@ -35,23 +35,27 @@ object SupervisionCompileOnly {
//#multiple
//#wrap
sealed trait Command
case class Increment(nr: Int) extends Command
case class GetCount(replyTo: ActorRef[Int]) extends Command
object Counter {
sealed trait Command
case class Increment(nr: Int) extends Command
case class GetCount(replyTo: ActorRef[Int]) extends Command
def counter(count: Int): Behavior[Command] = Behaviors.receiveMessage[Command] {
case Increment(nr: Int) =>
counter(count + nr)
case GetCount(replyTo) =>
replyTo ! count
Behaviors.same
//#top-level
def apply(): Behavior[Command] =
Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart)
//#top-level
private def counter(count: Int): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case Increment(nr: Int) =>
counter(count + nr)
case GetCount(replyTo) =>
replyTo ! count
Behaviors.same
}
}
//#wrap
//#top-level
Behaviors.supervise(counter(1))
//#top-level
//#restart-stop-children
def child(size: Long): Behavior[String] =
Behaviors.receiveMessage(msg => child(size + msg.length))

View file

@ -61,11 +61,22 @@ Scala
Java
: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #multiple }
For a full list of strategies see the public methods on `SupervisorStrategy`
For a full list of strategies see the public methods on @apidoc[akka.actor.typed.SupervisorStrategy].
@@@ note
When the behavior is restarted the original `Behavior` that was given to `Behaviors.supervise` is re-installed,
which means that if it contains mutable state it must be a factory via `Behaviors.setup`. When using the
object-oriented style with a class extending `AbstractBehavior` it's always recommended to create it via
`Behaviors.setup` as described in @ref:[Behavior factory method](style-guide.md#behavior-factory-method).
For the function style there is typically no need for the factory if the state is captured in immutable
parameters.
@@@
### Wrapping behaviors
It is very common to store state by changing behavior e.g.
With the @ref:[functional style](style-guide.md#functional-versus-object-oriented-style) it is very common
to store state by changing behavior e.g.
Scala
: @@snip [SupervisionCompileOnly.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala) { #wrap }