From dacfd3f4a168ed983fe5e76fd2fb428ac22d000e Mon Sep 17 00:00:00 2001 From: AndyChen Date: Wed, 11 Oct 2023 15:57:38 +0800 Subject: [PATCH] fix: keep stashBuffer when exception throw again #669 (#670) --- .../pekko/actor/typed/SupervisionSpec.scala | 99 ++++++++++++++++++- .../actor/typed/internal/Supervision.scala | 16 ++- 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala index c5e8d5b8f1..f6288791f7 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/SupervisionSpec.scala @@ -66,14 +66,15 @@ object SupervisionSpec { def targetBehavior( monitor: ActorRef[Event], state: State = State(0, Map.empty), - slowStop: Option[CountDownLatch] = None): Behavior[Command] = + slowStop: Option[CountDownLatch] = None, + slowRestart: Option[CountDownLatch] = None): Behavior[Command] = receive[Command] { (context, cmd) => cmd match { case Ping(n) => monitor ! Pong(n) Behaviors.same case IncrementState => - targetBehavior(monitor, state.copy(n = state.n + 1), slowStop) + targetBehavior(monitor, state.copy(n = state.n + 1), slowStop, slowRestart) case GetState => val reply = state.copy(children = context.children.map(c => c.path.name -> c.unsafeUpcast[Command]).toMap) monitor ! reply @@ -94,6 +95,8 @@ object SupervisionSpec { case (_, sig) => if (sig == PostStop) slowStop.foreach(latch => latch.await(10, TimeUnit.SECONDS)) + else if (sig == PreRestart) + slowRestart.foreach(latch => latch.await(10, TimeUnit.SECONDS)) monitor ! ReceivedSignal(sig) Behaviors.same } @@ -294,6 +297,49 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" } } + class FailingConstructorTestSetupWithChild(failCount: Int, slowStopCount: Int) { + val failCounter = new AtomicInteger(0) + val probe = TestProbe[Event]("evt") + val slowStop = new CountDownLatch(slowStopCount) + + class FailingConstructor(context: ActorContext[Command], monitor: ActorRef[Event]) + extends AbstractBehavior[Command](context) { + monitor ! Started + context.spawn(targetBehavior(probe.ref, slowStop = Some(slowStop)), nextName()) + if (failCounter.getAndIncrement() < failCount) { + throw TestException("simulated exc from constructor") + } + + override def onMessage(message: Command): Behavior[Command] = { + message match { + case Ping(i) => + monitor ! Pong(i) + Behaviors.same + // ignore others. + case _ => Behaviors.same + } + } + } + + def testMessageRetentionWhenStartException(strategy: SupervisorStrategy): Unit = { + val behv = supervise(setup[Command](ctx => new FailingConstructor(ctx, probe.ref))) + .onFailure[Exception](strategy) + val ref = spawn(behv) + probe.expectMessage(Started) + ref ! Ping(1) + ref ! Ping(2) + // unlock restart + slowStop.countDown() + probe.expectMessage(ReceivedSignal(PostStop)) + probe.expectMessage(Started) + probe.expectMessage(ReceivedSignal(PostStop)) + probe.expectMessage(Started) + // expect no message lost + probe.expectMessage(Pong(1)) + probe.expectMessage(Pong(2)) + } + } + class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy) { val probe = TestProbe[AnyRef]("evt") val failCounter = new AtomicInteger(0) @@ -1133,6 +1179,55 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" } } + "ensure unhandled message retention during unstash exception when restart" in { + testMessageRetentionWhenMultipleException(SupervisorStrategy.restart.withStashCapacity(4)) + } + + "ensure unhandled message retention during unstash exception when backoff" in { + testMessageRetentionWhenMultipleException(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, + 0).withStashCapacity(4)) + } + + def testMessageRetentionWhenMultipleException(strategy: SupervisorStrategy): Unit = { + val probe = TestProbe[Event]("evt") + val slowRestart = new CountDownLatch(1) + val behv = + Behaviors.supervise(targetBehavior(probe.ref, slowRestart = Some(slowRestart))).onFailure[Exc1](strategy) + val ref = spawn(behv) + + // restart strategy require a latch in order to afford the opportunity to stash messages + val childProbe = TestProbe[Event]("childEvt") + val childSlowStop = new CountDownLatch(1) + val childName = nextName() + ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(childSlowStop)), childName) + ref ! GetState + probe.expectMessageType[State].children.keySet should ===(Set(childName)) + + ref ! Throw(new Exc1) + ref ! Throw(new Exc1) + ref ! Ping(1) + ref ! Ping(2) + // waiting for actor to restart, Pings will stashed + probe.expectNoMessage() + slowRestart.countDown() + probe.expectMessage(ReceivedSignal(PreRestart)) + childSlowStop.countDown() + probe.expectMessage(ReceivedSignal(PreRestart)) + probe.expectMessage(Pong(1)) + probe.expectMessage(Pong(2)) + } + + "ensure stash message retention on start exception when restart" in new FailingConstructorTestSetupWithChild( + failCount = 2, slowStopCount = 1) { + testMessageRetentionWhenStartException(SupervisorStrategy.restart.withStashCapacity(4).withLimit(4, 10.seconds)) + } + + "ensure stash message retention on start exception when backoff" in new FailingConstructorTestSetupWithChild( + failCount = 2, slowStopCount = 1) { + testMessageRetentionWhenStartException(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, + 0).withStashCapacity(4)) + } + "work with nested supervisions and defers" in { val strategy = SupervisorStrategy.restart.withLimit(3, 1.second) val probe = TestProbe[AnyRef]("p") diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala index 44ea630956..bbc4366cb2 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala @@ -365,9 +365,15 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior val stashCapacity = if (strategy.stashCapacity >= 0) strategy.stashCapacity else ctx.asScala.system.settings.RestartStashCapacity - restartingInProgress = OptionVal.Some( - (StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], stashCapacity), childrenToStop)) - + // new generate only if first time or there has been reset to None + restartingInProgress = restartingInProgress match { + case OptionVal.Some((stashBuffer, _)) => + // keep stashBuffer when exception throw again + OptionVal.Some((stashBuffer, childrenToStop)) + case _ => + OptionVal.Some( + (StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], stashCapacity), childrenToStop)) + } strategy match { case backoff: Backoff => val restartDelay = @@ -398,8 +404,10 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior val newBehavior = Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]])) val nextBehavior = restartingInProgress match { case OptionVal.Some((stashBuffer, _)) => + val behavior = stashBuffer.unstashAll(newBehavior.unsafeCast) + // reset stash state if not more exception throw restartingInProgress = OptionVal.None - stashBuffer.unstashAll(newBehavior.unsafeCast) + behavior case _ => newBehavior } nextBehavior.narrow