diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala index 1bf8d8a03d..87ede87c63 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala @@ -15,7 +15,7 @@ import scala.concurrent.duration._ * * Back-off supervisor that stops and starts a child actor when the child actor restarts. * This back-off supervisor is created by using ``akka.pattern.BackoffSupervisor.props`` - * with ``akka.pattern.Backoff.onFailure``. + * with ``akka.pattern.BackoffOpts.onFailure``. */ @InternalApi private class BackoffOnRestartSupervisor( val childProps: Props, diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala index abd4623228..666d2a8293 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala @@ -15,7 +15,7 @@ import scala.concurrent.duration.FiniteDuration * * Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops. * This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props` - * with `Backoff.onStop`. + * with `BackoffOpts.onStop`. */ @InternalApi private[akka] class BackoffOnStopSupervisor( val childProps: Props, diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala index 28dcda8c3c..6dbd839e63 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -333,7 +333,7 @@ private final case class BackoffOnStopOptionsImpl[T]( finalStopMessage: Option[Any ⇒ Boolean] = None ) extends BackoffOnStopOptions { - val backoffReset = reset.getOrElse(AutoReset(minBackoff)) + private val backoffReset = reset.getOrElse(AutoReset(minBackoff)) def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withManualReset = copy(reset = Some(ManualReset)) @@ -343,7 +343,7 @@ private final case class BackoffOnStopOptionsImpl[T]( def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) def withFinalStopMessage(action: Any ⇒ Boolean) = copy(finalStopMessage = Some(action)) - def props = { + def props: Props = { require(minBackoff > Duration.Zero, "minBackoff must be > 0") require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff") require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") @@ -368,7 +368,7 @@ private final case class BackoffOnFailureOptionsImpl[T]( replyWhileStopped: Option[Any] = None ) extends BackoffOnFailureOptions { - val backoffReset = reset.getOrElse(AutoReset(minBackoff)) + private val backoffReset = reset.getOrElse(AutoReset(minBackoff)) def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withManualReset = copy(reset = Some(ManualReset)) @@ -377,7 +377,7 @@ private final case class BackoffOnFailureOptionsImpl[T]( def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) - def props = { + def props: Props = { require(minBackoff > Duration.Zero, "minBackoff must be > 0") require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff") require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala index 1159f65cfb..9f37e60281 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala @@ -7,7 +7,7 @@ package akka.cluster.sharding import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props } import akka.cluster.Cluster import akka.cluster.sharding.ShardRegion.Passivate -import akka.pattern.{ Backoff, BackoffSupervisor } +import akka.pattern.{ Backoff, BackoffOpts, BackoffSupervisor } import akka.testkit.{ AkkaSpec, ImplicitSender } import com.typesafe.config.ConfigFactory @@ -64,7 +64,7 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend import SupervisionSpec._ - "Supervision for a sharded actor" must { + "Supervision for a sharded actor (deprecated)" must { "allow passivation" in { @@ -99,4 +99,38 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend } } + "Supervision for a sharded actor" must { + + "allow passivation" in { + + val supervisedProps = BackoffSupervisor.props(BackoffOpts.onStop( + Props(new PassivatingActor()), + childName = "child", + minBackoff = 1.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 + ).withFinalStopMessage(_ == StopMessage)) + + Cluster(system).join(Cluster(system).selfAddress) + val region = ClusterSharding(system).start( + "passy", + supervisedProps, + ClusterShardingSettings(system), + idExtractor, + shardResolver + ) + + region ! Msg(10, "hello") + val response = expectMsgType[Response](5.seconds) + watch(response.self) + + region ! Msg(10, "passivate") + expectTerminated(response.self) + + // This would fail before as sharded actor would be stuck passivating + region ! Msg(10, "hello") + expectMsgType[Response](20.seconds) + } + } + } diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index 4d7ef2b490..239e7e4fd8 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -413,6 +413,8 @@ Java Note that stopped entities will be started again when a new message is targeted to the entity. +If 'on stop' backoff supervision strategy is used, a final termination message must be set and used for passivation, see #ref:[Supervision](general/supervision.md#Sharding) + ## Graceful Shutdown You can send the @scala[`ShardRegion.GracefulShutdown`] @java[`ShardRegion.gracefulShutdownInstance`] message diff --git a/akka-docs/src/main/paradox/general/supervision.md b/akka-docs/src/main/paradox/general/supervision.md index 7f6c84daf9..4d5373101b 100644 --- a/akka-docs/src/main/paradox/general/supervision.md +++ b/akka-docs/src/main/paradox/general/supervision.md @@ -207,6 +207,25 @@ to recover before the persistent actor is started. > [1] A failure can be indicated in two different ways; by an actor stopping or crashing. +#### Supervision strategies + +There are two basic supervision strategies available for backoff: +* 'On failure': The supervisor will restart the supervised actor once it crashes, but terminate if the actor stops normaly (e.g. through `context.stop`) +* 'On stop': The supervisor will restart the supervised actor if it terminates in any way (consider this for `PersistentActor` since they stop on persistence failures instead of crashing) + +#### Sharding +If the 'on stop' strategy is used for sharded actors a final termination message should be configured and used to terminate the actor on passivation. Otherwise the supervisor will just restart the actor again. + +The termination message is configured with: + +@@snip [BackoffSupervisorDocSpec.scala](/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala) { #backoff-sharded } + +And must be used for passivation: + +@@snip [BackoffSupervisorDocSpec.scala](/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala) { #backoff-sharded-passivation } + +#### Simple backoff + The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor after it has stopped because of a failure, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds: @@ -239,7 +258,21 @@ The above is equivalent to this Java code: @@snip [BackoffSupervisorDocTest.java](/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java) { #backoff-fail } -The `akka.pattern.BackoffOptions` can be used to customize the behavior of the back-off supervisor actor, below are some examples: +#### Customization + +The `akka.pattern.BackoffOnFailureOptions` and `akka.pattern.BackoffOnRestartOptions` can be used to customize the behavior of the back-off supervisor actor. +Options are: +* `withAutoReset`: The backoff is reset if no failure/stop occurs within the duration. This is the default behaviour with `minBackoff` as default value +* `withManualReset`: The child must send `BackoffSupervisor.Reset` to its backoff supervisor (parent) +* `withSupervisionStrategy`: Sets a custom `OneForOneStrategy` (as each backoff supervisor only has one child). The default strategy uses the `akka.actor.SupervisorStrategy.defaultDecider` which restarts on exceptions. +* `withDefaultStoppingStrategy`: Sets a `OneForOneStrategy` with the stopping decider that stops the child on all exceptions. +* `withMaxNrOfRetries`: Sets the maximum number of retries until the supervisor will give up (`-1` is default which means no limit of retries). Note: This is set on the supervision strategy, so setting a different strategy resets the `maxNrOfRetries`. +* `withReplyWhileStopped`: By default all messages received while the child is stopped are forwarded to dead letters. With this set, the supervisor will reply to the sender instead. + +Only available on `BackoffOnStopOptions`: +* `withFinalStopMessage`: Allows to define a predicate to decide on finally stopping the child (and supervisor). Used for passivate sharded actors - see above. + +Some examples: @@snip [BackoffSupervisorDocSpec.scala](/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala) { #backoff-custom-stop } diff --git a/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java b/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java index e1b68b421b..5f7755837f 100644 --- a/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java +++ b/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java @@ -6,6 +6,7 @@ package jdocs.pattern; import akka.actor.*; import akka.pattern.Backoff; +import akka.pattern.BackoffOpts; import akka.pattern.BackoffSupervisor; import akka.testkit.TestActors.EchoActor; // #backoff-imports @@ -20,7 +21,7 @@ public class BackoffSupervisorDocTest { final Props supervisorProps = BackoffSupervisor.props( - Backoff.onStop( + BackoffOpts.onStop( childProps, "myEcho", Duration.ofSeconds(3), @@ -37,7 +38,7 @@ public class BackoffSupervisorDocTest { final Props supervisorProps = BackoffSupervisor.props( - Backoff.onFailure( + BackoffOpts.onFailure( childProps, "myEcho", Duration.ofSeconds(3), diff --git a/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala b/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala index 0814829cf9..c2f1893bb3 100644 --- a/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/pattern/BackoffSupervisorDocSpec.scala @@ -4,7 +4,8 @@ package docs.pattern -import akka.actor.{ ActorSystem, OneForOneStrategy, Props, SupervisorStrategy } +import akka.actor.{ ActorContext, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy } +import akka.cluster.sharding.ShardRegion.Passivate import akka.pattern.{ BackoffOpts, BackoffSupervisor } import akka.testkit.TestActors.EchoActor @@ -99,4 +100,27 @@ class BackoffSupervisorDocSpec { case class MyException(msg: String) extends Exception(msg) + case object StopMessage + + class BackoffSupervisorDocSpecExampleSharding { + val system: ActorSystem = ??? + val context: ActorContext = ??? + import scala.concurrent.duration._ + + val childProps = Props(classOf[EchoActor]) + + //#backoff-sharded + val supervisor = BackoffSupervisor.props(BackoffOpts.onStop( + childProps, + childName = "myEcho", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 + ).withFinalStopMessage(_ == StopMessage)) + //#backoff-sharded + + //#backoff-sharded-passivation + context.parent ! Passivate(StopMessage) + //#backoff-sharded-passivation + } }