support nested restart and backoff supervision strategies

* problem found by new EventSourcedBehaviorStashSpec
* added tests of this in SupervisionSpec
* the scheduled ScheduledRestart and ResetRestartCount need an
  owner field so that they can be passed on if not scheduled by
  the strategy instance itself
This commit is contained in:
Patrik Nordwall 2019-01-18 12:22:41 +01:00
parent 794a8cc6ff
commit 31b700fb7b
5 changed files with 93 additions and 28 deletions

View file

@ -354,6 +354,62 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"support nesting exceptions with outer restart and inner backoff strategies" in {
val probe = TestProbe[Event]("evt")
val behv =
supervise(
supervise(targetBehavior(probe.ref))
.onFailure[IllegalArgumentException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))
).onFailure[IOException](SupervisorStrategy.restart)
val ref = spawn(behv)
EventFilter[Exception](occurrences = 1).intercept {
ref ! Throw(new IOException())
probe.expectMessage(GotSignal(PreRestart))
}
// verify that it's still alive and not stopped, IllegalStateException would stop it
ref ! Ping(1)
probe.expectMessage(Pong(1))
EventFilter[IllegalArgumentException](occurrences = 1).intercept {
ref ! Throw(new IllegalArgumentException("cat"))
probe.expectMessage(GotSignal(PreRestart))
}
// verify that it's still alive and not stopped, IllegalStateException would stop it
ref ! Ping(2)
probe.expectMessage(Pong(2))
}
"support nesting exceptions with inner restart and outer backoff strategies" in {
val probe = TestProbe[Event]("evt")
val behv =
supervise(
supervise(targetBehavior(probe.ref))
.onFailure[IllegalArgumentException](SupervisorStrategy.restart)
).onFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0))
val ref = spawn(behv)
EventFilter[Exception](occurrences = 1).intercept {
ref ! Throw(new IOException())
probe.expectMessage(GotSignal(PreRestart))
}
// verify that it's still alive and not stopped, IllegalStateException would stop it
ref ! Ping(1)
probe.expectMessage(Pong(1))
EventFilter[IllegalArgumentException](occurrences = 1).intercept {
ref ! Throw(new IllegalArgumentException("cat"))
probe.expectMessage(GotSignal(PreRestart))
}
// verify that it's still alive and not stopped, IllegalStateException would stop it
ref ! Ping(2)
probe.expectMessage(Pong(2))
}
"stop when not supervised" in {
val probe = TestProbe[Event]("evt")
val behv = targetBehavior(probe.ref)

View file

@ -172,9 +172,6 @@ object SupervisorStrategy {
stopChildren: Boolean = true,
stashCapacity: Int = -1) extends BackoffSupervisorStrategy with RestartOrBackoff {
override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy =
copy(loggingEnabled = enabled)
override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy =
copy(resetBackoffAfter = timeout)
@ -191,6 +188,9 @@ object SupervisorStrategy {
override def withStashCapacity(capacity: Int): BackoffSupervisorStrategy =
copy(stashCapacity = capacity)
override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy =
copy(loggingEnabled = enabled)
}
}

View file

@ -147,8 +147,8 @@ private object RestartSupervisor {
}
}
case object ScheduledRestart
final case class ResetRestartCount(current: Int) extends DeadLetterSuppression
final case class ScheduledRestart(owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression
final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression
}
private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff)
@ -192,25 +192,35 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
msg.asInstanceOf[Any] match {
case ScheduledRestart
restartingInProgress match {
case OptionVal.Some((_, children))
if (strategy.stopChildren && children.nonEmpty) {
// still waiting for children to stop
gotScheduledRestart = true
Behaviors.same
} else
restartCompleted(ctx)
case ScheduledRestart(owner)
if (owner eq this) {
restartingInProgress match {
case OptionVal.Some((_, children))
if (strategy.stopChildren && children.nonEmpty) {
// still waiting for children to stop
gotScheduledRestart = true
Behaviors.same
} else
restartCompleted(ctx)
case OptionVal.None
throw new IllegalStateException("Unexpected ScheduledRestart when restart not in progress")
case OptionVal.None
throw new IllegalStateException("Unexpected ScheduledRestart when restart not in progress")
}
} else {
// ScheduledRestart from nested Backoff strategy
target(ctx, msg.asInstanceOf[T])
}
case ResetRestartCount(current)
if (current == restartCount) {
restartCount = 0
case ResetRestartCount(current, owner)
if (owner eq this) {
if (current == restartCount) {
restartCount = 0
}
Behavior.same
} else {
// ResetRestartCount from nested Backoff strategy
target(ctx, msg.asInstanceOf[T])
}
Behavior.same
case m: T @unchecked
restartingInProgress match {
@ -288,7 +298,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
case backoff: Backoff
val restartDelay = calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor)
gotScheduledRestart = false
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart)
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart(this))
Behaviors.empty
case _: Restart
if (childrenToStop.isEmpty)
@ -302,7 +312,8 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
strategy match {
case backoff: Backoff
gotScheduledRestart = false
ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount))
ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any],
ResetRestartCount(restartCount, this))
case _: Restart
}

View file

@ -4,7 +4,7 @@
package docs.akka.cluster.typed
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props, SupervisorStrategy }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.SingletonActor

View file

@ -16,7 +16,6 @@ import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
@ -65,11 +64,10 @@ object EventSourcedBehaviorStashSpec {
def counter(persistenceId: PersistenceId): Behavior[Command[_]] =
Behaviors.supervise[Command[_]] {
Behaviors.setup(ctx counter(ctx, persistenceId))
Behaviors.setup(_ eventSourcedCounter(persistenceId))
}.onFailure(SupervisorStrategy.restart.withLoggingEnabled(enabled = false))
def counter(
ctx: ActorContext[Command[_]],
def eventSourcedCounter(
persistenceId: PersistenceId): EventSourcedBehavior[Command[_], Event, State] = {
EventSourcedBehavior.withEnforcedReplies[Command[_], Event, State](
persistenceId,
@ -128,7 +126,7 @@ object EventSourcedBehaviorStashSpec {
private def inactive(state: State, command: Command[_]): ReplyEffect[Event, State] = {
command match {
case cmd: Increment
case _: Increment
Effect.stash()
case cmd @ UpdateValue(_, value, _)
Effect.persist(ValueUpdated(value))