#19246 Auto and manual reset and BackoffOptions.
Also moved `TransparantExponentialBackoffSupervisor` to `akka.pattern` (renamed to `BackoffOnRestartSupervisor`)
This commit is contained in:
parent
2bade93c31
commit
2404a9da01
12 changed files with 808 additions and 441 deletions
|
|
@ -0,0 +1,122 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestProbe
|
||||
import akka.testkit.filterException
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import scala.language.postfixOps
|
||||
|
||||
object TestActor {
|
||||
class TestException(msg: String) extends Exception(msg)
|
||||
class StoppingException extends TestException("stopping exception")
|
||||
class NormalException extends TestException("normal exception")
|
||||
def props(probe: ActorRef): Props = Props(new TestActor(probe))
|
||||
}
|
||||
|
||||
class TestActor(probe: ActorRef) extends Actor {
|
||||
import context.dispatcher
|
||||
|
||||
probe ! "STARTED"
|
||||
|
||||
def receive = {
|
||||
case "DIE" ⇒ context.stop(self)
|
||||
case "THROW" ⇒ throw new TestActor.NormalException
|
||||
case "THROW_STOPPING_EXCEPTION" ⇒ throw new TestActor.StoppingException
|
||||
case ("TO_PARENT", msg) ⇒ context.parent ! msg
|
||||
case other ⇒ probe ! other
|
||||
}
|
||||
}
|
||||
|
||||
object TestParentActor {
|
||||
def props(probe: ActorRef, supervisorProps: Props): Props =
|
||||
Props(new TestParentActor(probe, supervisorProps))
|
||||
}
|
||||
class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor {
|
||||
val supervisor = context.actorOf(supervisorProps)
|
||||
|
||||
def receive = {
|
||||
case other ⇒ probe.forward(other)
|
||||
}
|
||||
}
|
||||
|
||||
class BackoffOnRestartSupervisorSpec extends AkkaSpec {
|
||||
|
||||
def supervisorProps(probeRef: ActorRef) = {
|
||||
val options = Backoff.onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0)
|
||||
.withSupervisorStrategy(OneForOneStrategy() {
|
||||
case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop
|
||||
})
|
||||
BackoffSupervisor.props(options)
|
||||
}
|
||||
|
||||
trait Setup {
|
||||
val probe = TestProbe()
|
||||
val supervisor = system.actorOf(supervisorProps(probe.ref))
|
||||
probe.expectMsg("STARTED")
|
||||
}
|
||||
|
||||
trait Setup2 {
|
||||
val probe = TestProbe()
|
||||
val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref)))
|
||||
probe.expectMsg("STARTED")
|
||||
val child = probe.lastSender
|
||||
}
|
||||
|
||||
"BackoffOnRestartSupervisor" must {
|
||||
"terminate when child terminates" in new Setup {
|
||||
filterException[TestActor.TestException] {
|
||||
probe.watch(supervisor)
|
||||
supervisor ! "DIE"
|
||||
probe.expectTerminated(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
"restart the child with an exponential back off" in new Setup {
|
||||
filterException[TestActor.TestException] {
|
||||
// Exponential back off restart test
|
||||
probe.within(1.4 seconds, 2 seconds) {
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 0 ~ 200 millis
|
||||
probe.expectMsg(300 millis, "STARTED")
|
||||
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 1 ~ 400 millis
|
||||
probe.expectMsg(500 millis, "STARTED")
|
||||
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 2 ~ 800 millis
|
||||
probe.expectMsg(900 millis, "STARTED")
|
||||
}
|
||||
|
||||
// Verify that we only have one child at this point by selecting all the children
|
||||
// under the supervisor and broadcasting to them.
|
||||
// If there exists more than one child, we will get more than one reply.
|
||||
val supervisorChildSelection = system.actorSelection(supervisor.path / "*")
|
||||
supervisorChildSelection.tell("testmsg", probe.ref)
|
||||
probe.expectMsg("testmsg")
|
||||
probe.expectNoMsg
|
||||
}
|
||||
}
|
||||
|
||||
"stop on exceptions as dictated by the supervisor strategy" in new Setup {
|
||||
filterException[TestActor.TestException] {
|
||||
probe.watch(supervisor)
|
||||
// This should cause the supervisor to stop the child actor and then
|
||||
// subsequently stop itself.
|
||||
supervisor ! "THROW_STOPPING_EXCEPTION"
|
||||
probe.expectTerminated(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
"forward messages from the child to the parent of the supervisor" in new Setup2 {
|
||||
child ! (("TO_PARENT", "TEST_MESSAGE"))
|
||||
probe.expectMsg("TEST_MESSAGE")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -25,15 +25,32 @@ object BackoffSupervisorSpec {
|
|||
case msg ⇒ probe ! msg
|
||||
}
|
||||
}
|
||||
|
||||
object ManualChild {
|
||||
def props(probe: ActorRef): Props =
|
||||
Props(new ManualChild(probe))
|
||||
}
|
||||
|
||||
class ManualChild(probe: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case "boom" ⇒ throw new TestException
|
||||
case msg ⇒
|
||||
probe ! msg
|
||||
context.parent ! BackoffSupervisor.Reset
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender {
|
||||
import BackoffSupervisorSpec._
|
||||
|
||||
def onStopOptions(props: Props = Child.props(testActor)) = Backoff.onStop(props, "c1", 100.millis, 3.seconds, 0.2)
|
||||
def onFailureOptions(props: Props = Child.props(testActor)) = Backoff.onFailure(props, "c1", 100.millis, 3.seconds, 0.2)
|
||||
def create(options: BackoffOptions) = system.actorOf(BackoffSupervisor.props(options))
|
||||
|
||||
"BackoffSupervisor" must {
|
||||
"start child again when it stops" in {
|
||||
val supervisor = system.actorOf(
|
||||
BackoffSupervisor.props(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2))
|
||||
"start child again when it stops when using `Backoff.onStop`" in {
|
||||
val supervisor = create(onStopOptions())
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
|
|
@ -47,27 +64,111 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender {
|
|||
}
|
||||
|
||||
"forward messages to the child" in {
|
||||
val supervisor = system.actorOf(
|
||||
BackoffSupervisor.props(Child.props(testActor), "c2", 100.millis, 3.seconds, 0.2))
|
||||
supervisor ! "hello"
|
||||
expectMsg("hello")
|
||||
def assertForward(supervisor: ActorRef) = {
|
||||
supervisor ! "hello"
|
||||
expectMsg("hello")
|
||||
}
|
||||
assertForward(create(onStopOptions()))
|
||||
assertForward(create(onFailureOptions()))
|
||||
}
|
||||
|
||||
"support custom supervision decider" in {
|
||||
val supervisor = system.actorOf(
|
||||
BackoffSupervisor.propsWithSupervisorStrategy(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2,
|
||||
OneForOneStrategy() {
|
||||
case _: TestException ⇒ SupervisorStrategy.Stop
|
||||
}))
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
c1 ! "boom"
|
||||
expectTerminated(c1)
|
||||
awaitAssert {
|
||||
"support custom supervision strategy" in {
|
||||
def assertCustomStrategy(supervisor: ActorRef) = {
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
// new instance
|
||||
expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1)
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
c1 ! "boom"
|
||||
expectTerminated(c1)
|
||||
awaitAssert {
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
// new instance
|
||||
expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1)
|
||||
}
|
||||
}
|
||||
filterException[TestException] {
|
||||
val stoppingStrategy = OneForOneStrategy() {
|
||||
case _: TestException ⇒ SupervisorStrategy.Stop
|
||||
}
|
||||
val restartingStrategy = OneForOneStrategy() {
|
||||
case _: TestException ⇒ SupervisorStrategy.Restart
|
||||
}
|
||||
|
||||
assertCustomStrategy(
|
||||
create(onStopOptions()
|
||||
.withSupervisorStrategy(stoppingStrategy)))
|
||||
|
||||
assertCustomStrategy(
|
||||
create(onFailureOptions()
|
||||
.withSupervisorStrategy(restartingStrategy)))
|
||||
}
|
||||
}
|
||||
|
||||
"support default stopping strategy when using `Backoff.onStop`" in {
|
||||
filterException[TestException] {
|
||||
val supervisor = create(onStopOptions().withDefaultStoppingStrategy)
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
supervisor ! BackoffSupervisor.GetRestartCount
|
||||
expectMsg(BackoffSupervisor.RestartCount(0))
|
||||
|
||||
c1 ! "boom"
|
||||
expectTerminated(c1)
|
||||
awaitAssert {
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
// new instance
|
||||
expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1)
|
||||
}
|
||||
supervisor ! BackoffSupervisor.GetRestartCount
|
||||
expectMsg(BackoffSupervisor.RestartCount(1))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"support manual reset" in {
|
||||
filterException[TestException] {
|
||||
def assertManualReset(supervisor: ActorRef) = {
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
c1 ! "boom"
|
||||
expectTerminated(c1)
|
||||
supervisor ! BackoffSupervisor.GetRestartCount
|
||||
expectMsg(BackoffSupervisor.RestartCount(1))
|
||||
|
||||
awaitAssert {
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
// new instance
|
||||
expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1)
|
||||
}
|
||||
|
||||
supervisor ! "hello"
|
||||
expectMsg("hello")
|
||||
|
||||
// making sure the Reset is handled by supervisor
|
||||
supervisor ! "hello"
|
||||
expectMsg("hello")
|
||||
|
||||
supervisor ! BackoffSupervisor.GetRestartCount
|
||||
expectMsg(BackoffSupervisor.RestartCount(0))
|
||||
}
|
||||
|
||||
val stoppingStrategy = OneForOneStrategy() {
|
||||
case _: TestException ⇒ SupervisorStrategy.Stop
|
||||
}
|
||||
val restartingStrategy = OneForOneStrategy() {
|
||||
case _: TestException ⇒ SupervisorStrategy.Restart
|
||||
}
|
||||
|
||||
assertManualReset(
|
||||
create(onStopOptions(ManualChild.props(testActor))
|
||||
.withManualReset
|
||||
.withSupervisorStrategy(stoppingStrategy)))
|
||||
|
||||
assertManualReset(
|
||||
create(onFailureOptions(ManualChild.props(testActor))
|
||||
.withManualReset
|
||||
.withSupervisorStrategy(restartingStrategy)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.pattern
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy._
|
||||
|
||||
/**
|
||||
* Back-off supervisor that stops and starts a child actor when the child actor restarts.
|
||||
* This back-off supervisor is created by using ``akka.pattern.BackoffSupervisor.props``
|
||||
* with ``akka.pattern.Backoff.onFailure``.
|
||||
*/
|
||||
private class BackoffOnRestartSupervisor(
|
||||
val childProps: Props,
|
||||
val childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
val reset: BackoffReset,
|
||||
randomFactor: Double,
|
||||
strategy: OneForOneStrategy)
|
||||
extends Actor with HandleBackoff
|
||||
with ActorLogging {
|
||||
|
||||
import context._
|
||||
import BackoffSupervisor._
|
||||
override val supervisorStrategy = OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) {
|
||||
case ex ⇒
|
||||
val defaultDirective: Directive =
|
||||
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate)
|
||||
|
||||
strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective) match {
|
||||
|
||||
// Whatever the final Directive is, we will translate all Restarts
|
||||
// to our own Restarts, which involves stopping the child.
|
||||
// directive match {
|
||||
case Restart ⇒
|
||||
val childRef = sender()
|
||||
become({
|
||||
case Terminated(`childRef`) ⇒
|
||||
unbecome()
|
||||
child = None
|
||||
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
|
||||
context.system.scheduler.scheduleOnce(restartDelay, self, BackoffSupervisor.StartChild)
|
||||
restartCount += 1
|
||||
case _ ⇒ // ignore
|
||||
}, discardOld = false)
|
||||
Stop
|
||||
case other ⇒ other
|
||||
}
|
||||
}
|
||||
|
||||
def onTerminated: Receive = {
|
||||
case Terminated(child) ⇒
|
||||
log.debug(s"Terminating, because child [$child] terminated itself")
|
||||
stop(self)
|
||||
}
|
||||
|
||||
def receive = onTerminated orElse handleBackoff
|
||||
}
|
||||
228
akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala
Normal file
228
akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.pattern
|
||||
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import akka.actor.{ Props, OneForOneStrategy, SupervisorStrategy }
|
||||
import akka.actor.SupervisorStrategy.{ Decider, JDecider }
|
||||
|
||||
/**
|
||||
* Builds back-off options for creating a back-off supervisor.
|
||||
* You can pass `BackoffOptions` to `akka.pattern.BackoffSupervisor.props`.
|
||||
* An example of creating back-off options:
|
||||
* {{{
|
||||
* Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor)
|
||||
* .withManualReset
|
||||
* .withSupervisorStrategy(
|
||||
* OneforOneStrategy(){
|
||||
* case e: GivingUpException => Stop
|
||||
* case e: RetryableException => Restart
|
||||
* }
|
||||
* )
|
||||
*
|
||||
* }}}
|
||||
*/
|
||||
object Backoff {
|
||||
/**
|
||||
* Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure.
|
||||
*
|
||||
* This explicit supervisor behaves similarly to the normal implicit supervision where
|
||||
* if an actor throws an exception, the decider on the supervisor will decide when to
|
||||
* `Stop`, `Restart`, `Escalate`, `Resume` the child actor.
|
||||
*
|
||||
* When the `Restart` directive is specified, the supervisor will delay the restart
|
||||
* using an exponential back off strategy (bounded by minBackoff and maxBackoff).
|
||||
*
|
||||
* This supervisor is intended to be transparent to both the child actor and external actors.
|
||||
* Where external actors can send messages to the supervisor as if it was the child and the
|
||||
* messages will be forwarded. And when the child is `Terminated`, the supervisor is also
|
||||
* `Terminated`.
|
||||
* Transparent to the child means that the child does not have to be aware that it is being
|
||||
* supervised specifically by this actor. Just like it does
|
||||
* not need to know when it is being supervised by the usual implicit supervisors.
|
||||
* The only caveat is that the `ActorRef` of the child is not stable, so any user storing the
|
||||
* `sender()` `ActorRef` from the child response may eventually not be able to communicate with
|
||||
* the stored `ActorRef`. In general all messages to the child should be directed through this actor.
|
||||
*
|
||||
* An example of where this supervisor might be used is when you may have an actor that is
|
||||
* responsible for continuously polling on a server for some resource that sometimes may be down.
|
||||
* Instead of hammering the server continuously when the resource is unavailable, the actor will
|
||||
* be restarted with an exponentially increasing back off until the resource is available again.
|
||||
*
|
||||
* '''***
|
||||
* This supervisor should not be used with `Akka Persistence` child actors.
|
||||
* `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather
|
||||
* than throw an exception on a failure like normal actors.
|
||||
* [[#onStop]] should be used instead for cases where the child actor
|
||||
* terminates itself as a failure signal instead of the normal behavior of throwing an exception.
|
||||
* ***'''
|
||||
* You can define another
|
||||
* supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]].
|
||||
*
|
||||
* @param childProps the [[akka.actor.Props]] of the child actor that
|
||||
* will be started and supervised
|
||||
* @param childName name of the child actor
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
* @param maxBackoff the exponential back-off is capped to this duration
|
||||
* @param randomFactor after calculation of the exponential back-off an additional
|
||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||
* In order to skip this additional delay pass in `0`.
|
||||
*/
|
||||
def onFailure(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): BackoffOptions =
|
||||
BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor)
|
||||
|
||||
/**
|
||||
* Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure.
|
||||
*
|
||||
* This actor can be used to supervise a child actor and start it again
|
||||
* after a back-off duration if the child actor is stopped.
|
||||
*
|
||||
* This is useful in situations where the re-start of the child actor should be
|
||||
* delayed e.g. in order to give an external resource time to recover before the
|
||||
* child actor tries contacting it again (after being restarted).
|
||||
*
|
||||
* Specifically this pattern is useful for for persistent actors,
|
||||
* which are stopped in case of persistence failures.
|
||||
* Just restarting them immediately would probably fail again (since the data
|
||||
* store is probably unavailable). It is better to try again after a delay.
|
||||
*
|
||||
* It supports exponential back-off between the given `minBackoff` and
|
||||
* `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
|
||||
* `maxBackoff` 30 seconds the start attempts will be delayed with
|
||||
* 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset
|
||||
* if the actor is not terminated within the `minBackoff` duration.
|
||||
*
|
||||
* In addition to the calculated exponential back-off an additional
|
||||
* random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20%
|
||||
* delay. The reason for adding a random delay is to avoid that all failing
|
||||
* actors hit the backend resource at the same time.
|
||||
*
|
||||
* You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild`
|
||||
* message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]]
|
||||
* containing the `ActorRef` of the current child, if any.
|
||||
*
|
||||
* The `BackoffSupervisor`delegates all messages from the child to the parent of the
|
||||
* `BackoffSupervisor`, with the supervisor as sender.
|
||||
*
|
||||
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
|
||||
*
|
||||
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
|
||||
* if it wants to do an intentional stop.
|
||||
*
|
||||
* Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using
|
||||
* [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A
|
||||
* `Restart` will perform a normal immediate restart of the child. A `Stop` will
|
||||
* stop the child, but it will be started again after the back-off duration.
|
||||
*
|
||||
* @param childProps the [[akka.actor.Props]] of the child actor that
|
||||
* will be started and supervised
|
||||
* @param childName name of the child actor
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
* @param maxBackoff the exponential back-off is capped to this duration
|
||||
* @param randomFactor after calculation of the exponential back-off an additional
|
||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||
* In order to skip this additional delay pass in `0`.
|
||||
*/
|
||||
def onStop(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): BackoffOptions =
|
||||
BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures a back-off supervisor actor. Start with `Backoff.onStop` or `Backoff.onFailure`.
|
||||
* BackoffOptions is immutable, so be sure to chain methods like:
|
||||
* {{{
|
||||
* val options = Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor)
|
||||
* .withManualReset
|
||||
* context.actorOf(BackoffSupervisor.props(options), name)
|
||||
* }}}
|
||||
*/
|
||||
trait BackoffOptions {
|
||||
/**
|
||||
* Returns a new BackoffOptions with automatic back-off reset.
|
||||
* The back-off algorithm is reset if the child does not crash within the specified `resetBackoff`.
|
||||
* @param resetBackoff The back-off is reset if the child does not crash within this duration.
|
||||
*/
|
||||
def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns a new BackoffOptions with manual back-off reset. The back-off is only reset
|
||||
* if the child sends a `BackoffSupervisor.Reset` to its parent (the backoff-supervisor actor).
|
||||
*/
|
||||
def withManualReset: BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns a new BackoffOptions with the supervisorStrategy.
|
||||
* @param supervisorStrategy the supervisorStrategy that the back-off supervisor will use.
|
||||
* The default supervisor strategy is used as fallback if the specified supervisorStrategy (its decider)
|
||||
* does not explicitly handle an exception.
|
||||
*/
|
||||
def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns a new BackoffOptions with a default `SupervisorStrategy.stoppingStrategy`.
|
||||
* The default supervisor strategy is used as fallback for throwables not handled by `SupervisorStrategy.stoppingStrategy`.
|
||||
*/
|
||||
def withDefaultStoppingStrategy: BackoffOptions
|
||||
|
||||
/**
|
||||
* Returns the props to create the back-off supervisor.
|
||||
*/
|
||||
private[akka] def props: Props
|
||||
}
|
||||
|
||||
private final case class BackoffOptionsImpl(
|
||||
backoffType: BackoffType = RestartImpliesFailure,
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double,
|
||||
reset: Option[BackoffReset] = None,
|
||||
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider)) extends BackoffOptions {
|
||||
|
||||
val backoffReset = reset.getOrElse(AutoReset(minBackoff))
|
||||
|
||||
def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff)))
|
||||
def withManualReset = copy(reset = Some(ManualReset))
|
||||
def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy)
|
||||
def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider))
|
||||
|
||||
def props = {
|
||||
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
|
||||
require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff")
|
||||
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
|
||||
backoffReset match {
|
||||
case AutoReset(resetBackoff) ⇒
|
||||
require(minBackoff <= resetBackoff && resetBackoff <= maxBackoff)
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
|
||||
backoffType match {
|
||||
case RestartImpliesFailure ⇒
|
||||
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy))
|
||||
case StopImpliesFailure ⇒
|
||||
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sealed trait BackoffType
|
||||
private final case object StopImpliesFailure extends BackoffType
|
||||
private final case object RestartImpliesFailure extends BackoffType
|
||||
|
||||
private[akka] sealed trait BackoffReset
|
||||
private[akka] final case object ManualReset extends BackoffReset
|
||||
private[akka] final case class AutoReset(resetBackoff: FiniteDuration) extends BackoffReset
|
||||
|
|
@ -4,22 +4,24 @@
|
|||
package akka.pattern
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.duration.Duration
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.Optional
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.DeadLetterSuppression
|
||||
import akka.actor.Props
|
||||
import akka.actor.Terminated
|
||||
import java.util.Optional
|
||||
import scala.concurrent.duration.Duration
|
||||
import akka.actor.SupervisorStrategy.Decider
|
||||
import akka.actor.SupervisorStrategy.Directive
|
||||
import akka.actor.SupervisorStrategy.Escalate
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy
|
||||
|
||||
object BackoffSupervisor {
|
||||
|
||||
/**
|
||||
* Props for creating an [[BackoffSupervisor]] actor.
|
||||
* Props for creating a [[BackoffSupervisor]] actor.
|
||||
*
|
||||
* Exceptions in the child are handled with the default supervision strategy, i.e.
|
||||
* most exceptions will immediately restart the child. You can define another
|
||||
|
|
@ -45,7 +47,7 @@ object BackoffSupervisor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Props for creating an [[BackoffSupervisor]] actor with a custom
|
||||
* Props for creating a [[BackoffSupervisor]] actor with a custom
|
||||
* supervision strategy.
|
||||
*
|
||||
* Exceptions in the child are handled with the given `supervisionStrategy`. A
|
||||
|
|
@ -77,6 +79,12 @@ object BackoffSupervisor {
|
|||
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor, strategy))
|
||||
}
|
||||
|
||||
/**
|
||||
* Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]].
|
||||
* @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor.
|
||||
*/
|
||||
def props(options: BackoffOptions): Props = options.props
|
||||
|
||||
/**
|
||||
* Send this message to the [[BackoffSupervisor]] and it will reply with
|
||||
* [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any.
|
||||
|
|
@ -89,6 +97,10 @@ object BackoffSupervisor {
|
|||
*/
|
||||
def getCurrentChild = GetCurrentChild
|
||||
|
||||
/**
|
||||
* Send this message to the [[BackoffSupervisor]] and it will reply with
|
||||
* [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any.
|
||||
*/
|
||||
final case class CurrentChild(ref: Option[ActorRef]) {
|
||||
/**
|
||||
* Java API: The `ActorRef` of the current child, if any
|
||||
|
|
@ -96,8 +108,36 @@ object BackoffSupervisor {
|
|||
def getRef: Optional[ActorRef] = Optional.ofNullable(ref.orNull)
|
||||
}
|
||||
|
||||
private case object StartChild extends DeadLetterSuppression
|
||||
private case class ResetRestartCount(current: Int) extends DeadLetterSuppression
|
||||
/**
|
||||
* Send this message to the [[BackoffSupervisor]] and it will reset the back-off.
|
||||
* This should be used in conjunction with `withManualReset` in [[BackoffOptions]].
|
||||
*/
|
||||
final case object Reset
|
||||
|
||||
/**
|
||||
* Java API: Send this message to the [[BackoffSupervisor]] and it will reset the back-off.
|
||||
* This should be used in conjunction with `withManualReset` in [[BackoffOptions]].
|
||||
*/
|
||||
def reset = Reset
|
||||
|
||||
/**
|
||||
* Send this message to the [[BackoffSupervisor]] and it will reply with
|
||||
* [[BackoffSupervisor.RestartCount]] containing the current restart count.
|
||||
*/
|
||||
final case object GetRestartCount
|
||||
|
||||
/**
|
||||
* Java API: Send this message to the [[BackoffSupervisor]] and it will reply with
|
||||
* [[BackoffSupervisor.RestartCount]] containing the current restart count.
|
||||
*/
|
||||
def getRestartCount = GetRestartCount
|
||||
|
||||
final case class RestartCount(count: Int)
|
||||
|
||||
private[akka] final case object StartChild extends DeadLetterSuppression
|
||||
|
||||
// not final for binary compatibility with 2.4.1
|
||||
private[akka] case class ResetRestartCount(current: Int) extends DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -121,59 +161,45 @@ object BackoffSupervisor {
|
|||
}
|
||||
|
||||
/**
|
||||
* This actor can be used to supervise a child actor and start it again
|
||||
* after a back-off duration if the child actor is stopped.
|
||||
*
|
||||
* This is useful in situations where the re-start of the child actor should be
|
||||
* delayed e.g. in order to give an external resource time to recover before the
|
||||
* child actor tries contacting it again (after being restarted).
|
||||
*
|
||||
* Specifically this pattern is useful for for persistent actors,
|
||||
* which are stopped in case of persistence failures.
|
||||
* Just restarting them immediately would probably fail again (since the data
|
||||
* store is probably unavailable). It is better to try again after a delay.
|
||||
*
|
||||
* It supports exponential back-off between the given `minBackoff` and
|
||||
* `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
|
||||
* `maxBackoff` 30 seconds the start attempts will be delayed with
|
||||
* 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset
|
||||
* if the actor is not terminated within the `minBackoff` duration.
|
||||
*
|
||||
* In addition to the calculated exponential back-off an additional
|
||||
* random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20%
|
||||
* delay. The reason for adding a random delay is to avoid that all failing
|
||||
* actors hit the backend resource at the same time.
|
||||
*
|
||||
* You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild`
|
||||
* message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]]
|
||||
* containing the `ActorRef` of the current child, if any.
|
||||
*
|
||||
* The `BackoffSupervisor`delegates all messages from the child to the parent of the
|
||||
* `BackoffSupervisor`, with the supervisor as sender.
|
||||
*
|
||||
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
|
||||
*
|
||||
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
|
||||
* if it wants to do an intentional stop.
|
||||
*
|
||||
* Exceptions in the child are handled with the given `supervisionStrategy`. A
|
||||
* `Restart` will perform a normal immediate restart of the child. A `Stop` will
|
||||
* stop the child, but it will be started again after the back-off duration.
|
||||
* Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops.
|
||||
* This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props`
|
||||
* with `Backoff.onStop`.
|
||||
*/
|
||||
final class BackoffSupervisor(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
val childProps: Props,
|
||||
val childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
val reset: BackoffReset,
|
||||
randomFactor: Double,
|
||||
override val supervisorStrategy: SupervisorStrategy)
|
||||
extends Actor {
|
||||
strategy: SupervisorStrategy)
|
||||
extends Actor with HandleBackoff {
|
||||
|
||||
import BackoffSupervisor._
|
||||
import context.dispatcher
|
||||
|
||||
private var child: Option[ActorRef] = None
|
||||
private var restartCount = 0
|
||||
// to keep binary compatibility with 2.4.1
|
||||
override val supervisorStrategy = strategy match {
|
||||
case oneForOne: OneForOneStrategy ⇒
|
||||
OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) {
|
||||
case ex ⇒
|
||||
val defaultDirective: Directive =
|
||||
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate)
|
||||
|
||||
strategy.decider.applyOrElse(ex, (_: Any) ⇒ defaultDirective)
|
||||
}
|
||||
case s ⇒ s
|
||||
}
|
||||
|
||||
// for binary compatibility with 2.4.1
|
||||
def this(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double,
|
||||
supervisorStrategy: SupervisorStrategy) =
|
||||
this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy)
|
||||
|
||||
// for binary compatibility with 2.4.0
|
||||
def this(
|
||||
|
|
@ -184,29 +210,57 @@ final class BackoffSupervisor(
|
|||
randomFactor: Double) =
|
||||
this(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy)
|
||||
|
||||
override def preStart(): Unit =
|
||||
startChild()
|
||||
def onTerminated: Receive = {
|
||||
case Terminated(ref) if child.contains(ref) ⇒
|
||||
child = None
|
||||
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
|
||||
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
|
||||
restartCount += 1
|
||||
}
|
||||
|
||||
def receive = onTerminated orElse handleBackoff
|
||||
}
|
||||
|
||||
private[akka] trait HandleBackoff { this: Actor ⇒
|
||||
def childProps: Props
|
||||
def childName: String
|
||||
def reset: BackoffReset
|
||||
|
||||
var child: Option[ActorRef] = None
|
||||
var restartCount = 0
|
||||
|
||||
import BackoffSupervisor._
|
||||
import context.dispatcher
|
||||
|
||||
override def preStart(): Unit = startChild()
|
||||
|
||||
def startChild(): Unit =
|
||||
if (child.isEmpty) {
|
||||
child = Some(context.watch(context.actorOf(childProps, childName)))
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Terminated(ref) if child.contains(ref) ⇒
|
||||
child = None
|
||||
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
|
||||
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
|
||||
restartCount += 1
|
||||
|
||||
def handleBackoff: Receive = {
|
||||
case StartChild ⇒
|
||||
startChild()
|
||||
context.system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(restartCount))
|
||||
reset match {
|
||||
case AutoReset(resetBackoff) ⇒
|
||||
val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
|
||||
case Reset ⇒
|
||||
reset match {
|
||||
case ManualReset ⇒ restartCount = 0
|
||||
case msg ⇒ unhandled(msg)
|
||||
}
|
||||
|
||||
case ResetRestartCount(current) ⇒
|
||||
if (current == restartCount)
|
||||
restartCount = 0
|
||||
|
||||
case GetRestartCount ⇒
|
||||
sender() ! RestartCount(restartCount)
|
||||
|
||||
case GetCurrentChild ⇒
|
||||
sender() ! CurrentChild(child)
|
||||
|
||||
|
|
@ -219,5 +273,4 @@ final class BackoffSupervisor(
|
|||
case None ⇒ context.system.deadLetters.forward(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,203 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object TransparentExponentialBackoffSupervisor {
|
||||
private case class ScheduleRestart(childRef: ActorRef) extends DeadLetterSuppression
|
||||
private case object StartChild extends DeadLetterSuppression
|
||||
private case class ResetRestartCount(lastNumRestarts: Int) extends DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* Props for creating a [[TransparentExponentialBackoffSupervisor]] with a decider.
|
||||
*
|
||||
* @param childProps the [[akka.actor.Props]] of the child to be supervised.
|
||||
* @param childName the name of the child actor.
|
||||
* @param minBackoff the min time before the child is restarted.
|
||||
* @param maxBackoff the max time (upperbound) for a child restart.
|
||||
* @param randomFactor a random delay factor to add on top of the calculated exponential
|
||||
* back off.
|
||||
* The calculation is equivalent to:
|
||||
* {{{
|
||||
* final_delay = min(
|
||||
* maxBackoff,
|
||||
* (random_delay_factor * calculated_backoff) + calculated_backoff)
|
||||
* }}}
|
||||
* @param decider a `Decider` to specify how the supervisor
|
||||
* should behave for different exceptions. If no cases are matched, the default decider of
|
||||
* [[akka.actor.Actor]] is used. When the `Restart` directive
|
||||
* is returned by the decider, this supervisor will apply an exponential back off restart.
|
||||
*/
|
||||
def propsWithDecider(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double)(decider: Decider): Props = {
|
||||
Props(
|
||||
new TransparentExponentialBackoffSupervisor(
|
||||
childProps,
|
||||
childName,
|
||||
Some(decider),
|
||||
minBackoff,
|
||||
maxBackoff,
|
||||
randomFactor))
|
||||
}
|
||||
|
||||
/**
|
||||
* Props for creating a [[TransparentExponentialBackoffSupervisor]] using the
|
||||
* default [[akka.actor.Actor]] decider.
|
||||
*
|
||||
* @param childProps the [[akka.actor.Props]] of the child to be supervised.
|
||||
* @param childName the name of the child actor.
|
||||
* @param minBackoff the min time before the child is restarted.
|
||||
* @param maxBackoff the max time (upperbound) for a child restart.
|
||||
* @param randomFactor a random delay factor to add on top of the calculated exponential
|
||||
* back off.
|
||||
* The calculation is equivalent to:
|
||||
* {{{
|
||||
* final_delay = min(
|
||||
* maxBackoff,
|
||||
* (random_delay_factor * calculated_backoff) + calculated_backoff)
|
||||
* }}}
|
||||
*/
|
||||
def props(
|
||||
childProps: Props,
|
||||
childName: String,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double): Props = {
|
||||
Props(
|
||||
new TransparentExponentialBackoffSupervisor(
|
||||
childProps,
|
||||
childName,
|
||||
None,
|
||||
minBackoff,
|
||||
maxBackoff,
|
||||
randomFactor))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A supervising actor that restarts a child actor with an exponential back off.
|
||||
*
|
||||
* This explicit supervisor behaves similarly to the normal implicit supervision where
|
||||
* if an actor throws an exception, the decider on the supervisor will decide when to
|
||||
* `Stop`, `Restart`, `Escalate`, `Resume` the child actor.
|
||||
*
|
||||
* When the `Restart` directive is specified, the supervisor will delay the restart
|
||||
* using an exponential back off strategy (bounded by minBackoff and maxBackoff).
|
||||
*
|
||||
* This supervisor is intended to be transparent to both the child actor and external actors.
|
||||
* Where external actors can send messages to the supervisor as if it was the child and the
|
||||
* messages will be forwarded. And when the child is `Terminated`, the supervisor is also
|
||||
* `Terminated`.
|
||||
* Transparent to the child means that the child does not have to be aware that it is being
|
||||
* supervised specifically by the [[TransparentExponentialBackoffSupervisor]]. Just like it does
|
||||
* not need to know when it is being supervised by the usual implicit supervisors.
|
||||
* The only caveat is that the `ActorRef` of the child is not stable, so any user storing the
|
||||
* `sender()` `ActorRef` from the child response may eventually not be able to communicate with
|
||||
* the stored `ActorRef`. In general all messages to the child should be directed through the
|
||||
* [[TransparentExponentialBackoffSupervisor]].
|
||||
*
|
||||
* An example of where this supervisor might be used is when you may have an actor that is
|
||||
* responsible for continuously polling on a server for some resource that sometimes may be down.
|
||||
* Instead of hammering the server continuously when the resource is unavailable, the actor will
|
||||
* be restarted with an exponentially increasing back off until the resource is available again.
|
||||
*
|
||||
* '''***
|
||||
* This supervisor should not be used with `Akka Persistence` child actors.
|
||||
* `Akka Persistence` actors, currently, shutdown unconditionally on `persistFailure()`s rather
|
||||
* than throw an exception on a failure like normal actors.
|
||||
* [[akka.pattern.BackoffSupervisor]] should be used instead for cases where the child actor
|
||||
* terminates itself as a failure signal instead of the normal behavior of throwing an exception.
|
||||
* ***'''
|
||||
*/
|
||||
class TransparentExponentialBackoffSupervisor(
|
||||
props: Props,
|
||||
childName: String,
|
||||
decider: Option[Decider],
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double)
|
||||
extends Actor
|
||||
with Stash
|
||||
with ActorLogging {
|
||||
|
||||
import TransparentExponentialBackoffSupervisor._
|
||||
import context._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case ex ⇒
|
||||
val defaultDirective: Directive =
|
||||
super.supervisorStrategy.decider.applyOrElse(ex, (_: Any) ⇒ Escalate)
|
||||
val maybeDirective: Option[Directive] = decider
|
||||
.map(_.applyOrElse(ex, (_: Any) ⇒ defaultDirective))
|
||||
|
||||
// Get the directive from the specified decider or fallback to
|
||||
// the default decider.
|
||||
// Whatever the final Directive is, we will translate all Restarts
|
||||
// to our own Restarts, which involves stopping the child.
|
||||
maybeDirective.getOrElse(defaultDirective) match {
|
||||
case Restart ⇒
|
||||
val childRef = sender
|
||||
become({
|
||||
case Terminated(`childRef`) ⇒
|
||||
unbecome()
|
||||
self ! ScheduleRestart(childRef)
|
||||
case _ ⇒
|
||||
stash()
|
||||
}, discardOld = false)
|
||||
Stop
|
||||
case other ⇒ other
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize by starting up and watching the child
|
||||
self ! StartChild
|
||||
|
||||
def receive = waitingToStart(-1, false)
|
||||
|
||||
def waitingToStart(numRestarts: Int, scheduleCounterReset: Boolean): Receive = {
|
||||
case StartChild ⇒
|
||||
val childRef = actorOf(props, childName)
|
||||
watch(childRef)
|
||||
unstashAll()
|
||||
if (scheduleCounterReset) {
|
||||
system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(numRestarts + 1))
|
||||
}
|
||||
become(watching(childRef, numRestarts + 1))
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
|
||||
// Steady state
|
||||
def watching(childRef: ActorRef, numRestarts: Int): Receive = {
|
||||
case ScheduleRestart(`childRef`) ⇒
|
||||
val delay = akka.pattern.BackoffSupervisor.calculateDelay(
|
||||
numRestarts, minBackoff, maxBackoff, randomFactor)
|
||||
system.scheduler.scheduleOnce(delay, self, StartChild)
|
||||
become(waitingToStart(numRestarts, true))
|
||||
log.info(s"Restarting child in: $delay; numRestarts: $numRestarts")
|
||||
case ResetRestartCount(last) ⇒
|
||||
if (last == numRestarts) {
|
||||
log.debug(s"Last restart count [$last] matches current count; resetting")
|
||||
become(watching(childRef, 0))
|
||||
} else {
|
||||
log.debug(s"Last restart count [$last] does not match the current count [$numRestarts]")
|
||||
}
|
||||
case Terminated(`childRef`) ⇒
|
||||
log.debug(s"Terminating, because child [$childRef] terminated itself")
|
||||
stop(self)
|
||||
case msg if sender() == childRef ⇒
|
||||
// use the supervisor as sender
|
||||
context.parent ! msg
|
||||
case msg ⇒
|
||||
childRef.forward(msg)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,119 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestProbe
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import scala.language.postfixOps
|
||||
|
||||
object TestActor {
|
||||
class StoppingException extends Exception("stopping exception")
|
||||
def props(probe: ActorRef): Props = Props(new TestActor(probe))
|
||||
}
|
||||
|
||||
class TestActor(probe: ActorRef) extends Actor {
|
||||
import context.dispatcher
|
||||
|
||||
probe ! "STARTED"
|
||||
|
||||
def receive = {
|
||||
case "DIE" ⇒ context.stop(self)
|
||||
case "THROW" ⇒ throw new Exception("normal exception")
|
||||
case "THROW_STOPPING_EXCEPTION" ⇒ throw new TestActor.StoppingException
|
||||
case ("TO_PARENT", msg) ⇒ context.parent ! msg
|
||||
case other ⇒ probe ! other
|
||||
}
|
||||
}
|
||||
|
||||
object TestParentActor {
|
||||
def props(probe: ActorRef, supervisorProps: Props): Props =
|
||||
Props(new TestParentActor(probe, supervisorProps))
|
||||
}
|
||||
class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor {
|
||||
val supervisor = context.actorOf(supervisorProps)
|
||||
|
||||
def receive = {
|
||||
case other ⇒ probe.forward(other)
|
||||
}
|
||||
}
|
||||
|
||||
class TransparentExponentialBackoffSupervisorSpec extends AkkaSpec {
|
||||
|
||||
def supervisorProps(probeRef: ActorRef) = TransparentExponentialBackoffSupervisor.propsWithDecider(
|
||||
TestActor.props(probeRef),
|
||||
"someChildName",
|
||||
200 millis,
|
||||
10 seconds,
|
||||
0.0) {
|
||||
case _: TestActor.StoppingException ⇒ SupervisorStrategy.Stop
|
||||
}
|
||||
|
||||
trait Setup {
|
||||
val probe = TestProbe()
|
||||
val supervisor = system.actorOf(supervisorProps(probe.ref))
|
||||
probe.expectMsg("STARTED")
|
||||
}
|
||||
|
||||
trait Setup2 {
|
||||
val probe = TestProbe()
|
||||
val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref)))
|
||||
probe.expectMsg("STARTED")
|
||||
val child = probe.lastSender
|
||||
}
|
||||
|
||||
"TransparentExponentialBackoffSupervisor" must {
|
||||
"forward messages to child" in new Setup {
|
||||
supervisor ! "some message"
|
||||
probe.expectMsg("some message")
|
||||
}
|
||||
|
||||
"terminate when child terminates" in new Setup {
|
||||
probe.watch(supervisor)
|
||||
supervisor ! "DIE"
|
||||
probe.expectTerminated(supervisor)
|
||||
}
|
||||
|
||||
"restart the child with an exponential back off" in new Setup {
|
||||
// Exponential back off restart test
|
||||
probe.within(1.4 seconds, 2 seconds) {
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 0 ~ 200 millis
|
||||
probe.expectMsg(300 millis, "STARTED")
|
||||
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 1 ~ 400 millis
|
||||
probe.expectMsg(500 millis, "STARTED")
|
||||
|
||||
supervisor ! "THROW"
|
||||
// numRestart = 2 ~ 800 millis
|
||||
probe.expectMsg(900 millis, "STARTED")
|
||||
}
|
||||
|
||||
// Verify that we only have one child at this point by selecting all the children
|
||||
// under the supervisor and broadcasting to them.
|
||||
// If there exists more than one child, we will get more than one reply.
|
||||
val supervisorChildSelection = system.actorSelection(supervisor.path / "*")
|
||||
supervisorChildSelection.tell("testmsg", probe.ref)
|
||||
probe.expectMsg("testmsg")
|
||||
probe.expectNoMsg
|
||||
}
|
||||
|
||||
"stop on exceptions as dictated by the decider" in new Setup {
|
||||
probe.watch(supervisor)
|
||||
// This should cause the supervisor to stop the child actor and then
|
||||
// subsequently stop itself.
|
||||
supervisor ! "THROW_STOPPING_EXCEPTION"
|
||||
probe.expectTerminated(supervisor)
|
||||
}
|
||||
|
||||
"forward messages from the child to the parent of the supervisor" in new Setup2 {
|
||||
child ! (("TO_PARENT", "TEST_MESSAGE"))
|
||||
probe.expectMsg("TEST_MESSAGE")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -203,25 +203,26 @@ terminates a child by way of the ``system.stop(child)`` method or sending a
|
|||
|
||||
Delayed restarts with the BackoffSupervisor pattern
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Provided as a build-in pattern the ``akka.pattern.BackoffSupervisor`` actor implements the so-called
|
||||
*exponential backoff supervision strategy*, which can be used to death-watch an actor,
|
||||
and when it terminates try to start it again, each time with a growing time delay between those restarts.
|
||||
Provided as a built-in pattern the ``akka.pattern.BackoffSupervisor`` implements the so-called
|
||||
*exponential backoff supervision strategy*, starting a child actor again when it fails, each time with a growing time delay between restarts.
|
||||
|
||||
This pattern is useful when the started actor fails because some external resource is not available,
|
||||
This pattern is useful when the started actor fails [#]_ because some external resource is not available,
|
||||
and we need to give it some time to start-up again. One of the prime examples when this is useful is
|
||||
when a :ref:`PersistentActor <persistence-scala>` fails with an persistence failure - which indicates that
|
||||
when a :ref:`PersistentActor <persistence-scala>` fails (by stopping) with a persistence failure - which indicates that
|
||||
the database may be down or overloaded, in such situations it makes most sense to give it a little bit of time
|
||||
to recover before the peristent actor is restarted.
|
||||
to recover before the peristent actor is started.
|
||||
|
||||
The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor
|
||||
in increasing intervals of 3, 6, 12, 24 and finally 30 seconds:
|
||||
.. [#] A failure can be indicated in two different ways; by an actor stopping or crashing.
|
||||
|
||||
.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff
|
||||
The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor after it has stopped
|
||||
because of a failure, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds:
|
||||
|
||||
.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-stop
|
||||
|
||||
The above is equivalent to this Java code:
|
||||
|
||||
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-imports
|
||||
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff
|
||||
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-stop
|
||||
|
||||
Using a ``randomFactor`` to add a little bit of additional variance to the backoff intervals
|
||||
is highly recommended, in order to avoid multiple actors re-start at the exact same point in time,
|
||||
|
|
@ -230,6 +231,32 @@ and re-starting after the same configured interval. By adding additional randomn
|
|||
re-start intervals the actors will start in slightly different points in time, thus avoiding
|
||||
large spikes of traffic hitting the recovering shared database or other resource that they all need to contact.
|
||||
|
||||
The ``akka.pattern.BackoffSupervisor`` actor can also be configured to restart the actor after a delay when the actor
|
||||
crashes and the supervision strategy decides that it should restart.
|
||||
|
||||
The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor after it has crashed
|
||||
because of some exception, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds:
|
||||
|
||||
.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-fail
|
||||
|
||||
The above is equivalent to this Java code:
|
||||
|
||||
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-imports
|
||||
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-fail
|
||||
|
||||
The ``akka.pattern.BackoffOptions`` can be used to customize the behavior of the back-off supervisor actor, below are some examples:
|
||||
|
||||
.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-custom-stop
|
||||
|
||||
The above code sets up a back-off supervisor that requires the child actor to send a ``akka.pattern.BackoffSupervisor.Reset`` message
|
||||
to its parent when a message is successfully processed, resetting the back-off. It also uses a default stopping strategy, any exception
|
||||
will cause the child to stop.
|
||||
|
||||
.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff-custom-fail
|
||||
|
||||
The above code sets up a back-off supervisor that restarts the child after back-off if MyException is thrown, any other exception will be
|
||||
escalated. The back-off is automatically reset if the child does not throw any errors within 10 seconds.
|
||||
|
||||
One-For-One Strategy vs. All-For-One Strategy
|
||||
---------------------------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package docs.pattern;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.pattern.Backoff;
|
||||
import akka.pattern.BackoffSupervisor;
|
||||
import akka.testkit.TestActors.EchoActor;
|
||||
//#backoff-imports
|
||||
|
|
@ -14,19 +15,35 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public class BackoffSupervisorDocTest {
|
||||
|
||||
void example (ActorSystem system) {
|
||||
//#backoff
|
||||
void exampleStop (ActorSystem system) {
|
||||
//#backoff-stop
|
||||
final Props childProps = Props.create(EchoActor.class);
|
||||
|
||||
final Props supervisorProps = BackoffSupervisor.props(
|
||||
childProps,
|
||||
"myEcho",
|
||||
Duration.create(3, TimeUnit.SECONDS),
|
||||
Duration.create(30, TimeUnit.SECONDS),
|
||||
0.2); // adds 20% "noise" to vary the intervals slightly
|
||||
Backoff.onStop(
|
||||
childProps,
|
||||
"myEcho",
|
||||
Duration.create(3, TimeUnit.SECONDS),
|
||||
Duration.create(30, TimeUnit.SECONDS),
|
||||
0.2)); // adds 20% "noise" to vary the intervals slightly
|
||||
|
||||
system.actorOf(supervisorProps, "echoSupervisor");
|
||||
//#backoff
|
||||
//#backoff-stop
|
||||
}
|
||||
|
||||
void exampleFailure (ActorSystem system) {
|
||||
//#backoff-fail
|
||||
final Props childProps = Props.create(EchoActor.class);
|
||||
|
||||
final Props supervisorProps = BackoffSupervisor.props(
|
||||
Backoff.onFailure(
|
||||
childProps,
|
||||
"myEcho",
|
||||
Duration.create(3, TimeUnit.SECONDS),
|
||||
Duration.create(30, TimeUnit.SECONDS),
|
||||
0.2)); // adds 20% "noise" to vary the intervals slightly
|
||||
|
||||
system.actorOf(supervisorProps, "echoSupervisor");
|
||||
//#backoff-fail
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,28 +4,99 @@
|
|||
|
||||
package docs.pattern
|
||||
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
import akka.pattern.BackoffSupervisor
|
||||
import akka.actor.{ ActorSystem, Props, OneForOneStrategy, SupervisorStrategy }
|
||||
import akka.pattern.{ Backoff, BackoffSupervisor }
|
||||
import akka.testkit.TestActors.EchoActor
|
||||
|
||||
class BackoffSupervisorDocSpec {
|
||||
|
||||
class BackoffSupervisorDocSpecExample {
|
||||
class BackoffSupervisorDocSpecExampleStop {
|
||||
val system: ActorSystem = ???
|
||||
import scala.concurrent.duration._
|
||||
|
||||
//#backoff
|
||||
//#backoff-stop
|
||||
val childProps = Props(classOf[EchoActor])
|
||||
|
||||
val supervisor = BackoffSupervisor.props(
|
||||
childProps,
|
||||
childName = "myEcho",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2) // adds 20% "noise" to vary the intervals slightly
|
||||
Backoff.onStop(
|
||||
childProps,
|
||||
childName = "myEcho",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
|
||||
))
|
||||
|
||||
system.actorOf(supervisor, name = "echoSupervisor")
|
||||
//#backoff
|
||||
//#backoff-stop
|
||||
}
|
||||
|
||||
class BackoffSupervisorDocSpecExampleFail {
|
||||
val system: ActorSystem = ???
|
||||
import scala.concurrent.duration._
|
||||
|
||||
//#backoff-fail
|
||||
val childProps = Props(classOf[EchoActor])
|
||||
|
||||
val supervisor = BackoffSupervisor.props(
|
||||
Backoff.onFailure(
|
||||
childProps,
|
||||
childName = "myEcho",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
|
||||
))
|
||||
|
||||
system.actorOf(supervisor, name = "echoSupervisor")
|
||||
//#backoff-fail
|
||||
}
|
||||
|
||||
class BackoffSupervisorDocSpecExampleStopOptions {
|
||||
val system: ActorSystem = ???
|
||||
import scala.concurrent.duration._
|
||||
|
||||
val childProps = Props(classOf[EchoActor])
|
||||
|
||||
//#backoff-custom-stop
|
||||
val supervisor = BackoffSupervisor.props(
|
||||
Backoff.onStop(
|
||||
childProps,
|
||||
childName = "myEcho",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
|
||||
).withManualReset // the child must send BackoffSupervisor.Reset to its parent
|
||||
.withDefaultStoppingStrategy // Stop at any Exception thrown
|
||||
)
|
||||
//#backoff-custom-stop
|
||||
|
||||
system.actorOf(supervisor, name = "echoSupervisor")
|
||||
}
|
||||
|
||||
class BackoffSupervisorDocSpecExampleFailureOptions {
|
||||
val system: ActorSystem = ???
|
||||
import scala.concurrent.duration._
|
||||
|
||||
val childProps = Props(classOf[EchoActor])
|
||||
|
||||
//#backoff-custom-fail
|
||||
val supervisor = BackoffSupervisor.props(
|
||||
Backoff.onFailure(
|
||||
childProps,
|
||||
childName = "myEcho",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
|
||||
).withAutoReset(10.seconds) // the child must send BackoffSupervisor.Reset to its parent
|
||||
.withSupervisorStrategy(
|
||||
OneForOneStrategy() {
|
||||
case _: MyException ⇒ SupervisorStrategy.Restart
|
||||
case _ ⇒ SupervisorStrategy.Escalate
|
||||
}))
|
||||
//#backoff-custom-fail
|
||||
|
||||
system.actorOf(supervisor, name = "echoSupervisor")
|
||||
}
|
||||
|
||||
case class MyException(msg: String) extends Exception(msg)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package docs.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.BackoffSupervisor
|
||||
import akka.pattern.{ Backoff, BackoffSupervisor }
|
||||
import akka.persistence._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{ Source, Sink, Flow }
|
||||
|
|
@ -85,17 +85,18 @@ object PersistenceDocSpec {
|
|||
}
|
||||
}
|
||||
|
||||
object Backoff {
|
||||
object BackoffOnStop {
|
||||
abstract class MyActor extends Actor {
|
||||
import PersistAsync.MyPersistentActor
|
||||
//#backoff
|
||||
val childProps = Props[MyPersistentActor]
|
||||
val props = BackoffSupervisor.props(
|
||||
childProps,
|
||||
childName = "myActor",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2)
|
||||
Backoff.onStop(
|
||||
childProps,
|
||||
childName = "myActor",
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2))
|
||||
context.actorOf(props, name = "mySupervisor")
|
||||
//#backoff
|
||||
}
|
||||
|
|
|
|||
|
|
@ -597,7 +597,13 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[MissingTypesProblem]("akka.remote.InvalidAssociation$"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.apply"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.copy"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.this")
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.this"),
|
||||
|
||||
// #19281 BackoffSupervisor updates
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$child_="),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$restartCount"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$restartCount_="),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.BackoffSupervisor.akka$pattern$BackoffSupervisor$$child")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue