Hide backoff on stop supervisor implementation #26156

This commit is contained in:
Nicolas Vollmar 2018-12-20 07:26:38 +01:00
parent d0a5ced319
commit 551bbdec77
3 changed files with 107 additions and 85 deletions

View file

@ -0,0 +1,92 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern
import akka.actor.SupervisorStrategy.{ Directive, Escalate }
import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
import scala.concurrent.duration.FiniteDuration
/**
* Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops.
* This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props`
* with `Backoff.onStop`.
*/
private[akka] class BackoffOnStopSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
replyWhileStopped: Option[Any],
finalStopMessage: Option[Any Boolean])
extends Actor with HandleBackoff
with ActorLogging {
import BackoffSupervisor._
import context.dispatcher
// to keep binary compatibility with 2.4.1
override val supervisorStrategy = strategy match {
case oneForOne: OneForOneStrategy
OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) {
case ex
val defaultDirective: Directive =
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) Escalate)
strategy.decider.applyOrElse(ex, (_: Any) defaultDirective)
}
case s s
}
def onTerminated: Receive = {
case Terminated(ref) if child.contains(ref)
child = None
if (finalStopMessageReceived) {
context.stop(self)
} else {
val maxNrOfRetries = strategy match {
case oneForOne: OneForOneStrategy oneForOne.maxNrOfRetries
case _ -1
}
val nextRestartCount = restartCount + 1
if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) {
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount = nextRestartCount
} else {
log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries)
context.stop(self)
}
}
}
def receive: Receive = onTerminated orElse handleBackoff
protected def handleMessageToChild(msg: Any): Unit = child match {
case Some(c)
c.forward(msg)
if (!finalStopMessageReceived && finalStopMessage.isDefined) {
finalStopMessageReceived = finalStopMessage.get.apply(msg)
}
case None
replyWhileStopped match {
case None context.system.deadLetters.forward(msg)
case Some(r) sender() ! r
}
finalStopMessage match {
case None
case Some(fsm)
if (fsm(msg)) {
context.stop(self)
}
}
}
}

View file

@ -632,7 +632,7 @@ private final case class BackoffOptionsImpl(
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped))
//onStop method in companion object
case StopImpliesFailure
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage))
Props(new BackoffOnStopSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage))
}
}
}

View file

@ -7,8 +7,7 @@ package akka.pattern
import java.util.Optional
import java.util.concurrent.ThreadLocalRandom
import akka.actor.SupervisorStrategy.{ Directive, Escalate }
import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
import akka.actor.{ ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy }
import akka.util.JavaDurationConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }
@ -163,7 +162,7 @@ object BackoffSupervisor {
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff")
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor, strategy))
Props(new BackoffOnStopSupervisor(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, strategy, None, None))
}
/**
@ -278,41 +277,19 @@ object BackoffSupervisor {
}
}
/**
* Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops.
* This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props`
* with `Backoff.onStop`.
*/
// for backwards compability
@deprecated("Use BackoffSupervisor props method instead", since = "2.5.20")
final class BackoffSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
@deprecated("Should be internal", since = "2.5.19") // since removed from HandleBackoff the val can be removed
val replyWhileStopped: Option[Any],
@deprecated("Should be internal", since = "2.5.19") // since removed from HandleBackoff the val can be removed
val finalStopMessage: Option[Any Boolean])
extends Actor with HandleBackoff
with ActorLogging {
import BackoffSupervisor._
import context.dispatcher
// to keep binary compatibility with 2.4.1
override val supervisorStrategy = strategy match {
case oneForOne: OneForOneStrategy
OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) {
case ex
val defaultDirective: Directive =
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) Escalate)
strategy.decider.applyOrElse(ex, (_: Any) defaultDirective)
}
case s s
}
override val childProps: Props,
override val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
override val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
val replyWhileStopped: Option[Any],
val finalStopMessage: Option[Any Boolean])
extends BackoffOnStopSupervisor(childProps, childName, minBackoff, maxBackoff, reset, randomFactor, strategy, replyWhileStopped, finalStopMessage) {
// for binary compatibility with 2.5.18
def this(
@ -343,51 +320,4 @@ final class BackoffSupervisor(
maxBackoff: FiniteDuration,
randomFactor: Double) =
this(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy)
def onTerminated: Receive = {
case Terminated(ref) if child.contains(ref)
child = None
if (finalStopMessageReceived) {
context.stop(self)
} else {
val maxNrOfRetries = strategy match {
case oneForOne: OneForOneStrategy oneForOne.maxNrOfRetries
case _ -1
}
val nextRestartCount = restartCount + 1
if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) {
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount = nextRestartCount
} else {
log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries)
context.stop(self)
}
}
}
def receive: Receive = onTerminated orElse handleBackoff
protected def handleMessageToChild(msg: Any): Unit = child match {
case Some(c)
c.forward(msg)
if (!finalStopMessageReceived && finalStopMessage.isDefined) {
finalStopMessageReceived = finalStopMessage.get.apply(msg)
}
case None
replyWhileStopped match {
case None context.system.deadLetters.forward(msg)
case Some(r) sender() ! r
}
finalStopMessage match {
case None
case Some(fsm)
if (fsm(msg)) {
context.stop(self)
}
}
}
}