From ea454a733d7714d98aa81e63122da6ec055af045 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 08:32:56 +0100 Subject: [PATCH] Reset timeout for fixed restarts --- .../testkit/typed/scaladsl/ActorTestKit.scala | 2 +- .../akka/actor/typed/InterceptSpec.scala | 2 +- .../akka/actor/typed/SupervisionSpec.scala | 28 +++++++++- .../actor/typed/BehaviorInterceptor.scala | 3 +- .../typed/internal/InterceptorImpl.scala | 4 +- .../akka/actor/typed/internal/Restarter.scala | 52 ++++++++++++------- .../internal/WithMdcBehaviorInterceptor.scala | 2 +- .../scala/akka/testkit/TestActorRefSpec.scala | 2 +- 8 files changed, 65 insertions(+), 30 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala index efacac2a76..6f38c3cd22 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala @@ -6,7 +6,7 @@ package akka.actor.testkit.typed.scaladsl import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } -import akka.annotation.{ ApiMayChange, InternalApi } +import akka.annotation.ApiMayChange import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index 731bd2fe90..85b1c010ad 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -176,7 +176,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit( "allow an interceptor to replace started behavior" in { val interceptor = new BehaviorInterceptor[String, String] { - override def preStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = { + override def aroundStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = { Behaviors.stopped } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 76a7d2d912..d72fbfc04b 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -389,8 +389,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( "stop when restart limit is hit" in { val probe = TestProbe[Event]("evt") + val resetTimeout = 500.millis val behv = Behaviors.supervise(targetBehavior(probe.ref)) - .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, 1.minute)) + .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout)) val ref = spawn(behv) ref ! IncrementState ref ! GetState @@ -408,6 +409,31 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectNoMessage() } + "reset fixed limit after timeout" in { + val probe = TestProbe[Event]("evt") + val resetTimeout = 500.millis + val behv = Behaviors.supervise(targetBehavior(probe.ref)) + .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout)) + val ref = spawn(behv) + ref ! IncrementState + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + + EventFilter[Exc2](occurrences = 3).intercept { + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + + probe.expectNoMessage(resetTimeout + 50.millis) + + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + } + ref ! GetState + probe.expectMessage(State(0, Map.empty)) + } + "NOT stop children when restarting" in { val parentProbe = TestProbe[Event]("evt") val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index 502237cd82..ec4067da55 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] { * @return The returned behavior will be the "started" behavior of the actor used to accept * the next message or signal. */ - def preStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = + def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = target.start(ctx) /** @@ -72,7 +72,6 @@ object BehaviorInterceptor { @DoNotInherit trait ReceiveTarget[T] { def apply(ctx: ActorContext[_], msg: T): Behavior[T] - def current(): Behavior[T] def signal(ctx: ActorContext[_], signal: Signal): Behavior[T] } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 0b9ccffeed..08784bb12d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -50,8 +50,6 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] = Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal) - - override def current(): Behavior[I] = nestedBehavior } private val signalTarget = new SignalTarget[I] { @@ -61,7 +59,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce // invoked pre-start to start/de-duplicate the initial behavior stack def preStart(ctx: typed.ActorContext[O]): Behavior[O] = { - val started = interceptor.preStart(ctx, preStartTarget) + val started = interceptor.aroundStart(ctx, preStartTarget) deduplicate(started, ctx) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 88a9183cf2..63ee00ad30 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -8,9 +8,8 @@ package internal import java.util.concurrent.ThreadLocalRandom import akka.actor.DeadLetterSuppression -import akka.actor.typed.BehaviorInterceptor.{ ReceiveTarget, SignalTarget } +import akka.actor.typed.BehaviorInterceptor.SignalTarget import akka.actor.typed.SupervisorStrategy._ -import akka.actor.typed.internal.BackoffRestarter.ScheduledRestart import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.util.{ OptionVal, PrettyDuration } @@ -59,7 +58,7 @@ abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy } } - override def preStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { + override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { try { target.start(ctx) } catch handleExceptionOnStart(ctx) @@ -127,36 +126,49 @@ class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends Simple class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { var restarts = 0 + var deadline: OptionVal[Deadline] = OptionVal.None - override def preStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { + private def deadlineHasTimeLeft: Boolean = deadline match { + case OptionVal.None ⇒ true + case OptionVal.Some(d) ⇒ d.hasTimeLeft + } + + override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { try { target.start(ctx) } catch { case NonFatal(t: Thr) ⇒ - log(ctx, t) - restarts += 1 // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop - if (restarts != strategy.maxNrOfRetries && strategy.maxNrOfRetries != -1) { - preStart(ctx, target) - } else { + log(ctx, t) + if (((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft) || strategy.maxNrOfRetries == -1) { throw t + } else { + ctx.asScala.log.info("Trying restart") + restart(ctx, t) + aroundStart(ctx, target) } } } - private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - log(ctx, t) - restarts += 1 - signalTarget(PreRestart) - if (restarts != strategy.maxNrOfRetries) { - // TODO what about exceptions here? - Behavior.validateAsInitial(Behavior.start(initial, ctx)) - } else { - throw t - } + private def restart(ctx: ActorContext[_], t: Throwable) = { + val timeLeft = deadlineHasTimeLeft + val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) + restarts = if (timeLeft) restarts + 1 else 0 + deadline = newDeadline } + private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { + throw t + } else { + signalTarget(PreRestart) + log(ctx, t) + restart(ctx, t) + // TODO what about exceptions here? + Behavior.validateAsInitial(Behavior.start(initial, ctx)) + } + } override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, s ⇒ target(ctx, s)) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala index 0c6cd82dcc..1bf0269ed4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala @@ -38,7 +38,7 @@ import scala.collection.immutable.HashMap import BehaviorInterceptor._ - override def preStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = { + override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = { // when declaring we expect the outermost to win // for example with // val behavior = ... diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 462969d564..1e8a2eb859 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -170,7 +170,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA "stop when sent a poison pill" in { EventFilter[ActorKilledException]() intercept { val a = TestActorRef(Props[WorkerActor]) - val forwarder = system.actorOf(Props(new Actor { + system.actorOf(Props(new Actor { context.watch(a) def receive = { case t: Terminated ⇒ testActor forward WrappedTerminated(t)