fix: keep stashBuffer when exception throw again #669 (#670)

This commit is contained in:
AndyChen 2023-10-11 15:57:38 +08:00 committed by GitHub
parent 1b1f57224b
commit dacfd3f4a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 6 deletions

View file

@ -66,14 +66,15 @@ object SupervisionSpec {
def targetBehavior(
monitor: ActorRef[Event],
state: State = State(0, Map.empty),
slowStop: Option[CountDownLatch] = None): Behavior[Command] =
slowStop: Option[CountDownLatch] = None,
slowRestart: Option[CountDownLatch] = None): Behavior[Command] =
receive[Command] { (context, cmd) =>
cmd match {
case Ping(n) =>
monitor ! Pong(n)
Behaviors.same
case IncrementState =>
targetBehavior(monitor, state.copy(n = state.n + 1), slowStop)
targetBehavior(monitor, state.copy(n = state.n + 1), slowStop, slowRestart)
case GetState =>
val reply = state.copy(children = context.children.map(c => c.path.name -> c.unsafeUpcast[Command]).toMap)
monitor ! reply
@ -94,6 +95,8 @@ object SupervisionSpec {
case (_, sig) =>
if (sig == PostStop)
slowStop.foreach(latch => latch.await(10, TimeUnit.SECONDS))
else if (sig == PreRestart)
slowRestart.foreach(latch => latch.await(10, TimeUnit.SECONDS))
monitor ! ReceivedSignal(sig)
Behaviors.same
}
@ -294,6 +297,49 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
}
}
class FailingConstructorTestSetupWithChild(failCount: Int, slowStopCount: Int) {
val failCounter = new AtomicInteger(0)
val probe = TestProbe[Event]("evt")
val slowStop = new CountDownLatch(slowStopCount)
class FailingConstructor(context: ActorContext[Command], monitor: ActorRef[Event])
extends AbstractBehavior[Command](context) {
monitor ! Started
context.spawn(targetBehavior(probe.ref, slowStop = Some(slowStop)), nextName())
if (failCounter.getAndIncrement() < failCount) {
throw TestException("simulated exc from constructor")
}
override def onMessage(message: Command): Behavior[Command] = {
message match {
case Ping(i) =>
monitor ! Pong(i)
Behaviors.same
// ignore others.
case _ => Behaviors.same
}
}
}
def testMessageRetentionWhenStartException(strategy: SupervisorStrategy): Unit = {
val behv = supervise(setup[Command](ctx => new FailingConstructor(ctx, probe.ref)))
.onFailure[Exception](strategy)
val ref = spawn(behv)
probe.expectMessage(Started)
ref ! Ping(1)
ref ! Ping(2)
// unlock restart
slowStop.countDown()
probe.expectMessage(ReceivedSignal(PostStop))
probe.expectMessage(Started)
probe.expectMessage(ReceivedSignal(PostStop))
probe.expectMessage(Started)
// expect no message lost
probe.expectMessage(Pong(1))
probe.expectMessage(Pong(2))
}
}
class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy) {
val probe = TestProbe[AnyRef]("evt")
val failCounter = new AtomicInteger(0)
@ -1133,6 +1179,55 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
}
}
"ensure unhandled message retention during unstash exception when restart" in {
testMessageRetentionWhenMultipleException(SupervisorStrategy.restart.withStashCapacity(4))
}
"ensure unhandled message retention during unstash exception when backoff" in {
testMessageRetentionWhenMultipleException(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis,
0).withStashCapacity(4))
}
def testMessageRetentionWhenMultipleException(strategy: SupervisorStrategy): Unit = {
val probe = TestProbe[Event]("evt")
val slowRestart = new CountDownLatch(1)
val behv =
Behaviors.supervise(targetBehavior(probe.ref, slowRestart = Some(slowRestart))).onFailure[Exc1](strategy)
val ref = spawn(behv)
// restart strategy require a latch in order to afford the opportunity to stash messages
val childProbe = TestProbe[Event]("childEvt")
val childSlowStop = new CountDownLatch(1)
val childName = nextName()
ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(childSlowStop)), childName)
ref ! GetState
probe.expectMessageType[State].children.keySet should ===(Set(childName))
ref ! Throw(new Exc1)
ref ! Throw(new Exc1)
ref ! Ping(1)
ref ! Ping(2)
// waiting for actor to restart, Pings will stashed
probe.expectNoMessage()
slowRestart.countDown()
probe.expectMessage(ReceivedSignal(PreRestart))
childSlowStop.countDown()
probe.expectMessage(ReceivedSignal(PreRestart))
probe.expectMessage(Pong(1))
probe.expectMessage(Pong(2))
}
"ensure stash message retention on start exception when restart" in new FailingConstructorTestSetupWithChild(
failCount = 2, slowStopCount = 1) {
testMessageRetentionWhenStartException(SupervisorStrategy.restart.withStashCapacity(4).withLimit(4, 10.seconds))
}
"ensure stash message retention on start exception when backoff" in new FailingConstructorTestSetupWithChild(
failCount = 2, slowStopCount = 1) {
testMessageRetentionWhenStartException(SupervisorStrategy.restartWithBackoff(10.millis, 10.millis,
0).withStashCapacity(4))
}
"work with nested supervisions and defers" in {
val strategy = SupervisorStrategy.restart.withLimit(3, 1.second)
val probe = TestProbe[AnyRef]("p")

View file

@ -365,9 +365,15 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
val stashCapacity =
if (strategy.stashCapacity >= 0) strategy.stashCapacity
else ctx.asScala.system.settings.RestartStashCapacity
restartingInProgress = OptionVal.Some(
(StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], stashCapacity), childrenToStop))
// new generate only if first time or there has been reset to None
restartingInProgress = restartingInProgress match {
case OptionVal.Some((stashBuffer, _)) =>
// keep stashBuffer when exception throw again
OptionVal.Some((stashBuffer, childrenToStop))
case _ =>
OptionVal.Some(
(StashBuffer[Any](ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], stashCapacity), childrenToStop))
}
strategy match {
case backoff: Backoff =>
val restartDelay =
@ -398,8 +404,10 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
val newBehavior = Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]]))
val nextBehavior = restartingInProgress match {
case OptionVal.Some((stashBuffer, _)) =>
val behavior = stashBuffer.unstashAll(newBehavior.unsafeCast)
// reset stash state if not more exception throw
restartingInProgress = OptionVal.None
stashBuffer.unstashAll(newBehavior.unsafeCast)
behavior
case _ => newBehavior
}
nextBehavior.narrow