From 176b718b2a1d249767164af2269ce099f5ac1a07 Mon Sep 17 00:00:00 2001 From: Saleh Khazaei Date: Fri, 14 Sep 2018 16:52:52 +0430 Subject: [PATCH] Adding maximum restart attempts to BackoffSupervisor #24769 --- .../BackoffOnRestartSupervisorSpec.scala | 6 +- .../akka/pattern/BackoffSupervisorSpec.scala | 102 ++++++- .../mima-filters/2.5.14.backwards.excludes | 2 +- .../mima-filters/2.5.16.backwards.excludes | 3 + .../main/scala/akka/actor/FaultHandling.scala | 2 + .../scala/akka/pattern/BackoffOptions.scala | 282 +++++++++++++++++- .../akka/pattern/BackoffSupervisor.scala | 96 +++++- .../cluster/sharding/ClusterSharding.scala | 3 +- .../sharding/ClusterShardingSpec.scala | 3 +- .../pattern/BackoffSupervisorDocSpec.scala | 12 +- .../docs/persistence/PersistenceDocSpec.scala | 3 +- 11 files changed, 475 insertions(+), 39 deletions(-) create mode 100644 akka-actor/src/main/mima-filters/2.5.16.backwards.excludes diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala index df698b1217..df9db17283 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala @@ -52,7 +52,7 @@ class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor { class BackoffOnRestartSupervisorSpec extends AkkaSpec with ImplicitSender { def supervisorProps(probeRef: ActorRef) = { - val options = Backoff.onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0) + val options = Backoff.onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0, maxNrOfRetries = -1) .withSupervisorStrategy(OneForOneStrategy(maxNrOfRetries = 4, withinTimeRange = 30 seconds) { case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop }) @@ -139,7 +139,7 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec with ImplicitSender { "accept commands while child is terminating" in { val postStopLatch = new CountDownLatch(1) - val options = Backoff.onFailure(Props(new SlowlyFailingActor(postStopLatch)), "someChildName", 1 nanos, 1 nanos, 0.0) + val options = Backoff.onFailure(Props(new SlowlyFailingActor(postStopLatch)), "someChildName", 1 nanos, 1 nanos, 0.0, maxNrOfRetries = -1) .withSupervisorStrategy(OneForOneStrategy(loggingEnabled = false) { case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop }) @@ -197,7 +197,7 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec with ImplicitSender { // withinTimeRange indicates the time range in which maxNrOfRetries will cause the child to // stop. IE: If we restart more than maxNrOfRetries in a time range longer than withinTimeRange // that is acceptable. - val options = Backoff.onFailure(TestActor.props(probe.ref), "someChildName", 300 millis, 10 seconds, 0.0) + val options = Backoff.onFailure(TestActor.props(probe.ref), "someChildName", 300 millis, 10 seconds, 0.0, maxNrOfRetries = -1) .withSupervisorStrategy(OneForOneStrategy(withinTimeRange = 1 seconds, maxNrOfRetries = 3) { case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop }) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala index 05f9570494..f194febf4f 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala @@ -6,6 +6,7 @@ package akka.pattern import akka.actor._ import akka.testkit._ +import org.scalatest.concurrent.Eventually import org.scalatest.prop.TableDrivenPropertyChecks._ import scala.concurrent.duration._ @@ -42,11 +43,11 @@ object BackoffSupervisorSpec { } } -class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { +class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually { import BackoffSupervisorSpec._ - def onStopOptions(props: Props = Child.props(testActor)) = Backoff.onStop(props, "c1", 100.millis, 3.seconds, 0.2) - def onFailureOptions(props: Props = Child.props(testActor)) = Backoff.onFailure(props, "c1", 100.millis, 3.seconds, 0.2) + def onStopOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1) = Backoff.onStop(props, "c1", 100.millis, 3.seconds, 0.2, maxNrOfRetries) + def onFailureOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1) = Backoff.onFailure(props, "c1", 100.millis, 3.seconds, 0.2, maxNrOfRetries) def create(options: BackoffOptions) = system.actorOf(BackoffSupervisor.props(options)) "BackoffSupervisor" must { @@ -178,7 +179,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { "reply to sender if replyWhileStopped is specified" in { filterException[TestException] { - val supervisor = create(Backoff.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2).withReplyWhileStopped("child was stopped")) + val supervisor = create(Backoff.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2, maxNrOfRetries = -1).withReplyWhileStopped("child was stopped")) supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get watch(c1) @@ -200,7 +201,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { "not reply to sender if replyWhileStopped is NOT specified" in { filterException[TestException] { - val supervisor = create(Backoff.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2)) + val supervisor = create(Backoff.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2, maxNrOfRetries = -1)) supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get watch(c1) @@ -242,5 +243,96 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { assert(calculatedValue === expectedResult) } } + + "stop restarting the child after reaching maxNrOfRetries limit (Backoff.onStop)" in { + val supervisor = create(onStopOptions(maxNrOfRetries = 2)) + def waitForChild: Option[ActorRef] = { + eventually(timeout(1.second), interval(50.millis)) { + supervisor ! BackoffSupervisor.GetCurrentChild + val c = expectMsgType[BackoffSupervisor.CurrentChild].ref + c.isDefined shouldBe true + } + + supervisor ! BackoffSupervisor.GetCurrentChild + expectMsgType[BackoffSupervisor.CurrentChild].ref + } + + watch(supervisor) + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(0)) + + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + c1 ! PoisonPill + expectTerminated(c1) + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(1)) + + val c2 = waitForChild.get + awaitAssert(c2 should !==(c1)) + watch(c2) + c2 ! PoisonPill + expectTerminated(c2) + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(2)) + + val c3 = waitForChild.get + awaitAssert(c3 should !==(c2)) + watch(c3) + c3 ! PoisonPill + expectTerminated(c3) + expectTerminated(supervisor) + } + + "stop restarting the child after reaching maxNrOfRetries limit (Backoff.onFailure)" in { + filterException[TestException] { + val supervisor = create(onFailureOptions(maxNrOfRetries = 2)) + + def waitForChild: Option[ActorRef] = { + eventually(timeout(1.second), interval(50.millis)) { + supervisor ! BackoffSupervisor.GetCurrentChild + val c = expectMsgType[BackoffSupervisor.CurrentChild].ref + c.isDefined shouldBe true + } + + supervisor ! BackoffSupervisor.GetCurrentChild + expectMsgType[BackoffSupervisor.CurrentChild].ref + } + + watch(supervisor) + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(0)) + + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + c1 ! "boom" + expectTerminated(c1) + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(1)) + + val c2 = waitForChild.get + awaitAssert(c2 should !==(c1)) + watch(c2) + c2 ! "boom" + expectTerminated(c2) + + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(2)) + + val c3 = waitForChild.get + awaitAssert(c3 should !==(c2)) + watch(c3) + c3 ! "boom" + expectTerminated(c3) + expectTerminated(supervisor) + } + } } } diff --git a/akka-actor/src/main/mima-filters/2.5.14.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.14.backwards.excludes index ed067fa17b..942188caf3 100644 --- a/akka-actor/src/main/mima-filters/2.5.14.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.14.backwards.excludes @@ -14,4 +14,4 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.internal.DnsClie ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.internal.DnsClient#Answer.this") ProblemFilters.exclude[MissingTypesProblem]("akka.io.dns.internal.DnsClient$Answer$") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.internal.DnsClient#Answer.apply") -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.dns.internal.AsyncDnsCache.put") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.dns.internal.AsyncDnsCache.put") \ No newline at end of file diff --git a/akka-actor/src/main/mima-filters/2.5.16.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.16.backwards.excludes new file mode 100644 index 0000000000..5d9ad3dc39 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.5.16.backwards.excludes @@ -0,0 +1,3 @@ +# #25435 - Adding maximum restart attempts to BackoffSupervisor +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withMaxNrOfRetries") + diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 8beeeba31f..5d864067aa 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -547,6 +547,8 @@ case class OneForOneStrategy( def this(decider: SupervisorStrategy.Decider) = this()(decider) + def withMaxNrOfRetries(maxNrOfRetries: Int): OneForOneStrategy = copy(maxNrOfRetries = maxNrOfRetries)(decider) + /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala index 9acf8a49f8..648c788a8c 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -71,14 +71,19 @@ object Backoff { * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. + * @param maxNrOfRetries maximum number of attempts to restart the child actor. + * The supervisor will terminate itself after the maxNoOfRetries is reached. + * In order to restart infinitely pass in `-1`. + * */ def onFailure( - childProps: Props, - childName: String, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double): BackoffOptions = - BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxNrOfRetries: Int): BackoffOptions = + BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor).withMaxNrOfRetries(maxNrOfRetries) /** * Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure. @@ -126,13 +131,192 @@ object Backoff { * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. */ + @deprecated("Use the overloaded one which accepts maxNrOfRetries instead.", "2.5.17") + def onFailure( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): BackoffOptions = + BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) + + /** + * Java API: Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure. + * + * This explicit supervisor behaves similarly to the normal implicit supervision where + * if an actor throws an exception, the decider on the supervisor will decide when to + * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. + * + * When the `Restart` directive is specified, the supervisor will delay the restart + * using an exponential back off strategy (bounded by minBackoff and maxBackoff). + * + * This supervisor is intended to be transparent to both the child actor and external actors. + * Where external actors can send messages to the supervisor as if it was the child and the + * messages will be forwarded. And when the child is `Terminated`, the supervisor is also + * `Terminated`. + * Transparent to the child means that the child does not have to be aware that it is being + * supervised specifically by this actor. Just like it does + * not need to know when it is being supervised by the usual implicit supervisors. + * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the + * `sender()` `ActorRef` from the child response may eventually not be able to communicate with + * the stored `ActorRef`. In general all messages to the child should be directed through this actor. + * + * An example of where this supervisor might be used is when you may have an actor that is + * responsible for continuously polling on a server for some resource that sometimes may be down. + * Instead of hammering the server continuously when the resource is unavailable, the actor will + * be restarted with an exponentially increasing back off until the resource is available again. + * + * '''*** + * This supervisor should not be used with `Akka Persistence` child actors. + * `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather + * than throw an exception on a failure like normal actors. + * [[#onStop]] should be used instead for cases where the child actor + * terminates itself as a failure signal instead of the normal behavior of throwing an exception. + * ***''' + * You can define another + * supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxNrOfRetries maximum number of attempts to restart the child actor. + * The supervisor will terminate itself after the maxNoOfRetries is reached. + * In order to restart infinitely pass in `-1`. + */ + def onFailure( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxNrOfRetries: Int): BackoffOptions = + onFailure(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, maxNrOfRetries) + + /** + * Java API: Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure. + * + * This explicit supervisor behaves similarly to the normal implicit supervision where + * if an actor throws an exception, the decider on the supervisor will decide when to + * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. + * + * When the `Restart` directive is specified, the supervisor will delay the restart + * using an exponential back off strategy (bounded by minBackoff and maxBackoff). + * + * This supervisor is intended to be transparent to both the child actor and external actors. + * Where external actors can send messages to the supervisor as if it was the child and the + * messages will be forwarded. And when the child is `Terminated`, the supervisor is also + * `Terminated`. + * Transparent to the child means that the child does not have to be aware that it is being + * supervised specifically by this actor. Just like it does + * not need to know when it is being supervised by the usual implicit supervisors. + * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the + * `sender()` `ActorRef` from the child response may eventually not be able to communicate with + * the stored `ActorRef`. In general all messages to the child should be directed through this actor. + * + * An example of where this supervisor might be used is when you may have an actor that is + * responsible for continuously polling on a server for some resource that sometimes may be down. + * Instead of hammering the server continuously when the resource is unavailable, the actor will + * be restarted with an exponentially increasing back off until the resource is available again. + * + * '''*** + * This supervisor should not be used with `Akka Persistence` child actors. + * `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather + * than throw an exception on a failure like normal actors. + * [[#onStop]] should be used instead for cases where the child actor + * terminates itself as a failure signal instead of the normal behavior of throwing an exception. + * ***''' + * You can define another + * supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + @Deprecated def onFailure( childProps: Props, childName: String, minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double): BackoffOptions = - onFailure(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor) + onFailure(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, -1) + + /** + * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. + * + * This actor can be used to supervise a child actor and start it again + * after a back-off duration if the child actor is stopped. + * + * This is useful in situations where the re-start of the child actor should be + * delayed e.g. in order to give an external resource time to recover before the + * child actor tries contacting it again (after being restarted). + * + * Specifically this pattern is useful for persistent actors, + * which are stopped in case of persistence failures. + * Just restarting them immediately would probably fail again (since the data + * store is probably unavailable). It is better to try again after a delay. + * + * It supports exponential back-off between the given `minBackoff` and + * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and + * `maxBackoff` 30 seconds the start attempts will be delayed with + * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset + * if the actor is not terminated within the `minBackoff` duration. + * + * In addition to the calculated exponential back-off an additional + * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% + * delay. The reason for adding a random delay is to avoid that all failing + * actors hit the backend resource at the same time. + * + * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` + * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]] + * containing the `ActorRef` of the current child, if any. + * + * The `BackoffSupervisor`delegates all messages from the child to the parent of the + * `BackoffSupervisor`, with the supervisor as sender. + * + * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. + * + * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor + * if it wants to do an intentional stop. + * + * Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using + * [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A + * `Restart` will perform a normal immediate restart of the child. A `Stop` will + * stop the child, but it will be started again after the back-off duration. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxNrOfRetries maximum number of attempts to restart the child actor. + * The supervisor will terminate itself after the maxNoOfRetries is reached. + * In order to restart infinitely pass in `-1`. + */ + def onStop( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxNrOfRetries: Int): BackoffOptions = + BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor).withMaxNrOfRetries(maxNrOfRetries) /** * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. @@ -187,6 +371,7 @@ object Backoff { * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. */ + @deprecated("Use the overloaded one which accepts maxNrOfRetries instead.", "2.5.17") def onStop( childProps: Props, childName: String, @@ -196,7 +381,72 @@ object Backoff { BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) /** - * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. + * Java API: Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. + * + * This actor can be used to supervise a child actor and start it again + * after a back-off duration if the child actor is stopped. + * + * This is useful in situations where the re-start of the child actor should be + * delayed e.g. in order to give an external resource time to recover before the + * child actor tries contacting it again (after being restarted). + * + * Specifically this pattern is useful for persistent actors, + * which are stopped in case of persistence failures. + * Just restarting them immediately would probably fail again (since the data + * store is probably unavailable). It is better to try again after a delay. + * + * It supports exponential back-off between the given `minBackoff` and + * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and + * `maxBackoff` 30 seconds the start attempts will be delayed with + * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset + * if the actor is not terminated within the `minBackoff` duration. + * + * In addition to the calculated exponential back-off an additional + * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% + * delay. The reason for adding a random delay is to avoid that all failing + * actors hit the backend resource at the same time. + * + * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` + * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]] + * containing the `ActorRef` of the current child, if any. + * + * The `BackoffSupervisor`delegates all messages from the child to the parent of the + * `BackoffSupervisor`, with the supervisor as sender. + * + * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. + * + * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor + * if it wants to do an intentional stop. + * + * Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using + * [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A + * `Restart` will perform a normal immediate restart of the child. A `Stop` will + * stop the child, but it will be started again after the back-off duration. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxNrOfRetries maximum number of attempts to restart the child actor. + * The supervisor will terminate itself after the maxNoOfRetries is reached. + * In order to restart infinitely pass in `-1`. + */ + def onStop( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxNrOfRetries: Int): BackoffOptions = + onStop(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, maxNrOfRetries) + + /** + * Java API: Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. * * This actor can be used to supervise a child actor and start it again * after a back-off duration if the child actor is stopped. @@ -248,13 +498,14 @@ object Backoff { * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. */ + @Deprecated def onStop( childProps: Props, childName: String, minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double): BackoffOptions = - onStop(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor) + onStop(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, -1) } @@ -287,6 +538,7 @@ trait BackoffOptions { * The default supervisor strategy is used as fallback if the specified supervisorStrategy (its decider) * does not explicitly handle an exception. As the BackoffSupervisor creates a separate actor to handle the * backoff process, only a [[OneForOneStrategy]] makes sense here. + * Note that changing the strategy will replace the previously defined maxNrOfRetries. */ def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): BackoffOptions @@ -305,6 +557,15 @@ trait BackoffOptions { */ def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions + /** + * Returns a new BackoffOptions with a maximum number of retries to restart the child actor. + * By default, the supervisor will retry infinitely. + * With this option, the supervisor will terminate itself after the maxNoOfRetries is reached. + * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit, + * if the limit is exceeded the child actor is stopped + */ + def withMaxNrOfRetries(maxNrOfRetries: Int): BackoffOptions + /** * Returns the props to create the back-off supervisor. */ @@ -327,8 +588,9 @@ private final case class BackoffOptionsImpl( def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withManualReset = copy(reset = Some(ManualReset)) def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy) - def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider)) + def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy(supervisorStrategy.maxNrOfRetries)(SupervisorStrategy.stoppingStrategy.decider)) def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) + def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) def props = { require(minBackoff > Duration.Zero, "minBackoff must be > 0") diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index 5145b43bfb..4ac0db49d4 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -7,15 +7,8 @@ package akka.pattern import java.util.concurrent.ThreadLocalRandom import java.util.Optional -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.DeadLetterSuppression -import akka.actor.Props -import akka.actor.Terminated -import akka.actor.SupervisorStrategy.Directive -import akka.actor.SupervisorStrategy.Escalate -import akka.actor.OneForOneStrategy -import akka.actor.SupervisorStrategy +import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy, Terminated } +import akka.actor.SupervisorStrategy.{ Directive, Escalate, Restart, Stop } import akka.util.JavaDurationConverters._ import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -49,6 +42,40 @@ object BackoffSupervisor { propsWithSupervisorStrategy(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy) } + /** + * Props for creating a [[BackoffSupervisor]] actor. + * + * Exceptions in the child are handled with the default supervision strategy, i.e. + * most exceptions will immediately restart the child. You can define another + * supervision strategy by using [[#propsWithSupervisorStrategy]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxNrOfRetries maximum number of attempts to restart the child actor. + * The supervisor will terminate itself after the maxNoOfRetries is reached. + * In order to restart infinitely pass in `-1`. + */ + def props( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxNrOfRetries: Int): Props = { + val supervisionStrategy = SupervisorStrategy.defaultStrategy match { + case oneForOne: OneForOneStrategy ⇒ oneForOne.withMaxNrOfRetries(maxNrOfRetries) + case s ⇒ s + } + propsWithSupervisorStrategy(childProps, childName, minBackoff, maxBackoff, randomFactor, supervisionStrategy) + } + /** * Props for creating a [[BackoffSupervisor]] actor. * @@ -75,6 +102,36 @@ object BackoffSupervisor { props(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor) } + /** + * Props for creating a [[BackoffSupervisor]] actor. + * + * Exceptions in the child are handled with the default supervision strategy, i.e. + * most exceptions will immediately restart the child. You can define another + * supervision strategy by using [[#propsWithSupervisorStrategy]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxNrOfRetries maximum number of attempts to restart the child actor. + * The supervisor will terminate itself after the maxNoOfRetries is reached. + * In order to restart infinitely pass in `-1`. + */ + def props( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxNrOfRetries: Int): Props = { + props(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, maxNrOfRetries) + } + /** * Props for creating a [[BackoffSupervisor]] actor with a custom * supervision strategy. @@ -235,7 +292,8 @@ final class BackoffSupervisor( randomFactor: Double, strategy: SupervisorStrategy, val replyWhileStopped: Option[Any]) - extends Actor with HandleBackoff { + extends Actor with HandleBackoff + with ActorLogging { import BackoffSupervisor._ import context.dispatcher @@ -275,9 +333,21 @@ final class BackoffSupervisor( def onTerminated: Receive = { case Terminated(ref) if child.contains(ref) ⇒ child = None - val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) - context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) - restartCount += 1 + val maxNrOfRetries = strategy match { + case oneForOne: OneForOneStrategy ⇒ oneForOne.maxNrOfRetries + case _ ⇒ -1 + } + + val nextRestartCount = restartCount + 1 + + if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) { + val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) + context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) + restartCount = nextRestartCount + } else { + log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries) + context.stop(self) + } } def receive = onTerminated orElse handleBackoff diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index f5685c89f8..2428debf20 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -644,7 +644,8 @@ private[akka] class ClusterShardingGuardian extends Actor { childName = "coordinator", minBackoff = coordinatorFailureBackoff, maxBackoff = coordinatorFailureBackoff * 5, - randomFactor = 0.2) + randomFactor = 0.2, + maxNrOfRetries = -1) .withDeploy(Deploy.local) val singletonSettings = settings.coordinatorSingletonSettings .withSingletonName("singleton") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index f64235d65b..9d21b11afd 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -309,7 +309,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu childName = "coordinator", minBackoff = 5.seconds, maxBackoff = 5.seconds, - randomFactor = 0.1).withDeploy(Deploy.local) + randomFactor = 0.1, + maxNrOfRetries = -1).withDeploy(Deploy.local) system.actorOf( ClusterSingletonManager.props( singletonProps, diff --git a/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala b/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala index 53daebdb07..7cfa78af2c 100644 --- a/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala @@ -23,7 +23,8 @@ class BackoffSupervisorDocSpec { childName = "myEcho", minBackoff = 3.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly + maxNrOfRetries = -1 )) system.actorOf(supervisor, name = "echoSupervisor") @@ -43,7 +44,8 @@ class BackoffSupervisorDocSpec { childName = "myEcho", minBackoff = 3.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly + maxNrOfRetries = -1 )) system.actorOf(supervisor, name = "echoSupervisor") @@ -63,7 +65,8 @@ class BackoffSupervisorDocSpec { childName = "myEcho", minBackoff = 3.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly + maxNrOfRetries = -1 ).withManualReset // the child must send BackoffSupervisor.Reset to its parent .withDefaultStoppingStrategy // Stop at any Exception thrown ) @@ -85,7 +88,8 @@ class BackoffSupervisorDocSpec { childName = "myEcho", minBackoff = 3.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly + maxNrOfRetries = -1 ).withAutoReset(10.seconds) // reset if the child does not throw any errors within 10 seconds .withSupervisorStrategy( OneForOneStrategy() { diff --git a/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala index ac792b0ed6..31f4923f30 100644 --- a/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala @@ -103,7 +103,8 @@ object PersistenceDocSpec { childName = "myActor", minBackoff = 3.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2)) + randomFactor = 0.2, + maxNrOfRetries = -1)) context.actorOf(props, name = "mySupervisor") //#backoff }