* When nesting two (or more) restartWithBackoff there can be a ClassCastException when the internal ResetRestartCount is passed in to the user Behavior. * Reason is that a new instance of RestartSupervisor is created from a first exception via the interceptor. The ResetRestartCount is still scheduled and will not have a matching owner. * Changed ResetRestartCount and ScheduledRestart to signals instead. Signal types are open so it should be fine to add a new signal (that can't be handled). * It's probably not often this will happen, but we have seen it.
This commit is contained in:
parent
67db6145cc
commit
4cb2b2e09f
2 changed files with 80 additions and 41 deletions
|
|
@ -970,6 +970,42 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
|
|||
probe.expectMessage(State(0, Map.empty))
|
||||
}
|
||||
|
||||
// issue #31461
|
||||
"handle reset backoff count for more than one nested restartWithBackoff" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val minBackoff = 20.millis
|
||||
val strategy1 =
|
||||
SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0).withResetBackoffAfter(900.millis)
|
||||
|
||||
val strategy2 =
|
||||
SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0).withResetBackoffAfter(1100.millis)
|
||||
|
||||
val behv = supervise(supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy1)).onFailure[Exc3](strategy2)
|
||||
val ref = spawn(behv)
|
||||
|
||||
ref ! IncrementState
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
probe.expectMessage(State(0, Map.empty))
|
||||
|
||||
ref ! IncrementState
|
||||
ref ! Throw(new Exc3)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
probe.expectMessage(State(0, Map.empty))
|
||||
|
||||
// no matching owner for the scheduled ResetRestartCount so it will be passed through,
|
||||
// but ok since it's a signal
|
||||
probe.expectMessageType[ReceivedSignal].signal.getClass.getName should endWith("ResetRestartCount")
|
||||
probe.expectNoMessage()
|
||||
|
||||
// still alive
|
||||
ref ! IncrementState
|
||||
ref ! GetState
|
||||
probe.expectMessage(State(1, Map.empty))
|
||||
}
|
||||
|
||||
"create underlying deferred behavior immediately" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = supervise(setup[Command] { _ =>
|
||||
|
|
|
|||
|
|
@ -185,9 +185,12 @@ private object RestartSupervisor {
|
|||
}
|
||||
}
|
||||
|
||||
final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression
|
||||
final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable])
|
||||
extends Signal
|
||||
with DeadLetterSuppression
|
||||
final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _ <: Throwable])
|
||||
extends DeadLetterSuppression
|
||||
extends Signal
|
||||
with DeadLetterSuppression
|
||||
}
|
||||
|
||||
private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff)
|
||||
|
|
@ -205,32 +208,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
}
|
||||
|
||||
override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
restartingInProgress match {
|
||||
case OptionVal.Some((stashBuffer, children)) =>
|
||||
signal match {
|
||||
case Terminated(ref) if strategy.stopChildren && children(ref) =>
|
||||
val remainingChildren = children - ref
|
||||
if (remainingChildren.isEmpty && gotScheduledRestart) {
|
||||
restartCompleted(ctx)
|
||||
} else {
|
||||
restartingInProgress = OptionVal.Some((stashBuffer, remainingChildren))
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
case _ =>
|
||||
if (stashBuffer.isFull)
|
||||
dropped(ctx, signal)
|
||||
else
|
||||
stashBuffer.stash(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
case _ =>
|
||||
super.aroundSignal(ctx, signal, target)
|
||||
}
|
||||
}
|
||||
|
||||
override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
msg match {
|
||||
signal match {
|
||||
case ScheduledRestart(owner) =>
|
||||
if (owner eq this) {
|
||||
restartingInProgress match {
|
||||
|
|
@ -247,7 +225,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
}
|
||||
} else {
|
||||
// ScheduledRestart from nested Backoff strategy
|
||||
target(ctx, msg.asInstanceOf[T])
|
||||
super.aroundSignal(ctx, signal, target)
|
||||
}
|
||||
|
||||
case ResetRestartCount(current, owner) =>
|
||||
|
|
@ -258,26 +236,51 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
BehaviorImpl.same
|
||||
} else {
|
||||
// ResetRestartCount from nested Backoff strategy
|
||||
target(ctx, msg.asInstanceOf[T])
|
||||
super.aroundSignal(ctx, signal, target)
|
||||
}
|
||||
|
||||
case msg =>
|
||||
val m = msg.asInstanceOf[T]
|
||||
case _ =>
|
||||
restartingInProgress match {
|
||||
case OptionVal.Some((stashBuffer, _)) =>
|
||||
if (stashBuffer.isFull)
|
||||
dropped(ctx, m)
|
||||
else
|
||||
stashBuffer.stash(m)
|
||||
Behaviors.same
|
||||
case OptionVal.Some((stashBuffer, children)) =>
|
||||
signal match {
|
||||
case Terminated(ref) if strategy.stopChildren && children(ref) =>
|
||||
val remainingChildren = children - ref
|
||||
if (remainingChildren.isEmpty && gotScheduledRestart) {
|
||||
restartCompleted(ctx)
|
||||
} else {
|
||||
restartingInProgress = OptionVal.Some((stashBuffer, remainingChildren))
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
case _ =>
|
||||
if (stashBuffer.isFull)
|
||||
dropped(ctx, signal)
|
||||
else
|
||||
stashBuffer.stash(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
case _ =>
|
||||
try {
|
||||
target(ctx, m)
|
||||
} catch handleReceiveException(ctx, target)
|
||||
super.aroundSignal(ctx, signal, target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
val m = msg.asInstanceOf[T]
|
||||
restartingInProgress match {
|
||||
case OptionVal.Some((stashBuffer, _)) =>
|
||||
if (stashBuffer.isFull)
|
||||
dropped(ctx, m)
|
||||
else
|
||||
stashBuffer.stash(m)
|
||||
Behaviors.same
|
||||
case _ =>
|
||||
try {
|
||||
target(ctx, m)
|
||||
} catch handleReceiveException(ctx, target)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def handleExceptionOnStart(
|
||||
ctx: TypedActorContext[Any],
|
||||
@unused target: PreStartTarget[T]): Catcher[Behavior[T]] = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue