diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 66c9be7480..9afedb1e25 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -354,6 +354,62 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } + "support nesting exceptions with outer restart and inner backoff strategies" in { + val probe = TestProbe[Event]("evt") + val behv = + supervise( + supervise(targetBehavior(probe.ref)) + .onFailure[IllegalArgumentException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)) + ).onFailure[IOException](SupervisorStrategy.restart) + + val ref = spawn(behv) + + EventFilter[Exception](occurrences = 1).intercept { + ref ! Throw(new IOException()) + probe.expectMessage(GotSignal(PreRestart)) + } + // verify that it's still alive and not stopped, IllegalStateException would stop it + ref ! Ping(1) + probe.expectMessage(Pong(1)) + + EventFilter[IllegalArgumentException](occurrences = 1).intercept { + ref ! Throw(new IllegalArgumentException("cat")) + probe.expectMessage(GotSignal(PreRestart)) + } + + // verify that it's still alive and not stopped, IllegalStateException would stop it + ref ! Ping(2) + probe.expectMessage(Pong(2)) + } + + "support nesting exceptions with inner restart and outer backoff strategies" in { + val probe = TestProbe[Event]("evt") + val behv = + supervise( + supervise(targetBehavior(probe.ref)) + .onFailure[IllegalArgumentException](SupervisorStrategy.restart) + ).onFailure[IOException](SupervisorStrategy.restartWithBackoff(10.millis, 10.millis, 0.0)) + + val ref = spawn(behv) + + EventFilter[Exception](occurrences = 1).intercept { + ref ! Throw(new IOException()) + probe.expectMessage(GotSignal(PreRestart)) + } + // verify that it's still alive and not stopped, IllegalStateException would stop it + ref ! Ping(1) + probe.expectMessage(Pong(1)) + + EventFilter[IllegalArgumentException](occurrences = 1).intercept { + ref ! Throw(new IllegalArgumentException("cat")) + probe.expectMessage(GotSignal(PreRestart)) + } + + // verify that it's still alive and not stopped, IllegalStateException would stop it + ref ! Ping(2) + probe.expectMessage(Pong(2)) + } + "stop when not supervised" in { val probe = TestProbe[Event]("evt") val behv = targetBehavior(probe.ref) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index 1cb5872964..fb14fab454 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -172,9 +172,6 @@ object SupervisorStrategy { stopChildren: Boolean = true, stashCapacity: Int = -1) extends BackoffSupervisorStrategy with RestartOrBackoff { - override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy = - copy(loggingEnabled = enabled) - override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy = copy(resetBackoffAfter = timeout) @@ -191,6 +188,9 @@ object SupervisorStrategy { override def withStashCapacity(capacity: Int): BackoffSupervisorStrategy = copy(stashCapacity = capacity) + + override def withLoggingEnabled(enabled: Boolean): BackoffSupervisorStrategy = + copy(loggingEnabled = enabled) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index a2d2dac4ef..9e2d70ba9c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -147,8 +147,8 @@ private object RestartSupervisor { } } - case object ScheduledRestart - final case class ResetRestartCount(current: Int) extends DeadLetterSuppression + final case class ScheduledRestart(owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression + final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression } private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) @@ -192,25 +192,35 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = { msg.asInstanceOf[Any] match { - case ScheduledRestart ⇒ - restartingInProgress match { - case OptionVal.Some((_, children)) ⇒ - if (strategy.stopChildren && children.nonEmpty) { - // still waiting for children to stop - gotScheduledRestart = true - Behaviors.same - } else - restartCompleted(ctx) + case ScheduledRestart(owner) ⇒ + if (owner eq this) { + restartingInProgress match { + case OptionVal.Some((_, children)) ⇒ + if (strategy.stopChildren && children.nonEmpty) { + // still waiting for children to stop + gotScheduledRestart = true + Behaviors.same + } else + restartCompleted(ctx) - case OptionVal.None ⇒ - throw new IllegalStateException("Unexpected ScheduledRestart when restart not in progress") + case OptionVal.None ⇒ + throw new IllegalStateException("Unexpected ScheduledRestart when restart not in progress") + } + } else { + // ScheduledRestart from nested Backoff strategy + target(ctx, msg.asInstanceOf[T]) } - case ResetRestartCount(current) ⇒ - if (current == restartCount) { - restartCount = 0 + case ResetRestartCount(current, owner) ⇒ + if (owner eq this) { + if (current == restartCount) { + restartCount = 0 + } + Behavior.same + } else { + // ResetRestartCount from nested Backoff strategy + target(ctx, msg.asInstanceOf[T]) } - Behavior.same case m: T @unchecked ⇒ restartingInProgress match { @@ -288,7 +298,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav case backoff: Backoff ⇒ val restartDelay = calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor) gotScheduledRestart = false - ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart) + ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart(this)) Behaviors.empty case _: Restart ⇒ if (childrenToStop.isEmpty) @@ -302,7 +312,8 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav strategy match { case backoff: Backoff ⇒ gotScheduledRestart = false - ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount)) + ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], + ResetRestartCount(restartCount, this)) case _: Restart ⇒ } diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala index f2d1d68d94..56e939e654 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala @@ -4,7 +4,7 @@ package docs.akka.cluster.typed -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props, SupervisorStrategy } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors import akka.cluster.typed.SingletonActor diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala index 89fd2546cf..5a02372d4e 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala @@ -16,7 +16,6 @@ import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy -import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId @@ -65,11 +64,10 @@ object EventSourcedBehaviorStashSpec { def counter(persistenceId: PersistenceId): Behavior[Command[_]] = Behaviors.supervise[Command[_]] { - Behaviors.setup(ctx ⇒ counter(ctx, persistenceId)) + Behaviors.setup(_ ⇒ eventSourcedCounter(persistenceId)) }.onFailure(SupervisorStrategy.restart.withLoggingEnabled(enabled = false)) - def counter( - ctx: ActorContext[Command[_]], + def eventSourcedCounter( persistenceId: PersistenceId): EventSourcedBehavior[Command[_], Event, State] = { EventSourcedBehavior.withEnforcedReplies[Command[_], Event, State]( persistenceId, @@ -128,7 +126,7 @@ object EventSourcedBehaviorStashSpec { private def inactive(state: State, command: Command[_]): ReplyEffect[Event, State] = { command match { - case cmd: Increment ⇒ + case _: Increment ⇒ Effect.stash() case cmd @ UpdateValue(_, value, _) ⇒ Effect.persist(ValueUpdated(value))