From 4cb2b2e09ff9e7e30d350141971852b360c6de52 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 Jul 2022 10:58:43 +0200 Subject: [PATCH] Avoid ClassCastException for nested restartWithBackoff, #31461 (#31462) * When nesting two (or more) restartWithBackoff there can be a ClassCastException when the internal ResetRestartCount is passed in to the user Behavior. * Reason is that a new instance of RestartSupervisor is created from a first exception via the interceptor. The ResetRestartCount is still scheduled and will not have a matching owner. * Changed ResetRestartCount and ScheduledRestart to signals instead. Signal types are open so it should be fine to add a new signal (that can't be handled). * It's probably not often this will happen, but we have seen it. --- .../akka/actor/typed/SupervisionSpec.scala | 36 ++++++++ .../actor/typed/internal/Supervision.scala | 85 ++++++++++--------- 2 files changed, 80 insertions(+), 41 deletions(-) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index d9c322c3cd..6ded780d34 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -970,6 +970,42 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage(State(0, Map.empty)) } + // issue #31461 + "handle reset backoff count for more than one nested restartWithBackoff" in { + val probe = TestProbe[Event]("evt") + val minBackoff = 20.millis + val strategy1 = + SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0).withResetBackoffAfter(900.millis) + + val strategy2 = + SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0).withResetBackoffAfter(1100.millis) + + val behv = supervise(supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy1)).onFailure[Exc3](strategy2) + val ref = spawn(behv) + + ref ! IncrementState + ref ! Throw(new Exc1) + probe.expectMessage(ReceivedSignal(PreRestart)) + ref ! GetState + probe.expectMessage(State(0, Map.empty)) + + ref ! IncrementState + ref ! Throw(new Exc3) + probe.expectMessage(ReceivedSignal(PreRestart)) + ref ! GetState + probe.expectMessage(State(0, Map.empty)) + + // no matching owner for the scheduled ResetRestartCount so it will be passed through, + // but ok since it's a signal + probe.expectMessageType[ReceivedSignal].signal.getClass.getName should endWith("ResetRestartCount") + probe.expectNoMessage() + + // still alive + ref ! IncrementState + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + } + "create underlying deferred behavior immediately" in { val probe = TestProbe[Event]("evt") val behv = supervise(setup[Command] { _ => diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index a5037ceea7..46d3ac9355 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -185,9 +185,12 @@ private object RestartSupervisor { } } - final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression + final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable]) + extends Signal + with DeadLetterSuppression final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _ <: Throwable]) - extends DeadLetterSuppression + extends Signal + with DeadLetterSuppression } private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) @@ -205,32 +208,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior } override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[T]): Behavior[T] = { - restartingInProgress match { - case OptionVal.Some((stashBuffer, children)) => - signal match { - case Terminated(ref) if strategy.stopChildren && children(ref) => - val remainingChildren = children - ref - if (remainingChildren.isEmpty && gotScheduledRestart) { - restartCompleted(ctx) - } else { - restartingInProgress = OptionVal.Some((stashBuffer, remainingChildren)) - Behaviors.same - } - - case _ => - if (stashBuffer.isFull) - dropped(ctx, signal) - else - stashBuffer.stash(signal) - Behaviors.same - } - case _ => - super.aroundSignal(ctx, signal, target) - } - } - - override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = { - msg match { + signal match { case ScheduledRestart(owner) => if (owner eq this) { restartingInProgress match { @@ -247,7 +225,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior } } else { // ScheduledRestart from nested Backoff strategy - target(ctx, msg.asInstanceOf[T]) + super.aroundSignal(ctx, signal, target) } case ResetRestartCount(current, owner) => @@ -258,26 +236,51 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior BehaviorImpl.same } else { // ResetRestartCount from nested Backoff strategy - target(ctx, msg.asInstanceOf[T]) + super.aroundSignal(ctx, signal, target) } - case msg => - val m = msg.asInstanceOf[T] + case _ => restartingInProgress match { - case OptionVal.Some((stashBuffer, _)) => - if (stashBuffer.isFull) - dropped(ctx, m) - else - stashBuffer.stash(m) - Behaviors.same + case OptionVal.Some((stashBuffer, children)) => + signal match { + case Terminated(ref) if strategy.stopChildren && children(ref) => + val remainingChildren = children - ref + if (remainingChildren.isEmpty && gotScheduledRestart) { + restartCompleted(ctx) + } else { + restartingInProgress = OptionVal.Some((stashBuffer, remainingChildren)) + Behaviors.same + } + + case _ => + if (stashBuffer.isFull) + dropped(ctx, signal) + else + stashBuffer.stash(signal) + Behaviors.same + } case _ => - try { - target(ctx, m) - } catch handleReceiveException(ctx, target) + super.aroundSignal(ctx, signal, target) } } } + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = { + val m = msg.asInstanceOf[T] + restartingInProgress match { + case OptionVal.Some((stashBuffer, _)) => + if (stashBuffer.isFull) + dropped(ctx, m) + else + stashBuffer.stash(m) + Behaviors.same + case _ => + try { + target(ctx, m) + } catch handleReceiveException(ctx, target) + } + } + override protected def handleExceptionOnStart( ctx: TypedActorContext[Any], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = {