Extend backoff supervision with custom message handling #29082 (#29083)

This commit is contained in:
Nicolas Vollmar 2020-05-28 13:15:46 +02:00 committed by GitHub
parent 2a536d7065
commit cfe4443d25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 162 additions and 75 deletions

View file

@ -31,7 +31,7 @@ class TestActor(probe: ActorRef) extends Actor {
probe ! "STARTED" probe ! "STARTED"
def receive = { def receive: Receive = {
case "DIE" => context.stop(self) case "DIE" => context.stop(self)
case "THROW" => throw new TestActor.NormalException case "THROW" => throw new TestActor.NormalException
case "THROW_STOPPING_EXCEPTION" => throw new TestActor.StoppingException case "THROW_STOPPING_EXCEPTION" => throw new TestActor.StoppingException
@ -46,9 +46,9 @@ object TestParentActor {
} }
class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor { class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor {
val supervisor = context.actorOf(supervisorProps) val supervisor: ActorRef = context.actorOf(supervisorProps)
def receive = { def receive: Receive = {
case other => probe.forward(other) case other => probe.forward(other)
} }
} }
@ -58,10 +58,10 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec("""
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
""") with WithLogCapturing with ImplicitSender { """) with WithLogCapturing with ImplicitSender {
@silent
def supervisorProps(probeRef: ActorRef) = { def supervisorProps(probeRef: ActorRef) = {
val options = Backoff val options = BackoffOpts
.onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0, maxNrOfRetries = -1) .onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0)
.withMaxNrOfRetries(-1)
.withSupervisorStrategy(OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 30 seconds) { .withSupervisorStrategy(OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 30 seconds) {
case _: TestActor.StoppingException => SupervisorStrategy.Stop case _: TestActor.StoppingException => SupervisorStrategy.Stop
}) })
@ -69,16 +69,16 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec("""
} }
trait Setup { trait Setup {
val probe = TestProbe() val probe: TestProbe = TestProbe()
val supervisor = system.actorOf(supervisorProps(probe.ref)) val supervisor: ActorRef = system.actorOf(supervisorProps(probe.ref))
probe.expectMsg("STARTED") probe.expectMsg("STARTED")
} }
trait Setup2 { trait Setup2 {
val probe = TestProbe() val probe: TestProbe = TestProbe()
val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref))) val parent: ActorRef = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref)))
probe.expectMsg("STARTED") probe.expectMsg("STARTED")
val child = probe.lastSender val child: ActorRef = probe.lastSender
} }
"BackoffOnRestartSupervisor" must { "BackoffOnRestartSupervisor" must {
@ -139,7 +139,7 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec("""
} }
class SlowlyFailingActor(latch: CountDownLatch) extends Actor { class SlowlyFailingActor(latch: CountDownLatch) extends Actor {
def receive = { def receive: Receive = {
case "THROW" => case "THROW" =>
sender ! "THROWN" sender ! "THROWN"
throw new NormalException throw new NormalException
@ -155,18 +155,12 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec("""
"accept commands while child is terminating" in { "accept commands while child is terminating" in {
val postStopLatch = new CountDownLatch(1) val postStopLatch = new CountDownLatch(1)
@silent @silent
val options = Backoff val options = BackoffOpts
.onFailure( .onFailure(Props(new SlowlyFailingActor(postStopLatch)), "someChildName", 1 nanos, 1 nanos, 0.0)
Props(new SlowlyFailingActor(postStopLatch)), .withMaxNrOfRetries(-1)
"someChildName",
1 nanos,
1 nanos,
0.0,
maxNrOfRetries = -1)
.withSupervisorStrategy(OneForOneStrategy(loggingEnabled = false) { .withSupervisorStrategy(OneForOneStrategy(loggingEnabled = false) {
case _: TestActor.StoppingException => SupervisorStrategy.Stop case _: TestActor.StoppingException => SupervisorStrategy.Stop
}) })
@silent
val supervisor = system.actorOf(BackoffSupervisor.props(options)) val supervisor = system.actorOf(BackoffSupervisor.props(options))
supervisor ! BackoffSupervisor.GetCurrentChild supervisor ! BackoffSupervisor.GetCurrentChild
@ -221,13 +215,12 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec("""
// withinTimeRange indicates the time range in which maxNrOfRetries will cause the child to // withinTimeRange indicates the time range in which maxNrOfRetries will cause the child to
// stop. IE: If we restart more than maxNrOfRetries in a time range longer than withinTimeRange // stop. IE: If we restart more than maxNrOfRetries in a time range longer than withinTimeRange
// that is acceptable. // that is acceptable.
@silent val options = BackoffOpts
val options = Backoff .onFailure(TestActor.props(probe.ref), "someChildName", 300.millis, 10.seconds, 0.0)
.onFailure(TestActor.props(probe.ref), "someChildName", 300.millis, 10.seconds, 0.0, maxNrOfRetries = -1) .withMaxNrOfRetries(-1)
.withSupervisorStrategy(OneForOneStrategy(withinTimeRange = 1 seconds, maxNrOfRetries = 3) { .withSupervisorStrategy(OneForOneStrategy(withinTimeRange = 1 seconds, maxNrOfRetries = 3) {
case _: TestActor.StoppingException => SupervisorStrategy.Stop case _: TestActor.StoppingException => SupervisorStrategy.Stop
}) })
@silent
val supervisor = system.actorOf(BackoffSupervisor.props(options)) val supervisor = system.actorOf(BackoffSupervisor.props(options))
probe.expectMsg("STARTED") probe.expectMsg("STARTED")
filterException[TestActor.TestException] { filterException[TestActor.TestException] {

View file

@ -7,7 +7,6 @@ package akka.pattern
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import com.github.ghik.silencer.silent
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.prop.TableDrivenPropertyChecks._
@ -24,7 +23,7 @@ object BackoffSupervisorSpec {
} }
class Child(probe: ActorRef) extends Actor { class Child(probe: ActorRef) extends Actor {
def receive = { def receive: Receive = {
case "boom" => throw new TestException case "boom" => throw new TestException
case msg => probe ! msg case msg => probe ! msg
} }
@ -36,7 +35,7 @@ object BackoffSupervisorSpec {
} }
class ManualChild(probe: ActorRef) extends Actor { class ManualChild(probe: ActorRef) extends Actor {
def receive = { def receive: Receive = {
case "boom" => throw new TestException case "boom" => throw new TestException
case msg => case msg =>
probe ! msg probe ! msg
@ -48,14 +47,13 @@ object BackoffSupervisorSpec {
class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually { class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually {
import BackoffSupervisorSpec._ import BackoffSupervisorSpec._
@silent("deprecated") def onStopOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1): BackoffOnStopOptions =
def onStopOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1) = BackoffOpts.onStop(props, "c1", 100.millis, 3.seconds, 0.2).withMaxNrOfRetries(maxNrOfRetries)
Backoff.onStop(props, "c1", 100.millis, 3.seconds, 0.2, maxNrOfRetries) def onFailureOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1): BackoffOnFailureOptions =
@silent("deprecated") BackoffOpts.onFailure(props, "c1", 100.millis, 3.seconds, 0.2).withMaxNrOfRetries(maxNrOfRetries)
def onFailureOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1) =
Backoff.onFailure(props, "c1", 100.millis, 3.seconds, 0.2, maxNrOfRetries) def create(options: BackoffOnStopOptions): ActorRef = system.actorOf(BackoffSupervisor.props(options))
@silent("deprecated") def create(options: BackoffOnFailureOptions): ActorRef = system.actorOf(BackoffSupervisor.props(options))
def create(options: BackoffOptions) = system.actorOf(BackoffSupervisor.props(options))
"BackoffSupervisor" must { "BackoffSupervisor" must {
"start child again when it stops when using `Backoff.onStop`" in { "start child again when it stops when using `Backoff.onStop`" in {
@ -179,10 +177,10 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
"reply to sender if replyWhileStopped is specified" in { "reply to sender if replyWhileStopped is specified" in {
filterException[TestException] { filterException[TestException] {
@silent("deprecated")
val supervisor = create( val supervisor = create(
Backoff BackoffOpts
.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2, maxNrOfRetries = -1) .onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2)
.withMaxNrOfRetries(-1)
.withReplyWhileStopped("child was stopped")) .withReplyWhileStopped("child was stopped"))
supervisor ! BackoffSupervisor.GetCurrentChild supervisor ! BackoffSupervisor.GetCurrentChild
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
@ -203,11 +201,43 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
} }
} }
"use provided actor while stopped and withHandlerWhileStopped is specified" in {
val handler = system.actorOf(Props(new Actor {
override def receive: Receive = {
case "still there?" =>
sender() ! "not here!"
}
}))
filterException[TestException] {
val supervisor = create(
BackoffOpts
.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2)
.withMaxNrOfRetries(-1)
.withHandlerWhileStopped(handler))
supervisor ! BackoffSupervisor.GetCurrentChild
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
watch(c1)
supervisor ! BackoffSupervisor.GetRestartCount
expectMsg(BackoffSupervisor.RestartCount(0))
c1 ! "boom"
expectTerminated(c1)
awaitAssert {
supervisor ! BackoffSupervisor.GetRestartCount
expectMsg(BackoffSupervisor.RestartCount(1))
}
supervisor ! "still there?"
expectMsg("not here!")
}
}
"not reply to sender if replyWhileStopped is NOT specified" in { "not reply to sender if replyWhileStopped is NOT specified" in {
filterException[TestException] { filterException[TestException] {
@silent("deprecated")
val supervisor = val supervisor =
create(Backoff.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2, maxNrOfRetries = -1)) create(
BackoffOpts.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2).withMaxNrOfRetries(-1))
supervisor ! BackoffSupervisor.GetCurrentChild supervisor ! BackoffSupervisor.GetCurrentChild
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
watch(c1) watch(c1)
@ -382,7 +412,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
c1 ! PoisonPill c1 ! PoisonPill
expectTerminated(c1) expectTerminated(c1)
// since actor stopped we can expect the two messages to end up in dead letters // since actor stopped we can expect the two messages to end up in dead letters
EventFilter.warning(pattern = ".*(ping|stop).*", occurrences = 2).intercept { EventFilter.warning(pattern = ".*(ping|stop).*", occurrences = 1).intercept {
supervisor ! "ping" supervisor ! "ping"
supervisorWatcher.expectNoMessage(20.millis) // supervisor must not terminate supervisorWatcher.expectNoMessage(20.millis) // supervisor must not terminate

View file

@ -0,0 +1,27 @@
# Internals changed
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.ExtendedBackoffOptions.withHandlerWhileStopped")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.<init>$default$8")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply$default$8")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.<init>$default$8")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply$default$8")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.<init>$default$8")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply$default$8")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnFailureOptionsImpl.replyWhileStopped")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.copy$default$8")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.<init>$default$8")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply$default$8")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnStopOptionsImpl.replyWhileStopped")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.copy$default$8")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.internal.BackoffOnRestartSupervisor.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.internal.BackoffOnStopSupervisor.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.pattern.BackoffOnFailureOptionsImpl.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.pattern.BackoffOnStopOptionsImpl.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.pattern.BackoffOnFailureOptionsImpl.unapply")

View file

@ -617,7 +617,7 @@ private final case class BackoffOptionsImpl(
backoffReset, backoffReset,
randomFactor, randomFactor,
supervisorStrategy, supervisorStrategy,
replyWhileStopped)) replyWhileStopped.map(msg => ReplyWith(msg)).getOrElse(ForwardDeathLetters)))
//onStop method in companion object //onStop method in companion object
case StopImpliesFailure => case StopImpliesFailure =>
Props( Props(
@ -629,7 +629,7 @@ private final case class BackoffOptionsImpl(
backoffReset, backoffReset,
randomFactor, randomFactor,
supervisorStrategy, supervisorStrategy,
replyWhileStopped, replyWhileStopped.map(msg => ReplyWith(msg)).getOrElse(ForwardDeathLetters),
finalStopMessage)) finalStopMessage))
} }
} }

View file

@ -5,9 +5,8 @@
package akka.pattern package akka.pattern
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
import akka.actor.{ ActorRef, OneForOneStrategy, Props, SupervisorStrategy }
import akka.actor.{ OneForOneStrategy, Props, SupervisorStrategy } import akka.annotation.{ DoNotInherit, InternalApi }
import akka.annotation.DoNotInherit
import akka.pattern.internal.{ BackoffOnRestartSupervisor, BackoffOnStopSupervisor } import akka.pattern.internal.{ BackoffOnRestartSupervisor, BackoffOnStopSupervisor }
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
@ -299,6 +298,15 @@ private[akka] sealed trait ExtendedBackoffOptions[T <: ExtendedBackoffOptions[T]
*/ */
def withReplyWhileStopped(replyWhileStopped: Any): T def withReplyWhileStopped(replyWhileStopped: Any): T
/**
* Returns a new BackoffOptions with a custom handler for messages that the supervisor receives while its child is stopped.
* By default, a message received while the child is stopped is forwarded to `deadLetters`.
* Essentially, this handler replaces `deadLetters` allowing to implement custom handling instead of a static reply.
*
* @param handler PartialFunction of the received message and sender
*/
def withHandlerWhileStopped(handler: ActorRef): T
/** /**
* Returns the props to create the back-off supervisor. * Returns the props to create the back-off supervisor.
*/ */
@ -334,7 +342,7 @@ private final case class BackoffOnStopOptionsImpl[T](
randomFactor: Double, randomFactor: Double,
reset: Option[BackoffReset] = None, reset: Option[BackoffReset] = None,
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider), supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider),
replyWhileStopped: Option[Any] = None, handlingWhileStopped: HandlingWhileStopped = ForwardDeathLetters,
finalStopMessage: Option[Any => Boolean] = None) finalStopMessage: Option[Any => Boolean] = None)
extends BackoffOnStopOptions { extends BackoffOnStopOptions {
@ -344,7 +352,9 @@ private final case class BackoffOnStopOptionsImpl[T](
def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff)))
def withManualReset = copy(reset = Some(ManualReset)) def withManualReset = copy(reset = Some(ManualReset))
def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy) def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy)
def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) def withReplyWhileStopped(replyWhileStopped: Any) = copy(handlingWhileStopped = ReplyWith(replyWhileStopped))
def withHandlerWhileStopped(handlerWhileStopped: ActorRef) =
copy(handlingWhileStopped = ForwardTo(handlerWhileStopped))
def withMaxNrOfRetries(maxNrOfRetries: Int) = def withMaxNrOfRetries(maxNrOfRetries: Int) =
copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries))
@ -374,7 +384,7 @@ private final case class BackoffOnStopOptionsImpl[T](
backoffReset, backoffReset,
randomFactor, randomFactor,
supervisorStrategy, supervisorStrategy,
replyWhileStopped, handlingWhileStopped,
finalStopMessage)) finalStopMessage))
} }
} }
@ -387,7 +397,7 @@ private final case class BackoffOnFailureOptionsImpl[T](
randomFactor: Double, randomFactor: Double,
reset: Option[BackoffReset] = None, reset: Option[BackoffReset] = None,
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider), supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider),
replyWhileStopped: Option[Any] = None) handlingWhileStopped: HandlingWhileStopped = ForwardDeathLetters)
extends BackoffOnFailureOptions { extends BackoffOnFailureOptions {
private val backoffReset = reset.getOrElse(AutoReset(minBackoff)) private val backoffReset = reset.getOrElse(AutoReset(minBackoff))
@ -396,7 +406,9 @@ private final case class BackoffOnFailureOptionsImpl[T](
def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff)))
def withManualReset = copy(reset = Some(ManualReset)) def withManualReset = copy(reset = Some(ManualReset))
def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy) def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy)
def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) def withReplyWhileStopped(replyWhileStopped: Any) = copy(handlingWhileStopped = ReplyWith(replyWhileStopped))
def withHandlerWhileStopped(handlerWhileStopped: ActorRef) =
copy(handlingWhileStopped = ForwardTo(handlerWhileStopped))
def withMaxNrOfRetries(maxNrOfRetries: Int) = def withMaxNrOfRetries(maxNrOfRetries: Int) =
copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries))
@ -419,10 +431,17 @@ private final case class BackoffOnFailureOptionsImpl[T](
backoffReset, backoffReset,
randomFactor, randomFactor,
supervisorStrategy, supervisorStrategy,
replyWhileStopped)) handlingWhileStopped))
} }
} }
@InternalApi
private[akka] sealed trait BackoffReset private[akka] sealed trait BackoffReset
private[akka] case object ManualReset extends BackoffReset private[akka] case object ManualReset extends BackoffReset
private[akka] final case class AutoReset(resetBackoff: FiniteDuration) extends BackoffReset private[akka] final case class AutoReset(resetBackoff: FiniteDuration) extends BackoffReset
@InternalApi
private[akka] sealed trait HandlingWhileStopped
private[akka] case object ForwardDeathLetters extends HandlingWhileStopped
private[akka] case class ForwardTo(handler: ActorRef) extends HandlingWhileStopped
private[akka] case class ReplyWith(msg: Any) extends HandlingWhileStopped

View file

@ -184,7 +184,7 @@ object BackoffSupervisor {
AutoReset(minBackoff), AutoReset(minBackoff),
randomFactor, randomFactor,
strategy, strategy,
None, ForwardDeathLetters,
None)) None))
} }
@ -341,7 +341,7 @@ final class BackoffSupervisor @deprecated("Use `BackoffSupervisor.props` method
reset, reset,
randomFactor, randomFactor,
strategy, strategy,
replyWhileStopped, replyWhileStopped.map(msg => ReplyWith(msg)).getOrElse(ForwardDeathLetters),
finalStopMessage) { finalStopMessage) {
// for binary compatibility with 2.5.18 // for binary compatibility with 2.5.18

View file

@ -4,12 +4,20 @@
package akka.pattern.internal package akka.pattern.internal
import scala.concurrent.duration._
import akka.actor.{ OneForOneStrategy, _ }
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
import akka.actor.{ OneForOneStrategy, _ }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } import akka.pattern.{
BackoffReset,
BackoffSupervisor,
ForwardDeathLetters,
ForwardTo,
HandleBackoff,
HandlingWhileStopped,
ReplyWith
}
import scala.concurrent.duration._
/** /**
* INTERNAL API * INTERNAL API
@ -26,7 +34,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }
val reset: BackoffReset, val reset: BackoffReset,
randomFactor: Double, randomFactor: Double,
strategy: OneForOneStrategy, strategy: OneForOneStrategy,
replyWhileStopped: Option[Any]) handlingWhileStopped: HandlingWhileStopped)
extends Actor extends Actor
with HandleBackoff with HandleBackoff
with ActorLogging { with ActorLogging {
@ -34,7 +42,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }
import BackoffSupervisor._ import BackoffSupervisor._
import context._ import context._
override val supervisorStrategy = override val supervisorStrategy: OneForOneStrategy =
OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) { OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) {
case ex => case ex =>
val defaultDirective: Directive = val defaultDirective: Directive =
@ -94,9 +102,10 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }
case Some(c) => case Some(c) =>
c.forward(msg) c.forward(msg)
case None => case None =>
replyWhileStopped match { handlingWhileStopped match {
case None => context.system.deadLetters.forward(msg) case ForwardDeathLetters => context.system.deadLetters.forward(msg)
case Some(r) => sender() ! r case ForwardTo(h) => h.forward(msg)
case ReplyWith(r) => sender() ! r
} }
} }
} }

View file

@ -4,12 +4,20 @@
package akka.pattern.internal package akka.pattern.internal
import scala.concurrent.duration.FiniteDuration
import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
import akka.actor.SupervisorStrategy.{ Directive, Escalate } import akka.actor.SupervisorStrategy.{ Directive, Escalate }
import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } import akka.pattern.{
BackoffReset,
BackoffSupervisor,
ForwardDeathLetters,
ForwardTo,
HandleBackoff,
HandlingWhileStopped,
ReplyWith
}
import scala.concurrent.duration.FiniteDuration
/** /**
* INTERNAL API * INTERNAL API
@ -26,7 +34,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }
val reset: BackoffReset, val reset: BackoffReset,
randomFactor: Double, randomFactor: Double,
strategy: SupervisorStrategy, strategy: SupervisorStrategy,
replyWhileStopped: Option[Any], handlingWhileStopped: HandlingWhileStopped,
finalStopMessage: Option[Any => Boolean]) finalStopMessage: Option[Any => Boolean])
extends Actor extends Actor
with HandleBackoff with HandleBackoff
@ -35,7 +43,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }
import BackoffSupervisor._ import BackoffSupervisor._
import context.dispatcher import context.dispatcher
override val supervisorStrategy = strategy match { override val supervisorStrategy: SupervisorStrategy = strategy match {
case oneForOne: OneForOneStrategy => case oneForOne: OneForOneStrategy =>
OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) { OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) {
case ex => case ex =>
@ -84,13 +92,14 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff }
case None => case None =>
} }
case None => case None =>
replyWhileStopped match {
case Some(r) => sender() ! r
case None => context.system.deadLetters.forward(msg)
}
finalStopMessage match { finalStopMessage match {
case Some(fsm) if fsm(msg) => context.stop(self) case Some(fsm) if fsm(msg) => context.stop(self)
case _ => case _ =>
handlingWhileStopped match {
case ForwardDeathLetters => context.system.deadLetters.forward(msg)
case ForwardTo(h) => h.forward(msg)
case ReplyWith(r) => sender() ! r
}
} }
} }
} }