diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala new file mode 100644 index 0000000000..389986b6e0 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.pattern + +import akka.testkit.AkkaSpec +import akka.testkit.TestProbe +import akka.testkit.filterException +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.actor._ +import scala.language.postfixOps + +object TestActor { + class TestException(msg: String) extends Exception(msg) + class StoppingException extends TestException("stopping exception") + class NormalException extends TestException("normal exception") + def props(probe: ActorRef): Props = Props(new TestActor(probe)) +} + +class TestActor(probe: ActorRef) extends Actor { + import context.dispatcher + + probe ! "STARTED" + + def receive = { + case "DIE" ⇒ context.stop(self) + case "THROW" ⇒ throw new TestActor.NormalException + case "THROW_STOPPING_EXCEPTION" ⇒ throw new TestActor.StoppingException + case ("TO_PARENT", msg) ⇒ context.parent ! msg + case other ⇒ probe ! other + } +} + +object TestParentActor { + def props(probe: ActorRef, supervisorProps: Props): Props = + Props(new TestParentActor(probe, supervisorProps)) +} +class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor { + val supervisor = context.actorOf(supervisorProps) + + def receive = { + case other ⇒ probe.forward(other) + } +} + +class BackoffOnRestartSupervisorSpec extends AkkaSpec { + + def supervisorProps(probeRef: ActorRef) = { + val options = Backoff.onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0) + .withSupervisorStrategy(OneForOneStrategy() { + case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop + }) + BackoffSupervisor.props(options) + } + + trait Setup { + val probe = TestProbe() + val supervisor = system.actorOf(supervisorProps(probe.ref)) + probe.expectMsg("STARTED") + } + + trait Setup2 { + val probe = TestProbe() + val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref))) + probe.expectMsg("STARTED") + val child = probe.lastSender + } + + "BackoffOnRestartSupervisor" must { + "terminate when child terminates" in new Setup { + filterException[TestActor.TestException] { + probe.watch(supervisor) + supervisor ! "DIE" + probe.expectTerminated(supervisor) + } + } + + "restart the child with an exponential back off" in new Setup { + filterException[TestActor.TestException] { + // Exponential back off restart test + probe.within(1.4 seconds, 2 seconds) { + supervisor ! "THROW" + // numRestart = 0 ~ 200 millis + probe.expectMsg(300 millis, "STARTED") + + supervisor ! "THROW" + // numRestart = 1 ~ 400 millis + probe.expectMsg(500 millis, "STARTED") + + supervisor ! "THROW" + // numRestart = 2 ~ 800 millis + probe.expectMsg(900 millis, "STARTED") + } + + // Verify that we only have one child at this point by selecting all the children + // under the supervisor and broadcasting to them. + // If there exists more than one child, we will get more than one reply. + val supervisorChildSelection = system.actorSelection(supervisor.path / "*") + supervisorChildSelection.tell("testmsg", probe.ref) + probe.expectMsg("testmsg") + probe.expectNoMsg + } + } + + "stop on exceptions as dictated by the supervisor strategy" in new Setup { + filterException[TestActor.TestException] { + probe.watch(supervisor) + // This should cause the supervisor to stop the child actor and then + // subsequently stop itself. + supervisor ! "THROW_STOPPING_EXCEPTION" + probe.expectTerminated(supervisor) + } + } + + "forward messages from the child to the parent of the supervisor" in new Setup2 { + child ! (("TO_PARENT", "TEST_MESSAGE")) + probe.expectMsg("TEST_MESSAGE") + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala index bc72785fa7..fe8f2604c9 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala @@ -25,15 +25,32 @@ object BackoffSupervisorSpec { case msg ⇒ probe ! msg } } + + object ManualChild { + def props(probe: ActorRef): Props = + Props(new ManualChild(probe)) + } + + class ManualChild(probe: ActorRef) extends Actor { + def receive = { + case "boom" ⇒ throw new TestException + case msg ⇒ + probe ! msg + context.parent ! BackoffSupervisor.Reset + } + } } class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { import BackoffSupervisorSpec._ + def onStopOptions(props: Props = Child.props(testActor)) = Backoff.onStop(props, "c1", 100.millis, 3.seconds, 0.2) + def onFailureOptions(props: Props = Child.props(testActor)) = Backoff.onFailure(props, "c1", 100.millis, 3.seconds, 0.2) + def create(options: BackoffOptions) = system.actorOf(BackoffSupervisor.props(options)) + "BackoffSupervisor" must { - "start child again when it stops" in { - val supervisor = system.actorOf( - BackoffSupervisor.props(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2)) + "start child again when it stops when using `Backoff.onStop`" in { + val supervisor = create(onStopOptions()) supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get watch(c1) @@ -47,27 +64,111 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { } "forward messages to the child" in { - val supervisor = system.actorOf( - BackoffSupervisor.props(Child.props(testActor), "c2", 100.millis, 3.seconds, 0.2)) - supervisor ! "hello" - expectMsg("hello") + def assertForward(supervisor: ActorRef) = { + supervisor ! "hello" + expectMsg("hello") + } + assertForward(create(onStopOptions())) + assertForward(create(onFailureOptions())) } - "support custom supervision decider" in { - val supervisor = system.actorOf( - BackoffSupervisor.propsWithSupervisorStrategy(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2, - OneForOneStrategy() { - case _: TestException ⇒ SupervisorStrategy.Stop - })) - supervisor ! BackoffSupervisor.GetCurrentChild - val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get - watch(c1) - c1 ! "boom" - expectTerminated(c1) - awaitAssert { + "support custom supervision strategy" in { + def assertCustomStrategy(supervisor: ActorRef) = { supervisor ! BackoffSupervisor.GetCurrentChild - // new instance - expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1) + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + c1 ! "boom" + expectTerminated(c1) + awaitAssert { + supervisor ! BackoffSupervisor.GetCurrentChild + // new instance + expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1) + } + } + filterException[TestException] { + val stoppingStrategy = OneForOneStrategy() { + case _: TestException ⇒ SupervisorStrategy.Stop + } + val restartingStrategy = OneForOneStrategy() { + case _: TestException ⇒ SupervisorStrategy.Restart + } + + assertCustomStrategy( + create(onStopOptions() + .withSupervisorStrategy(stoppingStrategy))) + + assertCustomStrategy( + create(onFailureOptions() + .withSupervisorStrategy(restartingStrategy))) + } + } + + "support default stopping strategy when using `Backoff.onStop`" in { + filterException[TestException] { + val supervisor = create(onStopOptions().withDefaultStoppingStrategy) + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(0)) + + c1 ! "boom" + expectTerminated(c1) + awaitAssert { + supervisor ! BackoffSupervisor.GetCurrentChild + // new instance + expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1) + } + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(1)) + + } + } + + "support manual reset" in { + filterException[TestException] { + def assertManualReset(supervisor: ActorRef) = { + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + c1 ! "boom" + expectTerminated(c1) + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(1)) + + awaitAssert { + supervisor ! BackoffSupervisor.GetCurrentChild + // new instance + expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1) + } + + supervisor ! "hello" + expectMsg("hello") + + // making sure the Reset is handled by supervisor + supervisor ! "hello" + expectMsg("hello") + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(0)) + } + + val stoppingStrategy = OneForOneStrategy() { + case _: TestException ⇒ SupervisorStrategy.Stop + } + val restartingStrategy = OneForOneStrategy() { + case _: TestException ⇒ SupervisorStrategy.Restart + } + + assertManualReset( + create(onStopOptions(ManualChild.props(testActor)) + .withManualReset + .withSupervisorStrategy(stoppingStrategy))) + + assertManualReset( + create(onFailureOptions(ManualChild.props(testActor)) + .withManualReset + .withSupervisorStrategy(restartingStrategy))) } } } diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala new file mode 100644 index 0000000000..362674d1b1 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.pattern + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy._ + +/** + * Back-off supervisor that stops and starts a child actor when the child actor restarts. + * This back-off supervisor is created by using ``akka.pattern.BackoffSupervisor.props`` + * with ``akka.pattern.Backoff.onFailure``. + */ +private class BackoffOnRestartSupervisor( + val childProps: Props, + val childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + val reset: BackoffReset, + randomFactor: Double, + strategy: OneForOneStrategy) + extends Actor with HandleBackoff + with ActorLogging { + + import context._ + import BackoffSupervisor._ + override val supervisorStrategy = OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) { + case ex ⇒ + val defaultDirective: Directive = + super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate) + + strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective) match { + + // Whatever the final Directive is, we will translate all Restarts + // to our own Restarts, which involves stopping the child. + // directive match { + case Restart ⇒ + val childRef = sender() + become({ + case Terminated(`childRef`) ⇒ + unbecome() + child = None + val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) + context.system.scheduler.scheduleOnce(restartDelay, self, BackoffSupervisor.StartChild) + restartCount += 1 + case _ ⇒ // ignore + }, discardOld = false) + Stop + case other ⇒ other + } + } + + def onTerminated: Receive = { + case Terminated(child) ⇒ + log.debug(s"Terminating, because child [$child] terminated itself") + stop(self) + } + + def receive = onTerminated orElse handleBackoff +} diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala new file mode 100644 index 0000000000..3dad0069c7 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.pattern + +import scala.concurrent.duration.{ Duration, FiniteDuration } +import akka.actor.{ Props, OneForOneStrategy, SupervisorStrategy } +import akka.actor.SupervisorStrategy.{ Decider, JDecider } + +/** + * Builds back-off options for creating a back-off supervisor. + * You can pass `BackoffOptions` to `akka.pattern.BackoffSupervisor.props`. + * An example of creating back-off options: + * {{{ + * Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor) + * .withManualReset + * .withSupervisorStrategy( + * OneforOneStrategy(){ + * case e: GivingUpException => Stop + * case e: RetryableException => Restart + * } + * ) + * + * }}} + */ +object Backoff { + /** + * Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure. + * + * This explicit supervisor behaves similarly to the normal implicit supervision where + * if an actor throws an exception, the decider on the supervisor will decide when to + * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. + * + * When the `Restart` directive is specified, the supervisor will delay the restart + * using an exponential back off strategy (bounded by minBackoff and maxBackoff). + * + * This supervisor is intended to be transparent to both the child actor and external actors. + * Where external actors can send messages to the supervisor as if it was the child and the + * messages will be forwarded. And when the child is `Terminated`, the supervisor is also + * `Terminated`. + * Transparent to the child means that the child does not have to be aware that it is being + * supervised specifically by this actor. Just like it does + * not need to know when it is being supervised by the usual implicit supervisors. + * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the + * `sender()` `ActorRef` from the child response may eventually not be able to communicate with + * the stored `ActorRef`. In general all messages to the child should be directed through this actor. + * + * An example of where this supervisor might be used is when you may have an actor that is + * responsible for continuously polling on a server for some resource that sometimes may be down. + * Instead of hammering the server continuously when the resource is unavailable, the actor will + * be restarted with an exponentially increasing back off until the resource is available again. + * + * '''*** + * This supervisor should not be used with `Akka Persistence` child actors. + * `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather + * than throw an exception on a failure like normal actors. + * [[#onStop]] should be used instead for cases where the child actor + * terminates itself as a failure signal instead of the normal behavior of throwing an exception. + * ***''' + * You can define another + * supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def onFailure( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): BackoffOptions = + BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) + + /** + * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. + * + * This actor can be used to supervise a child actor and start it again + * after a back-off duration if the child actor is stopped. + * + * This is useful in situations where the re-start of the child actor should be + * delayed e.g. in order to give an external resource time to recover before the + * child actor tries contacting it again (after being restarted). + * + * Specifically this pattern is useful for for persistent actors, + * which are stopped in case of persistence failures. + * Just restarting them immediately would probably fail again (since the data + * store is probably unavailable). It is better to try again after a delay. + * + * It supports exponential back-off between the given `minBackoff` and + * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and + * `maxBackoff` 30 seconds the start attempts will be delayed with + * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset + * if the actor is not terminated within the `minBackoff` duration. + * + * In addition to the calculated exponential back-off an additional + * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% + * delay. The reason for adding a random delay is to avoid that all failing + * actors hit the backend resource at the same time. + * + * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` + * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]] + * containing the `ActorRef` of the current child, if any. + * + * The `BackoffSupervisor`delegates all messages from the child to the parent of the + * `BackoffSupervisor`, with the supervisor as sender. + * + * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. + * + * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor + * if it wants to do an intentional stop. + * + * Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using + * [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A + * `Restart` will perform a normal immediate restart of the child. A `Stop` will + * stop the child, but it will be started again after the back-off duration. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def onStop( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): BackoffOptions = + BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) +} + +/** + * Configures a back-off supervisor actor. Start with `Backoff.onStop` or `Backoff.onFailure`. + * BackoffOptions is immutable, so be sure to chain methods like: + * {{{ + * val options = Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor) + * .withManualReset + * context.actorOf(BackoffSupervisor.props(options), name) + * }}} + */ +trait BackoffOptions { + /** + * Returns a new BackoffOptions with automatic back-off reset. + * The back-off algorithm is reset if the child does not crash within the specified `resetBackoff`. + * @param resetBackoff The back-off is reset if the child does not crash within this duration. + */ + def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions + + /** + * Returns a new BackoffOptions with manual back-off reset. The back-off is only reset + * if the child sends a `BackoffSupervisor.Reset` to its parent (the backoff-supervisor actor). + */ + def withManualReset: BackoffOptions + + /** + * Returns a new BackoffOptions with the supervisorStrategy. + * @param supervisorStrategy the supervisorStrategy that the back-off supervisor will use. + * The default supervisor strategy is used as fallback if the specified supervisorStrategy (its decider) + * does not explicitly handle an exception. + */ + def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): BackoffOptions + + /** + * Returns a new BackoffOptions with a default `SupervisorStrategy.stoppingStrategy`. + * The default supervisor strategy is used as fallback for throwables not handled by `SupervisorStrategy.stoppingStrategy`. + */ + def withDefaultStoppingStrategy: BackoffOptions + + /** + * Returns the props to create the back-off supervisor. + */ + private[akka] def props: Props +} + +private final case class BackoffOptionsImpl( + backoffType: BackoffType = RestartImpliesFailure, + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + reset: Option[BackoffReset] = None, + supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider)) extends BackoffOptions { + + val backoffReset = reset.getOrElse(AutoReset(minBackoff)) + + def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) + def withManualReset = copy(reset = Some(ManualReset)) + def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy) + def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider)) + + def props = { + require(minBackoff > Duration.Zero, "minBackoff must be > 0") + require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff") + require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") + backoffReset match { + case AutoReset(resetBackoff) ⇒ + require(minBackoff <= resetBackoff && resetBackoff <= maxBackoff) + case _ ⇒ // ignore + } + + backoffType match { + case RestartImpliesFailure ⇒ + Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy)) + case StopImpliesFailure ⇒ + Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy)) + } + } +} + +private sealed trait BackoffType +private final case object StopImpliesFailure extends BackoffType +private final case object RestartImpliesFailure extends BackoffType + +private[akka] sealed trait BackoffReset +private[akka] final case object ManualReset extends BackoffReset +private[akka] final case class AutoReset(resetBackoff: FiniteDuration) extends BackoffReset diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index 65c7f794fc..d589ce20e9 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -4,22 +4,24 @@ package akka.pattern import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.Duration import java.util.concurrent.ThreadLocalRandom +import java.util.Optional import akka.actor.Actor import akka.actor.ActorRef import akka.actor.DeadLetterSuppression import akka.actor.Props import akka.actor.Terminated -import java.util.Optional -import scala.concurrent.duration.Duration import akka.actor.SupervisorStrategy.Decider +import akka.actor.SupervisorStrategy.Directive +import akka.actor.SupervisorStrategy.Escalate import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy object BackoffSupervisor { /** - * Props for creating an [[BackoffSupervisor]] actor. + * Props for creating a [[BackoffSupervisor]] actor. * * Exceptions in the child are handled with the default supervision strategy, i.e. * most exceptions will immediately restart the child. You can define another @@ -45,7 +47,7 @@ object BackoffSupervisor { } /** - * Props for creating an [[BackoffSupervisor]] actor with a custom + * Props for creating a [[BackoffSupervisor]] actor with a custom * supervision strategy. * * Exceptions in the child are handled with the given `supervisionStrategy`. A @@ -77,6 +79,12 @@ object BackoffSupervisor { Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor, strategy)) } + /** + * Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]]. + * @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor. + */ + def props(options: BackoffOptions): Props = options.props + /** * Send this message to the [[BackoffSupervisor]] and it will reply with * [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any. @@ -89,6 +97,10 @@ object BackoffSupervisor { */ def getCurrentChild = GetCurrentChild + /** + * Send this message to the [[BackoffSupervisor]] and it will reply with + * [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any. + */ final case class CurrentChild(ref: Option[ActorRef]) { /** * Java API: The `ActorRef` of the current child, if any @@ -96,8 +108,36 @@ object BackoffSupervisor { def getRef: Optional[ActorRef] = Optional.ofNullable(ref.orNull) } - private case object StartChild extends DeadLetterSuppression - private case class ResetRestartCount(current: Int) extends DeadLetterSuppression + /** + * Send this message to the [[BackoffSupervisor]] and it will reset the back-off. + * This should be used in conjunction with `withManualReset` in [[BackoffOptions]]. + */ + final case object Reset + + /** + * Java API: Send this message to the [[BackoffSupervisor]] and it will reset the back-off. + * This should be used in conjunction with `withManualReset` in [[BackoffOptions]]. + */ + def reset = Reset + + /** + * Send this message to the [[BackoffSupervisor]] and it will reply with + * [[BackoffSupervisor.RestartCount]] containing the current restart count. + */ + final case object GetRestartCount + + /** + * Java API: Send this message to the [[BackoffSupervisor]] and it will reply with + * [[BackoffSupervisor.RestartCount]] containing the current restart count. + */ + def getRestartCount = GetRestartCount + + final case class RestartCount(count: Int) + + private[akka] final case object StartChild extends DeadLetterSuppression + + // not final for binary compatibility with 2.4.1 + private[akka] case class ResetRestartCount(current: Int) extends DeadLetterSuppression /** * INTERNAL API @@ -121,59 +161,45 @@ object BackoffSupervisor { } /** - * This actor can be used to supervise a child actor and start it again - * after a back-off duration if the child actor is stopped. - * - * This is useful in situations where the re-start of the child actor should be - * delayed e.g. in order to give an external resource time to recover before the - * child actor tries contacting it again (after being restarted). - * - * Specifically this pattern is useful for for persistent actors, - * which are stopped in case of persistence failures. - * Just restarting them immediately would probably fail again (since the data - * store is probably unavailable). It is better to try again after a delay. - * - * It supports exponential back-off between the given `minBackoff` and - * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and - * `maxBackoff` 30 seconds the start attempts will be delayed with - * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset - * if the actor is not terminated within the `minBackoff` duration. - * - * In addition to the calculated exponential back-off an additional - * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% - * delay. The reason for adding a random delay is to avoid that all failing - * actors hit the backend resource at the same time. - * - * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` - * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]] - * containing the `ActorRef` of the current child, if any. - * - * The `BackoffSupervisor`delegates all messages from the child to the parent of the - * `BackoffSupervisor`, with the supervisor as sender. - * - * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. - * - * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor - * if it wants to do an intentional stop. - * - * Exceptions in the child are handled with the given `supervisionStrategy`. A - * `Restart` will perform a normal immediate restart of the child. A `Stop` will - * stop the child, but it will be started again after the back-off duration. + * Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops. + * This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props` + * with `Backoff.onStop`. */ final class BackoffSupervisor( - childProps: Props, - childName: String, + val childProps: Props, + val childName: String, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, + val reset: BackoffReset, randomFactor: Double, - override val supervisorStrategy: SupervisorStrategy) - extends Actor { + strategy: SupervisorStrategy) + extends Actor with HandleBackoff { import BackoffSupervisor._ import context.dispatcher - private var child: Option[ActorRef] = None - private var restartCount = 0 + // to keep binary compatibility with 2.4.1 + override val supervisorStrategy = strategy match { + case oneForOne: OneForOneStrategy ⇒ + OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) { + case ex ⇒ + val defaultDirective: Directive = + super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate) + + strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective) + } + case s ⇒ s + } + + // for binary compatibility with 2.4.1 + def this( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + supervisorStrategy: SupervisorStrategy) = + this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy) // for binary compatibility with 2.4.0 def this( @@ -184,29 +210,57 @@ final class BackoffSupervisor( randomFactor: Double) = this(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy) - override def preStart(): Unit = - startChild() + def onTerminated: Receive = { + case Terminated(ref) if child.contains(ref) ⇒ + child = None + val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) + context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) + restartCount += 1 + } + + def receive = onTerminated orElse handleBackoff +} + +private[akka] trait HandleBackoff { this: Actor ⇒ + def childProps: Props + def childName: String + def reset: BackoffReset + + var child: Option[ActorRef] = None + var restartCount = 0 + + import BackoffSupervisor._ + import context.dispatcher + + override def preStart(): Unit = startChild() def startChild(): Unit = if (child.isEmpty) { child = Some(context.watch(context.actorOf(childProps, childName))) } - def receive = { - case Terminated(ref) if child.contains(ref) ⇒ - child = None - val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) - context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) - restartCount += 1 - + def handleBackoff: Receive = { case StartChild ⇒ startChild() - context.system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(restartCount)) + reset match { + case AutoReset(resetBackoff) ⇒ + val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount)) + case _ ⇒ // ignore + } + + case Reset ⇒ + reset match { + case ManualReset ⇒ restartCount = 0 + case msg ⇒ unhandled(msg) + } case ResetRestartCount(current) ⇒ if (current == restartCount) restartCount = 0 + case GetRestartCount ⇒ + sender() ! RestartCount(restartCount) + case GetCurrentChild ⇒ sender() ! CurrentChild(child) @@ -219,5 +273,4 @@ final class BackoffSupervisor( case None ⇒ context.system.deadLetters.forward(msg) } } -} - +} \ No newline at end of file diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisor.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisor.scala deleted file mode 100644 index eac61896d0..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisor.scala +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ - -package akka.contrib.pattern - -import akka.actor._ -import akka.actor.OneForOneStrategy -import akka.actor.SupervisorStrategy._ -import scala.concurrent.duration._ - -object TransparentExponentialBackoffSupervisor { - private case class ScheduleRestart(childRef: ActorRef) extends DeadLetterSuppression - private case object StartChild extends DeadLetterSuppression - private case class ResetRestartCount(lastNumRestarts: Int) extends DeadLetterSuppression - - /** - * Props for creating a [[TransparentExponentialBackoffSupervisor]] with a decider. - * - * @param childProps the [[akka.actor.Props]] of the child to be supervised. - * @param childName the name of the child actor. - * @param minBackoff the min time before the child is restarted. - * @param maxBackoff the max time (upperbound) for a child restart. - * @param randomFactor a random delay factor to add on top of the calculated exponential - * back off. - * The calculation is equivalent to: - * {{{ - * final_delay = min( - * maxBackoff, - * (random_delay_factor * calculated_backoff) + calculated_backoff) - * }}} - * @param decider a `Decider` to specify how the supervisor - * should behave for different exceptions. If no cases are matched, the default decider of - * [[akka.actor.Actor]] is used. When the `Restart` directive - * is returned by the decider, this supervisor will apply an exponential back off restart. - */ - def propsWithDecider( - childProps: Props, - childName: String, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double)(decider: Decider): Props = { - Props( - new TransparentExponentialBackoffSupervisor( - childProps, - childName, - Some(decider), - minBackoff, - maxBackoff, - randomFactor)) - } - - /** - * Props for creating a [[TransparentExponentialBackoffSupervisor]] using the - * default [[akka.actor.Actor]] decider. - * - * @param childProps the [[akka.actor.Props]] of the child to be supervised. - * @param childName the name of the child actor. - * @param minBackoff the min time before the child is restarted. - * @param maxBackoff the max time (upperbound) for a child restart. - * @param randomFactor a random delay factor to add on top of the calculated exponential - * back off. - * The calculation is equivalent to: - * {{{ - * final_delay = min( - * maxBackoff, - * (random_delay_factor * calculated_backoff) + calculated_backoff) - * }}} - */ - def props( - childProps: Props, - childName: String, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double): Props = { - Props( - new TransparentExponentialBackoffSupervisor( - childProps, - childName, - None, - minBackoff, - maxBackoff, - randomFactor)) - } -} - -/** - * A supervising actor that restarts a child actor with an exponential back off. - * - * This explicit supervisor behaves similarly to the normal implicit supervision where - * if an actor throws an exception, the decider on the supervisor will decide when to - * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. - * - * When the `Restart` directive is specified, the supervisor will delay the restart - * using an exponential back off strategy (bounded by minBackoff and maxBackoff). - * - * This supervisor is intended to be transparent to both the child actor and external actors. - * Where external actors can send messages to the supervisor as if it was the child and the - * messages will be forwarded. And when the child is `Terminated`, the supervisor is also - * `Terminated`. - * Transparent to the child means that the child does not have to be aware that it is being - * supervised specifically by the [[TransparentExponentialBackoffSupervisor]]. Just like it does - * not need to know when it is being supervised by the usual implicit supervisors. - * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the - * `sender()` `ActorRef` from the child response may eventually not be able to communicate with - * the stored `ActorRef`. In general all messages to the child should be directed through the - * [[TransparentExponentialBackoffSupervisor]]. - * - * An example of where this supervisor might be used is when you may have an actor that is - * responsible for continuously polling on a server for some resource that sometimes may be down. - * Instead of hammering the server continuously when the resource is unavailable, the actor will - * be restarted with an exponentially increasing back off until the resource is available again. - * - * '''*** - * This supervisor should not be used with `Akka Persistence` child actors. - * `Akka Persistence` actors, currently, shutdown unconditionally on `persistFailure()`s rather - * than throw an exception on a failure like normal actors. - * [[akka.pattern.BackoffSupervisor]] should be used instead for cases where the child actor - * terminates itself as a failure signal instead of the normal behavior of throwing an exception. - * ***''' - */ -class TransparentExponentialBackoffSupervisor( - props: Props, - childName: String, - decider: Option[Decider], - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double) - extends Actor - with Stash - with ActorLogging { - - import TransparentExponentialBackoffSupervisor._ - import context._ - - override val supervisorStrategy = OneForOneStrategy() { - case ex ⇒ - val defaultDirective: Directive = - super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate) - val maybeDirective: Option[Directive] = decider - .map(_.applyOrElse(ex, (_: Any) ⇒ defaultDirective)) - - // Get the directive from the specified decider or fallback to - // the default decider. - // Whatever the final Directive is, we will translate all Restarts - // to our own Restarts, which involves stopping the child. - maybeDirective.getOrElse(defaultDirective) match { - case Restart ⇒ - val childRef = sender - become({ - case Terminated(`childRef`) ⇒ - unbecome() - self ! ScheduleRestart(childRef) - case _ ⇒ - stash() - }, discardOld = false) - Stop - case other ⇒ other - } - } - - // Initialize by starting up and watching the child - self ! StartChild - - def receive = waitingToStart(-1, false) - - def waitingToStart(numRestarts: Int, scheduleCounterReset: Boolean): Receive = { - case StartChild ⇒ - val childRef = actorOf(props, childName) - watch(childRef) - unstashAll() - if (scheduleCounterReset) { - system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(numRestarts + 1)) - } - become(watching(childRef, numRestarts + 1)) - case _ ⇒ stash() - } - - // Steady state - def watching(childRef: ActorRef, numRestarts: Int): Receive = { - case ScheduleRestart(`childRef`) ⇒ - val delay = akka.pattern.BackoffSupervisor.calculateDelay( - numRestarts, minBackoff, maxBackoff, randomFactor) - system.scheduler.scheduleOnce(delay, self, StartChild) - become(waitingToStart(numRestarts, true)) - log.info(s"Restarting child in: $delay; numRestarts: $numRestarts") - case ResetRestartCount(last) ⇒ - if (last == numRestarts) { - log.debug(s"Last restart count [$last] matches current count; resetting") - become(watching(childRef, 0)) - } else { - log.debug(s"Last restart count [$last] does not match the current count [$numRestarts]") - } - case Terminated(`childRef`) ⇒ - log.debug(s"Terminating, because child [$childRef] terminated itself") - stop(self) - case msg if sender() == childRef ⇒ - // use the supervisor as sender - context.parent ! msg - case msg ⇒ - childRef.forward(msg) - } -} diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisorSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisorSpec.scala deleted file mode 100644 index 07441cb66f..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/TransparentExponentialBackoffSupervisorSpec.scala +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ - -package akka.contrib.pattern - -import akka.testkit.AkkaSpec -import akka.testkit.TestProbe -import scala.concurrent.Future -import scala.concurrent.duration._ -import akka.actor._ -import scala.language.postfixOps - -object TestActor { - class StoppingException extends Exception("stopping exception") - def props(probe: ActorRef): Props = Props(new TestActor(probe)) -} - -class TestActor(probe: ActorRef) extends Actor { - import context.dispatcher - - probe ! "STARTED" - - def receive = { - case "DIE" ⇒ context.stop(self) - case "THROW" ⇒ throw new Exception("normal exception") - case "THROW_STOPPING_EXCEPTION" ⇒ throw new TestActor.StoppingException - case ("TO_PARENT", msg) ⇒ context.parent ! msg - case other ⇒ probe ! other - } -} - -object TestParentActor { - def props(probe: ActorRef, supervisorProps: Props): Props = - Props(new TestParentActor(probe, supervisorProps)) -} -class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor { - val supervisor = context.actorOf(supervisorProps) - - def receive = { - case other ⇒ probe.forward(other) - } -} - -class TransparentExponentialBackoffSupervisorSpec extends AkkaSpec { - - def supervisorProps(probeRef: ActorRef) = TransparentExponentialBackoffSupervisor.propsWithDecider( - TestActor.props(probeRef), - "someChildName", - 200 millis, - 10 seconds, - 0.0) { - case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop - } - - trait Setup { - val probe = TestProbe() - val supervisor = system.actorOf(supervisorProps(probe.ref)) - probe.expectMsg("STARTED") - } - - trait Setup2 { - val probe = TestProbe() - val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref))) - probe.expectMsg("STARTED") - val child = probe.lastSender - } - - "TransparentExponentialBackoffSupervisor" must { - "forward messages to child" in new Setup { - supervisor ! "some message" - probe.expectMsg("some message") - } - - "terminate when child terminates" in new Setup { - probe.watch(supervisor) - supervisor ! "DIE" - probe.expectTerminated(supervisor) - } - - "restart the child with an exponential back off" in new Setup { - // Exponential back off restart test - probe.within(1.4 seconds, 2 seconds) { - supervisor ! "THROW" - // numRestart = 0 ~ 200 millis - probe.expectMsg(300 millis, "STARTED") - - supervisor ! "THROW" - // numRestart = 1 ~ 400 millis - probe.expectMsg(500 millis, "STARTED") - - supervisor ! "THROW" - // numRestart = 2 ~ 800 millis - probe.expectMsg(900 millis, "STARTED") - } - - // Verify that we only have one child at this point by selecting all the children - // under the supervisor and broadcasting to them. - // If there exists more than one child, we will get more than one reply. - val supervisorChildSelection = system.actorSelection(supervisor.path / "*") - supervisorChildSelection.tell("testmsg", probe.ref) - probe.expectMsg("testmsg") - probe.expectNoMsg - } - - "stop on exceptions as dictated by the decider" in new Setup { - probe.watch(supervisor) - // This should cause the supervisor to stop the child actor and then - // subsequently stop itself. - supervisor ! "THROW_STOPPING_EXCEPTION" - probe.expectTerminated(supervisor) - } - - "forward messages from the child to the parent of the supervisor" in new Setup2 { - child ! (("TO_PARENT", "TEST_MESSAGE")) - probe.expectMsg("TEST_MESSAGE") - } - } -} diff --git a/akka-docs/rst/general/supervision.rst b/akka-docs/rst/general/supervision.rst index a047fdb111..46ef7c15e8 100644 --- a/akka-docs/rst/general/supervision.rst +++ b/akka-docs/rst/general/supervision.rst @@ -203,25 +203,26 @@ terminates a child by way of the ``system.stop(child)`` method or sending a Delayed restarts with the BackoffSupervisor pattern ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Provided as a build-in pattern the ``akka.pattern.BackoffSupervisor`` actor implements the so-called -*exponential backoff supervision strategy*, which can be used to death-watch an actor, -and when it terminates try to start it again, each time with a growing time delay between those restarts. +Provided as a built-in pattern the ``akka.pattern.BackoffSupervisor`` implements the so-called +*exponential backoff supervision strategy*, starting a child actor again when it fails, each time with a growing time delay between restarts. -This pattern is useful when the started actor fails because some external resource is not available, +This pattern is useful when the started actor fails [#]_ because some external resource is not available, and we need to give it some time to start-up again. One of the prime examples when this is useful is -when a :ref:`PersistentActor ` fails with an persistence failure - which indicates that +when a :ref:`PersistentActor ` fails (by stopping) with a persistence failure - which indicates that the database may be down or overloaded, in such situations it makes most sense to give it a little bit of time -to recover before the peristent actor is restarted. +to recover before the peristent actor is started. -The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor -in increasing intervals of 3, 6, 12, 24 and finally 30 seconds: +.. [#] A failure can be indicated in two different ways; by an actor stopping or crashing. -.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff +The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor after it has stopped +because of a failure, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds: + +.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-stop The above is equivalent to this Java code: .. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-imports -.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff +.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-stop Using a ``randomFactor`` to add a little bit of additional variance to the backoff intervals is highly recommended, in order to avoid multiple actors re-start at the exact same point in time, @@ -230,6 +231,32 @@ and re-starting after the same configured interval. By adding additional randomn re-start intervals the actors will start in slightly different points in time, thus avoiding large spikes of traffic hitting the recovering shared database or other resource that they all need to contact. +The ``akka.pattern.BackoffSupervisor`` actor can also be configured to restart the actor after a delay when the actor +crashes and the supervision strategy decides that it should restart. + +The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor after it has crashed +because of some exception, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds: + +.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-fail + +The above is equivalent to this Java code: + +.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-imports +.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-fail + +The ``akka.pattern.BackoffOptions`` can be used to customize the behavior of the back-off supervisor actor, below are some examples: + +.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-custom-stop + +The above code sets up a back-off supervisor that requires the child actor to send a ``akka.pattern.BackoffSupervisor.Reset`` message +to its parent when a message is successfully processed, resetting the back-off. It also uses a default stopping strategy, any exception +will cause the child to stop. + +.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-custom-fail + +The above code sets up a back-off supervisor that restarts the child after back-off if MyException is thrown, any other exception will be +escalated. The back-off is automatically reset if the child does not throw any errors within 10 seconds. + One-For-One Strategy vs. All-For-One Strategy --------------------------------------------- diff --git a/akka-docs/rst/java/code/docs/pattern/BackoffSupervisorDocTest.java b/akka-docs/rst/java/code/docs/pattern/BackoffSupervisorDocTest.java index a5c103f746..7d09ec18b4 100644 --- a/akka-docs/rst/java/code/docs/pattern/BackoffSupervisorDocTest.java +++ b/akka-docs/rst/java/code/docs/pattern/BackoffSupervisorDocTest.java @@ -4,6 +4,7 @@ package docs.pattern; import akka.actor.*; +import akka.pattern.Backoff; import akka.pattern.BackoffSupervisor; import akka.testkit.TestActors.EchoActor; //#backoff-imports @@ -14,19 +15,35 @@ import java.util.concurrent.TimeUnit; public class BackoffSupervisorDocTest { - void example (ActorSystem system) { - //#backoff + void exampleStop (ActorSystem system) { + //#backoff-stop final Props childProps = Props.create(EchoActor.class); final Props supervisorProps = BackoffSupervisor.props( - childProps, - "myEcho", - Duration.create(3, TimeUnit.SECONDS), - Duration.create(30, TimeUnit.SECONDS), - 0.2); // adds 20% "noise" to vary the intervals slightly + Backoff.onStop( + childProps, + "myEcho", + Duration.create(3, TimeUnit.SECONDS), + Duration.create(30, TimeUnit.SECONDS), + 0.2)); // adds 20% "noise" to vary the intervals slightly system.actorOf(supervisorProps, "echoSupervisor"); - //#backoff + //#backoff-stop } + void exampleFailure (ActorSystem system) { + //#backoff-fail + final Props childProps = Props.create(EchoActor.class); + + final Props supervisorProps = BackoffSupervisor.props( + Backoff.onFailure( + childProps, + "myEcho", + Duration.create(3, TimeUnit.SECONDS), + Duration.create(30, TimeUnit.SECONDS), + 0.2)); // adds 20% "noise" to vary the intervals slightly + + system.actorOf(supervisorProps, "echoSupervisor"); + //#backoff-fail + } } diff --git a/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala b/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala index ec70d37ada..52c29be33f 100644 --- a/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala @@ -4,28 +4,99 @@ package docs.pattern -import akka.actor.{ ActorSystem, Props } -import akka.pattern.BackoffSupervisor +import akka.actor.{ ActorSystem, Props, OneForOneStrategy, SupervisorStrategy } +import akka.pattern.{ Backoff, BackoffSupervisor } import akka.testkit.TestActors.EchoActor class BackoffSupervisorDocSpec { - class BackoffSupervisorDocSpecExample { + class BackoffSupervisorDocSpecExampleStop { val system: ActorSystem = ??? import scala.concurrent.duration._ - //#backoff + //#backoff-stop val childProps = Props(classOf[EchoActor]) val supervisor = BackoffSupervisor.props( - childProps, - childName = "myEcho", - minBackoff = 3.seconds, - maxBackoff = 30.seconds, - randomFactor = 0.2) // adds 20% "noise" to vary the intervals slightly + Backoff.onStop( + childProps, + childName = "myEcho", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + )) system.actorOf(supervisor, name = "echoSupervisor") - //#backoff + //#backoff-stop } + class BackoffSupervisorDocSpecExampleFail { + val system: ActorSystem = ??? + import scala.concurrent.duration._ + + //#backoff-fail + val childProps = Props(classOf[EchoActor]) + + val supervisor = BackoffSupervisor.props( + Backoff.onFailure( + childProps, + childName = "myEcho", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + )) + + system.actorOf(supervisor, name = "echoSupervisor") + //#backoff-fail + } + + class BackoffSupervisorDocSpecExampleStopOptions { + val system: ActorSystem = ??? + import scala.concurrent.duration._ + + val childProps = Props(classOf[EchoActor]) + + //#backoff-custom-stop + val supervisor = BackoffSupervisor.props( + Backoff.onStop( + childProps, + childName = "myEcho", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + ).withManualReset // the child must send BackoffSupervisor.Reset to its parent + .withDefaultStoppingStrategy // Stop at any Exception thrown + ) + //#backoff-custom-stop + + system.actorOf(supervisor, name = "echoSupervisor") + } + + class BackoffSupervisorDocSpecExampleFailureOptions { + val system: ActorSystem = ??? + import scala.concurrent.duration._ + + val childProps = Props(classOf[EchoActor]) + + //#backoff-custom-fail + val supervisor = BackoffSupervisor.props( + Backoff.onFailure( + childProps, + childName = "myEcho", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + ).withAutoReset(10.seconds) // the child must send BackoffSupervisor.Reset to its parent + .withSupervisorStrategy( + OneForOneStrategy() { + case _: MyException ⇒ SupervisorStrategy.Restart + case _ ⇒ SupervisorStrategy.Escalate + })) + //#backoff-custom-fail + + system.actorOf(supervisor, name = "echoSupervisor") + } + + case class MyException(msg: String) extends Exception(msg) + } diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 1c60ac7916..186d3a7949 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -5,7 +5,7 @@ package docs.persistence import akka.actor._ -import akka.pattern.BackoffSupervisor +import akka.pattern.{ Backoff, BackoffSupervisor } import akka.persistence._ import akka.stream.ActorMaterializer import akka.stream.scaladsl.{ Source, Sink, Flow } @@ -85,17 +85,18 @@ object PersistenceDocSpec { } } - object Backoff { + object BackoffOnStop { abstract class MyActor extends Actor { import PersistAsync.MyPersistentActor //#backoff val childProps = Props[MyPersistentActor] val props = BackoffSupervisor.props( - childProps, - childName = "myActor", - minBackoff = 3.seconds, - maxBackoff = 30.seconds, - randomFactor = 0.2) + Backoff.onStop( + childProps, + childName = "myActor", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2)) context.actorOf(props, name = "mySupervisor") //#backoff } diff --git a/project/MiMa.scala b/project/MiMa.scala index 33cc2f50f2..aae3626cef 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -597,7 +597,13 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingTypesProblem]("akka.remote.InvalidAssociation$"), ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.apply"), ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.copy"), - ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.this") + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.this"), + + // #19281 BackoffSupervisor updates + ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$child_="), + ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$restartCount"), + ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$restartCount_="), + ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$child") ) ) }