Simplify backoff supervision api #26156
This commit is contained in:
parent
f2708c9dd3
commit
d0a5ced319
5 changed files with 223 additions and 182 deletions
|
|
@ -0,0 +1,9 @@
|
|||
# Simplify backoff supervision API #19016
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.replyWhileStopped")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.finalStopMessage")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.HandleBackoff.replyWhileStopped")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessage")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.handleMessageToChild")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.Backoff.onFailure")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.Backoff.onStop")
|
||||
|
|
@ -4,11 +4,10 @@
|
|||
|
||||
package akka.pattern
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy._
|
||||
import akka.actor.{ OneForOneStrategy, _ }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* Back-off supervisor that stops and starts a child actor when the child actor restarts.
|
||||
|
|
@ -23,13 +22,12 @@ private class BackoffOnRestartSupervisor(
|
|||
val reset: BackoffReset,
|
||||
randomFactor: Double,
|
||||
strategy: OneForOneStrategy,
|
||||
val replyWhileStopped: Option[Any],
|
||||
val finalStopMessage: Option[Any ⇒ Boolean])
|
||||
replyWhileStopped: Option[Any])
|
||||
extends Actor with HandleBackoff
|
||||
with ActorLogging {
|
||||
|
||||
import context._
|
||||
import BackoffSupervisor._
|
||||
import context._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) {
|
||||
case ex ⇒
|
||||
|
|
@ -81,6 +79,15 @@ private class BackoffOnRestartSupervisor(
|
|||
stop(self)
|
||||
}
|
||||
|
||||
def receive = onTerminated orElse handleBackoff
|
||||
def receive: Receive = onTerminated orElse handleBackoff
|
||||
|
||||
protected def handleMessageToChild(msg: Any): Unit = child match {
|
||||
case Some(c) ⇒
|
||||
c.forward(msg)
|
||||
case None ⇒
|
||||
replyWhileStopped match {
|
||||
case None ⇒ context.system.deadLetters.forward(msg)
|
||||
case Some(r) ⇒ sender() ! r
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ object Backoff {
|
|||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double,
|
||||
maxNrOfRetries: Int): BackoffOptions =
|
||||
maxNrOfRetries: Int): BackoffOnFailureOptions =
|
||||
BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor).withMaxNrOfRetries(maxNrOfRetries)
|
||||
|
||||
/**
|
||||
|
|
@ -137,7 +137,7 @@ object Backoff {
|
|||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): BackoffOptions =
|
||||
randomFactor: Double): BackoffOnFailureOptions =
|
||||
BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor)
|
||||
|
||||
/**
|
||||
|
|
@ -195,7 +195,7 @@ object Backoff {
|
|||
minBackoff: java.time.Duration,
|
||||
maxBackoff: java.time.Duration,
|
||||
randomFactor: Double,
|
||||
maxNrOfRetries: Int): BackoffOptions =
|
||||
maxNrOfRetries: Int): BackoffOnFailureOptions =
|
||||
onFailure(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, maxNrOfRetries)
|
||||
|
||||
/**
|
||||
|
|
@ -250,7 +250,7 @@ object Backoff {
|
|||
childName: String,
|
||||
minBackoff: java.time.Duration,
|
||||
maxBackoff: java.time.Duration,
|
||||
randomFactor: Double): BackoffOptions =
|
||||
randomFactor: Double): BackoffOnFailureOptions =
|
||||
onFailure(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, -1)
|
||||
|
||||
/**
|
||||
|
|
@ -315,7 +315,7 @@ object Backoff {
|
|||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double,
|
||||
maxNrOfRetries: Int): BackoffOptions =
|
||||
maxNrOfRetries: Int): BackoffOnStopOptions =
|
||||
BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor).withMaxNrOfRetries(maxNrOfRetries)
|
||||
|
||||
/**
|
||||
|
|
@ -376,7 +376,7 @@ object Backoff {
|
|||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): BackoffOptions =
|
||||
randomFactor: Double): BackoffOnStopOptions =
|
||||
BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor)
|
||||
|
||||
/**
|
||||
|
|
@ -441,7 +441,7 @@ object Backoff {
|
|||
minBackoff: java.time.Duration,
|
||||
maxBackoff: java.time.Duration,
|
||||
randomFactor: Double,
|
||||
maxNrOfRetries: Int): BackoffOptions =
|
||||
maxNrOfRetries: Int): BackoffOnStopOptions =
|
||||
onStop(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, maxNrOfRetries)
|
||||
|
||||
/**
|
||||
|
|
@ -503,7 +503,7 @@ object Backoff {
|
|||
childName: String,
|
||||
minBackoff: java.time.Duration,
|
||||
maxBackoff: java.time.Duration,
|
||||
randomFactor: Double): BackoffOptions =
|
||||
randomFactor: Double): BackoffOnStopOptions =
|
||||
onStop(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, -1)
|
||||
|
||||
}
|
||||
|
|
@ -548,23 +548,6 @@ trait BackoffOptions {
|
|||
*/
|
||||
def withDefaultStoppingStrategy: BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns a new BackoffOptions with a constant reply to messages that the supervisor receives while its
|
||||
* child is stopped. By default, a message received while the child is stopped is forwarded to `deadLetters`.
|
||||
* With this option, the supervisor will reply to the sender instead.
|
||||
* @param replyWhileStopped The message that the supervisor will send in response to all messages while
|
||||
* its child is stopped.
|
||||
*/
|
||||
def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions
|
||||
|
||||
/**
|
||||
* Predicate evaluated for each message, if it returns true and the supervised actor is
|
||||
* stopped then the supervisor will stop its self. If it returns true while
|
||||
* the supervised actor is running then it will be forwarded to the supervised actor and
|
||||
* when the supervised actor stops its self the supervisor will stop its self.
|
||||
*/
|
||||
def withFinalStopMessage(isFinalStopMessage: Any ⇒ Boolean): BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns a new BackoffOptions with a maximum number of retries to restart the child actor.
|
||||
* By default, the supervisor will retry infinitely.
|
||||
|
|
@ -574,12 +557,42 @@ trait BackoffOptions {
|
|||
*/
|
||||
def withMaxNrOfRetries(maxNrOfRetries: Int): BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns a new BackoffOptions with a constant reply to messages that the supervisor receives while its
|
||||
* child is stopped. By default, a message received while the child is stopped is forwarded to `deadLetters`.
|
||||
* With this option, the supervisor will reply to the sender instead.
|
||||
* @param replyWhileStopped The message that the supervisor will send in response to all messages while
|
||||
* its child is stopped.
|
||||
*/
|
||||
def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions
|
||||
|
||||
// for backwards compability with 2.5.19
|
||||
@deprecated("Use through BackoffOnStopOptions instead of BackoffOptions", since = "2.5.20")
|
||||
def withFinalStopMessage(isFinalStopMessage: Any ⇒ Boolean): BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns the props to create the back-off supervisor.
|
||||
*/
|
||||
private[akka] def props: Props
|
||||
}
|
||||
|
||||
sealed trait BackoffOnStopOptions extends BackoffOptions {
|
||||
|
||||
/**
|
||||
* Predicate evaluated for each message, if it returns true and the supervised actor is
|
||||
* stopped then the supervisor will stop its self. If it returns true while
|
||||
* the supervised actor is running then it will be forwarded to the supervised actor and
|
||||
* when the supervised actor stops its self the supervisor will stop its self.
|
||||
*/
|
||||
def withFinalStopMessage(isFinalStopMessage: Any ⇒ Boolean): BackoffOptions
|
||||
}
|
||||
|
||||
sealed trait BackoffOnFailureOptions extends BackoffOptions {
|
||||
// for backwards compability with 2.5.19
|
||||
@deprecated("This has no effect for backoff on failure", since = "2.5.20")
|
||||
def withFinalStopMessage(isFinalStopMessage: Any ⇒ Boolean): BackoffOptions
|
||||
}
|
||||
|
||||
private final case class BackoffOptionsImpl(
|
||||
backoffType: BackoffType = RestartImpliesFailure,
|
||||
childProps: Props,
|
||||
|
|
@ -591,7 +604,7 @@ private final case class BackoffOptionsImpl(
|
|||
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider),
|
||||
replyWhileStopped: Option[Any] = None,
|
||||
finalStopMessage: Option[Any ⇒ Boolean] = None
|
||||
) extends BackoffOptions {
|
||||
) extends BackoffOptions with BackoffOnStopOptions with BackoffOnFailureOptions {
|
||||
|
||||
val backoffReset = reset.getOrElse(AutoReset(minBackoff))
|
||||
|
||||
|
|
@ -616,7 +629,7 @@ private final case class BackoffOptionsImpl(
|
|||
backoffType match {
|
||||
//onFailure method in companion object
|
||||
case RestartImpliesFailure ⇒
|
||||
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage))
|
||||
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped))
|
||||
//onStop method in companion object
|
||||
case StopImpliesFailure ⇒
|
||||
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage))
|
||||
|
|
|
|||
|
|
@ -4,11 +4,11 @@
|
|||
|
||||
package akka.pattern
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
|
||||
import akka.actor.SupervisorStrategy.{ Directive, Escalate }
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
|
||||
import akka.util.JavaDurationConverters._
|
||||
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
|
|
@ -291,7 +291,9 @@ final class BackoffSupervisor(
|
|||
val reset: BackoffReset,
|
||||
randomFactor: Double,
|
||||
strategy: SupervisorStrategy,
|
||||
@deprecated("Should be internal", since = "2.5.19") // since removed from HandleBackoff the val can be removed
|
||||
val replyWhileStopped: Option[Any],
|
||||
@deprecated("Should be internal", since = "2.5.19") // since removed from HandleBackoff the val can be removed
|
||||
val finalStopMessage: Option[Any ⇒ Boolean])
|
||||
extends Actor with HandleBackoff
|
||||
with ActorLogging {
|
||||
|
|
@ -367,62 +369,9 @@ final class BackoffSupervisor(
|
|||
|
||||
}
|
||||
|
||||
def receive = onTerminated orElse handleBackoff
|
||||
}
|
||||
def receive: Receive = onTerminated orElse handleBackoff
|
||||
|
||||
private[akka] trait HandleBackoff { this: Actor ⇒
|
||||
def childProps: Props
|
||||
def childName: String
|
||||
def reset: BackoffReset
|
||||
def replyWhileStopped: Option[Any]
|
||||
def finalStopMessage: Option[Any ⇒ Boolean]
|
||||
|
||||
var child: Option[ActorRef] = None
|
||||
var restartCount = 0
|
||||
var finalStopMessageReceived = false
|
||||
|
||||
import BackoffSupervisor._
|
||||
import context.dispatcher
|
||||
|
||||
override def preStart(): Unit = startChild()
|
||||
|
||||
def startChild(): Unit = {
|
||||
if (child.isEmpty) {
|
||||
child = Some(context.watch(context.actorOf(childProps, childName)))
|
||||
}
|
||||
}
|
||||
|
||||
def handleBackoff: Receive = {
|
||||
case StartChild ⇒
|
||||
startChild()
|
||||
reset match {
|
||||
case AutoReset(resetBackoff) ⇒
|
||||
val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
|
||||
case Reset ⇒
|
||||
reset match {
|
||||
case ManualReset ⇒ restartCount = 0
|
||||
case msg ⇒ unhandled(msg)
|
||||
}
|
||||
|
||||
case ResetRestartCount(current) ⇒
|
||||
if (current == restartCount) {
|
||||
restartCount = 0
|
||||
}
|
||||
|
||||
case GetRestartCount ⇒
|
||||
sender() ! RestartCount(restartCount)
|
||||
|
||||
case GetCurrentChild ⇒
|
||||
sender() ! CurrentChild(child)
|
||||
|
||||
case msg if child.contains(sender()) ⇒
|
||||
// use the BackoffSupervisor as sender
|
||||
context.parent ! msg
|
||||
|
||||
case msg ⇒ child match {
|
||||
protected def handleMessageToChild(msg: Any): Unit = child match {
|
||||
case Some(c) ⇒
|
||||
c.forward(msg)
|
||||
if (!finalStopMessageReceived && finalStopMessage.isDefined) {
|
||||
|
|
@ -441,5 +390,4 @@ private[akka] trait HandleBackoff { this: Actor ⇒
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
64
akka-actor/src/main/scala/akka/pattern/HandleBackoff.scala
Normal file
64
akka-actor/src/main/scala/akka/pattern/HandleBackoff.scala
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.pattern
|
||||
|
||||
import akka.actor.{ Actor, ActorRef, Props }
|
||||
|
||||
private[akka] trait HandleBackoff {
|
||||
this: Actor ⇒
|
||||
def childProps: Props
|
||||
def childName: String
|
||||
def reset: BackoffReset
|
||||
protected def handleMessageToChild(m: Any): Unit
|
||||
|
||||
var child: Option[ActorRef] = None
|
||||
var restartCount = 0
|
||||
var finalStopMessageReceived = false
|
||||
|
||||
import BackoffSupervisor._
|
||||
import context.dispatcher
|
||||
|
||||
override def preStart(): Unit = startChild()
|
||||
|
||||
def startChild(): Unit = {
|
||||
if (child.isEmpty) {
|
||||
child = Some(context.watch(context.actorOf(childProps, childName)))
|
||||
}
|
||||
}
|
||||
|
||||
def handleBackoff: Receive = {
|
||||
case StartChild ⇒
|
||||
startChild()
|
||||
reset match {
|
||||
case AutoReset(resetBackoff) ⇒
|
||||
val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
|
||||
case Reset ⇒
|
||||
reset match {
|
||||
case ManualReset ⇒ restartCount = 0
|
||||
case msg ⇒ unhandled(msg)
|
||||
}
|
||||
|
||||
case ResetRestartCount(current) ⇒
|
||||
if (current == restartCount) {
|
||||
restartCount = 0
|
||||
}
|
||||
|
||||
case GetRestartCount ⇒
|
||||
sender() ! RestartCount(restartCount)
|
||||
|
||||
case GetCurrentChild ⇒
|
||||
sender() ! CurrentChild(child)
|
||||
|
||||
case msg if child.contains(sender()) ⇒
|
||||
// use the BackoffSupervisor as sender
|
||||
context.parent ! msg
|
||||
|
||||
case msg ⇒
|
||||
handleMessageToChild(msg)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue