Merge pull request #18944 from akka/wip-18487-BackofSupervisor-decider-patriknw
+act #18487 Support custom supervision strategy in BackoffSupervisor
This commit is contained in:
commit
054e292563
2 changed files with 84 additions and 3 deletions
|
|
@ -7,8 +7,13 @@ package akka.pattern
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
import akka.actor.SupervisorStrategy
|
||||||
|
|
||||||
object BackoffSupervisorSpec {
|
object BackoffSupervisorSpec {
|
||||||
|
|
||||||
|
class TestException extends RuntimeException with NoStackTrace
|
||||||
|
|
||||||
object Child {
|
object Child {
|
||||||
def props(probe: ActorRef): Props =
|
def props(probe: ActorRef): Props =
|
||||||
Props(new Child(probe))
|
Props(new Child(probe))
|
||||||
|
|
@ -16,7 +21,8 @@ object BackoffSupervisorSpec {
|
||||||
|
|
||||||
class Child(probe: ActorRef) extends Actor {
|
class Child(probe: ActorRef) extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg ⇒ probe ! msg
|
case "boom" ⇒ throw new TestException
|
||||||
|
case msg ⇒ probe ! msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -46,5 +52,23 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender {
|
||||||
supervisor ! "hello"
|
supervisor ! "hello"
|
||||||
expectMsg("hello")
|
expectMsg("hello")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"support custom supervision decider" in {
|
||||||
|
val supervisor = system.actorOf(
|
||||||
|
BackoffSupervisor.propsWithSupervisorStrategy(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2,
|
||||||
|
OneForOneStrategy() {
|
||||||
|
case _: TestException ⇒ SupervisorStrategy.Stop
|
||||||
|
}))
|
||||||
|
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||||
|
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||||
|
watch(c1)
|
||||||
|
c1 ! "boom"
|
||||||
|
expectTerminated(c1)
|
||||||
|
awaitAssert {
|
||||||
|
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||||
|
// new instance
|
||||||
|
expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,12 +12,19 @@ import akka.actor.Props
|
||||||
import akka.actor.Terminated
|
import akka.actor.Terminated
|
||||||
import java.util.Optional
|
import java.util.Optional
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
|
import akka.actor.SupervisorStrategy.Decider
|
||||||
|
import akka.actor.OneForOneStrategy
|
||||||
|
import akka.actor.SupervisorStrategy
|
||||||
|
|
||||||
object BackoffSupervisor {
|
object BackoffSupervisor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Props for creating an [[BackoffSupervisor]] actor.
|
* Props for creating an [[BackoffSupervisor]] actor.
|
||||||
*
|
*
|
||||||
|
* Exceptions in the child are handled with the default supervision strategy, i.e.
|
||||||
|
* most exceptions will immediately restart the child. You can define another
|
||||||
|
* supervision strategy by using [[#propsWithSupervisorStrategy]].
|
||||||
|
*
|
||||||
* @param childProps the [[akka.actor.Props]] of the child actor that
|
* @param childProps the [[akka.actor.Props]] of the child actor that
|
||||||
* will be started and supervised
|
* will be started and supervised
|
||||||
* @param childName name of the child actor
|
* @param childName name of the child actor
|
||||||
|
|
@ -34,10 +41,40 @@ object BackoffSupervisor {
|
||||||
minBackoff: FiniteDuration,
|
minBackoff: FiniteDuration,
|
||||||
maxBackoff: FiniteDuration,
|
maxBackoff: FiniteDuration,
|
||||||
randomFactor: Double): Props = {
|
randomFactor: Double): Props = {
|
||||||
|
propsWithSupervisorStrategy(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Props for creating an [[BackoffSupervisor]] actor with a custom
|
||||||
|
* supervision strategy.
|
||||||
|
*
|
||||||
|
* Exceptions in the child are handled with the given `supervisionStrategy`. A
|
||||||
|
* `Restart` will perform a normal immediate restart of the child. A `Stop` will
|
||||||
|
* stop the child, but it will be started again after the back-off duration.
|
||||||
|
*
|
||||||
|
* @param childProps the [[akka.actor.Props]] of the child actor that
|
||||||
|
* will be started and supervised
|
||||||
|
* @param childName name of the child actor
|
||||||
|
* @param minBackoff minimum (initial) duration until the child actor will
|
||||||
|
* started again, if it is terminated
|
||||||
|
* @param maxBackoff the exponential back-off is capped to this duration
|
||||||
|
* @param randomFactor after calculation of the exponential back-off an additional
|
||||||
|
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||||
|
* In order to skip this additional delay pass in `0`.
|
||||||
|
* @param strategy the supervision strategy to use for handling exceptions
|
||||||
|
* in the child
|
||||||
|
*/
|
||||||
|
def propsWithSupervisorStrategy(
|
||||||
|
childProps: Props,
|
||||||
|
childName: String,
|
||||||
|
minBackoff: FiniteDuration,
|
||||||
|
maxBackoff: FiniteDuration,
|
||||||
|
randomFactor: Double,
|
||||||
|
strategy: SupervisorStrategy): Props = {
|
||||||
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
|
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
|
||||||
require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff")
|
require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff")
|
||||||
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
|
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
|
||||||
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor))
|
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor, strategy))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -111,17 +148,25 @@ object BackoffSupervisor {
|
||||||
* message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]]
|
* message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]]
|
||||||
* containing the `ActorRef` of the current child, if any.
|
* containing the `ActorRef` of the current child, if any.
|
||||||
*
|
*
|
||||||
|
* The `BackoffSupervisor` forwards all messages from the child to the parent of the
|
||||||
|
* `BackoffSupervisor`.
|
||||||
|
*
|
||||||
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
|
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
|
||||||
*
|
*
|
||||||
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
|
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
|
||||||
* if it wants to do an intentional stop.
|
* if it wants to do an intentional stop.
|
||||||
|
*
|
||||||
|
* Exceptions in the child are handled with the given `supervisionStrategy`. A
|
||||||
|
* `Restart` will perform a normal immediate restart of the child. A `Stop` will
|
||||||
|
* stop the child, but it will be started again after the back-off duration.
|
||||||
*/
|
*/
|
||||||
final class BackoffSupervisor(
|
final class BackoffSupervisor(
|
||||||
childProps: Props,
|
childProps: Props,
|
||||||
childName: String,
|
childName: String,
|
||||||
minBackoff: FiniteDuration,
|
minBackoff: FiniteDuration,
|
||||||
maxBackoff: FiniteDuration,
|
maxBackoff: FiniteDuration,
|
||||||
randomFactor: Double)
|
randomFactor: Double,
|
||||||
|
override val supervisorStrategy: SupervisorStrategy)
|
||||||
extends Actor {
|
extends Actor {
|
||||||
|
|
||||||
import BackoffSupervisor._
|
import BackoffSupervisor._
|
||||||
|
|
@ -130,6 +175,15 @@ final class BackoffSupervisor(
|
||||||
private var child: Option[ActorRef] = None
|
private var child: Option[ActorRef] = None
|
||||||
private var restartCount = 0
|
private var restartCount = 0
|
||||||
|
|
||||||
|
// for binary compatibility with 2.4.0
|
||||||
|
def this(
|
||||||
|
childProps: Props,
|
||||||
|
childName: String,
|
||||||
|
minBackoff: FiniteDuration,
|
||||||
|
maxBackoff: FiniteDuration,
|
||||||
|
randomFactor: Double) =
|
||||||
|
this(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy)
|
||||||
|
|
||||||
override def preStart(): Unit =
|
override def preStart(): Unit =
|
||||||
startChild()
|
startChild()
|
||||||
|
|
||||||
|
|
@ -156,6 +210,9 @@ final class BackoffSupervisor(
|
||||||
case GetCurrentChild ⇒
|
case GetCurrentChild ⇒
|
||||||
sender() ! CurrentChild(child)
|
sender() ! CurrentChild(child)
|
||||||
|
|
||||||
|
case msg if child.contains(sender()) ⇒
|
||||||
|
context.parent.forward(msg)
|
||||||
|
|
||||||
case msg ⇒ child match {
|
case msg ⇒ child match {
|
||||||
case Some(c) ⇒ c.forward(msg)
|
case Some(c) ⇒ c.forward(msg)
|
||||||
case None ⇒ context.system.deadLetters.forward(msg)
|
case None ⇒ context.system.deadLetters.forward(msg)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue