diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java new file mode 100644 index 0000000000..191f8f7a54 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java @@ -0,0 +1,37 @@ +package jdocs.akka.typed.supervision; + +import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.javadsl.Behaviors; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class SupervisionCompileOnlyTest { + public static Behavior behavior = Behaviors.empty(); + + public void supervision() { + //#restart + Behaviors.supervise(behavior) + .onFailure(IllegalStateException.class, SupervisorStrategy.restart()); + //#restart + + //#resume + Behaviors.supervise(behavior) + .onFailure(IllegalStateException.class, SupervisorStrategy.resume()); + //#resume + + //#restart-limit + Behaviors.supervise(behavior) + .onFailure(IllegalStateException.class, SupervisorStrategy.restartWithLimit( + 10, FiniteDuration.apply(10, TimeUnit.SECONDS) + )); + //#restart-limit + + //#multiple + Behaviors.supervise(Behaviors.supervise(behavior) + .onFailure(IllegalStateException.class, SupervisorStrategy.restart())) + .onFailure(IllegalArgumentException.class, SupervisorStrategy.stop()); + //#multiple + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 3706ef08e6..232ca9f4b0 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -3,6 +3,8 @@ */ package akka.actor.typed +import java.io.IOException + import akka.actor.typed.scaladsl.Behaviors import scala.concurrent.duration._ @@ -234,7 +236,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { } } -class SupervisionSpec extends TestKit("SupervisionSpec") with TypedAkkaSpecWithShutdown { +class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { import SupervisionSpec._ private val nameCounter = Iterator.from(0) @@ -252,12 +254,37 @@ class SupervisionSpec extends TestKit("SupervisionSpec") with TypedAkkaSpecWithS probe.expectMsg(Pong) } + "stop when strategy is stop" in { + val probe = TestProbe[Event]("evt") + val behv = Behaviors.supervise(targetBehavior(probe.ref)) + .onFailure[Throwable](SupervisorStrategy.stop) + val ref = spawn(behv) + ref ! Throw(new Exc3) + probe.expectMsg(GotSignal(PostStop)) + } + + "support nesting exceptions with different strategies" in { + val probe = TestProbe[Event]("evt") + val behv = + supervise( + supervise(targetBehavior(probe.ref)) + .onFailure[RuntimeException](SupervisorStrategy.stop) + ).onFailure[Exception](SupervisorStrategy.restart) + + val ref = spawn(behv) + + ref ! Throw(new IOException()) + probe.expectMsg(GotSignal(PreRestart)) + + ref ! Throw(new IllegalArgumentException("cat")) + probe.expectMsg(GotSignal(PostStop)) + } + "stop when not supervised" in { val probe = TestProbe[Event]("evt") val behv = targetBehavior(probe.ref) val ref = spawn(behv) ref ! Throw(new Exc3) - probe.expectMsg(GotSignal(PostStop)) } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala new file mode 100644 index 0000000000..7fab56e3a2 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala @@ -0,0 +1,33 @@ +package docs.akka.typed.supervision + +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.Behaviors +import scala.concurrent.duration._ + +object SupervisionCompileOnlyTest { + + val behavior = Behaviors.empty[String] + + //#restart + Behaviors.supervise(behavior) + .onFailure[IllegalStateException](SupervisorStrategy.restart) + //#restart + + //#resume + Behaviors.supervise(behavior) + .onFailure[IllegalStateException](SupervisorStrategy.resume) + //#resume + + //#restart-limit + Behaviors.supervise(behavior) + .onFailure[IllegalStateException](SupervisorStrategy.restartWithLimit( + maxNrOfRetries = 10, withinTimeRange = 10.seconds + )) + //#restart-limit + + //#multiple + Behaviors.supervise(Behaviors.supervise(behavior) + .onFailure[IllegalStateException](SupervisorStrategy.restart)) + .onFailure[IllegalArgumentException](SupervisorStrategy.stop) + //#multiple +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index 90c67bc52b..437a8f0b97 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -20,6 +20,11 @@ object SupervisorStrategy { */ val restart: SupervisorStrategy = Restart(-1, Duration.Zero, loggingEnabled = true) + /** + * Stop the actor + */ + val stop: SupervisorStrategy = Stop(loggingEnabled = true) + /** * Restart with a limit of number of restart retries. * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) @@ -71,6 +76,14 @@ object SupervisorStrategy { copy(loggingEnabled = enabled) } + /** + * INTERNAL API + */ + @InternalApi private[akka] case class Stop(loggingEnabled: Boolean) extends SupervisorStrategy { + override def withLoggingEnabled(on: Boolean) = + copy(loggingEnabled = on) + } + /** * INTERNAL API */ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 627de1b4f9..9102dd1c8c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -12,25 +12,18 @@ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import scala.util.control.Exception.Catcher import scala.util.control.NonFatal - import akka.actor.DeadLetterSuppression import akka.annotation.InternalApi import akka.event.Logging -import akka.actor.typed.ActorContext -import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.ExtensibleBehavior -import akka.actor.typed.PreRestart -import akka.actor.typed.Signal import akka.actor.typed.SupervisorStrategy._ -import akka.actor.typed.scaladsl.Behaviors._ import akka.util.OptionVal import akka.actor.typed.scaladsl.Behaviors /** * INTERNAL API */ -@InternalApi private[akka] object Restarter { +@InternalApi private[akka] object Supervisor { def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = Behaviors.deferred[T] { ctx ⇒ val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]] @@ -41,6 +34,7 @@ import akka.actor.typed.scaladsl.Behaviors case r: Restart ⇒ new LimitedRestarter(initialBehavior, startedBehavior, r, retries = 0, deadline = OptionVal.None) case Resume(loggingEnabled) ⇒ new Resumer(startedBehavior, loggingEnabled) + case Stop(loggingEnabled) ⇒ new Stopper(startedBehavior, loggingEnabled) case b: Backoff ⇒ val backoffRestarter = new BackoffRestarter( @@ -72,7 +66,7 @@ import akka.actor.typed.scaladsl.Behaviors */ protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] - protected def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] + protected def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] protected def restart(ctx: ActorContext[T], initialBehavior: Behavior[T], startedBehavior: Behavior[T]): Supervisor[T, Thr] = { try Behavior.interpretSignal(startedBehavior, ctx, PreRestart) catch { @@ -80,7 +74,7 @@ import akka.actor.typed.scaladsl.Behaviors "failure during PreRestart")) } // no need to canonicalize, it's done in the calling methods - wrap(Restarter.initialUndefer(ctx, initialBehavior), afterException = true) + wrap(Supervisor.initialUndefer(ctx, initialBehavior), afterException = true) } protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = @@ -126,6 +120,22 @@ import akka.actor.typed.scaladsl.Behaviors } +/** + * INTERNAL API + */ +@InternalApi private[akka] final class Stopper[T, Thr <: Throwable: ClassTag]( + override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { + + override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { + case NonFatal(ex: Thr) ⇒ + log(ctx, ex) + Behaviors.stopped + } + + override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = + new Stopper[T, Thr](nextBehavior, loggingEnabled) +} + /** * INTERNAL API */ @@ -229,7 +239,7 @@ import akka.actor.typed.scaladsl.Behaviors msg match { case ScheduledRestart ⇒ // actual restart after scheduled backoff delay - val restartedBehavior = Restarter.initialUndefer(ctx, initialBehavior) + val restartedBehavior = Supervisor.initialUndefer(ctx, initialBehavior) ctx.asScala.schedule(strategy.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) new BackoffRestarter[T, Thr](initialBehavior, restartedBehavior, strategy, restartCount, blackhole = false) case ResetRestartCount(current) ⇒ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index 3b75aedfc5..15fba3a969 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -6,7 +6,6 @@ package akka.actor.typed.javadsl import java.util.function.{ Function ⇒ JFunction } import scala.reflect.ClassTag - import akka.util.OptionVal import akka.japi.function.{ Function2 ⇒ JapiFunction2 } import akka.japi.function.{ Procedure, Procedure2 } @@ -17,9 +16,7 @@ import akka.actor.typed.Signal import akka.actor.typed.ActorRef import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC } -import akka.actor.typed.internal.BehaviorImpl -import akka.actor.typed.internal.Restarter -import akka.actor.typed.internal.TimerSchedulerImpl +import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl } import akka.annotation.ApiMayChange /** @@ -261,12 +258,12 @@ object Behaviors { final class Supervise[T] private[akka] (wrapped: Behavior[T]) { /** - * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behaior throws. + * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. * * Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior. */ def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): Behavior[T] = - akka.actor.typed.internal.Restarter(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz)) + Supervisor(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz)) /** * Specify the [[SupervisorStrategy]] to be invoked when the wrapped behaior throws. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 8075cabf23..bcec5a4e6d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -254,7 +254,7 @@ object Behaviors { def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = { val tag = implicitly[ClassTag[Thr]] val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag - akka.actor.typed.internal.Restarter(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) + Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) } } diff --git a/akka-docs/src/main/paradox/fault-tolerance-typed.md b/akka-docs/src/main/paradox/fault-tolerance-typed.md index c01a36508f..ac5c4a94a7 100644 --- a/akka-docs/src/main/paradox/fault-tolerance-typed.md +++ b/akka-docs/src/main/paradox/fault-tolerance-typed.md @@ -1,20 +1,40 @@ # Fault Tolerance -As explained in @ref:[Actor Systems](general/actor-systems.md) each actor is the supervisor of its -children, and as such each actor defines fault handling supervisor strategy. -This strategy cannot be changed afterwards as it is an integral part of the -actor system’s structure. +The default supervision strategy is for the Actor be stopped. However that can be modified by wrapping behaviors in a call to `Behaviors.supervise` +for example to restart on `IllegalStateExceptions`: -## Creating a Supervisor Strategy +Scala +: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #restart } -TODO +Java +: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart } -### Default Supervisor Strategy +Or to resume instead: -### Restart Supervisor Strategy +Scala +: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #resume } -### Stopping Supervisor Strategy +Java +: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #resume } -### Logging of Actor Failures +More complicated restart strategies can be used e.g. to restart no more than 10 +times in a 10 second period: + +Scala +: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #restart-limit } + +Java +: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart-limit } + +To handle different exceptions with different strategies calls to `supervise` +can be nested: + +Scala +: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #multiple } + +Java +: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #multiple } + +For a full list of strategies see the public methods on `SupervisorStrategy`