Add typed supervisor strategy and some basic docs (#24341)

This commit is contained in:
Christopher Batey 2018-01-19 10:33:49 +00:00 committed by Johan Andrén
parent e81f350b2f
commit 4f6af8d856
8 changed files with 167 additions and 30 deletions

View file

@ -0,0 +1,37 @@
package jdocs.akka.typed.supervision;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.Behaviors;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
public class SupervisionCompileOnlyTest {
public static Behavior<String> behavior = Behaviors.empty();
public void supervision() {
//#restart
Behaviors.supervise(behavior)
.onFailure(IllegalStateException.class, SupervisorStrategy.restart());
//#restart
//#resume
Behaviors.supervise(behavior)
.onFailure(IllegalStateException.class, SupervisorStrategy.resume());
//#resume
//#restart-limit
Behaviors.supervise(behavior)
.onFailure(IllegalStateException.class, SupervisorStrategy.restartWithLimit(
10, FiniteDuration.apply(10, TimeUnit.SECONDS)
));
//#restart-limit
//#multiple
Behaviors.supervise(Behaviors.supervise(behavior)
.onFailure(IllegalStateException.class, SupervisorStrategy.restart()))
.onFailure(IllegalArgumentException.class, SupervisorStrategy.stop());
//#multiple
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.actor.typed
import java.io.IOException
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
@ -234,7 +236,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
}
}
class SupervisionSpec extends TestKit("SupervisionSpec") with TypedAkkaSpecWithShutdown {
class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
import SupervisionSpec._
private val nameCounter = Iterator.from(0)
@ -252,12 +254,37 @@ class SupervisionSpec extends TestKit("SupervisionSpec") with TypedAkkaSpecWithS
probe.expectMsg(Pong)
}
"stop when strategy is stop" in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Throwable](SupervisorStrategy.stop)
val ref = spawn(behv)
ref ! Throw(new Exc3)
probe.expectMsg(GotSignal(PostStop))
}
"support nesting exceptions with different strategies" in {
val probe = TestProbe[Event]("evt")
val behv =
supervise(
supervise(targetBehavior(probe.ref))
.onFailure[RuntimeException](SupervisorStrategy.stop)
).onFailure[Exception](SupervisorStrategy.restart)
val ref = spawn(behv)
ref ! Throw(new IOException())
probe.expectMsg(GotSignal(PreRestart))
ref ! Throw(new IllegalArgumentException("cat"))
probe.expectMsg(GotSignal(PostStop))
}
"stop when not supervised" in {
val probe = TestProbe[Event]("evt")
val behv = targetBehavior(probe.ref)
val ref = spawn(behv)
ref ! Throw(new Exc3)
probe.expectMsg(GotSignal(PostStop))
}

View file

@ -0,0 +1,33 @@
package docs.akka.typed.supervision
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
object SupervisionCompileOnlyTest {
val behavior = Behaviors.empty[String]
//#restart
Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restart)
//#restart
//#resume
Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.resume)
//#resume
//#restart-limit
Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restartWithLimit(
maxNrOfRetries = 10, withinTimeRange = 10.seconds
))
//#restart-limit
//#multiple
Behaviors.supervise(Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restart))
.onFailure[IllegalArgumentException](SupervisorStrategy.stop)
//#multiple
}

View file

@ -20,6 +20,11 @@ object SupervisorStrategy {
*/
val restart: SupervisorStrategy = Restart(-1, Duration.Zero, loggingEnabled = true)
/**
* Stop the actor
*/
val stop: SupervisorStrategy = Stop(loggingEnabled = true)
/**
* Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
@ -71,6 +76,14 @@ object SupervisorStrategy {
copy(loggingEnabled = enabled)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case class Stop(loggingEnabled: Boolean) extends SupervisorStrategy {
override def withLoggingEnabled(on: Boolean) =
copy(loggingEnabled = on)
}
/**
* INTERNAL API
*/

View file

@ -12,25 +12,18 @@ import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
import akka.event.Logging
import akka.actor.typed.ActorContext
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.ExtensibleBehavior
import akka.actor.typed.PreRestart
import akka.actor.typed.Signal
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors._
import akka.util.OptionVal
import akka.actor.typed.scaladsl.Behaviors
/**
* INTERNAL API
*/
@InternalApi private[akka] object Restarter {
@InternalApi private[akka] object Supervisor {
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] =
Behaviors.deferred[T] { ctx
val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]]
@ -41,6 +34,7 @@ import akka.actor.typed.scaladsl.Behaviors
case r: Restart
new LimitedRestarter(initialBehavior, startedBehavior, r, retries = 0, deadline = OptionVal.None)
case Resume(loggingEnabled) new Resumer(startedBehavior, loggingEnabled)
case Stop(loggingEnabled) new Stopper(startedBehavior, loggingEnabled)
case b: Backoff
val backoffRestarter =
new BackoffRestarter(
@ -72,7 +66,7 @@ import akka.actor.typed.scaladsl.Behaviors
*/
protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr]
protected def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]]
protected def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]]
protected def restart(ctx: ActorContext[T], initialBehavior: Behavior[T], startedBehavior: Behavior[T]): Supervisor[T, Thr] = {
try Behavior.interpretSignal(startedBehavior, ctx, PreRestart) catch {
@ -80,7 +74,7 @@ import akka.actor.typed.scaladsl.Behaviors
"failure during PreRestart"))
}
// no need to canonicalize, it's done in the calling methods
wrap(Restarter.initialUndefer(ctx, initialBehavior), afterException = true)
wrap(Supervisor.initialUndefer(ctx, initialBehavior), afterException = true)
}
protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] =
@ -126,6 +120,22 @@ import akka.actor.typed.scaladsl.Behaviors
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Stopper[T, Thr <: Throwable: ClassTag](
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
Behaviors.stopped
}
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] =
new Stopper[T, Thr](nextBehavior, loggingEnabled)
}
/**
* INTERNAL API
*/
@ -229,7 +239,7 @@ import akka.actor.typed.scaladsl.Behaviors
msg match {
case ScheduledRestart
// actual restart after scheduled backoff delay
val restartedBehavior = Restarter.initialUndefer(ctx, initialBehavior)
val restartedBehavior = Supervisor.initialUndefer(ctx, initialBehavior)
ctx.asScala.schedule(strategy.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount))
new BackoffRestarter[T, Thr](initialBehavior, restartedBehavior, strategy, restartCount, blackhole = false)
case ResetRestartCount(current)

View file

@ -6,7 +6,6 @@ package akka.actor.typed.javadsl
import java.util.function.{ Function JFunction }
import scala.reflect.ClassTag
import akka.util.OptionVal
import akka.japi.function.{ Function2 JapiFunction2 }
import akka.japi.function.{ Procedure, Procedure2 }
@ -17,9 +16,7 @@ import akka.actor.typed.Signal
import akka.actor.typed.ActorRef
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.{ ActorContext SAC }
import akka.actor.typed.internal.BehaviorImpl
import akka.actor.typed.internal.Restarter
import akka.actor.typed.internal.TimerSchedulerImpl
import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl }
import akka.annotation.ApiMayChange
/**
@ -261,12 +258,12 @@ object Behaviors {
final class Supervise[T] private[akka] (wrapped: Behavior[T]) {
/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behaior throws.
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws.
*
* Only exceptions of the given type (and their subclasses) will be handled by this supervision behavior.
*/
def onFailure[Thr <: Throwable](clazz: Class[Thr], strategy: SupervisorStrategy): Behavior[T] =
akka.actor.typed.internal.Restarter(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz))
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(ClassTag(clazz))
/**
* Specify the [[SupervisorStrategy]] to be invoked when the wrapped behaior throws.

View file

@ -254,7 +254,7 @@ object Behaviors {
def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = {
val tag = implicitly[ClassTag[Thr]]
val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag
akka.actor.typed.internal.Restarter(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
}
}

View file

@ -1,20 +1,40 @@
# Fault Tolerance
As explained in @ref:[Actor Systems](general/actor-systems.md) each actor is the supervisor of its
children, and as such each actor defines fault handling supervisor strategy.
This strategy cannot be changed afterwards as it is an integral part of the
actor systems structure.
The default supervision strategy is for the Actor be stopped. However that can be modified by wrapping behaviors in a call to `Behaviors.supervise`
for example to restart on `IllegalStateExceptions`:
## Creating a Supervisor Strategy
Scala
: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #restart }
TODO
Java
: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart }
### Default Supervisor Strategy
Or to resume instead:
### Restart Supervisor Strategy
Scala
: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #resume }
### Stopping Supervisor Strategy
Java
: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #resume }
### Logging of Actor Failures
More complicated restart strategies can be used e.g. to restart no more than 10
times in a 10 second period:
Scala
: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #restart-limit }
Java
: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #restart-limit }
To handle different exceptions with different strategies calls to `supervise`
can be nested:
Scala
: @@snip [SupervisionCompileOnlyTest.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/supervision/SupervisionCompileOnlyTest.scala) { #multiple }
Java
: @@snip [SupervisionCompileOnlyTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/supervision/SupervisionCompileOnlyTest.java) { #multiple }
For a full list of strategies see the public methods on `SupervisorStrategy`