This commit is contained in:
Christopher Batey 2018-09-19 18:33:12 +01:00
parent bf3c11464e
commit 85a4597e4a
7 changed files with 254 additions and 37 deletions

View file

@ -15,7 +15,7 @@ import scala.util.control.NoStackTrace
/**
* Exception without stack trace to use for verifying exceptions in tests
*/
final case class TE(message: String) extends RuntimeException(message) with NoStackTrace
final case class TE(message: String) extends RuntimeException(message)
object TestKitSettings {
/**

View file

@ -255,6 +255,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
// FIXME eventfilter support in typed testkit
import scaladsl.adapter._
implicit val untypedSystem = system.toUntyped
class FailingConstructorTestSetup(failCount: Int) {
@ -315,6 +316,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
}
}
"stop when strategy is stop - exception in setup" in {
val probe = TestProbe[Event]("evt")
val failedSetup = Behaviors.setup[Command](_ {
throw new Exc3()
targetBehavior(probe.ref)
})
val behv = Behaviors.supervise(failedSetup).onFailure[Throwable](SupervisorStrategy.stop)
EventFilter[Exc3](occurrences = 1).intercept {
spawn(behv)
}
}
"support nesting exceptions with different strategies" in {
val probe = TestProbe[Event]("evt")
val behv =
@ -374,6 +387,27 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
probe.expectMessage(State(0, Map.empty))
}
"stop when restart limit is hit" in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, 1.minute))
val ref = spawn(behv)
ref ! IncrementState
ref ! GetState
probe.expectMessage(State(1, Map.empty))
EventFilter[Exc2](occurrences = 3).intercept {
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PostStop))
}
ref ! GetState
probe.expectNoMessage()
}
"NOT stop children when restarting" in {
val parentProbe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref))
@ -684,9 +718,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
actor ! "give me stacktrace"
val stacktrace = probe.expectMessageType[Vector[StackTraceElement]]
// supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
// InterceptorImpl receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
// and then the IllegalArgument one is kept since it has a different throwable
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2)
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.InterceptorImpl.receive")) should ===(2)
}
"replace supervision when new returned behavior catches same exception nested in other behaviors" in {
@ -737,9 +771,8 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
actor ! "give me stacktrace"
val stacktrace = probe.expectMessageType[Vector[StackTraceElement]]
// supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
// and then the IllegalArgument one is kept since it has a different throwable
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2)
stacktrace.foreach(println)
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2)
}
"replace backoff supervision duplicate when behavior is created in a setup" in {

View file

@ -130,6 +130,7 @@ object Behavior {
* that is not necessary.
*/
def same[T]: Behavior[T] = SameBehavior.asInstanceOf[Behavior[T]]
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior, including the hint that the

View file

@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] {
* @return The returned behavior will be the "started" behavior of the actor used to accept
* the next message or signal.
*/
def preStart(ctx: ActorContext[I], target: PreStartTarget[I]): Behavior[I] =
def preStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] =
target.start(ctx)
/**
@ -72,6 +72,8 @@ object BehaviorInterceptor {
@DoNotInherit
trait ReceiveTarget[T] {
def apply(ctx: ActorContext[_], msg: T): Behavior[T]
def current(): Behavior[T]
def signal(ctx: ActorContext[_], signal: Signal): Behavior[T]
}
/**

View file

@ -47,6 +47,11 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] {
override def apply(ctx: ActorContext[_], msg: I): Behavior[I] =
Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], msg)
override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] =
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal)
override def current(): Behavior[I] = nestedBehavior
}
private val signalTarget = new SignalTarget[I] {
@ -56,7 +61,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
// invoked pre-start to start/de-duplicate the initial behavior stack
def preStart(ctx: typed.ActorContext[O]): Behavior[O] = {
val started = interceptor.preStart(ctx.asInstanceOf[ActorContext[I]], preStartTarget)
val started = interceptor.preStart(ctx, preStartTarget)
deduplicate(started, ctx)
}

View file

@ -8,7 +8,9 @@ package internal
import java.util.concurrent.ThreadLocalRandom
import akka.actor.DeadLetterSuppression
import akka.actor.typed.BehaviorInterceptor.{ ReceiveTarget, SignalTarget }
import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.internal.BackoffRestarter.ScheduledRestart
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.util.{ OptionVal, PrettyDuration }
@ -17,6 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.reflect.ClassTag
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
import scala.concurrent.duration._
/**
* INTERNAL API
@ -29,8 +32,7 @@ import scala.util.control.NonFatal
new Restarter(initialBehavior, initialBehavior, loggingEnabled)
case r: Restart
new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None)
case Resume(loggingEnabled) new Resumer(initialBehavior, loggingEnabled)
case Stop(loggingEnabled) new Stopper(initialBehavior, loggingEnabled)
case Stop(loggingEnabled) new Stopper(initialBehavior, loggingEnabled)
case b: Backoff
val backoffRestarter =
new BackoffRestarter(
@ -46,6 +48,195 @@ import scala.util.control.NonFatal
}
abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] {
private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = {
other match {
case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass true
case _ false
}
}
override def preStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = {
try {
target.start(ctx)
} catch handleExceptionOnStart(ctx)
}
def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
try {
target(ctx, signal)
} catch handleSignalException(ctx, target)
}
def log(ctx: ActorContext[_], t: Throwable): Unit = {
if (ss.loggingEnabled) {
ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage)
}
}
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]]
protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]]
protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]]
}
/**
* For cases where O == I for BehaviorInterceptor.
*/
abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) {
override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
try {
target(ctx, msg)
} catch handleReceiveException(ctx, target)
}
protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(_: Thr)
Behaviors.stopped
}
// convenience if target not required to handle exception
protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
}
class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) {
override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
Behaviors.stopped
}
}
class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) {
override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
Behaviors.same
}
}
// FIXME tests + impl of resetting time
class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) {
var restarts = 0
override def preStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = {
try {
target.start(ctx)
} catch {
case NonFatal(t: Thr)
log(ctx, t)
restarts += 1
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
if (restarts != strategy.maxNrOfRetries && strategy.maxNrOfRetries != -1) {
preStart(ctx, target)
} else {
throw t
}
}
}
private def handleException(ctx: ActorContext[T], signalTarget: Signal Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
restarts += 1
signalTarget(PreRestart)
if (restarts != strategy.maxNrOfRetries) {
// TODO what about exceptions here?
Behavior.validateAsInitial(Behavior.start(initial, ctx))
} else {
throw t
}
}
override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, s target(ctx, s))
}
override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, s target.signal(ctx, s))
}
}
class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) {
import BackoffRestarter._
var blackhole = false
var restartCount: Int = 0
override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
try {
msg match {
case ScheduledRestart
blackhole = false
// TODO do we need to start it?
ctx.asScala.log.info("Scheduled restart")
ctx.asScala.schedule(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount))
try {
Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]]))
} catch {
case NonFatal(ex: Thr)
val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
ctx.asScala.log.info("Failure during initialisation")
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
restartCount += 1
blackhole = true
Behaviors.empty
}
case ResetRestartCount(current)
println("Reset restart count: " + current)
if (current == restartCount) {
println("Resetting")
restartCount = 0
}
Behavior.same
case _
// TODO publish dropped message
target(ctx, msg.asInstanceOf[T])
}
} catch handleReceiveException(ctx, target)
}
protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[akka.actor.typed.Behavior[T]] = {
case NonFatal(t: Thr)
log(ctx, t)
scheduleRestart(ctx)
}
protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = {
case NonFatal(t: Thr)
target.signal(ctx, PreRestart)
log(ctx, t)
scheduleRestart(ctx)
}
protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = {
case NonFatal(t: Thr)
target(ctx, PreRestart)
log(ctx, t)
scheduleRestart(ctx)
}
private def scheduleRestart(ctx: ActorContext[AnyRef]): Behavior[T] = {
val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart)
restartCount += 1
blackhole = true
Behaviors.empty
}
}
/**
* INTERNAL API
*/
@ -123,32 +314,6 @@ import scala.util.control.NonFatal
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Resumer[T, Thr <: Throwable: ClassTag](
override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] {
def init(ctx: ActorContext[T]) = {
// no handling of errors for Resume as that could lead to infinite restart-loop
val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
if (Behavior.isAlive(started)) wrap(started, afterException = false)
else started
}
override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(ex: Thr)
log(ctx, ex)
wrap(startedBehavior, afterException = true)
}
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] =
new Resumer[T, Thr](nextBehavior, loggingEnabled)
override def toString = "resume"
}
/**
* INTERNAL API
*/

View file

@ -5,6 +5,7 @@
package akka.actor.typed
package scaladsl
import akka.actor.typed.SupervisorStrategy.{ Backoff, Restart, Resume, Stop }
import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
import akka.actor.typed.internal._
@ -187,12 +188,22 @@ object Behaviors {
private final val NothingClassTag = ClassTag(classOf[Nothing])
private final val ThrowableClassTag = ClassTag(classOf[Throwable])
final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal {
/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = {
val tag = implicitly[ClassTag[Thr]]
val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
strategy match {
case r: Resume
Behaviors.intercept[T, T](new ResumeSupervisor(r)(effectiveTag))(wrapped)
case r: Restart
Behaviors.intercept[T, T](new RestartSupervisor(wrapped, r)(effectiveTag))(wrapped)
case r: Stop
Behaviors.intercept[T, T](new StopSupervisor(wrapped, r)(effectiveTag))(wrapped)
case r: Backoff
Behaviors.intercept[AnyRef, T](new BackoffSupervisor(wrapped, r)(effectiveTag))(wrapped).asInstanceOf[Behavior[T]]
}
}
}