diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala index 4be5f55f34..e6fa3b5a78 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffOnRestartSupervisorSpec.scala @@ -31,7 +31,7 @@ class TestActor(probe: ActorRef) extends Actor { probe ! "STARTED" - def receive = { + def receive: Receive = { case "DIE" => context.stop(self) case "THROW" => throw new TestActor.NormalException case "THROW_STOPPING_EXCEPTION" => throw new TestActor.StoppingException @@ -46,9 +46,9 @@ object TestParentActor { } class TestParentActor(probe: ActorRef, supervisorProps: Props) extends Actor { - val supervisor = context.actorOf(supervisorProps) + val supervisor: ActorRef = context.actorOf(supervisorProps) - def receive = { + def receive: Receive = { case other => probe.forward(other) } } @@ -58,10 +58,10 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec(""" akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] """) with WithLogCapturing with ImplicitSender { - @silent def supervisorProps(probeRef: ActorRef) = { - val options = Backoff - .onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0, maxNrOfRetries = -1) + val options = BackoffOpts + .onFailure(TestActor.props(probeRef), "someChildName", 200 millis, 10 seconds, 0.0) + .withMaxNrOfRetries(-1) .withSupervisorStrategy(OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 30 seconds) { case _: TestActor.StoppingException => SupervisorStrategy.Stop }) @@ -69,16 +69,16 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec(""" } trait Setup { - val probe = TestProbe() - val supervisor = system.actorOf(supervisorProps(probe.ref)) + val probe: TestProbe = TestProbe() + val supervisor: ActorRef = system.actorOf(supervisorProps(probe.ref)) probe.expectMsg("STARTED") } trait Setup2 { - val probe = TestProbe() - val parent = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref))) + val probe: TestProbe = TestProbe() + val parent: ActorRef = system.actorOf(TestParentActor.props(probe.ref, supervisorProps(probe.ref))) probe.expectMsg("STARTED") - val child = probe.lastSender + val child: ActorRef = probe.lastSender } "BackoffOnRestartSupervisor" must { @@ -139,7 +139,7 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec(""" } class SlowlyFailingActor(latch: CountDownLatch) extends Actor { - def receive = { + def receive: Receive = { case "THROW" => sender ! "THROWN" throw new NormalException @@ -155,18 +155,12 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec(""" "accept commands while child is terminating" in { val postStopLatch = new CountDownLatch(1) @silent - val options = Backoff - .onFailure( - Props(new SlowlyFailingActor(postStopLatch)), - "someChildName", - 1 nanos, - 1 nanos, - 0.0, - maxNrOfRetries = -1) + val options = BackoffOpts + .onFailure(Props(new SlowlyFailingActor(postStopLatch)), "someChildName", 1 nanos, 1 nanos, 0.0) + .withMaxNrOfRetries(-1) .withSupervisorStrategy(OneForOneStrategy(loggingEnabled = false) { case _: TestActor.StoppingException => SupervisorStrategy.Stop }) - @silent val supervisor = system.actorOf(BackoffSupervisor.props(options)) supervisor ! BackoffSupervisor.GetCurrentChild @@ -221,13 +215,12 @@ class BackoffOnRestartSupervisorSpec extends AkkaSpec(""" // withinTimeRange indicates the time range in which maxNrOfRetries will cause the child to // stop. IE: If we restart more than maxNrOfRetries in a time range longer than withinTimeRange // that is acceptable. - @silent - val options = Backoff - .onFailure(TestActor.props(probe.ref), "someChildName", 300.millis, 10.seconds, 0.0, maxNrOfRetries = -1) + val options = BackoffOpts + .onFailure(TestActor.props(probe.ref), "someChildName", 300.millis, 10.seconds, 0.0) + .withMaxNrOfRetries(-1) .withSupervisorStrategy(OneForOneStrategy(withinTimeRange = 1 seconds, maxNrOfRetries = 3) { case _: TestActor.StoppingException => SupervisorStrategy.Stop }) - @silent val supervisor = system.actorOf(BackoffSupervisor.props(options)) probe.expectMsg("STARTED") filterException[TestActor.TestException] { diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala index 48039c90fe..dcccd99b1b 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala @@ -7,7 +7,6 @@ package akka.pattern import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import com.github.ghik.silencer.silent import org.scalatest.concurrent.Eventually import org.scalatest.prop.TableDrivenPropertyChecks._ @@ -24,7 +23,7 @@ object BackoffSupervisorSpec { } class Child(probe: ActorRef) extends Actor { - def receive = { + def receive: Receive = { case "boom" => throw new TestException case msg => probe ! msg } @@ -36,7 +35,7 @@ object BackoffSupervisorSpec { } class ManualChild(probe: ActorRef) extends Actor { - def receive = { + def receive: Receive = { case "boom" => throw new TestException case msg => probe ! msg @@ -48,14 +47,13 @@ object BackoffSupervisorSpec { class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually { import BackoffSupervisorSpec._ - @silent("deprecated") - def onStopOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1) = - Backoff.onStop(props, "c1", 100.millis, 3.seconds, 0.2, maxNrOfRetries) - @silent("deprecated") - def onFailureOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1) = - Backoff.onFailure(props, "c1", 100.millis, 3.seconds, 0.2, maxNrOfRetries) - @silent("deprecated") - def create(options: BackoffOptions) = system.actorOf(BackoffSupervisor.props(options)) + def onStopOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1): BackoffOnStopOptions = + BackoffOpts.onStop(props, "c1", 100.millis, 3.seconds, 0.2).withMaxNrOfRetries(maxNrOfRetries) + def onFailureOptions(props: Props = Child.props(testActor), maxNrOfRetries: Int = -1): BackoffOnFailureOptions = + BackoffOpts.onFailure(props, "c1", 100.millis, 3.seconds, 0.2).withMaxNrOfRetries(maxNrOfRetries) + + def create(options: BackoffOnStopOptions): ActorRef = system.actorOf(BackoffSupervisor.props(options)) + def create(options: BackoffOnFailureOptions): ActorRef = system.actorOf(BackoffSupervisor.props(options)) "BackoffSupervisor" must { "start child again when it stops when using `Backoff.onStop`" in { @@ -179,10 +177,10 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually "reply to sender if replyWhileStopped is specified" in { filterException[TestException] { - @silent("deprecated") val supervisor = create( - Backoff - .onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2, maxNrOfRetries = -1) + BackoffOpts + .onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2) + .withMaxNrOfRetries(-1) .withReplyWhileStopped("child was stopped")) supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get @@ -203,11 +201,43 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually } } + "use provided actor while stopped and withHandlerWhileStopped is specified" in { + val handler = system.actorOf(Props(new Actor { + override def receive: Receive = { + case "still there?" => + sender() ! "not here!" + } + })) + filterException[TestException] { + val supervisor = create( + BackoffOpts + .onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2) + .withMaxNrOfRetries(-1) + .withHandlerWhileStopped(handler)) + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(0)) + + c1 ! "boom" + expectTerminated(c1) + + awaitAssert { + supervisor ! BackoffSupervisor.GetRestartCount + expectMsg(BackoffSupervisor.RestartCount(1)) + } + + supervisor ! "still there?" + expectMsg("not here!") + } + } + "not reply to sender if replyWhileStopped is NOT specified" in { filterException[TestException] { - @silent("deprecated") val supervisor = - create(Backoff.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2, maxNrOfRetries = -1)) + create( + BackoffOpts.onFailure(Child.props(testActor), "c1", 100.seconds, 300.seconds, 0.2).withMaxNrOfRetries(-1)) supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get watch(c1) @@ -382,7 +412,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually c1 ! PoisonPill expectTerminated(c1) // since actor stopped we can expect the two messages to end up in dead letters - EventFilter.warning(pattern = ".*(ping|stop).*", occurrences = 2).intercept { + EventFilter.warning(pattern = ".*(ping|stop).*", occurrences = 1).intercept { supervisor ! "ping" supervisorWatcher.expectNoMessage(20.millis) // supervisor must not terminate diff --git a/akka-actor/src/main/mima-filters/2.6.5.backwards.excludes/29082-backoff-reply.excludes b/akka-actor/src/main/mima-filters/2.6.5.backwards.excludes/29082-backoff-reply.excludes new file mode 100644 index 0000000000..a653bde9fb --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.6.5.backwards.excludes/29082-backoff-reply.excludes @@ -0,0 +1,27 @@ +# Internals changed +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.ExtendedBackoffOptions.withHandlerWhileStopped") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.$default$8") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply$default$8") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.$default$8") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply$default$8") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.$default$8") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply$default$8") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnFailureOptionsImpl.replyWhileStopped") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.copy$default$8") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnFailureOptionsImpl.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.$default$8") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply$default$8") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnStopOptionsImpl.replyWhileStopped") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.copy$default$8") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.BackoffOnStopOptionsImpl.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.internal.BackoffOnRestartSupervisor.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.pattern.internal.BackoffOnStopSupervisor.this") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.pattern.BackoffOnFailureOptionsImpl.unapply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.pattern.BackoffOnStopOptionsImpl.unapply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.pattern.BackoffOnFailureOptionsImpl.unapply") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/Backoff.scala b/akka-actor/src/main/scala/akka/pattern/Backoff.scala index 45f628959c..6cc78aeed6 100644 --- a/akka-actor/src/main/scala/akka/pattern/Backoff.scala +++ b/akka-actor/src/main/scala/akka/pattern/Backoff.scala @@ -617,7 +617,7 @@ private final case class BackoffOptionsImpl( backoffReset, randomFactor, supervisorStrategy, - replyWhileStopped)) + replyWhileStopped.map(msg => ReplyWith(msg)).getOrElse(ForwardDeathLetters))) //onStop method in companion object case StopImpliesFailure => Props( @@ -629,7 +629,7 @@ private final case class BackoffOptionsImpl( backoffReset, randomFactor, supervisorStrategy, - replyWhileStopped, + replyWhileStopped.map(msg => ReplyWith(msg)).getOrElse(ForwardDeathLetters), finalStopMessage)) } } diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala index 6e893978ac..8b2c5e61e8 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -5,9 +5,8 @@ package akka.pattern import scala.concurrent.duration.{ Duration, FiniteDuration } - -import akka.actor.{ OneForOneStrategy, Props, SupervisorStrategy } -import akka.annotation.DoNotInherit +import akka.actor.{ ActorRef, OneForOneStrategy, Props, SupervisorStrategy } +import akka.annotation.{ DoNotInherit, InternalApi } import akka.pattern.internal.{ BackoffOnRestartSupervisor, BackoffOnStopSupervisor } import akka.util.JavaDurationConverters._ @@ -299,6 +298,15 @@ private[akka] sealed trait ExtendedBackoffOptions[T <: ExtendedBackoffOptions[T] */ def withReplyWhileStopped(replyWhileStopped: Any): T + /** + * Returns a new BackoffOptions with a custom handler for messages that the supervisor receives while its child is stopped. + * By default, a message received while the child is stopped is forwarded to `deadLetters`. + * Essentially, this handler replaces `deadLetters` allowing to implement custom handling instead of a static reply. + * + * @param handler PartialFunction of the received message and sender + */ + def withHandlerWhileStopped(handler: ActorRef): T + /** * Returns the props to create the back-off supervisor. */ @@ -334,7 +342,7 @@ private final case class BackoffOnStopOptionsImpl[T]( randomFactor: Double, reset: Option[BackoffReset] = None, supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider), - replyWhileStopped: Option[Any] = None, + handlingWhileStopped: HandlingWhileStopped = ForwardDeathLetters, finalStopMessage: Option[Any => Boolean] = None) extends BackoffOnStopOptions { @@ -344,7 +352,9 @@ private final case class BackoffOnStopOptionsImpl[T]( def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withManualReset = copy(reset = Some(ManualReset)) def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy) - def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) + def withReplyWhileStopped(replyWhileStopped: Any) = copy(handlingWhileStopped = ReplyWith(replyWhileStopped)) + def withHandlerWhileStopped(handlerWhileStopped: ActorRef) = + copy(handlingWhileStopped = ForwardTo(handlerWhileStopped)) def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) @@ -374,7 +384,7 @@ private final case class BackoffOnStopOptionsImpl[T]( backoffReset, randomFactor, supervisorStrategy, - replyWhileStopped, + handlingWhileStopped, finalStopMessage)) } } @@ -387,7 +397,7 @@ private final case class BackoffOnFailureOptionsImpl[T]( randomFactor: Double, reset: Option[BackoffReset] = None, supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider), - replyWhileStopped: Option[Any] = None) + handlingWhileStopped: HandlingWhileStopped = ForwardDeathLetters) extends BackoffOnFailureOptions { private val backoffReset = reset.getOrElse(AutoReset(minBackoff)) @@ -396,7 +406,9 @@ private final case class BackoffOnFailureOptionsImpl[T]( def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff))) def withManualReset = copy(reset = Some(ManualReset)) def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy) - def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) + def withReplyWhileStopped(replyWhileStopped: Any) = copy(handlingWhileStopped = ReplyWith(replyWhileStopped)) + def withHandlerWhileStopped(handlerWhileStopped: ActorRef) = + copy(handlingWhileStopped = ForwardTo(handlerWhileStopped)) def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) @@ -419,10 +431,17 @@ private final case class BackoffOnFailureOptionsImpl[T]( backoffReset, randomFactor, supervisorStrategy, - replyWhileStopped)) + handlingWhileStopped)) } } +@InternalApi private[akka] sealed trait BackoffReset private[akka] case object ManualReset extends BackoffReset private[akka] final case class AutoReset(resetBackoff: FiniteDuration) extends BackoffReset + +@InternalApi +private[akka] sealed trait HandlingWhileStopped +private[akka] case object ForwardDeathLetters extends HandlingWhileStopped +private[akka] case class ForwardTo(handler: ActorRef) extends HandlingWhileStopped +private[akka] case class ReplyWith(msg: Any) extends HandlingWhileStopped diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index e85fe90dbe..112406f47a 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -184,7 +184,7 @@ object BackoffSupervisor { AutoReset(minBackoff), randomFactor, strategy, - None, + ForwardDeathLetters, None)) } @@ -341,7 +341,7 @@ final class BackoffSupervisor @deprecated("Use `BackoffSupervisor.props` method reset, randomFactor, strategy, - replyWhileStopped, + replyWhileStopped.map(msg => ReplyWith(msg)).getOrElse(ForwardDeathLetters), finalStopMessage) { // for binary compatibility with 2.5.18 diff --git a/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnRestartSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnRestartSupervisor.scala index 3f799dc361..e7e5bf8de3 100644 --- a/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnRestartSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnRestartSupervisor.scala @@ -4,12 +4,20 @@ package akka.pattern.internal -import scala.concurrent.duration._ - -import akka.actor.{ OneForOneStrategy, _ } import akka.actor.SupervisorStrategy._ +import akka.actor.{ OneForOneStrategy, _ } import akka.annotation.InternalApi -import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } +import akka.pattern.{ + BackoffReset, + BackoffSupervisor, + ForwardDeathLetters, + ForwardTo, + HandleBackoff, + HandlingWhileStopped, + ReplyWith +} + +import scala.concurrent.duration._ /** * INTERNAL API @@ -26,7 +34,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } val reset: BackoffReset, randomFactor: Double, strategy: OneForOneStrategy, - replyWhileStopped: Option[Any]) + handlingWhileStopped: HandlingWhileStopped) extends Actor with HandleBackoff with ActorLogging { @@ -34,7 +42,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } import BackoffSupervisor._ import context._ - override val supervisorStrategy = + override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy(strategy.maxNrOfRetries, strategy.withinTimeRange, strategy.loggingEnabled) { case ex => val defaultDirective: Directive = @@ -94,9 +102,10 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } case Some(c) => c.forward(msg) case None => - replyWhileStopped match { - case None => context.system.deadLetters.forward(msg) - case Some(r) => sender() ! r + handlingWhileStopped match { + case ForwardDeathLetters => context.system.deadLetters.forward(msg) + case ForwardTo(h) => h.forward(msg) + case ReplyWith(r) => sender() ! r } } } diff --git a/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnStopSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnStopSupervisor.scala index af94d4fa57..492f939bf5 100644 --- a/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnStopSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/internal/BackoffOnStopSupervisor.scala @@ -4,12 +4,20 @@ package akka.pattern.internal -import scala.concurrent.duration.FiniteDuration - -import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated } import akka.actor.SupervisorStrategy.{ Directive, Escalate } +import akka.actor.{ Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated } import akka.annotation.InternalApi -import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } +import akka.pattern.{ + BackoffReset, + BackoffSupervisor, + ForwardDeathLetters, + ForwardTo, + HandleBackoff, + HandlingWhileStopped, + ReplyWith +} + +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -26,7 +34,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } val reset: BackoffReset, randomFactor: Double, strategy: SupervisorStrategy, - replyWhileStopped: Option[Any], + handlingWhileStopped: HandlingWhileStopped, finalStopMessage: Option[Any => Boolean]) extends Actor with HandleBackoff @@ -35,7 +43,7 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } import BackoffSupervisor._ import context.dispatcher - override val supervisorStrategy = strategy match { + override val supervisorStrategy: SupervisorStrategy = strategy match { case oneForOne: OneForOneStrategy => OneForOneStrategy(oneForOne.maxNrOfRetries, oneForOne.withinTimeRange, oneForOne.loggingEnabled) { case ex => @@ -84,13 +92,14 @@ import akka.pattern.{ BackoffReset, BackoffSupervisor, HandleBackoff } case None => } case None => - replyWhileStopped match { - case Some(r) => sender() ! r - case None => context.system.deadLetters.forward(msg) - } finalStopMessage match { case Some(fsm) if fsm(msg) => context.stop(self) - case _ => + case _ => + handlingWhileStopped match { + case ForwardDeathLetters => context.system.deadLetters.forward(msg) + case ForwardTo(h) => h.forward(msg) + case ReplyWith(r) => sender() ! r + } } } }