Supervise also deferred initialization, #24052

* Initial test coverage

* Working except for the restart with exponential backoff

* All lights green

* Update after latest discussion

* Just an extra testcase to cover something I had a gut feeling could be wrong

* Some review and rebase fixes

* Alias and class name conflict

* Stopper implementing init()

* Some final minor fixes

* Rebased, and all exception logs silenced/checked for
This commit is contained in:
Johan Andrén 2018-02-01 19:41:40 +01:00 committed by Patrik Nordwall
parent 3b54f238ea
commit 9e459f0c04
5 changed files with 319 additions and 82 deletions

View file

@ -4,16 +4,19 @@
package akka.actor.typed
import java.io.IOException
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorInitializationException
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors._
import akka.testkit.EventFilter
import akka.testkit.typed.scaladsl._
import akka.testkit.typed._
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.duration._
import akka.actor.typed.scaladsl.Behaviors._
import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKit, TestKitSettings }
import scala.util.control.NoStackTrace
import akka.testkit.typed.scaladsl._
import org.scalatest.{ Matchers, WordSpec, fixture }
object SupervisionSpec {
@ -29,6 +32,7 @@ object SupervisionSpec {
case class GotSignal(signal: Signal) extends Event
case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event
case object Started extends Event
case object StartFailed extends Event
class Exc1(msg: String = "exc-1") extends RuntimeException(msg) with NoStackTrace
class Exc2 extends Exc1("exc-2")
@ -236,13 +240,55 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
}
}
class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseString(
"""
akka.loggers = [akka.testkit.TestEventListener]
""")) with TypedAkkaSpecWithShutdown {
import SupervisionSpec._
private val nameCounter = Iterator.from(0)
private def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}"
implicit val testSettings = TestKitSettings(system)
// FIXME eventfilter support in typed testkit
import scaladsl.adapter._
implicit val untypedSystem = system.toUntyped
class FailingConstructorTestSetup(failCount: Int) {
val failCounter = new AtomicInteger(0)
class FailingConstructor(monitor: ActorRef[Event]) extends MutableBehavior[Command] {
monitor ! Started
if (failCounter.getAndIncrement() < failCount) {
throw TE("simulated exc from constructor")
}
override def onMessage(msg: Command): Behavior[Command] = {
monitor ! Pong
Behaviors.same
}
}
}
class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy) {
val probe = TestProbe[AnyRef]("evt")
val failCounter = new AtomicInteger(0)
def behv = supervise(deferred[Command] { _
val count = failCounter.getAndIncrement()
if (count < failCount) {
probe.ref ! StartFailed
throw TE(s"construction ${count} failed")
} else {
probe.ref ! Started
Behaviors.empty
}
}).onFailure[TE](strategy)
}
class FailingUnhandledTestSetup(strategy: SupervisorStrategy) {
val probe = TestProbe[AnyRef]("evt")
def behv = supervise(deferred[Command] { _
probe.ref ! StartFailed
throw new TE("construction failed")
}).onFailure[IllegalArgumentException](strategy)
}
"A supervised actor" must {
"receive message" in {
@ -259,9 +305,11 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Throwable](SupervisorStrategy.stop)
val ref = spawn(behv)
EventFilter[Exc3](occurrences = 1).intercept {
ref ! Throw(new Exc3)
probe.expectMessage(GotSignal(PostStop))
}
}
"support nesting exceptions with different strategies" in {
val probe = TestProbe[Event]("evt")
@ -273,29 +321,37 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
val ref = spawn(behv)
EventFilter[IOException](occurrences = 1).intercept {
ref ! Throw(new IOException())
probe.expectMessage(GotSignal(PreRestart))
}
EventFilter[IllegalArgumentException](occurrences = 1).intercept {
ref ! Throw(new IllegalArgumentException("cat"))
probe.expectMessage(GotSignal(PostStop))
}
}
"stop when not supervised" in {
val probe = TestProbe[Event]("evt")
val behv = targetBehavior(probe.ref)
val ref = spawn(behv)
EventFilter[Exc3](occurrences = 1).intercept {
ref ! Throw(new Exc3)
probe.expectMessage(GotSignal(PostStop))
}
}
"stop when unhandled exception" in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restart)
val ref = spawn(behv)
EventFilter[Exc3](occurrences = 1).intercept {
ref ! Throw(new Exc3)
probe.expectMessage(GotSignal(PostStop))
}
}
"restart when handled exception" in {
val probe = TestProbe[Event]("evt")
@ -306,8 +362,10 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
ref ! GetState
probe.expectMessage(State(1, Map.empty))
EventFilter[Exc2](occurrences = 1).intercept {
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
}
ref ! GetState
probe.expectMessage(State(0, Map.empty))
}
@ -324,9 +382,11 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
ref ! GetState
parentProbe.expectMessageType[State].children.keySet should contain(childName)
EventFilter[Exc1](occurrences = 1).intercept {
ref ! Throw(new Exc1)
parentProbe.expectMessage(GotSignal(PreRestart))
ref ! GetState
}
// TODO document this difference compared to classic actors, and that
// children can be stopped if needed in PreRestart
parentProbe.expectMessageType[State].children.keySet should contain(childName)
@ -341,10 +401,12 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
ref ! GetState
probe.expectMessage(State(1, Map.empty))
EventFilter[Exc2](occurrences = 1).intercept {
ref ! Throw(new Exc2)
ref ! GetState
probe.expectMessage(State(1, Map.empty))
}
}
"support nesting to handle different exceptions" in {
val probe = TestProbe[Event]("evt")
@ -358,21 +420,27 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
probe.expectMessage(State(1, Map.empty))
// resume
EventFilter[Exc2](occurrences = 1).intercept {
ref ! Throw(new Exc2)
probe.expectNoMessage()
ref ! GetState
probe.expectMessage(State(1, Map.empty))
}
// restart
EventFilter[Exc3](occurrences = 1).intercept {
ref ! Throw(new Exc3)
probe.expectMessage(GotSignal(PreRestart))
ref ! GetState
probe.expectMessage(State(0, Map.empty))
}
// stop
EventFilter[Exc1](occurrences = 1).intercept {
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PostStop))
}
}
"restart after exponential backoff" in {
val probe = TestProbe[Event]("evt")
@ -387,11 +455,13 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
}).onFailure[Exception](strategy)
val ref = spawn(behv)
EventFilter[Exc1](occurrences = 1).intercept {
startedProbe.expectMessage(Started)
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
}
startedProbe.expectNoMessage(minBackoff - 100.millis)
probe.expectNoMessage(minBackoff + 100.millis)
@ -400,10 +470,12 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
probe.expectMessage(State(0, Map.empty))
// one more time
EventFilter[Exc1](occurrences = 1).intercept {
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
}
startedProbe.expectNoMessage((minBackoff * 2) - 100.millis)
probe.expectNoMessage((minBackoff * 2) + 100.millis)
@ -420,21 +492,25 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy)
val ref = spawn(behv)
EventFilter[Exc1](occurrences = 1).intercept {
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
}
probe.expectNoMessage(minBackoff + 100.millis.dilated)
ref ! GetState
probe.expectMessage(State(0, Map.empty))
// one more time after the reset timeout
EventFilter[Exc1](occurrences = 1).intercept {
probe.expectNoMessage(strategy.resetBackoffAfter + 100.millis.dilated)
ref ! IncrementState
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
ref ! Ping // dropped due to backoff
}
// backoff was reset, so restarted after the minBackoff
probe.expectNoMessage(minBackoff + 100.millis.dilated)
@ -454,14 +530,118 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown {
probe.expectMessage(Started)
}
"stop when exception from MutableBehavior constructor" in {
"fail instead of restart when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 1, strategy = SupervisorStrategy.restart) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
}
}
"fail to restart when deferred factory throws unhandled" in new FailingUnhandledTestSetup(
strategy = SupervisorStrategy.restart) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
}
}
"fail to resume when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 1,
strategy = SupervisorStrategy.resume
) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
}
}
"restart with exponential backoff when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 1,
strategy = SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis.dilated, maxBackoff = 1.second, 0)
) {
EventFilter[TE](occurrences = 1).intercept {
spawn(behv)
probe.expectMessage(StartFailed)
// restarted after a delay when first start failed
probe.expectNoMessage(100.millis)
probe.expectMessage(Started)
}
}
"fail instead of restart with exponential backoff when deferred factory throws unhandled" in new FailingUnhandledTestSetup(
strategy = SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis.dilated, maxBackoff = 1.second, 0)) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
probe.expectMessage(StartFailed)
}
}
"restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 1,
strategy = SupervisorStrategy.restartWithLimit(3, 1.second)
) {
EventFilter[TE](occurrences = 1).intercept {
spawn(behv)
probe.expectMessage(StartFailed)
probe.expectMessage(Started)
}
}
"fail after more than limit in restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup(
failCount = 3,
strategy = SupervisorStrategy.restartWithLimit(2, 1.second)
) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
EventFilter[TE](occurrences = 2).intercept {
spawn(behv)
// restarted 2 times before it gave up
probe.expectMessage(StartFailed)
probe.expectMessage(StartFailed)
probe.expectNoMessage(100.millis)
}
}
}
"fail instead of restart with limit when deferred factory throws unhandled" in new FailingUnhandledTestSetup(
strategy = SupervisorStrategy.restartWithLimit(3, 1.second)) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
spawn(behv)
probe.expectMessage(StartFailed)
}
}
"fail when exception from MutableBehavior constructor" in new FailingConstructorTestSetup(failCount = 1) {
val probe = TestProbe[Event]("evt")
val behv = supervise(mutable[Command](_ new FailingConstructor(probe.ref)))
.onFailure[Exception](SupervisorStrategy.restart)
EventFilter[ActorInitializationException](occurrences = 1).intercept {
val ref = spawn(behv)
probe.expectMessage(Started) // first one before failure
}
}
"work with nested supervisions and defers" in {
val strategy = SupervisorStrategy.restartWithLimit(3, 1.second)
val probe = TestProbe[AnyRef]("p")
val beh = supervise[String](deferred(ctx
supervise[String](deferred { ctx
probe.ref ! Started
scaladsl.Behaviors.empty[String]
}).onFailure[RuntimeException](strategy)
)).onFailure[Exception](strategy)
spawn(beh)
probe.expectMessage(Started)
ref ! Ping
probe.expectNoMessage()
}
}
}
}

View file

@ -12,11 +12,17 @@ object SupervisorStrategy {
/**
* Resume means keeping the same state as before the exception was
* thrown and is thus less safe than `restart`.
*
* If the actor behavior is deferred and throws an exception on startup the actor is stopped
* (restarting would be dangerous as it could lead to an infinite restart-loop)
*/
val resume: SupervisorStrategy = Resume(loggingEnabled = true)
/**
* Restart immediately without any limit on number of restart retries.
*
* If the actor behavior is deferred and throws an exception on startup the actor is stopped
* (restarting would be dangerous as it could lead to an infinite restart-loop)
*/
val restart: SupervisorStrategy = Restart(-1, Duration.Zero, loggingEnabled = true)
@ -31,6 +37,9 @@ object SupervisorStrategy {
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries
@ -55,6 +64,9 @@ object SupervisorStrategy {
* If no new exception occurs within the `minBackoff` duration the exponentially
* increased back-off timeout is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration

View file

@ -6,19 +6,17 @@ package internal
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import scala.concurrent.duration.Deadline
import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.event.Logging
import akka.util.OptionVal
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.reflect.ClassTag
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
import akka.event.Logging
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.SupervisorStrategy._
import akka.util.OptionVal
import akka.actor.typed.scaladsl.Behaviors
/**
* INTERNAL API
@ -27,26 +25,26 @@ import akka.actor.typed.scaladsl.Behaviors
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] =
Behaviors.deferred[T] { ctx
val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]]
val startedBehavior = initialUndefer(c, initialBehavior)
strategy match {
val supervisor: Supervisor[T, Thr] = strategy match {
case Restart(-1, _, loggingEnabled)
new Restarter(initialBehavior, startedBehavior, loggingEnabled)
new Restarter(initialBehavior, initialBehavior, loggingEnabled)
case r: Restart
new LimitedRestarter(initialBehavior, startedBehavior, r, retries = 0, deadline = OptionVal.None)
case Resume(loggingEnabled) new Resumer(startedBehavior, loggingEnabled)
case Stop(loggingEnabled) new Stopper(startedBehavior, loggingEnabled)
new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None)
case Resume(loggingEnabled) new Resumer(initialBehavior, loggingEnabled)
case Stop(loggingEnabled) new Stopper(initialBehavior, loggingEnabled)
case b: Backoff
val backoffRestarter =
new BackoffRestarter(
initialBehavior.asInstanceOf[Behavior[Any]],
startedBehavior.asInstanceOf[Behavior[Any]],
initialBehavior.asInstanceOf[Behavior[Any]],
b, restartCount = 0, blackhole = false)
backoffRestarter.asInstanceOf[Behavior[T]]
}
backoffRestarter
.asInstanceOf[Supervisor[T, Thr]]
}
supervisor.init(c)
}
def initialUndefer[T](ctx: ActorContext[T], initialBehavior: Behavior[T]): Behavior[T] =
Behavior.validateAsInitial(Behavior.undefer(initialBehavior, ctx))
}
/**
@ -56,6 +54,14 @@ import akka.actor.typed.scaladsl.Behaviors
protected def loggingEnabled: Boolean
/**
* Invoked when the actor is created (or re-created on restart) this is where a restarter implementation
* can provide logic for dealing with exceptions thrown when running any actor initialization logic (undeferring).
*
* @return The initial behavior of the actor after undeferring if needed
*/
def init(ctx: ActorContext[T]): Supervisor[T, Thr]
/**
* Current behavior
*/
@ -72,8 +78,7 @@ import akka.actor.typed.scaladsl.Behaviors
try Behavior.interpretSignal(startedBehavior, ctx, PreRestart) catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
// no need to canonicalize, it's done in the calling methods
wrap(Supervisor.initialUndefer(ctx, initialBehavior), afterException = true)
wrap(initialBehavior, afterException = true).init(ctx)
}
protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] =
@ -105,6 +110,10 @@ import akka.actor.typed.scaladsl.Behaviors
@InternalApi private[akka] final class Resumer[T, Thr <: Throwable: ClassTag](
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
def init(ctx: ActorContext[T]) =
// no handling of errors for Resume as that could lead to infinite restart-loop
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false)
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
@ -122,6 +131,9 @@ import akka.actor.typed.scaladsl.Behaviors
@InternalApi private[akka] final class Stopper[T, Thr <: Throwable: ClassTag](
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
def init(ctx: ActorContext[T]): Supervisor[T, Thr] =
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), false)
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
@ -130,6 +142,7 @@ import akka.actor.typed.scaladsl.Behaviors
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] =
new Stopper[T, Thr](nextBehavior, loggingEnabled)
}
/**
@ -139,6 +152,10 @@ import akka.actor.typed.scaladsl.Behaviors
initialBehavior: Behavior[T], override val behavior: Behavior[T],
override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
override def init(ctx: ActorContext[T]) =
// no handling of errors for Restart as that could lead to infinite restart-loop
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false)
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
@ -158,6 +175,17 @@ import akka.actor.typed.scaladsl.Behaviors
override def loggingEnabled: Boolean = strategy.loggingEnabled
override def init(ctx: ActorContext[T]) =
try {
wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false)
} catch {
case NonFatal(ex: Thr)
log(ctx, ex)
// we haven't actually wrapped and increased retries yet, so need to compare with +1
if (deadlineHasTimeLeft && (retries + 1) >= strategy.maxNrOfRetries) throw ex
else wrap(initialBehavior, afterException = true).init(ctx)
}
private def deadlineHasTimeLeft: Boolean = deadline match {
case OptionVal.None true
case OptionVal.Some(d) d.hasTimeLeft
@ -222,6 +250,18 @@ import akka.actor.typed.scaladsl.Behaviors
override def loggingEnabled: Boolean = strategy.loggingEnabled
def init(ctx: ActorContext[Any]): Supervisor[Any, Thr] =
try {
val startedBehavior = Behavior.validateAsInitial(Behavior.undefer(initialBehavior, ctx))
new BackoffRestarter(initialBehavior, startedBehavior, strategy, restartCount, blackhole)
} catch {
case NonFatal(ex: Thr)
log(ctx, ex)
val restartDelay = calculateDelay(restartCount, strategy.minBackoff, strategy.maxBackoff, strategy.randomFactor)
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount + 1, blackhole = true)
}
override def receiveSignal(ctx: ActorContext[Any], signal: Signal): Behavior[Any] = {
if (blackhole) {
import scaladsl.adapter._
@ -236,9 +276,8 @@ import akka.actor.typed.scaladsl.Behaviors
msg match {
case ScheduledRestart
// actual restart after scheduled backoff delay
val restartedBehavior = Supervisor.initialUndefer(ctx, initialBehavior)
ctx.asScala.schedule(strategy.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount))
new BackoffRestarter[T, Thr](initialBehavior, restartedBehavior, strategy, restartCount, blackhole = false)
new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount, blackhole = false).init(ctx)
case ResetRestartCount(current)
if (current == restartCount)
new BackoffRestarter[T, Thr](initialBehavior, behavior, strategy, restartCount = 0, blackhole)

View file

@ -10,6 +10,12 @@ import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.concurrent.{ Await, TimeoutException }
import scala.util.control.NoStackTrace
/**
* Exception without stack trace to use for verifying exceptions in tests
*/
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
object TestKit {