diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java index b8a99b03b2..9ebd011268 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java @@ -4,83 +4,143 @@ package jdocs.akka.typed; -import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.Behaviors; // #bubbling-example +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.DeathPactException; +import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +// #bubbling-example + public class BubblingSample { - interface Message {} + // #bubbling-example + public interface Protocol { + public interface Command {} - public static class Fail implements Message { - public final String text; + public static class Fail implements Command { + public final String text; - public Fail(String text) { - this.text = text; + public Fail(String text) { + this.text = text; + } + } + + public static class Hello implements Command { + public final String text; + public final ActorRef replyTo; + + public Hello(String text, ActorRef replyTo) { + this.text = text; + this.replyTo = replyTo; + } } } - public static Behavior failingChildBehavior = - Behaviors.receive(Message.class) - .onMessage( - Fail.class, - message -> { - throw new RuntimeException(message.text); - }) + public static class Worker extends AbstractBehavior { + + public static Behavior create() { + return Behaviors.setup(context -> new Worker()); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Protocol.Fail.class, this::onFail) + .onMessage(Protocol.Hello.class, this::onHello) .build(); + } - public static Behavior middleManagementBehavior = - Behaviors.setup( - (context) -> { - context.getLog().info("Middle management starting up"); - final ActorRef child = context.spawn(failingChildBehavior, "child"); - // we want to know when the child terminates, but since we do not handle - // the Terminated signal, we will in turn fail on child termination - context.watch(child); + private Behavior onFail(Protocol.Fail message) { + throw new RuntimeException(message.text); + } - // here we don't handle Terminated at all which means that - // when the child fails or stops gracefully this actor will - // fail with a DeathWatchException - return Behaviors.receive(Message.class) - .onMessage( - Message.class, - message -> { - // just pass messages on to the child - child.tell(message); - return Behaviors.same(); - }) - .build(); - }); + private Behavior onHello(Protocol.Hello message) { + message.replyTo.tell(message.text); + return this; + } + } - public static Behavior bossBehavior = - Behaviors.setup( - (context) -> { - context.getLog().info("Boss starting up"); - final ActorRef middleManagement = - context.spawn(middleManagementBehavior, "middle-management"); - context.watch(middleManagement); + public static class MiddleManagement extends AbstractBehavior { - // here we don't handle Terminated at all which means that - // when middle management fails with a DeathWatchException - // this actor will also fail - return Behaviors.receive(Message.class) - .onMessage( - Message.class, - message -> { - // just pass messages on to the child - middleManagement.tell(message); - return Behaviors.same(); - }) - .build(); - }); + public static Behavior create() { + return Behaviors.setup(MiddleManagement::new); + } + + private final ActorContext context; + private final ActorRef child; + + private MiddleManagement(ActorContext context) { + this.context = context; + + context.getLog().info("Middle management starting up"); + // default supervision of child, meaning that it will stop on failure + child = context.spawn(Worker.create(), "child"); + + // we want to know when the child terminates, but since we do not handle + // the Terminated signal, we will in turn fail on child termination + context.watch(child); + } + + @Override + public Receive createReceive() { + // here we don't handle Terminated at all which means that + // when the child fails or stops gracefully this actor will + // fail with a DeathWatchException + return newReceiveBuilder().onMessage(Protocol.Command.class, this::onCommand).build(); + } + + private Behavior onCommand(Protocol.Command message) { + // just pass messages on to the child + child.tell(message); + return this; + } + } + + public static class Boss extends AbstractBehavior { + + public static Behavior create() { + return Behaviors.supervise(Behaviors.setup(Boss::new)) + .onFailure(DeathPactException.class, SupervisorStrategy.restart()); + } + + private final ActorContext context; + private final ActorRef middleManagement; + + private Boss(ActorContext context) { + this.context = context; + context.getLog().info("Boss starting up"); + // default supervision of child, meaning that it will stop on failure + middleManagement = context.spawn(MiddleManagement.create(), "middle-management"); + context.watch(middleManagement); + } + + @Override + public Receive createReceive() { + // here we don't handle Terminated at all which means that + // when middle management fails with a DeathWatchException + // this actor will also fail + return newReceiveBuilder().onMessage(Protocol.Command.class, this::onCommand).build(); + } + + private Behavior onCommand(Protocol.Command message) { + // just pass messages on to the child + middleManagement.tell(message); + return this; + } + } + + // #bubbling-example public static void main(String[] args) { - final ActorSystem system = ActorSystem.create(bossBehavior, "boss"); + final ActorSystem system = ActorSystem.create(Boss.create(), "boss"); - system.tell(new Fail("boom")); - // this will now bubble up all the way to the boss and as that is the user guardian it means - // the entire actor system will stop + system.tell(new Protocol.Fail("boom")); + // this will now bubble up all the way to the boss, which will be restarted } } -// #bubbling-example diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java index c8fdf87960..daf4efc255 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java @@ -4,36 +4,39 @@ package jdocs.akka.typed; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.internal.adapter.ActorSystemAdapter; -import akka.testkit.javadsl.EventFilter; -import com.typesafe.config.ConfigFactory; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import java.util.concurrent.TimeUnit; +import java.time.Duration; + +import static jdocs.akka.typed.BubblingSample.Boss; +import static jdocs.akka.typed.BubblingSample.Protocol; public class BubblingSampleTest extends JUnitSuite { + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource("akka.loglevel = off"); + @Test public void testBubblingSample() throws Exception { + ActorRef boss = testKit.spawn(Boss.create(), "upper-management"); + TestProbe replyProbe = testKit.createTestProbe(String.class); + boss.tell(new Protocol.Hello("hi 1", replyProbe.getRef())); + replyProbe.expectMessage("hi 1"); + boss.tell(new Protocol.Fail("ping")); - final ActorSystem system = - ActorSystem.create( - BubblingSample.bossBehavior, - "boss", - ConfigFactory.parseString( - "akka.loggers = [ akka.testkit.TestEventListener ]\n" + "akka.loglevel=warning")); - - // actual exception and then the deathpacts - new EventFilter(Exception.class, ActorSystemAdapter.toClassic(system)) - .occurrences(4) - .intercept( - () -> { - system.tell(new BubblingSample.Fail("boom")); - return null; - }); - - system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + // message may be lost when MiddleManagement is stopped, but eventually it will be functional + // again + replyProbe.awaitAssert( + () -> { + boss.tell(new Protocol.Hello("hi 2", replyProbe.getRef())); + replyProbe.expectMessage(Duration.ofMillis(200), "hi 2"); + return null; + }); } } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java index 151a787962..56c7d528ca 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java @@ -14,36 +14,48 @@ import java.util.concurrent.TimeUnit; public class SupervisionCompileOnlyTest { // #wrap - interface CounterMessage {} + public static class Counter { + public interface Command {} - public static final class Increase implements CounterMessage {} + public static final class Increase implements Command {} - public static final class Get implements CounterMessage { - final ActorRef sender; + public static final class Get implements Command { + public final ActorRef replyTo; - public Get(ActorRef sender) { - this.sender = sender; + public Get(ActorRef replyTo) { + this.replyTo = replyTo; + } } - } - public static final class Got { - final int n; + public static final class Got { + public final int n; - public Got(int n) { - this.n = n; + public Got(int n) { + this.n = n; + } } - } - public static Behavior counter(int currentValue) { - return Behaviors.receive(CounterMessage.class) - .onMessage(Increase.class, o -> counter(currentValue + 1)) - .onMessage( - Get.class, - o -> { - o.sender.tell(new Got(currentValue)); - return Behaviors.same(); - }) - .build(); + // #top-level + public static Behavior create() { + return Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart()); + } + // #top-level + + private static Behavior counter(int currentValue) { + return Behaviors.receive(Command.class) + .onMessage(Increase.class, o -> onIncrease(currentValue)) + .onMessage(Get.class, command -> onGet(currentValue, command)) + .build(); + } + + private static Behavior onIncrease(int currentValue) { + return counter(currentValue + 1); + } + + private static Behavior onGet(int currentValue, Get command) { + command.replyTo.tell(new Got(currentValue)); + return Behaviors.same(); + } } // #wrap @@ -74,10 +86,6 @@ public class SupervisionCompileOnlyTest { .onFailure(IllegalArgumentException.class, SupervisorStrategy.stop()); // #multiple - // #top-level - Behaviors.supervise(counter(1)); - // #top-level - } // #restart-stop-children diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala index a6188db91e..0ea5431833 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala @@ -4,69 +4,101 @@ package docs.akka.typed -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ DeathPactException, SupervisorStrategy } +import scala.concurrent.duration._ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import org.scalatest.WordSpecLike import com.github.ghik.silencer.silent +import org.scalatest.WordSpecLike + +object FaultToleranceDocSpec { + // #bubbling-example + import akka.actor.typed.ActorRef + import akka.actor.typed.Behavior + import akka.actor.typed.DeathPactException + import akka.actor.typed.SupervisorStrategy + import akka.actor.typed.scaladsl.Behaviors + + object Protocol { + sealed trait Command + case class Fail(text: String) extends Command + case class Hello(text: String, replyTo: ActorRef[String]) extends Command + } + import Protocol._ + + object Worker { + def apply(): Behavior[Command] = + Behaviors.receiveMessage { + case Fail(text) => + throw new RuntimeException(text) + case Hello(text, replyTo) => + replyTo ! text + Behaviors.same + } + } + + object MiddleManagement { + def apply(): Behavior[Command] = + Behaviors.setup[Command] { context => + context.log.info("Middle management starting up") + // default supervision of child, meaning that it will stop on failure + val child = context.spawn(Worker(), "child") + // we want to know when the child terminates, but since we do not handle + // the Terminated signal, we will in turn fail on child termination + context.watch(child) + + // here we don't handle Terminated at all which means that + // when the child fails or stops gracefully this actor will + // fail with a DeathWatchException + Behaviors.receiveMessage { message => + child ! message + Behaviors.same + } + } + } + + object Boss { + def apply(): Behavior[Command] = + Behaviors + .supervise(Behaviors.setup[Command] { context => + context.log.info("Boss starting up") + // default supervision of child, meaning that it will stop on failure + val middleManagement = context.spawn(MiddleManagement(), "middle-management") + context.watch(middleManagement) + + // here we don't handle Terminated at all which means that + // when middle management fails with a DeathWatchException + // this actor will also fail + Behaviors.receiveMessage[Command] { message => + middleManagement ! message + Behaviors.same + } + }) + .onFailure[DeathPactException](SupervisorStrategy.restart) + } + // #bubbling-example +} @silent("never used") class FaultToleranceDocSpec extends ScalaTestWithActorTestKit(""" # silenced to not put noise in test logs akka.loglevel = off """) with WordSpecLike { + import FaultToleranceDocSpec._ "Bubbling of failures" must { "have an example for the docs" in { + val boss = spawn(Boss(), "upper-management") + val replyProbe = createTestProbe[String]() + boss ! Protocol.Hello("hi 1", replyProbe.ref) + replyProbe.expectMessage("hi 1") + boss ! Protocol.Fail("ping") - // FIXME I think we could have much better examples of this but I'm stuck so this will have to do for now - - // #bubbling-example - sealed trait Message - case class Fail(text: String) extends Message - - val worker = Behaviors.receive[Message] { (context, message) => - message match { - case Fail(text) => throw new RuntimeException(text) - } + // message may be lost when MiddleManagement is stopped, but eventually it will be functional again + eventually { + boss ! Protocol.Hello("hi 2", replyProbe.ref) + replyProbe.expectMessage(200.millis, "hi 2") } - val middleManagementBehavior = Behaviors.setup[Message] { context => - context.log.info("Middle management starting up") - val child = context.spawn(worker, "child") - context.watch(child) - - // here we don't handle Terminated at all which means that - // when the child fails or stops gracefully this actor will - // fail with a DeathWatchException - Behaviors.receive[Message] { (context, message) => - child ! message - Behaviors.same - } - } - - val bossBehavior = Behaviors - .supervise(Behaviors.setup[Message] { context => - context.log.info("Boss starting up") - val middleManagement = context.spawn(middleManagementBehavior, "middle-management") - context.watch(middleManagement) - - // here we don't handle Terminated at all which means that - // when middle management fails with a DeathWatchException - // this actor will also fail - Behaviors.receiveMessage[Message] { message => - middleManagement ! message - Behaviors.same - } - }) - .onFailure[DeathPactException](SupervisorStrategy.restart) - - // (spawn comes from the testkit) - val boss = spawn(bossBehavior, "upper-management") - boss ! Fail("ping") - // #bubbling-example - } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala index aa2b45d918..a3beb1a131 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala @@ -35,23 +35,27 @@ object SupervisionCompileOnly { //#multiple //#wrap - sealed trait Command - case class Increment(nr: Int) extends Command - case class GetCount(replyTo: ActorRef[Int]) extends Command + object Counter { + sealed trait Command + case class Increment(nr: Int) extends Command + case class GetCount(replyTo: ActorRef[Int]) extends Command - def counter(count: Int): Behavior[Command] = Behaviors.receiveMessage[Command] { - case Increment(nr: Int) => - counter(count + nr) - case GetCount(replyTo) => - replyTo ! count - Behaviors.same + //#top-level + def apply(): Behavior[Command] = + Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart) + //#top-level + + private def counter(count: Int): Behavior[Command] = + Behaviors.receiveMessage[Command] { + case Increment(nr: Int) => + counter(count + nr) + case GetCount(replyTo) => + replyTo ! count + Behaviors.same + } } //#wrap - //#top-level - Behaviors.supervise(counter(1)) - //#top-level - //#restart-stop-children def child(size: Long): Behavior[String] = Behaviors.receiveMessage(msg => child(size + msg.length)) diff --git a/akka-docs/src/main/paradox/typed/fault-tolerance.md b/akka-docs/src/main/paradox/typed/fault-tolerance.md index 9284d37bc7..5fbe3e64c4 100644 --- a/akka-docs/src/main/paradox/typed/fault-tolerance.md +++ b/akka-docs/src/main/paradox/typed/fault-tolerance.md @@ -61,11 +61,22 @@ Scala Java : @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #multiple } -For a full list of strategies see the public methods on `SupervisorStrategy` +For a full list of strategies see the public methods on @apidoc[akka.actor.typed.SupervisorStrategy]. + +@@@ note + +When the behavior is restarted the original `Behavior` that was given to `Behaviors.supervise` is re-installed, +which means that if it contains mutable state it must be a factory via `Behaviors.setup`. When using the +object-oriented style with a class extending `AbstractBehavior` it's always recommended to create it via +`Behaviors.setup` as described in @ref:[Behavior factory method](style-guide.md#behavior-factory-method). +For the function style there is typically no need for the factory if the state is captured in immutable +parameters. +@@@ ### Wrapping behaviors -It is very common to store state by changing behavior e.g. +With the @ref:[functional style](style-guide.md#functional-versus-object-oriented-style) it is very common +to store state by changing behavior e.g. Scala : @@snip [SupervisionCompileOnly.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala) { #wrap }