diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala new file mode 100644 index 0000000000..a2526fb8d4 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOnStopSupervisor.scala @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.pattern + +import akka.actor.SupervisorStrategy.{ Directive, Escalate } +import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated } + +import scala.concurrent.duration.FiniteDuration + +/** + * Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops. + * This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props` + * with `Backoff.onStop`. + */ +private[akka] class BackoffOnStopSupervisor( + val childProps: Props, + val childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + val reset: BackoffReset, + randomFactor: Double, + strategy: SupervisorStrategy, + replyWhileStopped: Option[Any], + finalStopMessage: Option[Any ⇒ Boolean]) + extends Actor with HandleBackoff + with ActorLogging { + + import BackoffSupervisor._ + import context.dispatcher + + // to keep binary compatibility with 2.4.1 + override val supervisorStrategy = strategy match { + case oneForOne: OneForOneStrategy ⇒ + OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) { + case ex ⇒ + val defaultDirective: Directive = + super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate) + + strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective) + } + case s ⇒ s + } + + def onTerminated: Receive = { + case Terminated(ref) if child.contains(ref) ⇒ + child = None + if (finalStopMessageReceived) { + context.stop(self) + } else { + val maxNrOfRetries = strategy match { + case oneForOne: OneForOneStrategy ⇒ oneForOne.maxNrOfRetries + case _ ⇒ -1 + } + + val nextRestartCount = restartCount + 1 + + if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) { + val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) + context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) + restartCount = nextRestartCount + } else { + log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries) + context.stop(self) + } + } + + } + + def receive: Receive = onTerminated orElse handleBackoff + + protected def handleMessageToChild(msg: Any): Unit = child match { + case Some(c) ⇒ + c.forward(msg) + if (!finalStopMessageReceived && finalStopMessage.isDefined) { + finalStopMessageReceived = finalStopMessage.get.apply(msg) + } + case None ⇒ + replyWhileStopped match { + case None ⇒ context.system.deadLetters.forward(msg) + case Some(r) ⇒ sender() ! r + } + finalStopMessage match { + case None ⇒ + case Some(fsm) ⇒ + if (fsm(msg)) { + context.stop(self) + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala index e8192ad4ad..a629b4eaa7 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -632,7 +632,7 @@ private final case class BackoffOptionsImpl( Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped)) //onStop method in companion object case StopImpliesFailure ⇒ - Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage)) + Props(new BackoffOnStopSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage)) } } } diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index 37e020f23d..b88f012f39 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -7,8 +7,7 @@ package akka.pattern import java.util.Optional import java.util.concurrent.ThreadLocalRandom -import akka.actor.SupervisorStrategy.{ Directive, Escalate } -import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy, Terminated } +import akka.actor.{ ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy } import akka.util.JavaDurationConverters._ import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -163,7 +162,7 @@ object BackoffSupervisor { require(minBackoff > Duration.Zero, "minBackoff must be > 0") require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff") require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") - Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor, strategy)) + Props(new BackoffOnStopSupervisor(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, strategy, None, None)) } /** @@ -278,41 +277,19 @@ object BackoffSupervisor { } } -/** - * Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops. - * This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props` - * with `Backoff.onStop`. - */ +// for backwards compability +@deprecated("Use BackoffSupervisor props method instead", since = "2.5.20") final class BackoffSupervisor( - val childProps: Props, - val childName: String, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - val reset: BackoffReset, - randomFactor: Double, - strategy: SupervisorStrategy, - @deprecated("Should be internal", since = "2.5.19") // since removed from HandleBackoff the val can be removed - val replyWhileStopped: Option[Any], - @deprecated("Should be internal", since = "2.5.19") // since removed from HandleBackoff the val can be removed - val finalStopMessage: Option[Any ⇒ Boolean]) - extends Actor with HandleBackoff - with ActorLogging { - - import BackoffSupervisor._ - import context.dispatcher - - // to keep binary compatibility with 2.4.1 - override val supervisorStrategy = strategy match { - case oneForOne: OneForOneStrategy ⇒ - OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) { - case ex ⇒ - val defaultDirective: Directive = - super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate) - - strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective) - } - case s ⇒ s - } + override val childProps: Props, + override val childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + override val reset: BackoffReset, + randomFactor: Double, + strategy: SupervisorStrategy, + val replyWhileStopped: Option[Any], + val finalStopMessage: Option[Any ⇒ Boolean]) + extends BackoffOnStopSupervisor(childProps, childName, minBackoff, maxBackoff, reset, randomFactor, strategy, replyWhileStopped, finalStopMessage) { // for binary compatibility with 2.5.18 def this( @@ -343,51 +320,4 @@ final class BackoffSupervisor( maxBackoff: FiniteDuration, randomFactor: Double) = this(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy) - - def onTerminated: Receive = { - case Terminated(ref) if child.contains(ref) ⇒ - child = None - if (finalStopMessageReceived) { - context.stop(self) - } else { - val maxNrOfRetries = strategy match { - case oneForOne: OneForOneStrategy ⇒ oneForOne.maxNrOfRetries - case _ ⇒ -1 - } - - val nextRestartCount = restartCount + 1 - - if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) { - val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) - context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) - restartCount = nextRestartCount - } else { - log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries) - context.stop(self) - } - } - - } - - def receive: Receive = onTerminated orElse handleBackoff - - protected def handleMessageToChild(msg: Any): Unit = child match { - case Some(c) ⇒ - c.forward(msg) - if (!finalStopMessageReceived && finalStopMessage.isDefined) { - finalStopMessageReceived = finalStopMessage.get.apply(msg) - } - case None ⇒ - replyWhileStopped match { - case None ⇒ context.system.deadLetters.forward(msg) - case Some(r) ⇒ sender() ! r - } - finalStopMessage match { - case None ⇒ - case Some(fsm) ⇒ - if (fsm(msg)) { - context.stop(self) - } - } - } }