Reset timeout for fixed restarts

This commit is contained in:
Christopher Batey 2018-09-20 08:32:56 +01:00
parent 85a4597e4a
commit ea454a733d
8 changed files with 65 additions and 30 deletions

View file

@ -6,7 +6,7 @@ package akka.actor.testkit.typed.scaladsl
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.annotation.{ ApiMayChange, InternalApi } import akka.annotation.ApiMayChange
import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils } import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils }

View file

@ -176,7 +176,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
"allow an interceptor to replace started behavior" in { "allow an interceptor to replace started behavior" in {
val interceptor = new BehaviorInterceptor[String, String] { val interceptor = new BehaviorInterceptor[String, String] {
override def preStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = { override def aroundStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = {
Behaviors.stopped Behaviors.stopped
} }

View file

@ -389,8 +389,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
"stop when restart limit is hit" in { "stop when restart limit is hit" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val resetTimeout = 500.millis
val behv = Behaviors.supervise(targetBehavior(probe.ref)) val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, 1.minute)) .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout))
val ref = spawn(behv) val ref = spawn(behv)
ref ! IncrementState ref ! IncrementState
ref ! GetState ref ! GetState
@ -408,6 +409,31 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
probe.expectNoMessage() probe.expectNoMessage()
} }
"reset fixed limit after timeout" in {
val probe = TestProbe[Event]("evt")
val resetTimeout = 500.millis
val behv = Behaviors.supervise(targetBehavior(probe.ref))
.onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout))
val ref = spawn(behv)
ref ! IncrementState
ref ! GetState
probe.expectMessage(State(1, Map.empty))
EventFilter[Exc2](occurrences = 3).intercept {
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
probe.expectNoMessage(resetTimeout + 50.millis)
ref ! Throw(new Exc2)
probe.expectMessage(GotSignal(PreRestart))
}
ref ! GetState
probe.expectMessage(State(0, Map.empty))
}
"NOT stop children when restarting" in { "NOT stop children when restarting" in {
val parentProbe = TestProbe[Event]("evt") val parentProbe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) val behv = Behaviors.supervise(targetBehavior(parentProbe.ref))

View file

@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] {
* @return The returned behavior will be the "started" behavior of the actor used to accept * @return The returned behavior will be the "started" behavior of the actor used to accept
* the next message or signal. * the next message or signal.
*/ */
def preStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] =
target.start(ctx) target.start(ctx)
/** /**
@ -72,7 +72,6 @@ object BehaviorInterceptor {
@DoNotInherit @DoNotInherit
trait ReceiveTarget[T] { trait ReceiveTarget[T] {
def apply(ctx: ActorContext[_], msg: T): Behavior[T] def apply(ctx: ActorContext[_], msg: T): Behavior[T]
def current(): Behavior[T]
def signal(ctx: ActorContext[_], signal: Signal): Behavior[T] def signal(ctx: ActorContext[_], signal: Signal): Behavior[T]
} }

View file

@ -50,8 +50,6 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] = override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] =
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal) Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal)
override def current(): Behavior[I] = nestedBehavior
} }
private val signalTarget = new SignalTarget[I] { private val signalTarget = new SignalTarget[I] {
@ -61,7 +59,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
// invoked pre-start to start/de-duplicate the initial behavior stack // invoked pre-start to start/de-duplicate the initial behavior stack
def preStart(ctx: typed.ActorContext[O]): Behavior[O] = { def preStart(ctx: typed.ActorContext[O]): Behavior[O] = {
val started = interceptor.preStart(ctx, preStartTarget) val started = interceptor.aroundStart(ctx, preStartTarget)
deduplicate(started, ctx) deduplicate(started, ctx)
} }

View file

@ -8,9 +8,8 @@ package internal
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.actor.typed.BehaviorInterceptor.{ ReceiveTarget, SignalTarget } import akka.actor.typed.BehaviorInterceptor.SignalTarget
import akka.actor.typed.SupervisorStrategy._ import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.internal.BackoffRestarter.ScheduledRestart
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.{ OptionVal, PrettyDuration } import akka.util.{ OptionVal, PrettyDuration }
@ -59,7 +58,7 @@ abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy
} }
} }
override def preStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = {
try { try {
target.start(ctx) target.start(ctx)
} catch handleExceptionOnStart(ctx) } catch handleExceptionOnStart(ctx)
@ -127,36 +126,49 @@ class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends Simple
class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) {
var restarts = 0 var restarts = 0
var deadline: OptionVal[Deadline] = OptionVal.None
override def preStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { private def deadlineHasTimeLeft: Boolean = deadline match {
case OptionVal.None true
case OptionVal.Some(d) d.hasTimeLeft
}
override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = {
try { try {
target.start(ctx) target.start(ctx)
} catch { } catch {
case NonFatal(t: Thr) case NonFatal(t: Thr)
log(ctx, t)
restarts += 1
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
if (restarts != strategy.maxNrOfRetries && strategy.maxNrOfRetries != -1) { log(ctx, t)
preStart(ctx, target) if (((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft) || strategy.maxNrOfRetries == -1) {
} else {
throw t throw t
} else {
ctx.asScala.log.info("Trying restart")
restart(ctx, t)
aroundStart(ctx, target)
} }
} }
} }
private def handleException(ctx: ActorContext[T], signalTarget: Signal Behavior[T]): Catcher[Behavior[T]] = { private def restart(ctx: ActorContext[_], t: Throwable) = {
case NonFatal(t: Thr) val timeLeft = deadlineHasTimeLeft
log(ctx, t) val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange)
restarts += 1 restarts = if (timeLeft) restarts + 1 else 0
signalTarget(PreRestart) deadline = newDeadline
if (restarts != strategy.maxNrOfRetries) {
// TODO what about exceptions here?
Behavior.validateAsInitial(Behavior.start(initial, ctx))
} else {
throw t
}
} }
private def handleException(ctx: ActorContext[T], signalTarget: Signal Behavior[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr)
if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) {
throw t
} else {
signalTarget(PreRestart)
log(ctx, t)
restart(ctx, t)
// TODO what about exceptions here?
Behavior.validateAsInitial(Behavior.start(initial, ctx))
}
}
override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, s target(ctx, s)) handleException(ctx, s target(ctx, s))

View file

@ -38,7 +38,7 @@ import scala.collection.immutable.HashMap
import BehaviorInterceptor._ import BehaviorInterceptor._
override def preStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = { override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
// when declaring we expect the outermost to win // when declaring we expect the outermost to win
// for example with // for example with
// val behavior = ... // val behavior = ...

View file

@ -170,7 +170,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
"stop when sent a poison pill" in { "stop when sent a poison pill" in {
EventFilter[ActorKilledException]() intercept { EventFilter[ActorKilledException]() intercept {
val a = TestActorRef(Props[WorkerActor]) val a = TestActorRef(Props[WorkerActor])
val forwarder = system.actorOf(Props(new Actor { system.actorOf(Props(new Actor {
context.watch(a) context.watch(a)
def receive = { def receive = {
case t: Terminated testActor forward WrappedTerminated(t) case t: Terminated testActor forward WrappedTerminated(t)