Merge pull request #17925 from ktoso/wip-move-BackoffSupervisor-ktoso
!per +act #17842 move BackoffSupervisor to akka.pattern
This commit is contained in:
commit
bbd5b2c739
13 changed files with 123 additions and 15 deletions
|
|
@ -1,12 +1,11 @@
|
||||||
/**
|
/**
|
||||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
*/
|
*/
|
||||||
package akka.persistence
|
package akka.pattern
|
||||||
|
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.actor.ActorLogging
|
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.actor.DeadLetterSuppression
|
import akka.actor.DeadLetterSuppression
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
|
|
@ -17,6 +16,8 @@ import scala.concurrent.duration.Duration
|
||||||
object BackoffSupervisor {
|
object BackoffSupervisor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Props for creating an [[BackoffSupervisor]] actor.
|
||||||
|
*
|
||||||
* @param childProps the [[akka.actor.Props]] of the child actor that
|
* @param childProps the [[akka.actor.Props]] of the child actor that
|
||||||
* will be started and supervised
|
* will be started and supervised
|
||||||
* @param childName name of the child actor
|
* @param childName name of the child actor
|
||||||
|
|
@ -24,8 +25,8 @@ object BackoffSupervisor {
|
||||||
* started again, if it is terminated
|
* started again, if it is terminated
|
||||||
* @param maxBackoff the exponential back-off is capped to this duration
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
* @param randomFactor after calculation of the exponential back-off an additional
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
* random delay based on this factor is added, e.g. 0.2 adds up to 20%
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
* delay
|
* In order to skip this additional delay pass in `0`.
|
||||||
*/
|
*/
|
||||||
def props(
|
def props(
|
||||||
childProps: Props,
|
childProps: Props,
|
||||||
|
|
@ -64,8 +65,14 @@ object BackoffSupervisor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This actor can be used to supervise a child actor and start it again
|
* This actor can be used to supervise a child actor and start it again
|
||||||
* after a back-off duration if the child actor is stopped. This is useful
|
* after a back-off duration if the child actor is stopped.
|
||||||
* for persistent actors, which are stopped in case of persistence failures.
|
*
|
||||||
|
* This is useful in situations where the re-start of the child actor should be
|
||||||
|
* delayed e.g. in order to give an external resource time to recover before the
|
||||||
|
* child actor tries contacting it again (after being restarted).
|
||||||
|
*
|
||||||
|
* Specifically this pattern is useful for for persistent actors,
|
||||||
|
* which are stopped in case of persistence failures.
|
||||||
* Just restarting them immediately would probably fail again (since the data
|
* Just restarting them immediately would probably fail again (since the data
|
||||||
* store is probably unavailable). It is better to try again after a delay.
|
* store is probably unavailable). It is better to try again after a delay.
|
||||||
*
|
*
|
||||||
|
|
@ -81,13 +88,13 @@ object BackoffSupervisor {
|
||||||
* actors hit the backend resource at the same time.
|
* actors hit the backend resource at the same time.
|
||||||
*
|
*
|
||||||
* You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild`
|
* You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild`
|
||||||
* message to this actor and it will reply with [[BackoffSupervisor.CurrentChild]] containing the
|
* message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]]
|
||||||
* `ActorRef` of the current child, if any.
|
* containing the `ActorRef` of the current child, if any.
|
||||||
*
|
*
|
||||||
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
|
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
|
||||||
*
|
*
|
||||||
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
|
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
|
||||||
* if it want to do an intentional stop.
|
* if it wants to do an intentional stop.
|
||||||
*/
|
*/
|
||||||
final class BackoffSupervisor(
|
final class BackoffSupervisor(
|
||||||
childProps: Props,
|
childProps: Props,
|
||||||
|
|
@ -119,7 +126,7 @@ final class BackoffSupervisor(
|
||||||
if (restartCount >= 30) // Duration overflow protection (> 100 years)
|
if (restartCount >= 30) // Duration overflow protection (> 100 years)
|
||||||
maxBackoff
|
maxBackoff
|
||||||
else
|
else
|
||||||
(maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd) match {
|
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
|
||||||
case f: FiniteDuration ⇒ f
|
case f: FiniteDuration ⇒ f
|
||||||
case _ ⇒ maxBackoff
|
case _ ⇒ maxBackoff
|
||||||
}
|
}
|
||||||
|
|
@ -20,7 +20,7 @@ import akka.actor.PoisonPill
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.singleton.ClusterSingletonManager
|
import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.persistence.BackoffSupervisor
|
import akka.pattern.BackoffSupervisor
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import akka.dispatch.Dispatchers
|
import akka.dispatch.Dispatchers
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import language.postfixOps
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
|
import akka.pattern.BackoffSupervisor
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.persistence.PersistentActor
|
import akka.persistence.PersistentActor
|
||||||
import akka.persistence.Persistence
|
import akka.persistence.Persistence
|
||||||
|
|
@ -26,7 +27,7 @@ import java.io.File
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import akka.cluster.singleton.ClusterSingletonManager
|
import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
||||||
import akka.persistence.BackoffSupervisor
|
import akka.pattern.BackoffSupervisor
|
||||||
|
|
||||||
object ClusterShardingSpec extends MultiNodeConfig {
|
object ClusterShardingSpec extends MultiNodeConfig {
|
||||||
val controller = role("controller")
|
val controller = role("controller")
|
||||||
|
|
|
||||||
|
|
@ -199,6 +199,37 @@ external resource, which may also be one of its own children. If a third party
|
||||||
terminates a child by way of the ``system.stop(child)`` method or sending a
|
terminates a child by way of the ``system.stop(child)`` method or sending a
|
||||||
:class:`PoisonPill`, the supervisor might well be affected.
|
:class:`PoisonPill`, the supervisor might well be affected.
|
||||||
|
|
||||||
|
.. _backoff-supervisor:
|
||||||
|
|
||||||
|
Delayed restarts with the BackoffSupervisor pattern
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
Provided as a build-in pattern the ``akka.pattern.BackoffSupervisor`` actor implements the so-called
|
||||||
|
*exponential backoff supervision strategy*, which can be used to death-watch an actor,
|
||||||
|
and when it terminates try to start it again, each time with a growing time delay between those restarts.
|
||||||
|
|
||||||
|
This pattern is useful when the started actor fails because some external resource is not available,
|
||||||
|
and we need to give it some time to start-up again. One of the prime examples when this is useful is
|
||||||
|
when a :ref:`PersistentActor <persistence-scala>` fails with an persistence failure - which indicates that
|
||||||
|
the database may be down or overloaded, in such situations it makes most sense to give it a little bit of time
|
||||||
|
to recover before the peristent actor is restarted.
|
||||||
|
|
||||||
|
The following Scala snippet shows how to create a backoff supervisor which will start the given echo actor
|
||||||
|
in increasing intervals of 3, 6, 12, 24 and finally 30 seconds:
|
||||||
|
|
||||||
|
.. includecode:: ../scala/code/docs/pattern/BackoffSupervisorDocSpec.scala#backoff
|
||||||
|
|
||||||
|
The above is equivalent to this Java code:
|
||||||
|
|
||||||
|
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff-imports
|
||||||
|
.. includecode:: ../java/code/docs/pattern/BackoffSupervisorDocTest.java#backoff
|
||||||
|
|
||||||
|
Using a ``randomFactor`` to add a little bit of additional variance to the backoff intervals
|
||||||
|
is highly recommended, in order to avoid multiple actors re-start at the exact same point in time,
|
||||||
|
for example because they were stopped due to a shared resource such as a database going down
|
||||||
|
and re-starting after the same configured interval. By adding additional randomness to the
|
||||||
|
re-start intervals the actors will start in slightly different points in time, thus avoiding
|
||||||
|
large spikes of traffic hitting the recovering shared database or other resource that they all need to contact.
|
||||||
|
|
||||||
One-For-One Strategy vs. All-For-One Strategy
|
One-For-One Strategy vs. All-For-One Strategy
|
||||||
---------------------------------------------
|
---------------------------------------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package docs.pattern;
|
||||||
|
|
||||||
|
import akka.actor.*;
|
||||||
|
import akka.pattern.BackoffSupervisor;
|
||||||
|
import akka.testkit.TestActors.EchoActor;
|
||||||
|
//#backoff-imports
|
||||||
|
import scala.concurrent.duration.Duration;
|
||||||
|
//#backoff-imports
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class BackoffSupervisorDocTest {
|
||||||
|
|
||||||
|
void example (ActorSystem system) {
|
||||||
|
//#backoff
|
||||||
|
final Props childProps = Props.create(EchoActor.class);
|
||||||
|
|
||||||
|
final Props supervisorProps = BackoffSupervisor.props(
|
||||||
|
childProps,
|
||||||
|
"myEcho",
|
||||||
|
Duration.create(3, TimeUnit.SECONDS),
|
||||||
|
Duration.create(30, TimeUnit.SECONDS),
|
||||||
|
0.2); // adds 20% "noise" to vary the intervals slightly
|
||||||
|
|
||||||
|
system.actorOf(supervisorProps, "echoSupervisor");
|
||||||
|
//#backoff
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -10,6 +10,7 @@ import akka.actor.ActorSystem;
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
import akka.japi.Procedure;
|
import akka.japi.Procedure;
|
||||||
import akka.japi.pf.ReceiveBuilder;
|
import akka.japi.pf.ReceiveBuilder;
|
||||||
|
import akka.pattern.BackoffSupervisor;
|
||||||
import akka.persistence.*;
|
import akka.persistence.*;
|
||||||
import scala.Option;
|
import scala.Option;
|
||||||
import scala.concurrent.duration.Duration;
|
import scala.concurrent.duration.Duration;
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@
|
||||||
package docs.persistence;
|
package docs.persistence;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import akka.pattern.BackoffSupervisor;
|
||||||
import scala.concurrent.duration.Duration;
|
import scala.concurrent.duration.Duration;
|
||||||
import akka.actor.ActorPath;
|
import akka.actor.ActorPath;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
|
|
|
||||||
|
|
@ -304,7 +304,7 @@ and the actor will unconditionally be stopped.
|
||||||
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
||||||
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
||||||
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
||||||
actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor
|
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
||||||
is provided to support such restarts.
|
is provided to support such restarts.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#backoff
|
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#backoff
|
||||||
|
|
|
||||||
|
|
@ -307,7 +307,7 @@ and the actor will unconditionally be stopped.
|
||||||
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
||||||
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
||||||
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
||||||
actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor
|
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
||||||
is provided to support such restarts.
|
is provided to support such restarts.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff
|
.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.pattern
|
||||||
|
|
||||||
|
import akka.actor.{ActorSystem, Props}
|
||||||
|
import akka.pattern.BackoffSupervisor
|
||||||
|
import akka.testkit.TestActors.EchoActor
|
||||||
|
|
||||||
|
class BackoffSupervisorDocSpec {
|
||||||
|
|
||||||
|
class BackoffSupervisorDocSpecExample {
|
||||||
|
val system: ActorSystem = ???
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
//#backoff
|
||||||
|
val childProps = Props(classOf[EchoActor])
|
||||||
|
|
||||||
|
val supervisor = BackoffSupervisor.props(
|
||||||
|
childProps,
|
||||||
|
childName = "myEcho",
|
||||||
|
minBackoff = 3.seconds,
|
||||||
|
maxBackoff = 30.seconds,
|
||||||
|
randomFactor = 0.2) // adds 20% "noise" to vary the intervals slightly
|
||||||
|
|
||||||
|
system.actorOf(supervisor, name = "echoSupervisor")
|
||||||
|
//#backoff
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
package docs.persistence
|
package docs.persistence
|
||||||
|
|
||||||
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
||||||
|
import akka.pattern.BackoffSupervisor
|
||||||
import akka.persistence._
|
import akka.persistence._
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
|
||||||
|
|
@ -293,7 +293,7 @@ and the actor will unconditionally be stopped.
|
||||||
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
||||||
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
||||||
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
||||||
actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor
|
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
||||||
is provided to support such restarts.
|
is provided to support such restarts.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff
|
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.persistence
|
package akka.persistence
|
||||||
|
|
||||||
|
import akka.pattern.BackoffSupervisor
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue