Merge pull request #25653 from chbatey/supervision-interceptor

Supervision with a BehaviorInterceptor
This commit is contained in:
Johan Andrén 2018-09-24 09:44:36 +02:00 committed by GitHub
commit 83b30f1ed2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 398 additions and 406 deletions

View file

@ -8,12 +8,11 @@ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.annotation.{ ApiMayChange, InternalApi }
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.Scheduler
import akka.util.Timeout

View file

@ -176,7 +176,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
"allow an interceptor to replace started behavior" in {
val interceptor = new BehaviorInterceptor[String, String] {
override def preStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = {
override def aroundStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = {
Behaviors.stopped
}

View file

@ -255,6 +255,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
// FIXME eventfilter support in typed testkit
import scaladsl.adapter._
implicit val untypedSystem = system.toUntyped
class FailingConstructorTestSetup(failCount: Int) {
@ -315,6 +316,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"stop when strategy is stop - exception in setup" in {
val probe = TestProbe[Event]("evt")
val failedSetup = Behaviors.setup[Command](_ {
throw new Exc3()
targetBehavior(probe.ref)
})
val behv = Behaviors.supervise(failedSetup).onFailure[Throwable](SupervisorStrategy.stop)
EventFilter[Exc3](occurrences = 1).intercept {
spawn(behv)
}
}
"support nesting exceptions with different strategies" in {
val probe = TestProbe[Event]("evt")
val behv =
@ -374,6 +387,53 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
probe.expectMessage(State(0, Map.empty))
}
"stop when restart limit is hit" in {
val probe = TestProbe[Event]("evt")
val resetTimeout = 500.millis
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout))
val ref = spawn(behv)
ref ! IncrementState
ref ! GetState
probe.expectMessage(State(1, Map.empty))
EventFilter[Exc2](occurrences = 3).intercept {
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PostStop))
}
ref ! GetState
probe.expectNoMessage()
}
"reset fixed limit after timeout" in {
val probe = TestProbe[Event]("evt")
val resetTimeout = 500.millis
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout))
val ref = spawn(behv)
ref ! IncrementState
ref ! GetState
probe.expectMessage(State(1, Map.empty))
EventFilter[Exc2](occurrences = 3).intercept {
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
probe.expectNoMessage(resetTimeout + 50.millis)
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
}
ref ! GetState
probe.expectMessage(State(0, Map.empty))
}
"NOT stop children when restarting" in {
val parentProbe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref))
@ -446,6 +506,29 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"publish dropped messages while backing off" in {
val probe = TestProbe[Event]("evt")
val startedProbe = TestProbe[Event]("started")
val minBackoff = 10.seconds
val strategy = SupervisorStrategy
.restartWithBackoff(minBackoff, minBackoff, 0.0)
val behv = Behaviors.supervise(Behaviors.setup[Command] { _
startedProbe.ref ! Started
targetBehavior(probe.ref)
}).onFailure[Exception](strategy)
val droppedMessagesProbe = TestProbe[Dropped]()
system.toUntyped.eventStream.subscribe(droppedMessagesProbe.ref.toUntyped, classOf[Dropped])
val ref = spawn(behv)
EventFilter[Exc1](occurrences = 1).intercept {
startedProbe.expectMessage(Started)
ref ! Throw(new Exc1)
probe.expectMessage(GotSignal(PreRestart))
}
ref ! Ping
droppedMessagesProbe.expectMessage(Dropped(Ping, ref))
}
"restart after exponential backoff" in {
val probe = TestProbe[Event]("evt")
val startedProbe = TestProbe[Event]("started")
@ -602,7 +685,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
) {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
EventFilter[TE](occurrences = 2).intercept {
EventFilter[TE](occurrences = 1).intercept {
spawn(behv)
// restarted 2 times before it gave up
@ -684,9 +767,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
actor ! "give me stacktrace"
val stacktrace = probe.expectMessageType[Vector[StackTraceElement]]
// supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
// InterceptorImpl receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
// and then the IllegalArgument one is kept since it has a different throwable
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2)
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.InterceptorImpl.receive")) should ===(2)
}
"replace supervision when new returned behavior catches same exception nested in other behaviors" in {
@ -737,9 +820,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
actor ! "give me stacktrace"
val stacktrace = probe.expectMessageType[Vector[StackTraceElement]]
// supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
// and then the IllegalArgument one is kept since it has a different throwable
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2)
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2)
}
"replace backoff supervision duplicate when behavior is created in a setup" in {

View file

@ -130,6 +130,7 @@ object Behavior {
* that is not necessary.
*/
def same[T]: Behavior[T] = SameBehavior.asInstanceOf[Behavior[T]]
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior, including the hint that the

View file

@ -4,7 +4,7 @@
package akka.actor.typed
import akka.annotation.DoNotInherit
import akka.annotation.{ DoNotInherit, InternalApi }
/**
* A behavior interceptor allows for intercepting message and signal reception and perform arbitrary logic -
@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] {
* @return The returned behavior will be the "started" behavior of the actor used to accept
* the next message or signal.
*/
def preStart(ctx: ActorContext[I], target: PreStartTarget[I]): Behavior[I] =
def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] =
target.start(ctx)
/**
@ -72,6 +72,17 @@ object BehaviorInterceptor {
@DoNotInherit
trait ReceiveTarget[T] {
def apply(ctx: ActorContext[_], msg: T): Behavior[T]
/**
* INTERNAL API
*
* Signal that the received message will result in a simulated restart
* by the [[BehaviorInterceptor]]. A [[PreRestart]] will be sent to the
* current behavior but the returned Behavior is ignored as a restart
* is taking place.
*/
@InternalApi
private[akka] def signalRestart(ctx: ActorContext[_]): Unit
}
/**

View file

@ -158,6 +158,8 @@ object SupervisorStrategy {
override def withLoggingEnabled(enabled: Boolean): SupervisorStrategy =
copy(loggingEnabled = enabled)
def unlimitedRestarts(): Boolean = maxNrOfRetries == -1
}
/**

View file

@ -7,7 +7,7 @@ package akka.actor.typed.internal
import akka.actor.typed
import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior }
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
import akka.actor.typed.{ ActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, Signal }
import akka.actor.typed.{ ActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal }
import akka.annotation.InternalApi
import akka.util.LineNumbers
@ -47,6 +47,9 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] {
override def apply(ctx: ActorContext[_], msg: I): Behavior[I] =
Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], msg)
override def signalRestart(ctx: ActorContext[_]): Unit =
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], PreRestart)
}
private val signalTarget = new SignalTarget[I] {
@ -56,7 +59,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
// invoked pre-start to start/de-duplicate the initial behavior stack
def preStart(ctx: typed.ActorContext[O]): Behavior[O] = {
val started = interceptor.preStart(ctx.asInstanceOf[ActorContext[I]], preStartTarget)
val started = interceptor.aroundStart(ctx, preStartTarget)
deduplicate(started, ctx)
}

View file

@ -1,388 +0,0 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
package internal
import java.util.concurrent.ThreadLocalRandom
import akka.actor.DeadLetterSuppression
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.util.{ OptionVal, PrettyDuration }
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.reflect.ClassTag
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi private[akka] object Supervisor {
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] =
Behaviors.setup[T] { ctx
val supervisor: Supervisor[T, Thr] = strategy match {
case Restart(-1, _, loggingEnabled)
new Restarter(initialBehavior, initialBehavior, loggingEnabled)
case r: Restart
new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None)
case Resume(loggingEnabled) new Resumer(initialBehavior, loggingEnabled)
case Stop(loggingEnabled) new Stopper(initialBehavior, loggingEnabled)
case b: Backoff
val backoffRestarter =
new BackoffRestarter(
initialBehavior.asInstanceOf[Behavior[Any]],
initialBehavior.asInstanceOf[Behavior[Any]],
b, restartCount = 0, blackhole = false)
backoffRestarter
.asInstanceOf[Supervisor[T, Thr]]
}
supervisor.init(ctx)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] abstract class Supervisor[T, Thr <: Throwable: ClassTag] extends ExtensibleBehavior[T] with WrappingBehavior[T, T] {
private[akka] def throwableClass = implicitly[ClassTag[Thr]].runtimeClass
protected def loggingEnabled: Boolean
/**
* Invoked when the actor is created (or re-created on restart) this is where a restarter implementation
* can provide logic for dealing with exceptions thrown when running any actor initialization logic (undeferring).
*
* Note that the logic must take care to not wrap StoppedBehavior to avoid creating zombie behaviors that keep
* running although stopped.
*
* @return The initial behavior of the actor after undeferring if needed
*/
def init(ctx: ActorContext[T]): Behavior[T]
/**
* Current behavior
*/
protected def behavior: Behavior[T]
def nestedBehavior: Behavior[T] = behavior
def replaceNested(newNested: Behavior[T]): Behavior[T] = wrap(newNested, afterException = false)
/**
* Wrap next behavior in a concrete restarter again.
*/
protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T]
protected def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]]
protected def restart(ctx: ActorContext[T], initialBehavior: Behavior[T], startedBehavior: Behavior[T]): Behavior[T] = {
try Behavior.interpretSignal(startedBehavior, ctx, PreRestart) catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
wrap(initialBehavior, afterException = true) match {
case s: Supervisor[T, Thr] s.init(ctx)
case b b
}
}
protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
val started = Behavior.start(nextBehavior, ctx)
val throwableAlreadyHandled = Behavior.existsInStack(started) {
case s: Supervisor[T, Thr] if s.throwableClass == throwableClass true
case _ false
}
if (throwableAlreadyHandled) started
else Behavior.wrap[T, T](behavior, started, ctx)(wrap(_, afterException = false))
}
override def receiveSignal(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
try {
val b = Behavior.interpretSignal(behavior, ctx, signal)
supervise(b, ctx)
} catch handleException(ctx, behavior)
}
override def receive(ctx: ActorContext[T], msg: T): Behavior[T] = {
try {
val b = Behavior.interpretMessage(behavior, ctx, msg)
supervise(b, ctx)
} catch handleException(ctx, behavior)
}
protected def log(ctx: ActorContext[T], ex: Thr): Unit = {
if (loggingEnabled)
ctx.asScala.log.error(ex, "Supervisor [{}] saw failure: {}", this, ex.getMessage)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Resumer[T, Thr <: Throwable: ClassTag](
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
def init(ctx: ActorContext[T]) = {
// no handling of errors for Resume as that could lead to infinite restart-loop
val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
if (Behavior.isAlive(started)) wrap(started, afterException = false)
else started
}
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
wrap(startedBehavior, afterException = true)
}
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] =
new Resumer[T, Thr](nextBehavior, loggingEnabled)
override def toString = "resume"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Stopper[T, Thr <: Throwable: ClassTag](
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
def init(ctx: ActorContext[T]): Behavior[T] = {
try {
val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
if (Behavior.isAlive(started)) wrap(started, false)
else started
} catch {
case NonFatal(ex: Thr)
log(ctx, ex)
Behavior.stopped
}
}
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
Behaviors.stopped
}
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] =
new Stopper[T, Thr](nextBehavior, loggingEnabled)
override def toString = "stop"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Restarter[T, Thr <: Throwable: ClassTag](
initialBehavior: Behavior[T], override val behavior: Behavior[T],
override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
override def init(ctx: ActorContext[T]) = {
// no handling of errors for Restart as that could lead to infinite restart-loop
val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
if (Behavior.isAlive(started)) wrap(started, afterException = false)
else started
}
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
restart(ctx, initialBehavior, startedBehavior)
}
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] =
new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled)
override def toString = "restart"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class LimitedRestarter[T, Thr <: Throwable: ClassTag](
initialBehavior: Behavior[T], override val behavior: Behavior[T],
strategy: Restart, retries: Int, deadline: OptionVal[Deadline]) extends Supervisor[T, Thr] {
override def loggingEnabled: Boolean = strategy.loggingEnabled
override def init(ctx: ActorContext[T]) =
try {
val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
if (Behavior.isAlive(started)) wrap(started, afterException = false)
else started
} catch {
case NonFatal(ex: Thr)
log(ctx, ex)
// we haven't actually wrapped and increased retries yet, so need to compare with +1
if (deadlineHasTimeLeft && (retries + 1) >= strategy.maxNrOfRetries) throw ex
else {
wrap(initialBehavior, afterException = true) match {
case s: Supervisor[T, Thr] s.init(ctx)
case b b
}
}
}
private def deadlineHasTimeLeft: Boolean = deadline match {
case OptionVal.None true
case OptionVal.Some(d) d.hasTimeLeft
}
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
if (deadlineHasTimeLeft && retries >= strategy.maxNrOfRetries)
throw ex
else
restart(ctx, initialBehavior, startedBehavior)
}
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] = {
val restarter = if (afterException) {
val timeLeft = deadlineHasTimeLeft
val newRetries = if (timeLeft) retries + 1 else 1
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange)
new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, newRetries, newDeadline)
} else
new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, retries, deadline)
restarter
}
override def toString = s"restartWithLimit(${strategy.maxNrOfRetries}, ${PrettyDuration.format(strategy.withinTimeRange)})"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object BackoffRestarter {
/**
* Calculates an exponential back off delay.
*/
def calculateDelay(
restartCount: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): FiniteDuration = {
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
if (restartCount >= 30) // Duration overflow protection (> 100 years)
maxBackoff
else
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
case f: FiniteDuration f
case _ maxBackoff
}
}
case object ScheduledRestart
final case class ResetRestartCount(current: Int) extends DeadLetterSuppression
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class BackoffRestarter[T, Thr <: Throwable: ClassTag](
initialBehavior: Behavior[Any], override val behavior: Behavior[Any],
strategy: Backoff, restartCount: Int, blackhole: Boolean) extends Supervisor[Any, Thr] {
// TODO using Any here because the scheduled messages can't be of type T.
import BackoffRestarter._
override def loggingEnabled: Boolean = strategy.loggingEnabled
// FIXME weird hack here to avoid having any Behavior.start call start the supervised behavior early when we are backing off
// maybe we can solve this in a less strange way?
override def nestedBehavior: Behavior[Any] =
if (blackhole) {
// if we are currently backing off, we don't want someone outside to start any deferred behaviors
Behaviors.empty
} else {
behavior
}
override def replaceNested(newNested: Behavior[Any]): Behavior[Any] = {
if (blackhole) {
// if we are currently backing off, we don't want someone outside to replace our inner behavior
this
} else {
super.replaceNested(newNested)
}
}
def init(ctx: ActorContext[Any]) =
try {
val started = Behavior.validateAsInitial(Behavior.start(initialBehavior, ctx))
if (Behavior.isAlive(started)) wrap(started, afterException = false)
else started
} catch {
case NonFatal(ex: Thr)
log(ctx, ex)
val restartDelay = calculateDelay(restartCount, strategy.minBackoff, strategy.maxBackoff, strategy.randomFactor)
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount + 1, blackhole = true)
}
override def receiveSignal(ctx: ActorContext[Any], signal: Signal): Behavior[Any] = {
if (blackhole) {
import scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self))
Behavior.same
} else
super.receiveSignal(ctx, signal)
}
override def receive(ctx: ActorContext[Any], msg: Any): Behavior[Any] = {
// intercept the scheduled messages and drop incoming messages if we are in backoff mode
msg match {
case ScheduledRestart
// actual restart after scheduled backoff delay
ctx.asScala.schedule(strategy.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount))
new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount, blackhole = false).init(ctx)
case ResetRestartCount(current)
if (current == restartCount)
new BackoffRestarter[T, Thr](initialBehavior, behavior, strategy, restartCount = 0, blackhole)
else
Behavior.same
case _
if (blackhole) {
import scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self))
Behavior.same
} else
super.receive(ctx, msg)
}
}
override def handleException(ctx: ActorContext[Any], startedBehavior: Behavior[Any]): Catcher[Supervisor[Any, Thr]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
// actual restart happens after the scheduled backoff delay
try Behavior.interpretSignal(behavior, ctx, PreRestart) catch {
case NonFatal(ex2) ctx.asScala.log.error(ex2, "failure during PreRestart")
}
val restartDelay = calculateDelay(restartCount, strategy.minBackoff, strategy.maxBackoff, strategy.randomFactor)
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
new BackoffRestarter[T, Thr](initialBehavior, startedBehavior, strategy, restartCount + 1, blackhole = true)
}
override protected def wrap(nextBehavior: Behavior[Any], afterException: Boolean): Behavior[Any] = {
if (afterException)
throw new IllegalStateException("wrap not expected afterException in BackoffRestarter")
else
new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole)
}
override def toString = s"restartWithBackoff(${PrettyDuration.format(strategy.minBackoff)}, ${PrettyDuration.format(strategy.maxBackoff)}, ${strategy.randomFactor})"
}

View file

@ -0,0 +1,286 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
package internal
import java.util.concurrent.ThreadLocalRandom
import akka.actor.DeadLetterSuppression
import akka.actor.typed.BehaviorInterceptor.SignalTarget
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.util.OptionVal
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.reflect.ClassTag
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi private[akka] object Supervisor {
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = {
strategy match {
case r: Resume
Behaviors.intercept[T, T](new ResumeSupervisor(r))(initialBehavior)
case r: Restart
Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior)
case r: Stop
Behaviors.intercept[T, T](new StopSupervisor(initialBehavior, r))(initialBehavior)
case r: Backoff
Behaviors.intercept[AnyRef, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior).asInstanceOf[Behavior[T]]
}
}
}
/**
* INTERNAL API
*/
@InternalApi
private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] {
private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = {
other match {
case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass true
case _ false
}
}
override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = {
try {
target.start(ctx)
} catch handleExceptionOnStart(ctx)
}
def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
try {
target(ctx, signal)
} catch handleSignalException(ctx, target)
}
def log(ctx: ActorContext[_], t: Throwable): Unit = {
if (strategy.loggingEnabled) {
ctx.asScala.log.error(t, "Supervisor {} saw failure: {}", this, t.getMessage)
}
}
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]]
protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]]
protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]]
}
/**
* For cases where O == I for BehaviorInterceptor.
*/
private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) {
override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
try {
target(ctx, msg)
} catch handleReceiveException(ctx, target)
}
protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(_: Thr)
Behaviors.stopped
}
// convenience if target not required to handle exception
protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
}
private class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) {
override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
Behaviors.stopped
}
}
private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) {
override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
Behaviors.same
}
}
private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) {
private var restarts = 0
private var deadline: OptionVal[Deadline] = OptionVal.None
private def deadlineHasTimeLeft: Boolean = deadline match {
case OptionVal.None true
case OptionVal.Some(d) d.hasTimeLeft
}
override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = {
try {
target.start(ctx)
} catch {
case NonFatal(t: Thr)
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
if (strategy.unlimitedRestarts() || ((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft)) {
// don't log here as it'll be logged as ActorInitializationException
throw t
} else {
log(ctx, t)
restart(ctx, t)
aroundStart(ctx, target)
}
}
}
private def restart(ctx: ActorContext[_], t: Throwable) = {
val timeLeft = deadlineHasTimeLeft
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange)
restarts = if (timeLeft) restarts + 1 else 1
deadline = newDeadline
}
private def handleException(ctx: ActorContext[T], signalRestart: () Unit): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) {
throw t
} else {
try {
signalRestart()
} catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
log(ctx, t)
restart(ctx, t)
Behavior.validateAsInitial(Behavior.start(initial, ctx))
}
}
override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, () target(ctx, PreRestart))
}
override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, () target.signalRestart(ctx))
}
}
private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) {
import BackoffSupervisor._
var blackhole = false
var restartCount: Int = 0
override def aroundSignal(ctx: ActorContext[AnyRef], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
if (blackhole) {
import akka.actor.typed.scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self))
Behaviors.same
} else {
super.aroundSignal(ctx, signal, target)
}
}
override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
try {
msg match {
case ScheduledRestart
blackhole = false
ctx.asScala.schedule(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount))
try {
Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]]))
} catch {
case NonFatal(ex: Thr)
log(ctx, ex)
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
restartCount += 1
blackhole = true
Behaviors.empty
}
case ResetRestartCount(current)
if (current == restartCount) {
restartCount = 0
}
Behavior.same
case _
if (blackhole) {
import akka.actor.typed.scaladsl.adapter._
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self))
Behaviors.same
} else {
target(ctx, msg.asInstanceOf[T])
}
}
} catch handleReceiveException(ctx, target)
}
protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
scheduleRestart(ctx, t)
}
protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = {
case NonFatal(t: Thr)
try {
target.signalRestart(ctx)
} catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
scheduleRestart(ctx, t)
}
protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = {
case NonFatal(t: Thr)
try {
target(ctx, PreRestart)
} catch {
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
}
scheduleRestart(ctx, t)
}
private def scheduleRestart(ctx: ActorContext[AnyRef], reason: Throwable): Behavior[T] = {
log(ctx, reason)
val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
restartCount += 1
blackhole = true
Behaviors.empty
}
}
private object BackoffSupervisor {
/**
* Calculates an exponential back off delay.
*/
def calculateDelay(
restartCount: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): FiniteDuration = {
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
if (restartCount >= 30) // Duration overflow protection (> 100 years)
maxBackoff
else
maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match {
case f: FiniteDuration f
case _ maxBackoff
}
}
case object ScheduledRestart
final case class ResetRestartCount(current: Int) extends DeadLetterSuppression
}

View file

@ -38,7 +38,7 @@ import scala.collection.immutable.HashMap
import BehaviorInterceptor._
override def preStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
// when declaring we expect the outermost to win
// for example with
// val behavior = ...

View file

@ -16,9 +16,6 @@ import scala.reflect.ClassTag
@ApiMayChange
object Behaviors {
private val _unitFunction = (_: ActorContext[Any], _: Any) ()
private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
/**
* `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
* the actor is started, as opposed to [[Behaviors.receive]] that creates the behavior instance

View file

@ -170,7 +170,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
"stop when sent a poison pill" in {
EventFilter[ActorKilledException]() intercept {
val a = TestActorRef(Props[WorkerActor])
val forwarder = system.actorOf(Props(new Actor {
system.actorOf(Props(new Actor {
context.watch(a)
def receive = {
case t: Terminated testActor forward WrappedTerminated(t)