Merge pull request #18776 from henrymai/master
Add TransparentExponentialBackoffSupervisor
This commit is contained in:
commit
61c257b2fe
3 changed files with 342 additions and 9 deletions
|
|
@ -61,6 +61,26 @@ object BackoffSupervisor {
|
|||
|
||||
private case object StartChild extends DeadLetterSuppression
|
||||
private case class ResetRestartCount(current: Int) extends DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Calculates an exponential back off delay.
|
||||
*/
|
||||
private[akka] def calculateDelay(
|
||||
restartCount: Int,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): FiniteDuration = {
|
||||
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
|
||||
if (restartCount >= 30) // Duration overflow protection (> 100 years)
|
||||
maxBackoff
|
||||
else
|
||||
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
|
||||
case f: FiniteDuration ⇒ f
|
||||
case _ ⇒ maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -121,15 +141,7 @@ final class BackoffSupervisor(
|
|||
def receive = {
|
||||
case Terminated(ref) if child.contains(ref) ⇒
|
||||
child = None
|
||||
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
|
||||
val restartDelay =
|
||||
if (restartCount >= 30) // Duration overflow protection (> 100 years)
|
||||
maxBackoff
|
||||
else
|
||||
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
|
||||
case f: FiniteDuration ⇒ f
|
||||
case _ ⇒ maxBackoff
|
||||
}
|
||||
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
|
||||
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
|
||||
restartCount += 1
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object TransparentExponentialBackoffSupervisor {
|
||||
private case class ScheduleRestart(childRef: ActorRef) extends DeadLetterSuppression
|
||||
private case object StartChild extends DeadLetterSuppression
|
||||
private case class ResetRestartCount(lastNumRestarts: Int) extends DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* Props for creating a [[TransparentExponentialBackoffSupervisor]] with a decider.
|
||||
*
|
||||
* @param childProps the [[akka.actor.Props]] of the child to be supervised.
|
||||
* @param childName the name of the child actor.
|
||||
* @param minBackoff the min time before the child is restarted.
|
||||
* @param maxBackoff the max time (upperbound) for a child restart.
|
||||
* @param randomFactor a random delay factor to add on top of the calculated exponential
|
||||
* back off.
|
||||
* The calculation is equivalent to:
|
||||
* {{{
|
||||
* final_delay = min(
|
||||
* maxBackoff,
|
||||
* (random_delay_factor * calculated_backoff) + calculated_backoff)
|
||||
* }}}
|
||||
* @param decider an [[akka.actor.SupervisorStrategy.Decider]] to specify how the supervisor
|
||||
* should behave for different exceptions. If no cases are matched, the default decider of
|
||||
* [[akka.actor.Actor]] is used. When the [[akka.actor.SupervisorStrategy.Restart]] directive
|
||||
* is returned by the decider, this supervisor will apply an exponential back off restart.
|
||||
*/
|
||||
def propsWithDecider(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double)(decider: Decider): Props = {
|
||||
Props(
|
||||
new TransparentExponentialBackoffSupervisor(
|
||||
childProps,
|
||||
childName,
|
||||
Some(decider),
|
||||
minBackoff,
|
||||
maxBackoff,
|
||||
randomFactor))
|
||||
}
|
||||
|
||||
/**
|
||||
* Props for creating a [[TransparentExponentialBackoffSupervisor]] using the
|
||||
* default [[akka.actor.Actor]] decider.
|
||||
*
|
||||
* @param childProps the [[akka.actor.Props]] of the child to be supervised.
|
||||
* @param childName the name of the child actor.
|
||||
* @param minBackoff the min time before the child is restarted.
|
||||
* @param maxBackoff the max time (upperbound) for a child restart.
|
||||
* @param randomFactor a random delay factor to add on top of the calculated exponential
|
||||
* back off.
|
||||
* The calculation is equivalent to:
|
||||
* {{{
|
||||
* final_delay = min(
|
||||
* maxBackoff,
|
||||
* (random_delay_factor * calculated_backoff) + calculated_backoff)
|
||||
* }}}
|
||||
*/
|
||||
def props(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): Props = {
|
||||
Props(
|
||||
new TransparentExponentialBackoffSupervisor(
|
||||
childProps,
|
||||
childName,
|
||||
None,
|
||||
minBackoff,
|
||||
maxBackoff,
|
||||
randomFactor))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A supervising actor that restarts a child actor with an exponential back off.
|
||||
*
|
||||
* This explicit supervisor behaves similarly to the normal implicit supervision where
|
||||
* if an actor throws an exception, the decider on the supervisor will decide when to
|
||||
* `Stop`, `Restart`, `Escalate`, `Resume` the child actor.
|
||||
*
|
||||
* When the `Restart` directive is specified, the supervisor will delay the restart
|
||||
* using an exponential back off strategy (bounded by minBackoff and maxBackoff).
|
||||
*
|
||||
* This supervisor is intended to be transparent to both the child actor and external actors.
|
||||
* Where external actors can send messages to the supervisor as if it was the child and the
|
||||
* messages will be forwarded. And when the child is `Terminated`, the supervisor is also
|
||||
* `Terminated`.
|
||||
* Transparent to the child means that the child does not have to be aware that it is being
|
||||
* supervised specifically by the [[TransparentExponentialBackoffSupervisor]]. Just like it does
|
||||
* not need to know when it is being supervised by the usual implicit supervisors.
|
||||
* The only caveat is that the `ActorRef` of the child is not stable, so any user storing the
|
||||
* `sender()` `ActorRef` from the child response may eventually not be able to communicate with
|
||||
* the stored `ActorRef`. In general all messages to the child should be directed through the
|
||||
* [[TransparentExponentialBackoffSupervisor]].
|
||||
*
|
||||
* An example of where this supervisor might be used is when you may have an actor that is
|
||||
* responsible for continuously polling on a server for some resource that sometimes may be down.
|
||||
* Instead of hammering the server continuously when the resource is unavailable, the actor will
|
||||
* be restarted with an exponentially increasing back off until the resource is available again.
|
||||
*
|
||||
* '''***
|
||||
* This supervisor should not be used with `Akka Persistence` child actors.
|
||||
* `Akka Persistence` actors, currently, shutdown unconditionally on `persistFailure()`s rather
|
||||
* than throw an exception on a failure like normal actors.
|
||||
* [[akka.pattern.BackoffSupervisor]] should be used instead for cases where the child actor
|
||||
* terminates itself as a failure signal instead of the normal behavior of throwing an exception.
|
||||
* ***'''
|
||||
*/
|
||||
class TransparentExponentialBackoffSupervisor(
|
||||
props: Props,
|
||||
childName: String,
|
||||
decider: Option[Decider],
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double)
|
||||
extends Actor
|
||||
with Stash
|
||||
with ActorLogging {
|
||||
|
||||
import TransparentExponentialBackoffSupervisor._
|
||||
import context._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case ex ⇒
|
||||
val defaultDirective: Directive =
|
||||
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate)
|
||||
val maybeDirective: Option[Directive] = decider
|
||||
.map(_.applyOrElse(ex, (_: Any) ⇒ defaultDirective))
|
||||
|
||||
// Get the directive from the specified decider or fallback to
|
||||
// the default decider.
|
||||
// Whatever the final Directive is, we will translate all Restarts
|
||||
// to our own Restarts, which involves stopping the child.
|
||||
maybeDirective.getOrElse(defaultDirective) match {
|
||||
case Restart ⇒
|
||||
val childRef = sender
|
||||
become({
|
||||
case Terminated(`childRef`) ⇒
|
||||
unbecome()
|
||||
self ! ScheduleRestart(childRef)
|
||||
case _ ⇒
|
||||
stash()
|
||||
}, discardOld = false)
|
||||
Stop
|
||||
case other ⇒ other
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize by starting up and watching the child
|
||||
self ! StartChild
|
||||
|
||||
def receive = waitingToStart(-1, false)
|
||||
|
||||
def waitingToStart(numRestarts: Int, scheduleCounterReset: Boolean): Receive = {
|
||||
case StartChild ⇒
|
||||
val childRef = actorOf(props, childName)
|
||||
watch(childRef)
|
||||
unstashAll()
|
||||
if (scheduleCounterReset) {
|
||||
system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(numRestarts + 1))
|
||||
}
|
||||
become(watching(childRef, numRestarts + 1))
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
|
||||
// Steady state
|
||||
def watching(childRef: ActorRef, numRestarts: Int): Receive = {
|
||||
case ScheduleRestart(`childRef`) ⇒
|
||||
val delay = akka.pattern.BackoffSupervisor.calculateDelay(
|
||||
numRestarts, minBackoff, maxBackoff, randomFactor)
|
||||
system.scheduler.scheduleOnce(delay, self, StartChild)
|
||||
become(waitingToStart(numRestarts, true))
|
||||
log.info(s"Restarting child in: $delay; numRestarts: $numRestarts")
|
||||
case ResetRestartCount(last) ⇒
|
||||
if (last == numRestarts) {
|
||||
log.debug(s"Last restart count [$last] matches current count; resetting")
|
||||
become(watching(childRef, 0))
|
||||
} else {
|
||||
log.debug(s"Last restart count [$last] does not match the current count [$numRestarts]")
|
||||
}
|
||||
case Terminated(`childRef`) ⇒
|
||||
log.debug(s"Terminating, because child [$childRef] terminated itself")
|
||||
stop(self)
|
||||
case msg if sender() == childRef ⇒
|
||||
parent.forward(msg)
|
||||
case msg ⇒
|
||||
childRef.forward(msg)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,119 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestProbe
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import scala.language.postfixOps
|
||||
|
||||
object TestActor {
|
||||
class StoppingException extends Exception("stopping exception")
|
||||
def props(probe: ActorRef): Props = Props(new TestActor(probe))
|
||||
}
|
||||
|
||||
class TestActor(probe: ActorRef) extends Actor {
|
||||
import context.dispatcher
|
||||
|
||||
probe ! "STARTED"
|
||||
|
||||
def receive = {
|
||||
case "DIE" ⇒ context.stop(self)
|
||||
case "THROW" ⇒ throw new Exception("normal exception")
|
||||
case "THROW_STOPPING_EXCEPTION" ⇒ throw new TestActor.StoppingException
|
||||
case ("TO_PARENT", msg) ⇒ context.parent ! msg
|
||||
case other ⇒ probe ! other
|
||||
}
|
||||
}
|
||||
|
||||
object TestParentActor {
|
||||
def props(probe: ActorRef, supervisorProps: Props): Props =
|
||||
Props(new TestParentActor(probe, supervisorProps))
|
||||
}
|
||||
class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor {
|
||||
val supervisor = context.actorOf(supervisorProps)
|
||||
|
||||
def receive = {
|
||||
case other ⇒ probe.forward(other)
|
||||
}
|
||||
}
|
||||
|
||||
class TransparentExponentialBackoffSupervisorSpec extends AkkaSpec {
|
||||
|
||||
def supervisorProps(probeRef: ActorRef) = TransparentExponentialBackoffSupervisor.propsWithDecider(
|
||||
TestActor.props(probeRef),
|
||||
"someChildName",
|
||||
200 millis,
|
||||
10 seconds,
|
||||
0.0) {
|
||||
case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop
|
||||
}
|
||||
|
||||
trait Setup {
|
||||
val probe = TestProbe()
|
||||
val supervisor = system.actorOf(supervisorProps(probe.ref))
|
||||
probe.expectMsg("STARTED")
|
||||
}
|
||||
|
||||
trait Setup2 {
|
||||
val probe = TestProbe()
|
||||
val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref)))
|
||||
probe.expectMsg("STARTED")
|
||||
val child = probe.lastSender
|
||||
}
|
||||
|
||||
"TransparentExponentialBackoffSupervisor" must {
|
||||
"forward messages to child" in new Setup {
|
||||
supervisor ! "some message"
|
||||
probe.expectMsg("some message")
|
||||
}
|
||||
|
||||
"terminate when child terminates" in new Setup {
|
||||
probe.watch(supervisor)
|
||||
supervisor ! "DIE"
|
||||
probe.expectTerminated(supervisor)
|
||||
}
|
||||
|
||||
"restart the child with an exponential back off" in new Setup {
|
||||
// Exponential back off restart test
|
||||
probe.within(1.4 seconds, 2 seconds) {
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 0 ~ 200 millis
|
||||
probe.expectMsg(300 millis, "STARTED")
|
||||
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 1 ~ 400 millis
|
||||
probe.expectMsg(500 millis, "STARTED")
|
||||
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 2 ~ 800 millis
|
||||
probe.expectMsg(900 millis, "STARTED")
|
||||
}
|
||||
|
||||
// Verify that we only have one child at this point by selecting all the children
|
||||
// under the supervisor and broadcasting to them.
|
||||
// If there exists more than one child, we will get more than one reply.
|
||||
val supervisorChildSelection = system.actorSelection(supervisor.path / "*")
|
||||
supervisorChildSelection.tell("testmsg", probe.ref)
|
||||
probe.expectMsg("testmsg")
|
||||
probe.expectNoMsg
|
||||
}
|
||||
|
||||
"stop on exceptions as dictated by the decider" in new Setup {
|
||||
probe.watch(supervisor)
|
||||
// This should cause the supervisor to stop the child actor and then
|
||||
// subsequently stop itself.
|
||||
supervisor ! "THROW_STOPPING_EXCEPTION"
|
||||
probe.expectTerminated(supervisor)
|
||||
}
|
||||
|
||||
"forward messages from the child to the parent of the supervisor" in new Setup2 {
|
||||
child ! (("TO_PARENT", "TEST_MESSAGE"))
|
||||
probe.expectMsg("TEST_MESSAGE")
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue