From b4fa591d64b580882e4b9ef7522057147ec77140 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 17 Jan 2019 16:48:22 +0100 Subject: [PATCH] Support stop or keep of child actors when parent is restarted, #25556 * stash messages and signals while waiting for children to be stopped * handle watch of other actors * exception from unstash * exception from first setup * merge RestartSupervisor and BackoffSupervisor * API change: restartWithLimit => restart.withLimit * remove unused PreStart * docs * move BubblingSample to separate class * fix: fail after more than limit in restart.withLimit when deferred factory throws * match case RestartOrBackoff instead --- .../actor/testkit/typed/TestException.scala | 2 +- .../actor/testkit/typed/TestKitSettings.scala | 5 - .../actor/typed/javadsl/ActorCompile.java | 2 +- .../java/jdocs/akka/typed/BubblingSample.java | 86 ++++ .../jdocs/akka/typed/BubblingSampleTest.java | 39 ++ .../akka/typed/FaultToleranceDocTest.java | 118 ------ .../SupervisionCompileOnlyTest.java | 51 ++- .../akka/actor/typed/SupervisionSpec.scala | 375 +++++++++++++---- .../akka/actor/typed/scaladsl/StopSpec.scala | 5 +- .../typed/scaladsl/adapter/AdapterSpec.scala | 14 +- .../supervision/SupervisionCompileOnly.scala | 47 ++- .../src/main/resources/reference.conf | 6 + .../scala/akka/actor/typed/ActorSystem.scala | 48 ++- .../akka/actor/typed/MessageAndSignals.scala | 21 +- .../akka/actor/typed/SupervisorStrategy.scala | 164 ++++++-- .../actor/typed/internal/Supervision.scala | 380 ++++++++++-------- .../src/main/paradox/typed/fault-tolerance.md | 40 +- .../javadsl/EventSourcedActorFailureTest.java | 4 +- .../EventSourcedBehaviorFailureSpec.scala | 16 +- .../typed/scaladsl/PerformanceSpec.scala | 4 +- 20 files changed, 976 insertions(+), 451 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java create mode 100644 akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java delete mode 100644 akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestException.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestException.scala index 1da6218de0..d4c47f8515 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestException.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestException.scala @@ -9,5 +9,5 @@ import scala.util.control.NoStackTrace /** * A predefined exception that can be used in tests. It doesn't include a stack trace. */ -class TestException(message: String) extends RuntimeException(message) with NoStackTrace +final case class TestException(message: String) extends RuntimeException(message) with NoStackTrace diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala index 920d765ae3..84d4b2d8e4 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala @@ -13,11 +13,6 @@ import akka.actor.typed.ActorSystem import scala.util.control.NoStackTrace -/** - * Exception without stack trace to use for verifying exceptions in tests - */ -final case class TE(message: String) extends RuntimeException(message) with NoStackTrace - object TestKitSettings { /** * Reads configuration settings from `akka.actor.testkit.typed` section. diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index 4485ff04c9..bf230d0e7b 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -115,7 +115,7 @@ public class ActorCompile { SupervisorStrategy strategy1 = SupervisorStrategy.restart(); SupervisorStrategy strategy2 = SupervisorStrategy.restart().withLoggingEnabled(false); SupervisorStrategy strategy3 = SupervisorStrategy.resume(); - SupervisorStrategy strategy4 = SupervisorStrategy.restartWithLimit(3, Duration.ofSeconds(1)); + SupervisorStrategy strategy4 = SupervisorStrategy.restart().withLimit(3, Duration.ofSeconds(1)); SupervisorStrategy strategy5 = SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(10), 0.1); diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java new file mode 100644 index 0000000000..d2b2200251 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package jdocs.akka.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; + +// #bubbling-example +public class BubblingSample { + interface Message {} + + public static class Fail implements Message { + public final String text; + + public Fail(String text) { + this.text = text; + } + } + + public static Behavior failingChildBehavior = + Behaviors.receive(Message.class) + .onMessage( + Fail.class, + (context, message) -> { + throw new RuntimeException(message.text); + }) + .build(); + + public static Behavior middleManagementBehavior = + Behaviors.setup( + (context) -> { + context.getLog().info("Middle management starting up"); + final ActorRef child = context.spawn(failingChildBehavior, "child"); + // we want to know when the child terminates, but since we do not handle + // the Terminated signal, we will in turn fail on child termination + context.watch(child); + + // here we don't handle Terminated at all which means that + // when the child fails or stops gracefully this actor will + // fail with a DeathWatchException + return Behaviors.receive(Message.class) + .onMessage( + Message.class, + (innerCtx, message) -> { + // just pass messages on to the child + child.tell(message); + return Behaviors.same(); + }) + .build(); + }); + + public static Behavior bossBehavior = + Behaviors.setup( + (context) -> { + context.getLog().info("Boss starting up"); + final ActorRef middleManagement = + context.spawn(middleManagementBehavior, "middle-management"); + context.watch(middleManagement); + + // here we don't handle Terminated at all which means that + // when middle management fails with a DeathWatchException + // this actor will also fail + return Behaviors.receive(Message.class) + .onMessage( + Message.class, + (innerCtx, message) -> { + // just pass messages on to the child + middleManagement.tell(message); + return Behaviors.same(); + }) + .build(); + }); + + public static void main(String[] args) { + final ActorSystem system = ActorSystem.create(bossBehavior, "boss"); + + system.tell(new Fail("boom")); + // this will now bubble up all the way to the boss and as that is the user guardian it means + // the entire actor system will stop + } +} +// #bubbling-example diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java new file mode 100644 index 0000000000..c485d177ec --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSampleTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.akka.typed; + +import akka.actor.typed.ActorSystem; +import akka.actor.typed.internal.adapter.ActorSystemAdapter; +import akka.testkit.javadsl.EventFilter; +import com.typesafe.config.ConfigFactory; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.util.concurrent.TimeUnit; + +public class BubblingSampleTest extends JUnitSuite { + + @Test + public void testBubblingSample() throws Exception { + + final ActorSystem system = + ActorSystem.create( + BubblingSample.bossBehavior, + "boss", + ConfigFactory.parseString( + "akka.loggers = [ akka.testkit.TestEventListener ]\n" + "akka.loglevel=warning")); + + // actual exception and then the deathpacts + new EventFilter(Exception.class, ActorSystemAdapter.toUntyped(system)) + .occurrences(4) + .intercept( + () -> { + system.tell(new BubblingSample.Fail("boom")); + return null; + }); + + system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + } +} diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java deleted file mode 100644 index de41bf5b61..0000000000 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package jdocs.akka.typed; - -import akka.actor.typed.ActorRef; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.Behavior; -import akka.actor.typed.internal.adapter.ActorSystemAdapter; -import akka.actor.typed.javadsl.Behaviors; -import akka.testkit.javadsl.EventFilter; -import com.typesafe.config.ConfigFactory; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; - -public class FaultToleranceDocTest extends JUnitSuite { - // #bubbling-example - interface Message {} - - class Fail implements Message { - public final String text; - - Fail(String text) { - this.text = text; - } - } - - // #bubbling-example - - @Test - public void bubblingSample() { - // #bubbling-example - final Behavior failingChildBehavior = - Behaviors.receive(Message.class) - .onMessage( - Fail.class, - (context, message) -> { - throw new RuntimeException(message.text); - }) - .build(); - - Behavior middleManagementBehavior = - Behaviors.setup( - (context) -> { - context.getLog().info("Middle management starting up"); - final ActorRef child = context.spawn(failingChildBehavior, "child"); - // we want to know when the child terminates, but since we do not handle - // the Terminated signal, we will in turn fail on child termination - context.watch(child); - - // here we don't handle Terminated at all which means that - // when the child fails or stops gracefully this actor will - // fail with a DeathWatchException - return Behaviors.receive(Message.class) - .onMessage( - Message.class, - (innerCtx, message) -> { - // just pass messages on to the child - child.tell(message); - return Behaviors.same(); - }) - .build(); - }); - - Behavior bossBehavior = - Behaviors.setup( - (context) -> { - context.getLog().info("Boss starting up"); - final ActorRef middleManagement = - context.spawn(middleManagementBehavior, "middle-management"); - context.watch(middleManagement); - - // here we don't handle Terminated at all which means that - // when middle management fails with a DeathWatchException - // this actor will also fail - return Behaviors.receive(Message.class) - .onMessage( - Message.class, - (innerCtx, message) -> { - // just pass messages on to the child - middleManagement.tell(message); - return Behaviors.same(); - }) - .build(); - }); - - { - // #bubbling-example - final ActorSystem system = ActorSystem.create(bossBehavior, "boss"); - // #bubbling-example - } - final ActorSystem system = - ActorSystem.create( - bossBehavior, - "boss", - ConfigFactory.parseString( - "akka.loggers = [ akka.testkit.TestEventListener ]\n" + "akka.loglevel=warning")); - - // #bubbling-example - // actual exception and thent the deathpacts - new EventFilter(Exception.class, ActorSystemAdapter.toUntyped(system)) - .occurrences(4) - .intercept( - () -> { - // #bubbling-example - system.tell(new Fail("boom")); - // #bubbling-example - return null; - }); - // #bubbling-example - // this will now bubble up all the way to the boss and as that is the user guardian it means - // the entire actor system will stop - - // #bubbling-example - - } -} diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java index 1a0ec4fa4b..0cb01f2870 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java @@ -68,7 +68,7 @@ public class SupervisionCompileOnlyTest { Behaviors.supervise(behavior) .onFailure( IllegalStateException.class, - SupervisorStrategy.restartWithLimit(10, FiniteDuration.apply(10, TimeUnit.SECONDS))); + SupervisorStrategy.restart().withLimit(10, FiniteDuration.apply(10, TimeUnit.SECONDS))); // #restart-limit // #multiple @@ -81,5 +81,54 @@ public class SupervisionCompileOnlyTest { // #top-level Behaviors.supervise(counter(1)); // #top-level + } + + // #restart-stop-children + static Behavior child(long size) { + return Behaviors.receiveMessage(msg -> child(size + msg.length())); + } + + static Behavior parent() { + return Behaviors.supervise( + Behaviors.setup( + ctx -> { + final ActorRef child1 = ctx.spawn(child(0), "child1"); + final ActorRef child2 = ctx.spawn(child(0), "child2"); + + return Behaviors.receiveMessage( + msg -> { + // there might be bugs here... + String[] parts = msg.split(" "); + child1.tell(parts[0]); + child2.tell(parts[1]); + return Behaviors.same(); + }); + })) + .onFailure(SupervisorStrategy.restart()); + } + // #restart-stop-children + + // #restart-keep-children + static Behavior parent2() { + return Behaviors.setup( + ctx -> { + final ActorRef child1 = ctx.spawn(child(0), "child1"); + final ActorRef child2 = ctx.spawn(child(0), "child2"); + + // supervision strategy inside the setup to not recreate children on restart + return Behaviors.supervise( + Behaviors.receiveMessage( + msg -> { + // there might be bugs here... + String[] parts = msg.split(" "); + child1.tell(parts[0]); + child2.tell(parts[1]); + return Behaviors.same(); + })) + .onFailure(SupervisorStrategy.restart().withStopChildren(false)); + }); + } + // #restart-keep-children + } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 59305e6be8..66c9be7480 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -5,33 +5,36 @@ package akka.actor.typed import java.io.IOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import akka.actor.ActorInitializationException -import akka.actor.typed.scaladsl.{ Behaviors, AbstractBehavior } +import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors } import akka.actor.typed.scaladsl.Behaviors._ import akka.testkit.EventFilter import akka.actor.testkit.typed.scaladsl._ import akka.actor.testkit.typed._ import org.scalatest.{ Matchers, WordSpec, WordSpecLike } - import scala.util.control.NoStackTrace import scala.concurrent.duration._ + import akka.actor.typed.SupervisorStrategy.Resume object SupervisionSpec { sealed trait Command - case object Ping extends Command - case class Throw(e: Throwable) extends Command + final case class Ping(n: Int) extends Command + final case class Throw(e: Throwable) extends Command case object IncrementState extends Command case object GetState extends Command - case class CreateChild[T](behavior: Behavior[T], name: String) extends Command + final case class CreateChild[T](behavior: Behavior[T], name: String) extends Command + final case class Watch(ref: ActorRef[_]) extends Command sealed trait Event - case object Pong extends Event - case class GotSignal(signal: Signal) extends Event - case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event + final case class Pong(n: Int) extends Event + final case class GotSignal(signal: Signal) extends Event + final case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event case object Started extends Event case object StartFailed extends Event @@ -39,14 +42,14 @@ object SupervisionSpec { class Exc2 extends Exc1("exc-2") class Exc3(message: String = "exc-3") extends RuntimeException(message) with NoStackTrace - def targetBehavior(monitor: ActorRef[Event], state: State = State(0, Map.empty)): Behavior[Command] = + def targetBehavior(monitor: ActorRef[Event], state: State = State(0, Map.empty), slowStop: Option[CountDownLatch] = None): Behavior[Command] = receive[Command] { (context, cmd) ⇒ cmd match { - case Ping ⇒ - monitor ! Pong + case Ping(n) ⇒ + monitor ! Pong(n) Behaviors.same case IncrementState ⇒ - targetBehavior(monitor, state.copy(n = state.n + 1)) + targetBehavior(monitor, state.copy(n = state.n + 1), slowStop) case GetState ⇒ val reply = state.copy(children = context.children.map(c ⇒ c.path.name → c.unsafeUpcast[Command]).toMap) monitor ! reply @@ -54,11 +57,16 @@ object SupervisionSpec { case CreateChild(childBehv, childName) ⇒ context.spawn(childBehv, childName) Behaviors.same + case Watch(ref) ⇒ + context.watch(ref) + Behaviors.same case Throw(e) ⇒ throw e } } receiveSignal { case (_, sig) ⇒ + if (sig == PostStop) + slowStop.foreach(latch ⇒ latch.await(10, TimeUnit.SECONDS)) monitor ! GotSignal(sig) Behaviors.same } @@ -68,7 +76,7 @@ object SupervisionSpec { throw new RuntimeException("simulated exc from constructor") with NoStackTrace override def onMessage(message: Command): Behavior[Command] = { - monitor ! Pong + monitor ! Pong(0) Behaviors.same } } @@ -83,8 +91,8 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { val inbox = TestInbox[Event]("evt") val behv = supervise(targetBehavior(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) val testkit = BehaviorTestKit(behv) - testkit.run(Ping) - inbox.receiveMessage() should ===(Pong) + testkit.run(Ping(1)) + inbox.receiveMessage() should ===(Pong(1)) } "stop when no supervise" in { @@ -178,7 +186,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { "stop after restart retries limit" in { val inbox = TestInbox[Event]("evt") - val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute) + val strategy = SupervisorStrategy.restart.withLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute) val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy) val testkit = BehaviorTestKit(behv) testkit.run(Throw(new Exc1)) @@ -194,7 +202,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { "reset retry limit after withinTimeRange" in { val inbox = TestInbox[Event]("evt") val withinTimeRange = 2.seconds - val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange) + val strategy = SupervisorStrategy.restart.withLimit(maxNrOfRetries = 2, withinTimeRange) val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy) val testkit = BehaviorTestKit(behv) testkit.run(Throw(new Exc1)) @@ -215,7 +223,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { "stop at first exception when restart retries limit is 0" in { val inbox = TestInbox[Event]("evt") - val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute) + val strategy = SupervisorStrategy.restart.withLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute) val behv = supervise(targetBehavior(inbox.ref)) .onFailure[Exc1](strategy) val testkit = BehaviorTestKit(behv) @@ -241,6 +249,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { class SupervisionSpec extends ScalaTestWithActorTestKit( """ akka.loggers = [akka.testkit.TestEventListener] + akka.log-dead-letters = off """) with WordSpecLike { import SupervisionSpec._ @@ -259,10 +268,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( class FailingConstructor(monitor: ActorRef[Event]) extends AbstractBehavior[Command] { monitor ! Started if (failCounter.getAndIncrement() < failCount) { - throw TE("simulated exc from constructor") + throw TestException("simulated exc from constructor") } override def onMessage(message: Command): Behavior[Command] = { - monitor ! Pong + monitor ! Pong(0) Behaviors.same } } @@ -275,19 +284,19 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val count = failCounter.getAndIncrement() if (count < failCount) { probe.ref ! StartFailed - throw TE(s"construction ${count} failed") + throw TestException(s"construction ${count} failed") } else { probe.ref ! Started Behaviors.empty } - }).onFailure[TE](strategy) + }).onFailure[TestException](strategy) } class FailingUnhandledTestSetup(strategy: SupervisorStrategy) { val probe = TestProbe[AnyRef]("evt") def behv = supervise(setup[Command] { _ ⇒ probe.ref ! StartFailed - throw new TE("construction failed") + throw new TestException("construction failed") }).onFailure[IllegalArgumentException](strategy) } @@ -297,8 +306,8 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val behv = Behaviors.supervise(targetBehavior(probe.ref)) .onFailure[Throwable](SupervisorStrategy.restart) val ref = spawn(behv) - ref ! Ping - probe.expectMessage(Pong) + ref ! Ping(1) + probe.expectMessage(Pong(1)) } "stop when strategy is stop" in { @@ -387,7 +396,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val probe = TestProbe[Event]("evt") val resetTimeout = 500.millis val behv = Behaviors.supervise(targetBehavior(probe.ref)) - .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout)) + .onFailure[Exc1](SupervisorStrategy.restart.withLimit(2, resetTimeout)) val ref = spawn(behv) ref ! IncrementState ref ! GetState @@ -401,15 +410,17 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ref ! Throw(new Exc2) probe.expectMessage(GotSignal(PostStop)) } - ref ! GetState - probe.expectNoMessage() + EventFilter.warning(start = "received dead letter", occurrences = 1).intercept { + ref ! GetState + probe.expectNoMessage() + } } "reset fixed limit after timeout" in { val probe = TestProbe[Event]("evt") val resetTimeout = 500.millis val behv = Behaviors.supervise(targetBehavior(probe.ref)) - .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout)) + .onFailure[Exc1](SupervisorStrategy.restart.withLimit(2, resetTimeout)) val ref = spawn(behv) ref ! IncrementState ref ! GetState @@ -430,10 +441,63 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectMessage(State(0, Map.empty)) } - "NOT stop children when restarting" in { + "stop children when restarting" in { + testStopChildren(strategy = SupervisorStrategy.restart) + } + + "stop children when backoff" in { + testStopChildren(strategy = SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0)) + } + + def testStopChildren(strategy: SupervisorStrategy): Unit = { val parentProbe = TestProbe[Event]("evt") val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) - .onFailure[Exc1](SupervisorStrategy.restart) + .onFailure[Exc1](strategy) + val ref = spawn(behv) + + val anotherProbe = TestProbe[String]("another") + ref ! Watch(anotherProbe.ref) + + val childProbe = TestProbe[Event]("childEvt") + val slowStop = new CountDownLatch(1) + val child1Name = nextName() + val child2Name = nextName() + ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), child1Name) + ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), child2Name) + ref ! GetState + parentProbe.expectMessageType[State].children.keySet should ===(Set(child1Name, child2Name)) + + EventFilter[Exc1](occurrences = 1).intercept { + ref ! Throw(new Exc1) + parentProbe.expectMessage(GotSignal(PreRestart)) + ref ! GetState + anotherProbe.stop() + } + + // waiting for children to stop, GetState stashed + parentProbe.expectNoMessage() + slowStop.countDown() + + childProbe.expectMessage(GotSignal(PostStop)) + childProbe.expectMessage(GotSignal(PostStop)) + parentProbe.expectMessageType[State].children.keySet should ===(Set.empty) + // anotherProbe was stopped, Terminated signal stashed and delivered to new behavior + parentProbe.expectMessage(GotSignal(Terminated(anotherProbe.ref))) + } + + "optionally NOT stop children when restarting" in { + testNotStopChildren(strategy = SupervisorStrategy.restart.withStopChildren(enabled = false)) + } + + "optionally NOT stop children when backoff" in { + testNotStopChildren(strategy = SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0) + .withStopChildren(enabled = false)) + } + + def testNotStopChildren(strategy: SupervisorStrategy): Unit = { + val parentProbe = TestProbe[Event]("evt") + val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) + .onFailure[Exc1](strategy) val ref = spawn(behv) val childProbe = TestProbe[Event]("childEvt") @@ -447,12 +511,154 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( parentProbe.expectMessage(GotSignal(PreRestart)) ref ! GetState } - // TODO document this difference compared to classic actors, and that - // children can be stopped if needed in PreRestart parentProbe.expectMessageType[State].children.keySet should contain(childName) childProbe.expectNoMessage() } + "stop children when restarting second time during unstash" in { + testStopChildrenWhenExceptionFromUnstash(SupervisorStrategy.restart) + } + + "stop children when backoff second time during unstash" in { + testStopChildrenWhenExceptionFromUnstash( + SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0)) + } + + def testStopChildrenWhenExceptionFromUnstash(strategy: SupervisorStrategy): Unit = { + val parentProbe = TestProbe[Event]("evt") + val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) + .onFailure[Exc1](strategy) + val ref = spawn(behv) + + val childProbe = TestProbe[Event]("childEvt") + val slowStop = new CountDownLatch(1) + val child1Name = nextName() + ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), child1Name) + ref ! GetState + parentProbe.expectMessageType[State].children.keySet should ===(Set(child1Name)) + + val child2Name = nextName() + + EventFilter[Exc1](occurrences = 1).intercept { + ref ! Throw(new Exc1) + parentProbe.expectMessage(GotSignal(PreRestart)) + ref ! GetState + ref ! CreateChild(targetBehavior(childProbe.ref), child2Name) + ref ! GetState + ref ! Throw(new Exc1) + } + + EventFilter[Exc1](occurrences = 1).intercept { + slowStop.countDown() + childProbe.expectMessage(GotSignal(PostStop)) // child1 + parentProbe.expectMessageType[State].children.keySet should ===(Set.empty) + parentProbe.expectMessageType[State].children.keySet should ===(Set(child2Name)) + // the stashed Throw is causing another restart and stop of child2 + childProbe.expectMessage(GotSignal(PostStop)) // child2 + } + + ref ! GetState + parentProbe.expectMessageType[State].children.keySet should ===(Set.empty) + } + + "stop children when restart (with limit) from exception in first setup" in { + testStopChildrenWhenExceptionFromFirstSetup(SupervisorStrategy.restart.withLimit(10, 1.second)) + } + + "stop children when backoff from exception in first setup" in { + testStopChildrenWhenExceptionFromFirstSetup(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0)) + } + + def testStopChildrenWhenExceptionFromFirstSetup(strategy: SupervisorStrategy): Unit = { + val parentProbe = TestProbe[Event]("evt") + val child1Probe = TestProbe[Event]("childEvt") + val child2Probe = TestProbe[Event]("childEvt") + val slowStop1 = new CountDownLatch(1) + val slowStop2 = new CountDownLatch(1) + val throwFromSetup = new AtomicBoolean(true) + val behv = Behaviors.supervise { + Behaviors.setup[Command] { ctx ⇒ + ctx.spawn(targetBehavior(child1Probe.ref, slowStop = Some(slowStop1)), "child1") + if (throwFromSetup.get()) { + // note that this second child waiting on slowStop2 will prevent a restart loop that could exhaust the + // limit before throwFromSetup is set back to false + ctx.spawn(targetBehavior(child2Probe.ref, slowStop = Some(slowStop2)), "child2") + throw TestException("exc from setup") + } + + targetBehavior(parentProbe.ref) + } + } + .onFailure[RuntimeException](strategy) + + EventFilter[TestException](occurrences = 1).intercept { + val ref = spawn(behv) + slowStop1.countDown() + child1Probe.expectMessage(GotSignal(PostStop)) + throwFromSetup.set(false) + slowStop2.countDown() + child2Probe.expectMessage(GotSignal(PostStop)) + + ref ! GetState + parentProbe.expectMessageType[State].children.keySet should ===(Set("child1")) + } + } + + "stop children when restart (with limit) from exception in later setup" in { + testStopChildrenWhenExceptionFromLaterSetup(SupervisorStrategy.restart.withLimit(10, 1.second)) + } + + "stop children when backoff from exception in later setup" in { + testStopChildrenWhenExceptionFromLaterSetup(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0)) + } + + def testStopChildrenWhenExceptionFromLaterSetup(strategy: SupervisorStrategy): Unit = { + val parentProbe = TestProbe[Event]("evt") + val child1Probe = TestProbe[Event]("childEvt") + val child2Probe = TestProbe[Event]("childEvt") + val slowStop1 = new CountDownLatch(1) + val slowStop2 = new CountDownLatch(1) + val throwFromSetup = new AtomicBoolean(false) + val behv = Behaviors.supervise { + Behaviors.setup[Command] { ctx ⇒ + ctx.spawn(targetBehavior(child1Probe.ref, slowStop = Some(slowStop1)), "child1") + if (throwFromSetup.get()) { + // note that this second child waiting on slowStop2 will prevent a restart loop that could exhaust the + // limit before throwFromSetup is set back to false + ctx.spawn(targetBehavior(child2Probe.ref, slowStop = Some(slowStop2)), "child2") + throw TestException("exc from setup") + } + + targetBehavior(parentProbe.ref) + } + } + .onFailure[RuntimeException](strategy) + + val ref = spawn(behv) + + ref ! GetState + parentProbe.expectMessageType[State].children.keySet should ===(Set("child1")) + + throwFromSetup.set(true) + + EventFilter[Exc1](occurrences = 1).intercept { + ref ! Throw(new Exc1) + parentProbe.expectMessage(GotSignal(PreRestart)) + } + + EventFilter[TestException](occurrences = 1).intercept { + slowStop1.countDown() + child1Probe.expectMessage(GotSignal(PostStop)) + child1Probe.expectMessage(GotSignal(PostStop)) + throwFromSetup.set(false) + slowStop2.countDown() + child2Probe.expectMessage(GotSignal(PostStop)) + } + + ref ! GetState + parentProbe.expectMessageType[State].children.keySet should ===(Set("child1")) + } + "resume when handled exception" in { val probe = TestProbe[Event]("evt") val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](SupervisorStrategy.resume) @@ -502,12 +708,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } - "publish dropped messages while backing off" in { + "publish dropped messages while backing off and stash is full" in { val probe = TestProbe[Event]("evt") val startedProbe = TestProbe[Event]("started") - val minBackoff = 10.seconds + val minBackoff = 1.seconds val strategy = SupervisorStrategy - .restartWithBackoff(minBackoff, minBackoff, 0.0) + .restartWithBackoff(minBackoff, minBackoff, 0.0).withStashCapacity(2) val behv = Behaviors.supervise(Behaviors.setup[Command] { _ ⇒ startedProbe.ref ! Started targetBehavior(probe.ref) @@ -521,8 +727,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ref ! Throw(new Exc1) probe.expectMessage(GotSignal(PreRestart)) } - ref ! Ping - droppedMessagesProbe.expectMessage(Dropped(Ping, ref)) + ref ! Ping(1) + ref ! Ping(2) + ref ! Ping(3) + ref ! Ping(4) + probe.expectMessage(Pong(1)) + probe.expectMessage(Pong(2)) + droppedMessagesProbe.expectMessage(Dropped(Ping(3), ref)) + droppedMessagesProbe.expectMessage(Dropped(Ping(4), ref)) } "restart after exponential backoff" in { @@ -532,6 +744,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val strategy = SupervisorStrategy .restartWithBackoff(minBackoff, 10.seconds, 0.0) .withResetBackoffAfter(10.seconds) + .withStashCapacity(0) val behv = Behaviors.supervise(Behaviors.setup[Command] { _ ⇒ startedProbe.ref ! Started targetBehavior(probe.ref) @@ -543,7 +756,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ref ! IncrementState ref ! Throw(new Exc1) probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + ref ! Ping(1) // dropped due to backoff, no stashing } startedProbe.expectNoMessage(minBackoff - 100.millis) @@ -557,7 +770,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ref ! IncrementState ref ! Throw(new Exc1) probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + ref ! Ping(2) // dropped due to backoff, no stashing } startedProbe.expectNoMessage((minBackoff * 2) - 100.millis) @@ -578,18 +791,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val alreadyStarted = new AtomicBoolean(false) val behv = Behaviors.supervise(Behaviors.setup[Command] { _ ⇒ - if (alreadyStarted.get()) throw TE("failure to restart") + if (alreadyStarted.get()) throw TestException("failure to restart") alreadyStarted.set(true) startedProbe.ref ! Started - Behaviors.receiveMessage { + Behaviors.receiveMessagePartial { case Throw(boom) ⇒ throw boom } }).onFailure[Exception](strategy) val ref = spawn(behv) EventFilter[Exc1](occurrences = 1).intercept { - EventFilter[TE](occurrences = 2).intercept { + EventFilter[TestException](occurrences = 2).intercept { startedProbe.expectMessage(Started) ref ! Throw(new Exc1) probe.expectTerminated(ref, 3.seconds) @@ -602,6 +815,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val minBackoff = 1.seconds val strategy = SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0) .withResetBackoffAfter(100.millis) + .withStashCapacity(0) val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy) val ref = spawn(behv) @@ -609,7 +823,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ref ! IncrementState ref ! Throw(new Exc1) probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + ref ! Ping(1) // dropped due to backoff, no stash } probe.expectNoMessage(minBackoff + 100.millis.dilated) @@ -622,7 +836,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ref ! IncrementState ref ! Throw(new Exc1) probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + ref ! Ping(2) // dropped due to backoff } // backoff was reset, so restarted after the minBackoff @@ -663,8 +877,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( failCount = 1, strategy = SupervisorStrategy.resume ) { - EventFilter[ActorInitializationException](occurrences = 1).intercept { - spawn(behv) + EventFilter[TestException](occurrences = 1).intercept { + EventFilter[ActorInitializationException](occurrences = 1).intercept { + spawn(behv) + } } } @@ -673,7 +889,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( strategy = SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis.dilated, maxBackoff = 1.second, 0) ) { - EventFilter[TE](occurrences = 1).intercept { + EventFilter[TestException](occurrences = 1).intercept { spawn(behv) probe.expectMessage(StartFailed) @@ -692,12 +908,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } - "restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup( + "restart.withLimit when deferred factory throws" in new FailingDeferredTestSetup( failCount = 1, - strategy = SupervisorStrategy.restartWithLimit(3, 1.second) + strategy = SupervisorStrategy.restart.withLimit(3, 1.second) ) { - EventFilter[TE](occurrences = 1).intercept { + EventFilter[TestException](occurrences = 1).intercept { spawn(behv) probe.expectMessage(StartFailed) @@ -705,16 +921,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } - "fail after more than limit in restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup( - failCount = 3, - strategy = SupervisorStrategy.restartWithLimit(2, 1.second) + "fail after more than limit in restart.withLimit when deferred factory throws" in new FailingDeferredTestSetup( + failCount = 20, + strategy = SupervisorStrategy.restart.withLimit(2, 1.second) ) { EventFilter[ActorInitializationException](occurrences = 1).intercept { - EventFilter[TE](occurrences = 1).intercept { + EventFilter[TestException](occurrences = 2).intercept { spawn(behv) - // restarted 2 times before it gave up + // first one from initial setup + probe.expectMessage(StartFailed) + // and then restarted 2 times before it gave up probe.expectMessage(StartFailed) probe.expectMessage(StartFailed) probe.expectNoMessage(100.millis) @@ -723,7 +941,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } "fail instead of restart with limit when deferred factory throws unhandled" in new FailingUnhandledTestSetup( - strategy = SupervisorStrategy.restartWithLimit(3, 1.second)) { + strategy = SupervisorStrategy.restart.withLimit(3, 1.second)) { EventFilter[ActorInitializationException](occurrences = 1).intercept { spawn(behv) @@ -743,10 +961,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } "work with nested supervisions and defers" in { - val strategy = SupervisorStrategy.restartWithLimit(3, 1.second) + val strategy = SupervisorStrategy.restart.withLimit(3, 1.second) val probe = TestProbe[AnyRef]("p") - val beh = supervise[String](setup(context ⇒ - supervise[String](setup { context ⇒ + val beh = supervise[String](setup(_ ⇒ + supervise[String](setup { _ ⇒ probe.ref ! Started Behaviors.empty[String] }).onFailure[RuntimeException](strategy) @@ -759,7 +977,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( "replace supervision when new returned behavior catches same exception" in { val probe = TestProbe[AnyRef]("probeMcProbeFace") val behv = supervise[String](Behaviors.receiveMessage { - case "boom" ⇒ throw TE("boom indeed") + case "boom" ⇒ throw TestException("boom indeed") case "switch" ⇒ supervise[String]( supervise[String]( @@ -767,7 +985,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( supervise[String]( supervise[String]( Behaviors.receiveMessage { - case "boom" ⇒ throw TE("boom indeed") + case "boom" ⇒ throw TestException("boom indeed") case "ping" ⇒ probe.ref ! "pong" Behaviors.same @@ -776,7 +994,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( Behaviors.stopped }).onFailure[RuntimeException](SupervisorStrategy.resume) ).onFailure[RuntimeException](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 23D)) - ).onFailure[RuntimeException](SupervisorStrategy.restartWithLimit(23, 10.seconds)) + ).onFailure[RuntimeException](SupervisorStrategy.restart.withLimit(23, 10.seconds)) ).onFailure[IllegalArgumentException](SupervisorStrategy.restart) ).onFailure[RuntimeException](SupervisorStrategy.restart) }).onFailure[RuntimeException](SupervisorStrategy.stop) @@ -812,15 +1030,15 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } val behv = supervise[String](Behaviors.receiveMessage { - case "boom" ⇒ throw TE("boom indeed") + case "boom" ⇒ throw TestException("boom indeed") case "switch" ⇒ supervise[String]( - setup(context ⇒ + setup(_ ⇒ supervise[String]( Behaviors.intercept(whateverInterceptor)( supervise[String]( Behaviors.receiveMessage { - case "boom" ⇒ throw TE("boom indeed") + case "boom" ⇒ throw TestException("boom indeed") case "ping" ⇒ probe.ref ! "pong" Behaviors.same @@ -829,7 +1047,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( Behaviors.stopped }).onFailure[RuntimeException](SupervisorStrategy.resume) ) - ).onFailure[IllegalArgumentException](SupervisorStrategy.restartWithLimit(23, 10.seconds)) + ).onFailure[IllegalArgumentException](SupervisorStrategy.restart.withLimit(23, 10.seconds)) ) ).onFailure[RuntimeException](SupervisorStrategy.restart) }).onFailure[RuntimeException](SupervisorStrategy.stop) @@ -846,14 +1064,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( actor ! "give me stacktrace" val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] - stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2) + stacktrace.count(_.toString.contains("Supervisor.aroundReceive")) should ===(2) } "replace backoff supervision duplicate when behavior is created in a setup" in { val probe = TestProbe[AnyRef]("probeMcProbeFace") val restartCount = new AtomicInteger(0) val behv = supervise[String]( - Behaviors.setup { context ⇒ + Behaviors.setup { _ ⇒ // a bit superficial, but just to be complete if (restartCount.incrementAndGet() == 1) { @@ -861,7 +1079,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( Behaviors.receiveMessage { case "boom" ⇒ probe.ref ! "crashing 1" - throw TE("boom indeed") + throw TestException("boom indeed") case "ping" ⇒ probe.ref ! "pong 1" Behaviors.same @@ -872,12 +1090,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( Behaviors.receiveMessage { case "boom" ⇒ probe.ref ! "crashing 2" - throw TE("boom indeed") + throw TestException("boom indeed") case "ping" ⇒ probe.ref ! "pong 2" Behaviors.same } - ).onFailure[TE](SupervisorStrategy.resume) + ).onFailure[TestException](SupervisorStrategy.resume) } } ).onFailure(SupervisorStrategy.restartWithBackoff(100.millis, 1.second, 0)) @@ -886,16 +1104,17 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectMessage("started 1") ref ! "ping" probe.expectMessage("pong 1") - EventFilter[TE](occurrences = 1).intercept { + EventFilter[TestException](occurrences = 1).intercept { ref ! "boom" probe.expectMessage("crashing 1") ref ! "ping" probe.expectNoMessage(100.millis) } probe.expectMessage("started 2") + probe.expectMessage("pong 2") // from "ping" that was stashed ref ! "ping" probe.expectMessage("pong 2") - EventFilter[TE](occurrences = 1).intercept { + EventFilter[TestException](occurrences = 1).intercept { ref ! "boom" // now we should have replaced supervision with the resuming one probe.expectMessage("crashing 2") } @@ -941,7 +1160,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( SupervisorStrategy.restart, SupervisorStrategy.resume, SupervisorStrategy.restartWithBackoff(1.millis, 100.millis, 2D), - SupervisorStrategy.restartWithLimit(1, 100.millis) + SupervisorStrategy.restart.withLimit(1, 100.millis) ) allStrategies.foreach { strategy ⇒ @@ -978,13 +1197,13 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } else { stopInSetup.set(true) Behaviors.receiveMessage { - case "boom" ⇒ throw TE("boom") + case "boom" ⇒ throw TestException("boom") } } - }).onFailure[TE](strategy) + }).onFailure[TestException](strategy) ) - EventFilter[TE](occurrences = 1).intercept { + EventFilter[TestException](occurrences = 1).intercept { actor ! "boom" } createTestProbe().expectTerminated(actor, 3.second) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala index 97da2656a5..0eb05aca20 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala @@ -5,8 +5,9 @@ package akka.actor.typed.scaladsl import scala.concurrent.Promise + import akka.Done -import akka.actor.testkit.typed.TE +import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed import akka.actor.typed.Behavior @@ -76,7 +77,7 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike { Behaviors.stopped( // illegal: Behaviors.setup[String] { _ ⇒ - throw TE("boom!") + throw TestException("boom!") } ) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala index 6b32e34425..e7cdd3be51 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala @@ -6,12 +6,18 @@ package akka.actor.typed.scaladsl.adapter import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated } + import akka.actor.InvalidMessageException -import akka.actor.testkit.typed.TE +import akka.actor.testkit.typed.TestException import akka.actor.typed.scaladsl.Behaviors -import akka.{ Done, NotUsed, actor ⇒ untyped } +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Terminated import akka.testkit._ +import akka.Done +import akka.NotUsed +import akka.{ actor ⇒ untyped } object AdapterSpec { val untyped1: untyped.Props = untyped.Props(new Untyped1) @@ -69,7 +75,7 @@ object AdapterSpec { def unhappyTyped(msg: String): Behavior[String] = Behaviors.setup[String] { ctx ⇒ val child = ctx.spawnAnonymous(Behaviors.receiveMessage[String] { _ ⇒ - throw TE(msg) + throw TestException(msg) }) child ! "throw please" Behaviors.empty diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala index 2f6bbd059a..7dc1b03775 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala @@ -7,9 +7,10 @@ package docs.akka.typed.supervision import akka.actor.typed.ActorRef import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors - import scala.concurrent.duration._ +import akka.actor.TypedActor.PreRestart + object SupervisionCompileOnly { val behavior = Behaviors.empty[String] @@ -26,7 +27,7 @@ object SupervisionCompileOnly { //#restart-limit Behaviors.supervise(behavior) - .onFailure[IllegalStateException](SupervisorStrategy.restartWithLimit( + .onFailure[IllegalStateException](SupervisorStrategy.restart.withLimit( maxNrOfRetries = 10, withinTimeRange = 10.seconds )) //#restart-limit @@ -54,4 +55,46 @@ object SupervisionCompileOnly { //#top-level Behaviors.supervise(counter(1)) //#top-level + + //#restart-stop-children + def child(size: Long): Behavior[String] = + Behaviors.receiveMessage(msg ⇒ child(size + msg.length)) + + def parent: Behavior[String] = { + Behaviors.supervise[String] { + Behaviors.setup { ctx ⇒ + val child1 = ctx.spawn(child(0), "child1") + val child2 = ctx.spawn(child(0), "child2") + + Behaviors.receiveMessage[String] { msg ⇒ + // there might be bugs here... + val parts = msg.split(" ") + child1 ! parts(0) + child2 ! parts(1) + Behaviors.same + } + } + }.onFailure(SupervisorStrategy.restart) + } + //#restart-stop-children + + //#restart-keep-children + def parent2: Behavior[String] = { + Behaviors.setup { ctx ⇒ + val child1 = ctx.spawn(child(0), "child1") + val child2 = ctx.spawn(child(0), "child2") + + // supervision strategy inside the setup to not recreate children on restart + Behaviors.supervise { + Behaviors.receiveMessage[String] { msg ⇒ + // there might be bugs here... + val parts = msg.split(" ") + child1 ! parts(0) + child2 ! parts(1) + Behaviors.same + } + }.onFailure(SupervisorStrategy.restart.withStopChildren(false)) + } + } + //#restart-keep-children } diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index 24f8bffc7d..30d424ccff 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -17,6 +17,12 @@ akka.actor.typed { # Receptionist is started eagerly to allow clustered receptionist to gather remote registrations early on. library-extensions += "akka.actor.typed.receptionist.Receptionist" + + # While an actor is restarted (waiting for backoff to expire and children to stop) + # incoming messages and signals are stashed, and delivered later to the newly restarted + # behavior. This property defines the capacity in number of messages of the stash + # buffer. If the capacity is exceed then additional incoming messages are dropped. + restart-stash-capacity = 1000 } # Load typed extensions by an untyped unextension. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index a175ab7e24..e23271c9d0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -5,20 +5,25 @@ package akka.actor.typed import akka.{ actor ⇒ untyped } -import java.util.concurrent.{ CompletionStage, ThreadFactory } +import java.util.concurrent.CompletionStage +import java.util.concurrent.ThreadFactory -import akka.actor.setup.ActorSystemSetup -import com.typesafe.config.{ Config, ConfigFactory } -import scala.concurrent.{ ExecutionContextExecutor, Future } +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future -import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter } -import akka.util.Timeout -import akka.annotation.DoNotInherit -import akka.annotation.ApiMayChange import akka.actor.BootstrapSetup +import akka.actor.setup.ActorSystemSetup import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.adapter.GuardianActorAdapter +import akka.actor.typed.internal.adapter.ActorSystemAdapter +import akka.actor.typed.internal.adapter.PropsAdapter import akka.actor.typed.receptionist.Receptionist +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.util.Helpers.Requiring +import akka.util.Timeout +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory /** * An ActorSystem is home to a hierarchy of Actors. It is created using @@ -237,7 +242,8 @@ object ActorSystem { * Wrap an untyped [[akka.actor.ActorSystem]] such that it can be used from * Akka Typed [[Behavior]]. */ - def wrap(system: untyped.ActorSystem): ActorSystem[Nothing] = ActorSystemAdapter.AdapterExtension(system.asInstanceOf[untyped.ActorSystemImpl]).adapter + def wrap(system: untyped.ActorSystem): ActorSystem[Nothing] = + ActorSystemAdapter.AdapterExtension(system.asInstanceOf[untyped.ActorSystemImpl]).adapter } /** @@ -245,19 +251,23 @@ object ActorSystem { * This class is immutable. */ final class Settings(val config: Config, val untypedSettings: untyped.ActorSystem.Settings, val name: String) { - def this(_cl: ClassLoader, _config: Config, name: String) = this({ - val config = _config.withFallback(ConfigFactory.defaultReference(_cl)) - config.checkValid(ConfigFactory.defaultReference(_cl), "akka") - config - }, new untyped.ActorSystem.Settings(_cl, _config, name), name) + def this(classLoader: ClassLoader, config: Config, name: String) = this({ + val cfg = config.withFallback(ConfigFactory.defaultReference(classLoader)) + cfg.checkValid(ConfigFactory.defaultReference(classLoader), "akka") + cfg + }, new untyped.ActorSystem.Settings(classLoader, config, name), name) def this(settings: untyped.ActorSystem.Settings) = this(settings.config, settings, settings.name) - private var foundSettings = List.empty[String] - - foundSettings = foundSettings.reverse - def setup: ActorSystemSetup = untypedSettings.setup - override def toString: String = s"Settings($name,\n ${foundSettings.mkString("\n ")})" + /** + * Returns the String representation of the Config that this Settings is backed by + */ + override def toString: String = config.root.render + + private val typedConfig = config.getConfig("akka.actor.typed") + + val RestartStashCapacity: Int = typedConfig.getInt("restart-stash-capacity") + .requiring(_ >= 0, "restart-stash-capacity must be >= 0") } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala index 5b4ccc900a..b87ef3a965 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala @@ -34,8 +34,7 @@ trait Signal /** * Lifecycle signal that is fired upon restart of the Actor before replacing * the behavior with the fresh one (i.e. this signal is received within the - * behavior that failed). The replacement behavior will receive PreStart as its - * first signal. + * behavior that failed). */ sealed abstract class PreRestart extends Signal case object PreRestart extends PreRestart { @@ -80,6 +79,15 @@ object Terminated { sealed class Terminated(val ref: ActorRef[Nothing]) extends Signal { /** Java API: The actor that was watched and got terminated */ def getRef(): ActorRef[Void] = ref.asInstanceOf[ActorRef[Void]] + + override def toString: String = s"Terminated($ref)" + + override def hashCode(): Int = ref.hashCode() + + override def equals(obj: Any): Boolean = obj match { + case Terminated(`ref`) ⇒ true + case _ ⇒ false + } } object ChildFailed { @@ -96,4 +104,13 @@ final class ChildFailed(ref: ActorRef[Nothing], val cause: Throwable) extends Te * Java API */ def getCause(): Throwable = cause + + override def toString: String = s"ChildFailed($ref,${cause.getClass.getName})" + + override def hashCode(): Int = ref.hashCode() + + override def equals(obj: Any): Boolean = obj match { + case ChildFailed(`ref`, `cause`) ⇒ true + case _ ⇒ false + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index dc927e7780..0057475ed1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -22,50 +22,20 @@ object SupervisorStrategy { val resume: SupervisorStrategy = Resume(loggingEnabled = true) /** - * Restart immediately without any limit on number of restart retries. + * Restart immediately without any limit on number of restart retries. A limit can be + * added with [[RestartSupervisorStrategy.withLimit]]. * * If the actor behavior is deferred and throws an exception on startup the actor is stopped * (restarting would be dangerous as it could lead to an infinite restart-loop) */ - val restart: SupervisorStrategy = Restart(-1, Duration.Zero, loggingEnabled = true) + val restart: RestartSupervisorStrategy = + Restart(maxRestarts = -1, withinTimeRange = Duration.Zero) /** * Stop the actor */ val stop: SupervisorStrategy = Stop(loggingEnabled = true) - /** - * Scala API: Restart with a limit of number of restart retries. - * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) - * within a time range (`withinTimeRange`). When the time window has elapsed without reaching - * `maxNrOfRetries` the restart count is reset. - * - * The strategy is applied also if the actor behavior is deferred and throws an exception during - * startup. - * - * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, - * if the limit is exceeded the child actor is stopped - * @param withinTimeRange duration of the time window for maxNrOfRetries - */ - def restartWithLimit(maxNrOfRetries: Int, withinTimeRange: FiniteDuration): SupervisorStrategy = - Restart(maxNrOfRetries, withinTimeRange, loggingEnabled = true) - - /** - * Java API: Restart with a limit of number of restart retries. - * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) - * within a time range (`withinTimeRange`). When the time window has elapsed without reaching - * `maxNrOfRetries` the restart count is reset. - * - * The strategy is applied also if the actor behavior is deferred and throws an exception during - * startup. - * - * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, - * if the limit is exceeded the child actor is stopped - * @param withinTimeRange duration of the time window for maxNrOfRetries - */ - def restartWithLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): SupervisorStrategy = - restartWithLimit(maxNrOfRetries, withinTimeRange.asScala) - /** * Scala API: It supports exponential back-off between the given `minBackoff` and * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and @@ -99,7 +69,7 @@ object SupervisorStrategy { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): BackoffSupervisorStrategy = - Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true, maxRestarts = -1) + Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff) /** * Java API: It supports exponential back-off between the given `minBackoff` and @@ -153,15 +123,40 @@ object SupervisorStrategy { /** * INTERNAL API */ - @InternalApi private[akka] final case class Restart( - maxNrOfRetries: Int, - withinTimeRange: FiniteDuration, - loggingEnabled: Boolean) extends SupervisorStrategy { + @InternalApi private[akka] sealed trait RestartOrBackoff extends SupervisorStrategy { + def maxRestarts: Int + def stopChildren: Boolean + def stashCapacity: Int + def loggingEnabled: Boolean - override def withLoggingEnabled(enabled: Boolean): SupervisorStrategy = + def unlimitedRestarts(): Boolean = maxRestarts == -1 + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] final case class Restart( + maxRestarts: Int, + withinTimeRange: FiniteDuration, + loggingEnabled: Boolean = true, + stopChildren: Boolean = true, + stashCapacity: Int = -1) extends RestartSupervisorStrategy with RestartOrBackoff { + + override def withLimit(maxNrOfRetries: Int, withinTimeRange: FiniteDuration): RestartSupervisorStrategy = + copy(maxNrOfRetries, withinTimeRange) + + override def withLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): RestartSupervisorStrategy = + copy(maxNrOfRetries, withinTimeRange.asScala) + + override def withStopChildren(enabled: Boolean): RestartSupervisorStrategy = + copy(stopChildren = enabled) + + override def withStashCapacity(capacity: Int): RestartSupervisorStrategy = + copy(stashCapacity = capacity) + + override def withLoggingEnabled(enabled: Boolean): RestartSupervisorStrategy = copy(loggingEnabled = enabled) - def unlimitedRestarts(): Boolean = maxNrOfRetries == -1 } /** @@ -172,10 +167,12 @@ object SupervisorStrategy { maxBackoff: FiniteDuration, randomFactor: Double, resetBackoffAfter: FiniteDuration, - loggingEnabled: Boolean, - maxRestarts: Int) extends BackoffSupervisorStrategy { + loggingEnabled: Boolean = true, + maxRestarts: Int = -1, + stopChildren: Boolean = true, + stashCapacity: Int = -1) extends BackoffSupervisorStrategy with RestartOrBackoff { - override def withLoggingEnabled(enabled: Boolean): SupervisorStrategy = + override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy = copy(loggingEnabled = enabled) override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy = @@ -188,6 +185,12 @@ object SupervisorStrategy { override def withMaxRestarts(maxRestarts: Int): BackoffSupervisorStrategy = copy(maxRestarts = maxRestarts) + + override def withStopChildren(enabled: Boolean): BackoffSupervisorStrategy = + copy(stopChildren = enabled) + + override def withStashCapacity(capacity: Int): BackoffSupervisorStrategy = + copy(stashCapacity = capacity) } } @@ -197,6 +200,59 @@ sealed abstract class SupervisorStrategy { def withLoggingEnabled(on: Boolean): SupervisorStrategy } +sealed abstract class RestartSupervisorStrategy extends SupervisorStrategy { + + /** + * Scala API: Restart with a limit of number of restart retries. + * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) + * within a time range (`withinTimeRange`). When the time window has elapsed without reaching + * `maxNrOfRetries` the restart count is reset. + * + * The strategy is applied also if the actor behavior is deferred and throws an exception during + * startup. + * + * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, + * if the limit is exceeded the child actor is stopped + * @param withinTimeRange duration of the time window for maxNrOfRetries + */ + def withLimit(maxNrOfRetries: Int, withinTimeRange: FiniteDuration): RestartSupervisorStrategy + + /** + * Java API: Restart with a limit of number of restart retries. + * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) + * within a time range (`withinTimeRange`). When the time window has elapsed without reaching + * `maxNrOfRetries` the restart count is reset. + * + * The strategy is applied also if the actor behavior is deferred and throws an exception during + * startup. + * + * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, + * if the limit is exceeded the child actor is stopped + * @param withinTimeRange duration of the time window for maxNrOfRetries + */ + def withLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): RestartSupervisorStrategy + + /** + * Stop or keep child actors when the parent actor is restarted. + * By default child actors are stopped when parent is restarted. + * @param enabled if `true` then child actors are stopped, otherwise they are kept + */ + def withStopChildren(enabled: Boolean): RestartSupervisorStrategy + + /** + * While restarting (waiting for children to stop) incoming messages and signals are + * stashed, and delivered later to the newly restarted behavior. This property defines + * the capacity in number of messages of the stash buffer. If the capacity is exceed + * then additional incoming messages are dropped. + * + * By default the capacity is defined by config property `akka.actor.typed.restart-stash-capacity`. + */ + def withStashCapacity(capacity: Int): RestartSupervisorStrategy + + override def withLoggingEnabled(enabled: Boolean): RestartSupervisorStrategy + +} + sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy { def resetBackoffAfter: FiniteDuration @@ -221,4 +277,24 @@ sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy { * the upper limit on restarts (and is the default) */ def withMaxRestarts(maxRestarts: Int): BackoffSupervisorStrategy + + /** + * Stop or keep child actors when the parent actor is restarted. + * By default child actors are stopped when parent is restarted. + * @param enabled if `true` then child actors are stopped, otherwise they are kept + */ + def withStopChildren(enabled: Boolean): BackoffSupervisorStrategy + + /** + * While restarting (waiting for backoff to expire and children to stop) incoming + * messages and signals are stashed, and delivered later to the newly restarted + * behavior. This property defines the capacity in number of messages of the stash + * buffer. If the capacity is exceed then additional incoming messages are dropped. + * + * By default the capacity is defined by config property `akka.actor.typed.restart-stash-capacity`. + */ + def withStashCapacity(capacity: Int): BackoffSupervisorStrategy + + override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 1180f3b6db..a2d2dac4ef 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -7,32 +7,36 @@ package internal import java.util.concurrent.ThreadLocalRandom -import akka.actor.DeadLetterSuppression -import akka.actor.typed.BehaviorInterceptor.{ PreStartTarget, ReceiveTarget, SignalTarget } -import akka.actor.typed.SupervisorStrategy._ -import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi -import akka.util.{ OptionVal, unused } - -import scala.concurrent.duration.{ Deadline, FiniteDuration } +import scala.concurrent.duration.Deadline +import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import scala.util.control.Exception.Catcher import scala.util.control.NonFatal +import akka.actor.DeadLetterSuppression +import akka.actor.typed.BehaviorInterceptor.PreStartTarget +import akka.actor.typed.BehaviorInterceptor.ReceiveTarget +import akka.actor.typed.BehaviorInterceptor.SignalTarget +import akka.actor.typed.SupervisorStrategy._ +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.StashBuffer +import akka.annotation.InternalApi +import akka.event.Logging +import akka.util.OptionVal +import akka.util.unused + /** * INTERNAL API */ @InternalApi private[akka] object Supervisor { def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = { strategy match { + case r: RestartOrBackoff ⇒ + Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior) case r: Resume ⇒ Behaviors.intercept[T, T](new ResumeSupervisor(r))(initialBehavior) - case r: Restart ⇒ - Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior) case r: Stop ⇒ - Behaviors.intercept[T, T](new StopSupervisor(r))(initialBehavior) - case r: Backoff ⇒ - Behaviors.intercept[T, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior) + Behaviors.intercept[T, T](new StopSupervisor(initialBehavior, r))(initialBehavior) } } } @@ -55,7 +59,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe override def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = { try { target.start(ctx) - } catch handleExceptionOnStart(ctx) + } catch handleExceptionOnStart(ctx, target) } def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { @@ -70,9 +74,16 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe } } - protected def handleExceptionOnStart(ctx: TypedActorContext[O]): Catcher[Behavior[I]] + def dropped(ctx: TypedActorContext[_], signalOrMessage: Any): Unit = { + import akka.actor.typed.scaladsl.adapter._ + ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signalOrMessage, ctx.asScala.self)) + } + + protected def handleExceptionOnStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Catcher[Behavior[I]] protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]] protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]] + + override def toString: String = Logging.simpleName(getClass) } /** @@ -92,7 +103,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super } // convenience if target not required to handle exception - protected def handleExceptionOnStart(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = + protected def handleExceptionOnStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Catcher[Behavior[T]] = handleException(ctx) protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = handleException(ctx) @@ -100,7 +111,8 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super handleException(ctx) } -private class StopSupervisor[T, Thr <: Throwable: ClassTag](strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { +private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop) + extends SimpleSupervisor[T, Thr](strategy) { override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ log(ctx, t) @@ -116,149 +128,7 @@ private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extend } } -private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { - - private var restarts = 0 - private var deadline: OptionVal[Deadline] = OptionVal.None - - private def deadlineHasTimeLeft: Boolean = deadline match { - case OptionVal.None ⇒ true - case OptionVal.Some(d) ⇒ d.hasTimeLeft - } - - override def aroundStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Behavior[T] = { - try { - target.start(ctx) - } catch { - case NonFatal(t: Thr) ⇒ - // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop - if (strategy.unlimitedRestarts() || ((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft)) { - // don't log here as it'll be logged as ActorInitializationException - throw t - } else { - log(ctx, t) - restart() - aroundStart(ctx, target) - } - } - } - - private def restart() = { - val timeLeft = deadlineHasTimeLeft - val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) - restarts = if (timeLeft) restarts + 1 else 1 - deadline = newDeadline - } - - private def handleException(ctx: TypedActorContext[T], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { - throw t - } else { - try { - signalRestart() - } catch { - case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") - } - log(ctx, t) - restart() - Behavior.validateAsInitial(Behavior.start(initial, ctx)) - } - } - - override protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { - handleException(ctx, () ⇒ target(ctx, PreRestart)) - } - override protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { - handleException(ctx, () ⇒ target.signalRestart(ctx)) - } -} - -private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[O, T, Thr](b) { - - import BackoffSupervisor._ - - var blackhole = false - var restartCount: Int = 0 - - override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = { - if (blackhole) { - import akka.actor.typed.scaladsl.adapter._ - ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self)) - Behaviors.same - } else { - super.aroundSignal(ctx, signal, target) - } - } - - override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = { - try { - msg.asInstanceOf[Any] match { - case ScheduledRestart ⇒ - blackhole = false - ctx.asScala.scheduleOnce(b.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount)) - try { - Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]])) - } catch { - case NonFatal(ex: Thr) if b.maxRestarts > 0 && restartCount >= b.maxRestarts ⇒ - log(ctx, ex) - Behavior.failed(ex) - case NonFatal(ex: Thr) ⇒ scheduleRestart(ctx, ex) - } - case ResetRestartCount(current) ⇒ - if (current == restartCount) { - restartCount = 0 - } - Behavior.same - case _ ⇒ - if (blackhole) { - import akka.actor.typed.scaladsl.adapter._ - ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self.unsafeUpcast[Any])) - Behaviors.same - } else { - target(ctx, msg.asInstanceOf[T]) - } - } - } catch handleReceiveException(ctx, target) - } - - protected def handleExceptionOnStart(ctx: TypedActorContext[O]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - scheduleRestart(ctx, t) - } - - protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - try { - target.signalRestart(ctx) - } catch { - case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") - } - scheduleRestart(ctx, t) - } - - protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - try { - target(ctx, PreRestart) - } catch { - case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") - } - scheduleRestart(ctx, t) - } - - private def scheduleRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = { - log(ctx, reason) - val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) - ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart) - restartCount += 1 - blackhole = true - Behaviors.empty - } - -} - -private object BackoffSupervisor { +private object RestartSupervisor { /** * Calculates an exponential back off delay. */ @@ -281,3 +151,193 @@ private object BackoffSupervisor { final case class ResetRestartCount(current: Int) extends DeadLetterSuppression } +private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) + extends AbstractSupervisor[O, T, Thr](strategy) { + import RestartSupervisor._ + + private var restartingInProgress: OptionVal[(StashBuffer[Any], Set[ActorRef[Nothing]])] = OptionVal.None + private var restartCount: Int = 0 + private var gotScheduledRestart = true + private var deadline: OptionVal[Deadline] = OptionVal.None + + private def deadlineHasTimeLeft: Boolean = deadline match { + case OptionVal.None ⇒ true + case OptionVal.Some(d) ⇒ d.hasTimeLeft + } + + override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + restartingInProgress match { + case OptionVal.None ⇒ + super.aroundSignal(ctx, signal, target) + case OptionVal.Some((stashBuffer, children)) ⇒ + signal match { + case Terminated(ref) if strategy.stopChildren && children(ref) ⇒ + val remainingChildren = children - ref + if (remainingChildren.isEmpty && gotScheduledRestart) { + restartCompleted(ctx) + } else { + restartingInProgress = OptionVal.Some((stashBuffer, remainingChildren)) + Behaviors.same + } + + case _ ⇒ + if (stashBuffer.isFull) + dropped(ctx, signal) + else + stashBuffer.stash(signal) + Behaviors.same + } + } + } + + override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = { + msg.asInstanceOf[Any] match { + case ScheduledRestart ⇒ + restartingInProgress match { + case OptionVal.Some((_, children)) ⇒ + if (strategy.stopChildren && children.nonEmpty) { + // still waiting for children to stop + gotScheduledRestart = true + Behaviors.same + } else + restartCompleted(ctx) + + case OptionVal.None ⇒ + throw new IllegalStateException("Unexpected ScheduledRestart when restart not in progress") + } + + case ResetRestartCount(current) ⇒ + if (current == restartCount) { + restartCount = 0 + } + Behavior.same + + case m: T @unchecked ⇒ + restartingInProgress match { + case OptionVal.None ⇒ + try { + target(ctx, m) + } catch handleReceiveException(ctx, target) + case OptionVal.Some((stashBuffer, _)) ⇒ + if (stashBuffer.isFull) + dropped(ctx, m) + else + stashBuffer.stash(m) + Behaviors.same + } + } + } + + override protected def handleExceptionOnStart(ctx: TypedActorContext[O], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + strategy match { + case _: Restart ⇒ + // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop + if (strategy.unlimitedRestarts() || ((restartCount + 1) >= strategy.maxRestarts && deadlineHasTimeLeft)) { + // don't log here as it'll be logged as ActorInitializationException + throw t + } else { + prepareRestart(ctx, t) + } + case _: Backoff ⇒ + prepareRestart(ctx, t) + } + } + + override protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, () ⇒ target(ctx, PreRestart)) + } + override protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, () ⇒ target.signalRestart(ctx)) + } + + private def handleException(ctx: TypedActorContext[O], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) { + strategy match { + case _: Restart ⇒ throw t + case _: Backoff ⇒ + log(ctx, t) + Behavior.failed(t) + } + + } else { + try signalRestart() catch { + case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") + } + + prepareRestart(ctx, t) + } + } + + private def prepareRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = { + log(ctx, reason) + + val currentRestartCount = restartCount + updateRestartCount() + + val childrenToStop = if (strategy.stopChildren) ctx.asScala.children.toSet else Set.empty[ActorRef[Nothing]] + stopChildren(ctx, childrenToStop) + + val stashCapacity = + if (strategy.stashCapacity >= 0) strategy.stashCapacity + else ctx.asScala.system.settings.RestartStashCapacity + restartingInProgress = OptionVal.Some((StashBuffer[Any](stashCapacity), childrenToStop)) + + strategy match { + case backoff: Backoff ⇒ + val restartDelay = calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor) + gotScheduledRestart = false + ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart) + Behaviors.empty + case _: Restart ⇒ + if (childrenToStop.isEmpty) + restartCompleted(ctx) + else + Behaviors.empty // wait for termination of children + } + } + + private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = { + strategy match { + case backoff: Backoff ⇒ + gotScheduledRestart = false + ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount)) + case _: Restart ⇒ + } + + try { + val newBehavior = Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]])) + val nextBehavior = restartingInProgress match { + case OptionVal.None ⇒ newBehavior + case OptionVal.Some((stashBuffer, _)) ⇒ + restartingInProgress = OptionVal.None + stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast) + } + nextBehavior.narrow + } catch handleException(ctx, signalRestart = () ⇒ ()) + // FIXME signal Restart is not done if unstashAll throws, unstash of each message may return a new behavior and + // it's the failing one that should receive the signal + } + + private def stopChildren(ctx: TypedActorContext[_], children: Set[ActorRef[Nothing]]): Unit = { + children.foreach { child ⇒ + ctx.asScala.watch(child) + ctx.asScala.stop(child) + } + } + + private def updateRestartCount(): Unit = { + strategy match { + case restart: Restart ⇒ + val timeLeft = deadlineHasTimeLeft + val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + restart.withinTimeRange) + restartCount = if (timeLeft) restartCount + 1 else 1 + deadline = newDeadline + case _: Backoff ⇒ + restartCount += 1 + } + } + +} + diff --git a/akka-docs/src/main/paradox/typed/fault-tolerance.md b/akka-docs/src/main/paradox/typed/fault-tolerance.md index 82c6a21fc2..5081b644a7 100644 --- a/akka-docs/src/main/paradox/typed/fault-tolerance.md +++ b/akka-docs/src/main/paradox/typed/fault-tolerance.md @@ -1,7 +1,16 @@ # Fault Tolerance When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor -will by default be stopped. Note that there is an important distinction between failures and validation errors: +will by default be stopped. + +@@@ note + +An important difference between Typed and Untyped actors is that Typed actors are by default stopped if +an exception is thrown and no supervision strategy is defined while in Untyped they are restarted. + +@@@ + +Note that there is an important distinction between failures and validation errors: A validation error means that the data of a command sent to an actor is not valid, this should rather be modelled as a part of the actor protocol than make the actor throw exceptions. @@ -74,6 +83,33 @@ Java Each returned behavior will be re-wrapped automatically with the supervisor. +## Child actors are stopped when parent is restarting + +Child actors are often started in a `setup` block that is run again when the parent actor is restarted. +The child actors are stopped to avoid resource leaks of creating new child actors each time the parent is restarted. + +Scala +: @@snip [SupervisionCompileOnly.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala) { #restart-stop-children } + +Java +: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart-stop-children } + +It is possible to override this so that child actors are not influenced when the parent actor is restarted. +The restarted parent instance will then have the same children as before the failure. + +If child actors are created from `setup` like in the previous example and they should remain intact (not stopped) +when parent is restarted the `supervise` should be placed inside the `setup` and using +@scala[`SupervisorStrategy.restart.withStopChildren(false)`]@java[`SupervisorStrategy.restart().withStopChildren(false)`] +like this: + +Scala +: @@snip [SupervisionCompileOnly.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnly.scala) { #restart-keep-children } + +Java +: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart-keep-children } + +That means that the `setup` block will only be run when the parent actor is first started, and not when it is +restarted. ## Bubble failures up through the hierarchy @@ -98,4 +134,4 @@ Scala : @@snip [FaultToleranceDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/FaultToleranceDocSpec.scala) { #bubbling-example } Java -: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/FaultToleranceDocTest.java) { #bubbling-example } +: @@snip [SupervisionCompileOnlyTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/BubblingSample.java) { #bubbling-example } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java index 60f56cec94..1c1f1e501a 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java @@ -4,7 +4,7 @@ package akka.persistence.typed.javadsl; -import akka.actor.testkit.typed.TE; +import akka.actor.testkit.typed.TestException; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.ActorRef; @@ -92,7 +92,7 @@ public class EventSourcedActorFailureTest extends JUnitSuite { Behavior p1 = fail(new PersistenceId("fail-recovery-once"), probe.ref(), recoveryFailureProbe.ref()); testKit.spawn(p1); - recoveryFailureProbe.expectMessageClass(TE.class); + recoveryFailureProbe.expectMessageClass(TestException.class); } @Test diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index 262ae3f3c5..3aa8c88177 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -8,17 +8,17 @@ import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, SupervisorStrategy } -import akka.actor.testkit.typed.TE import akka.persistence.AtomicWrite import akka.persistence.journal.inmem.InmemJournal import akka.persistence.typed.EventRejectedException import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Try + +import akka.actor.testkit.typed.TestException import akka.persistence.typed.PersistenceId class ChaosJournal extends InmemJournal { @@ -30,11 +30,11 @@ class ChaosJournal extends InmemJournal { val pid = messages.head.persistenceId if (pid == "fail-first-2" && count < 2) { count += 1 - Future.failed(TE("database says no")) + Future.failed(TestException("database says no")) } else if (pid == "reject-first" && reject) { reject = false Future.successful(messages.map(aw ⇒ Try { - throw TE("I don't like it") + throw TestException("I don't like it") })) } else { super.asyncWriteMessages(messages) @@ -44,9 +44,9 @@ class ChaosJournal extends InmemJournal { override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { if (persistenceId == "fail-recovery-once" && failRecovery) { failRecovery = false - Future.failed(TE("Nah")) + Future.failed(TestException("Nah")) } else if (persistenceId == "fail-recovery") { - Future.failed(TE("Nope")) + Future.failed(TestException("Nope")) } else { super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr) } @@ -93,13 +93,13 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou spawn(failingPersistentActor(PersistenceId("fail-recovery")) .onRecoveryFailure(t ⇒ probe.ref ! t)) - probe.expectMessageType[TE].message shouldEqual "Nope" + probe.expectMessageType[TestException].message shouldEqual "Nope" } "handle exceptions in onRecoveryFailure" in { val probe = TestProbe[String]() val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref) - .onRecoveryFailure(t ⇒ throw TE("recovery call back failure"))) + .onRecoveryFailure(t ⇒ throw TestException("recovery call back failure"))) pa ! "one" probe.expectMessage("starting") probe.expectMessage("persisting") diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index 5ace4c672d..fb9d8b91e2 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -9,11 +9,11 @@ import java.util.UUID import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, SupervisorStrategy } import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler -import akka.actor.testkit.typed.TE import akka.actor.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ +import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike @@ -85,7 +85,7 @@ object PerformanceSpec { Effect.persist(evt).thenRun(_ ⇒ { parameters.persistCalls += 1 if (parameters.every(1000)) print(".") - if (parameters.shouldFail) throw TE("boom") + if (parameters.shouldFail) throw TestException("boom") }) case _ ⇒ Effect.none }