From 85a4597e4ae10e2cedcda1fdd14d8837253e8bbc Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Wed, 19 Sep 2018 18:33:12 +0100 Subject: [PATCH 1/7] wip --- .../actor/testkit/typed/TestKitSettings.scala | 2 +- .../akka/actor/typed/SupervisionSpec.scala | 43 +++- .../scala/akka/actor/typed/Behavior.scala | 1 + .../actor/typed/BehaviorInterceptor.scala | 4 +- .../typed/internal/InterceptorImpl.scala | 7 +- .../akka/actor/typed/internal/Restarter.scala | 221 +++++++++++++++--- .../akka/actor/typed/scaladsl/Behaviors.scala | 13 +- 7 files changed, 254 insertions(+), 37 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala index 62bb15cb97..0a9bea09c5 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala @@ -15,7 +15,7 @@ import scala.util.control.NoStackTrace /** * Exception without stack trace to use for verifying exceptions in tests */ -final case class TE(message: String) extends RuntimeException(message) with NoStackTrace +final case class TE(message: String) extends RuntimeException(message) object TestKitSettings { /** diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 9e1472ad48..76a7d2d912 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -255,6 +255,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( // FIXME eventfilter support in typed testkit import scaladsl.adapter._ + implicit val untypedSystem = system.toUntyped class FailingConstructorTestSetup(failCount: Int) { @@ -315,6 +316,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } + "stop when strategy is stop - exception in setup" in { + val probe = TestProbe[Event]("evt") + val failedSetup = Behaviors.setup[Command](_ ⇒ { + throw new Exc3() + targetBehavior(probe.ref) + }) + val behv = Behaviors.supervise(failedSetup).onFailure[Throwable](SupervisorStrategy.stop) + EventFilter[Exc3](occurrences = 1).intercept { + spawn(behv) + } + } + "support nesting exceptions with different strategies" in { val probe = TestProbe[Event]("evt") val behv = @@ -374,6 +387,27 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectMessage(State(0, Map.empty)) } + "stop when restart limit is hit" in { + val probe = TestProbe[Event]("evt") + val behv = Behaviors.supervise(targetBehavior(probe.ref)) + .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, 1.minute)) + val ref = spawn(behv) + ref ! IncrementState + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + + EventFilter[Exc2](occurrences = 3).intercept { + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PostStop)) + } + ref ! GetState + probe.expectNoMessage() + } + "NOT stop children when restarting" in { val parentProbe = TestProbe[Event]("evt") val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) @@ -684,9 +718,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( actor ! "give me stacktrace" val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] - // supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException + // InterceptorImpl receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException // and then the IllegalArgument one is kept since it has a different throwable - stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2) + stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.InterceptorImpl.receive")) should ===(2) } "replace supervision when new returned behavior catches same exception nested in other behaviors" in { @@ -737,9 +771,8 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( actor ! "give me stacktrace" val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] - // supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException - // and then the IllegalArgument one is kept since it has a different throwable - stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2) + stacktrace.foreach(println) + stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2) } "replace backoff supervision duplicate when behavior is created in a setup" in { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index bc6199fb3b..618794531d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -130,6 +130,7 @@ object Behavior { * that is not necessary. */ def same[T]: Behavior[T] = SameBehavior.asInstanceOf[Behavior[T]] + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior, including the hint that the diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index 71c4f88b8a..502237cd82 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] { * @return The returned behavior will be the "started" behavior of the actor used to accept * the next message or signal. */ - def preStart(ctx: ActorContext[I], target: PreStartTarget[I]): Behavior[I] = + def preStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = target.start(ctx) /** @@ -72,6 +72,8 @@ object BehaviorInterceptor { @DoNotInherit trait ReceiveTarget[T] { def apply(ctx: ActorContext[_], msg: T): Behavior[T] + def current(): Behavior[T] + def signal(ctx: ActorContext[_], signal: Signal): Behavior[T] } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 6e21664a7f..0b9ccffeed 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -47,6 +47,11 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] { override def apply(ctx: ActorContext[_], msg: I): Behavior[I] = Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], msg) + + override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] = + Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal) + + override def current(): Behavior[I] = nestedBehavior } private val signalTarget = new SignalTarget[I] { @@ -56,7 +61,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce // invoked pre-start to start/de-duplicate the initial behavior stack def preStart(ctx: typed.ActorContext[O]): Behavior[O] = { - val started = interceptor.preStart(ctx.asInstanceOf[ActorContext[I]], preStartTarget) + val started = interceptor.preStart(ctx, preStartTarget) deduplicate(started, ctx) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 56785a0265..88a9183cf2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -8,7 +8,9 @@ package internal import java.util.concurrent.ThreadLocalRandom import akka.actor.DeadLetterSuppression +import akka.actor.typed.BehaviorInterceptor.{ ReceiveTarget, SignalTarget } import akka.actor.typed.SupervisorStrategy._ +import akka.actor.typed.internal.BackoffRestarter.ScheduledRestart import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.util.{ OptionVal, PrettyDuration } @@ -17,6 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.reflect.ClassTag import scala.util.control.Exception.Catcher import scala.util.control.NonFatal +import scala.concurrent.duration._ /** * INTERNAL API @@ -29,8 +32,7 @@ import scala.util.control.NonFatal new Restarter(initialBehavior, initialBehavior, loggingEnabled) case r: Restart ⇒ new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None) - case Resume(loggingEnabled) ⇒ new Resumer(initialBehavior, loggingEnabled) - case Stop(loggingEnabled) ⇒ new Stopper(initialBehavior, loggingEnabled) + case Stop(loggingEnabled) ⇒ new Stopper(initialBehavior, loggingEnabled) case b: Backoff ⇒ val backoffRestarter = new BackoffRestarter( @@ -46,6 +48,195 @@ import scala.util.control.NonFatal } +abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { + + private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass + + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { + other match { + case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass ⇒ true + case _ ⇒ false + } + } + + override def preStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { + try { + target.start(ctx) + } catch handleExceptionOnStart(ctx) + } + + def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { + try { + target(ctx, signal) + } catch handleSignalException(ctx, target) + } + + def log(ctx: ActorContext[_], t: Throwable): Unit = { + if (ss.loggingEnabled) { + ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage) + } + } + + protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]] + protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]] + protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]] +} + +/** + * For cases where O == I for BehaviorInterceptor. + */ +abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { + + override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + try { + target(ctx, msg) + } catch handleReceiveException(ctx, target) + } + + protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(_: Thr) ⇒ + Behaviors.stopped + } + + // convenience if target not required to handle exception + protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] = + handleException(ctx) + protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] = + handleException(ctx) + protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = + handleException(ctx) +} + +class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { + override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + Behaviors.stopped + } +} + +class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { + override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + Behaviors.same + } +} + +// FIXME tests + impl of resetting time +class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { + + var restarts = 0 + + override def preStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { + try { + target.start(ctx) + } catch { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + restarts += 1 + // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop + if (restarts != strategy.maxNrOfRetries && strategy.maxNrOfRetries != -1) { + preStart(ctx, target) + } else { + throw t + } + } + } + + private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + restarts += 1 + signalTarget(PreRestart) + if (restarts != strategy.maxNrOfRetries) { + // TODO what about exceptions here? + Behavior.validateAsInitial(Behavior.start(initial, ctx)) + } else { + throw t + } + } + + + override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, s ⇒ target(ctx, s)) + } + override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, s ⇒ target.signal(ctx, s)) + } +} + +class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) { + + import BackoffRestarter._ + + var blackhole = false + var restartCount: Int = 0 + + override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + try { + msg match { + case ScheduledRestart ⇒ + blackhole = false + // TODO do we need to start it? + ctx.asScala.log.info("Scheduled restart") + ctx.asScala.schedule(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) + + try { + Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]])) + } catch { + case NonFatal(ex: Thr) ⇒ + val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) + ctx.asScala.log.info("Failure during initialisation") + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + restartCount += 1 + blackhole = true + Behaviors.empty + } + case ResetRestartCount(current) ⇒ + println("Reset restart count: " + current) + if (current == restartCount) { + println("Resetting") + restartCount = 0 + } + Behavior.same + case _ ⇒ + // TODO publish dropped message + target(ctx, msg.asInstanceOf[T]) + } + } catch handleReceiveException(ctx, target) + } + + protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + scheduleRestart(ctx) + } + + protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + target.signal(ctx, PreRestart) + log(ctx, t) + scheduleRestart(ctx) + } + + protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + target(ctx, PreRestart) + log(ctx, t) + scheduleRestart(ctx) + } + + private def scheduleRestart(ctx: ActorContext[AnyRef]): Behavior[T] = { + val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + restartCount += 1 + blackhole = true + Behaviors.empty + } + +} + /** * INTERNAL API */ @@ -123,32 +314,6 @@ import scala.util.control.NonFatal } } -/** - * INTERNAL API - */ -@InternalApi private[akka] final class Resumer[T, Thr <: Throwable: ClassTag]( - override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { - - def init(ctx: ActorContext[T]) = { - // no handling of errors for Resume as that could lead to infinite restart-loop - val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx)) - if (Behavior.isAlive(started)) wrap(started, afterException = false) - else started - } - - override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - wrap(startedBehavior, afterException = true) - } - - override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] = - new Resumer[T, Thr](nextBehavior, loggingEnabled) - - override def toString = "resume" - -} - /** * INTERNAL API */ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index be68c470f0..dd0cfb28ac 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -5,6 +5,7 @@ package akka.actor.typed package scaladsl +import akka.actor.typed.SupervisorStrategy.{ Backoff, Restart, Resume, Stop } import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.actor.typed.internal._ @@ -187,12 +188,22 @@ object Behaviors { private final val NothingClassTag = ClassTag(classOf[Nothing]) private final val ThrowableClassTag = ClassTag(classOf[Throwable]) + final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal { /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = { val tag = implicitly[ClassTag[Thr]] val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag - Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) + strategy match { + case r: Resume ⇒ + Behaviors.intercept[T, T](new ResumeSupervisor(r)(effectiveTag))(wrapped) + case r: Restart ⇒ + Behaviors.intercept[T, T](new RestartSupervisor(wrapped, r)(effectiveTag))(wrapped) + case r: Stop ⇒ + Behaviors.intercept[T, T](new StopSupervisor(wrapped, r)(effectiveTag))(wrapped) + case r: Backoff ⇒ + Behaviors.intercept[AnyRef, T](new BackoffSupervisor(wrapped, r)(effectiveTag))(wrapped).asInstanceOf[Behavior[T]] + } } } From ea454a733d7714d98aa81e63122da6ec055af045 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 08:32:56 +0100 Subject: [PATCH 2/7] Reset timeout for fixed restarts --- .../testkit/typed/scaladsl/ActorTestKit.scala | 2 +- .../akka/actor/typed/InterceptSpec.scala | 2 +- .../akka/actor/typed/SupervisionSpec.scala | 28 +++++++++- .../actor/typed/BehaviorInterceptor.scala | 3 +- .../typed/internal/InterceptorImpl.scala | 4 +- .../akka/actor/typed/internal/Restarter.scala | 52 ++++++++++++------- .../internal/WithMdcBehaviorInterceptor.scala | 2 +- .../scala/akka/testkit/TestActorRefSpec.scala | 2 +- 8 files changed, 65 insertions(+), 30 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala index efacac2a76..6f38c3cd22 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala @@ -6,7 +6,7 @@ package akka.actor.testkit.typed.scaladsl import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } -import akka.annotation.{ ApiMayChange, InternalApi } +import akka.annotation.ApiMayChange import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index 731bd2fe90..85b1c010ad 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -176,7 +176,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit( "allow an interceptor to replace started behavior" in { val interceptor = new BehaviorInterceptor[String, String] { - override def preStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = { + override def aroundStart(ctx: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = { Behaviors.stopped } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 76a7d2d912..d72fbfc04b 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -389,8 +389,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( "stop when restart limit is hit" in { val probe = TestProbe[Event]("evt") + val resetTimeout = 500.millis val behv = Behaviors.supervise(targetBehavior(probe.ref)) - .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, 1.minute)) + .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout)) val ref = spawn(behv) ref ! IncrementState ref ! GetState @@ -408,6 +409,31 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectNoMessage() } + "reset fixed limit after timeout" in { + val probe = TestProbe[Event]("evt") + val resetTimeout = 500.millis + val behv = Behaviors.supervise(targetBehavior(probe.ref)) + .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, resetTimeout)) + val ref = spawn(behv) + ref ! IncrementState + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + + EventFilter[Exc2](occurrences = 3).intercept { + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + + probe.expectNoMessage(resetTimeout + 50.millis) + + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + } + ref ! GetState + probe.expectMessage(State(0, Map.empty)) + } + "NOT stop children when restarting" in { val parentProbe = TestProbe[Event]("evt") val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index 502237cd82..ec4067da55 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] { * @return The returned behavior will be the "started" behavior of the actor used to accept * the next message or signal. */ - def preStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = + def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = target.start(ctx) /** @@ -72,7 +72,6 @@ object BehaviorInterceptor { @DoNotInherit trait ReceiveTarget[T] { def apply(ctx: ActorContext[_], msg: T): Behavior[T] - def current(): Behavior[T] def signal(ctx: ActorContext[_], signal: Signal): Behavior[T] } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 0b9ccffeed..08784bb12d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -50,8 +50,6 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] = Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal) - - override def current(): Behavior[I] = nestedBehavior } private val signalTarget = new SignalTarget[I] { @@ -61,7 +59,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce // invoked pre-start to start/de-duplicate the initial behavior stack def preStart(ctx: typed.ActorContext[O]): Behavior[O] = { - val started = interceptor.preStart(ctx, preStartTarget) + val started = interceptor.aroundStart(ctx, preStartTarget) deduplicate(started, ctx) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 88a9183cf2..63ee00ad30 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -8,9 +8,8 @@ package internal import java.util.concurrent.ThreadLocalRandom import akka.actor.DeadLetterSuppression -import akka.actor.typed.BehaviorInterceptor.{ ReceiveTarget, SignalTarget } +import akka.actor.typed.BehaviorInterceptor.SignalTarget import akka.actor.typed.SupervisorStrategy._ -import akka.actor.typed.internal.BackoffRestarter.ScheduledRestart import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.util.{ OptionVal, PrettyDuration } @@ -59,7 +58,7 @@ abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy } } - override def preStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { + override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { try { target.start(ctx) } catch handleExceptionOnStart(ctx) @@ -127,36 +126,49 @@ class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends Simple class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { var restarts = 0 + var deadline: OptionVal[Deadline] = OptionVal.None - override def preStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { + private def deadlineHasTimeLeft: Boolean = deadline match { + case OptionVal.None ⇒ true + case OptionVal.Some(d) ⇒ d.hasTimeLeft + } + + override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { try { target.start(ctx) } catch { case NonFatal(t: Thr) ⇒ - log(ctx, t) - restarts += 1 // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop - if (restarts != strategy.maxNrOfRetries && strategy.maxNrOfRetries != -1) { - preStart(ctx, target) - } else { + log(ctx, t) + if (((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft) || strategy.maxNrOfRetries == -1) { throw t + } else { + ctx.asScala.log.info("Trying restart") + restart(ctx, t) + aroundStart(ctx, target) } } } - private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - log(ctx, t) - restarts += 1 - signalTarget(PreRestart) - if (restarts != strategy.maxNrOfRetries) { - // TODO what about exceptions here? - Behavior.validateAsInitial(Behavior.start(initial, ctx)) - } else { - throw t - } + private def restart(ctx: ActorContext[_], t: Throwable) = { + val timeLeft = deadlineHasTimeLeft + val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) + restarts = if (timeLeft) restarts + 1 else 0 + deadline = newDeadline } + private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { + throw t + } else { + signalTarget(PreRestart) + log(ctx, t) + restart(ctx, t) + // TODO what about exceptions here? + Behavior.validateAsInitial(Behavior.start(initial, ctx)) + } + } override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, s ⇒ target(ctx, s)) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala index 0c6cd82dcc..1bf0269ed4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala @@ -38,7 +38,7 @@ import scala.collection.immutable.HashMap import BehaviorInterceptor._ - override def preStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = { + override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = { // when declaring we expect the outermost to win // for example with // val behavior = ... diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 462969d564..1e8a2eb859 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -170,7 +170,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA "stop when sent a poison pill" in { EventFilter[ActorKilledException]() intercept { val a = TestActorRef(Props[WorkerActor]) - val forwarder = system.actorOf(Props(new Actor { + system.actorOf(Props(new Actor { context.watch(a) def receive = { case t: Terminated ⇒ testActor forward WrappedTerminated(t) From 6a8092480eb967a6cbd62e9b42caed79c624772c Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 11:29:09 +0100 Subject: [PATCH 3/7] Remove old restarters --- .../testkit/typed/scaladsl/ActorTestKit.scala | 5 +- .../actor/typed/BehaviorInterceptor.scala | 14 +- .../akka/actor/typed/SupervisorStrategy.scala | 2 + .../typed/internal/InterceptorImpl.scala | 6 +- .../akka/actor/typed/internal/Restarter.scala | 565 ------------------ .../actor/typed/internal/Supervision.scala | 271 +++++++++ .../akka/actor/typed/scaladsl/Behaviors.scala | 16 +- 7 files changed, 291 insertions(+), 588 deletions(-) delete mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala index 6f38c3cd22..667eb089fb 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala @@ -6,14 +6,13 @@ package akka.actor.testkit.typed.scaladsl import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } -import akka.annotation.ApiMayChange +import akka.annotation.{ ApiMayChange, InternalApi } import akka.actor.testkit.typed.TestKitSettings - import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils } import com.typesafe.config.{ Config, ConfigFactory } + import scala.concurrent.Await import scala.concurrent.duration._ - import akka.actor.Scheduler import akka.util.Timeout diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index ec4067da55..913687d3f2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -4,7 +4,7 @@ package akka.actor.typed -import akka.annotation.DoNotInherit +import akka.annotation.{ DoNotInherit, InternalApi } /** * A behavior interceptor allows for intercepting message and signal reception and perform arbitrary logic - @@ -72,7 +72,17 @@ object BehaviorInterceptor { @DoNotInherit trait ReceiveTarget[T] { def apply(ctx: ActorContext[_], msg: T): Behavior[T] - def signal(ctx: ActorContext[_], signal: Signal): Behavior[T] + + /** + * INTERNAL API + * + * Signal that the received message will result in a simulated restart + * by the [[BehaviorInterceptor]]. A [[PreRestart]] will be sent to the + * current behavior but the returned Behavior is ignored as a restart + * is taking place. + */ + @InternalApi + private[akka] def signalRestart(ctx: ActorContext[_]): Unit } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index 9d52c7a200..c2608e9d1d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -158,6 +158,8 @@ object SupervisorStrategy { override def withLoggingEnabled(enabled: Boolean): SupervisorStrategy = copy(loggingEnabled = enabled) + + def unlimitedRestarts(): Boolean = maxNrOfRetries == -1 } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 08784bb12d..17bd00fc0d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -7,7 +7,7 @@ package akka.actor.typed.internal import akka.actor.typed import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior } import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg -import akka.actor.typed.{ ActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, Signal } +import akka.actor.typed.{ ActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal } import akka.annotation.InternalApi import akka.util.LineNumbers @@ -48,8 +48,8 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce override def apply(ctx: ActorContext[_], msg: I): Behavior[I] = Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], msg) - override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] = - Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal) + override def signalRestart(ctx: ActorContext[_]): Unit = + Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], PreRestart) } private val signalTarget = new SignalTarget[I] { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala deleted file mode 100644 index 63ee00ad30..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ /dev/null @@ -1,565 +0,0 @@ -/** - * Copyright (C) 2016-2018 Lightbend Inc. - */ - -package akka.actor.typed -package internal - -import java.util.concurrent.ThreadLocalRandom - -import akka.actor.DeadLetterSuppression -import akka.actor.typed.BehaviorInterceptor.SignalTarget -import akka.actor.typed.SupervisorStrategy._ -import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi -import akka.util.{ OptionVal, PrettyDuration } - -import scala.concurrent.duration.{ Deadline, FiniteDuration } -import scala.reflect.ClassTag -import scala.util.control.Exception.Catcher -import scala.util.control.NonFatal -import scala.concurrent.duration._ - -/** - * INTERNAL API - */ -@InternalApi private[akka] object Supervisor { - def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = - Behaviors.setup[T] { ctx ⇒ - val supervisor: Supervisor[T, Thr] = strategy match { - case Restart(-1, _, loggingEnabled) ⇒ - new Restarter(initialBehavior, initialBehavior, loggingEnabled) - case r: Restart ⇒ - new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None) - case Stop(loggingEnabled) ⇒ new Stopper(initialBehavior, loggingEnabled) - case b: Backoff ⇒ - val backoffRestarter = - new BackoffRestarter( - initialBehavior.asInstanceOf[Behavior[Any]], - initialBehavior.asInstanceOf[Behavior[Any]], - b, restartCount = 0, blackhole = false) - backoffRestarter - .asInstanceOf[Supervisor[T, Thr]] - } - - supervisor.init(ctx) - } - -} - -abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { - - private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass - - override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { - other match { - case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass ⇒ true - case _ ⇒ false - } - } - - override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { - try { - target.start(ctx) - } catch handleExceptionOnStart(ctx) - } - - def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { - try { - target(ctx, signal) - } catch handleSignalException(ctx, target) - } - - def log(ctx: ActorContext[_], t: Throwable): Unit = { - if (ss.loggingEnabled) { - ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage) - } - } - - protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]] - protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]] - protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]] -} - -/** - * For cases where O == I for BehaviorInterceptor. - */ -abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { - - override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { - try { - target(ctx, msg) - } catch handleReceiveException(ctx, target) - } - - protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { - case NonFatal(_: Thr) ⇒ - Behaviors.stopped - } - - // convenience if target not required to handle exception - protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] = - handleException(ctx) - protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] = - handleException(ctx) - protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = - handleException(ctx) -} - -class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { - override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - log(ctx, t) - Behaviors.stopped - } -} - -class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { - override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - log(ctx, t) - Behaviors.same - } -} - -// FIXME tests + impl of resetting time -class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { - - var restarts = 0 - var deadline: OptionVal[Deadline] = OptionVal.None - - private def deadlineHasTimeLeft: Boolean = deadline match { - case OptionVal.None ⇒ true - case OptionVal.Some(d) ⇒ d.hasTimeLeft - } - - override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { - try { - target.start(ctx) - } catch { - case NonFatal(t: Thr) ⇒ - // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop - log(ctx, t) - if (((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft) || strategy.maxNrOfRetries == -1) { - throw t - } else { - ctx.asScala.log.info("Trying restart") - restart(ctx, t) - aroundStart(ctx, target) - } - } - } - - private def restart(ctx: ActorContext[_], t: Throwable) = { - val timeLeft = deadlineHasTimeLeft - val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) - restarts = if (timeLeft) restarts + 1 else 0 - deadline = newDeadline - } - - private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { - throw t - } else { - signalTarget(PreRestart) - log(ctx, t) - restart(ctx, t) - // TODO what about exceptions here? - Behavior.validateAsInitial(Behavior.start(initial, ctx)) - } - } - - override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { - handleException(ctx, s ⇒ target(ctx, s)) - } - override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = { - handleException(ctx, s ⇒ target.signal(ctx, s)) - } -} - -class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) { - - import BackoffRestarter._ - - var blackhole = false - var restartCount: Int = 0 - - override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { - try { - msg match { - case ScheduledRestart ⇒ - blackhole = false - // TODO do we need to start it? - ctx.asScala.log.info("Scheduled restart") - ctx.asScala.schedule(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) - - try { - Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]])) - } catch { - case NonFatal(ex: Thr) ⇒ - val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) - ctx.asScala.log.info("Failure during initialisation") - ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) - restartCount += 1 - blackhole = true - Behaviors.empty - } - case ResetRestartCount(current) ⇒ - println("Reset restart count: " + current) - if (current == restartCount) { - println("Resetting") - restartCount = 0 - } - Behavior.same - case _ ⇒ - // TODO publish dropped message - target(ctx, msg.asInstanceOf[T]) - } - } catch handleReceiveException(ctx, target) - } - - protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[akka.actor.typed.Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - log(ctx, t) - scheduleRestart(ctx) - } - - protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - target.signal(ctx, PreRestart) - log(ctx, t) - scheduleRestart(ctx) - } - - protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = { - case NonFatal(t: Thr) ⇒ - target(ctx, PreRestart) - log(ctx, t) - scheduleRestart(ctx) - } - - private def scheduleRestart(ctx: ActorContext[AnyRef]): Behavior[T] = { - val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) - ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) - restartCount += 1 - blackhole = true - Behaviors.empty - } - -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] abstract class Supervisor[T, Thr <: Throwable: ClassTag] extends ExtensibleBehavior[T] with WrappingBehavior[T, T] { - - private[akka] def throwableClass = implicitly[ClassTag[Thr]].runtimeClass - - protected def loggingEnabled: Boolean - - /** - * Invoked when the actor is created (or re-created on restart) this is where a restarter implementation - * can provide logic for dealing with exceptions thrown when running any actor initialization logic (undeferring). - * - * Note that the logic must take care to not wrap StoppedBehavior to avoid creating zombie behaviors that keep - * running although stopped. - * - * @return The initial behavior of the actor after undeferring if needed - */ - def init(ctx: ActorContext[T]): Behavior[T] - - /** - * Current behavior - */ - protected def behavior: Behavior[T] - - def nestedBehavior: Behavior[T] = behavior - - def replaceNested(newNested: Behavior[T]): Behavior[T] = wrap(newNested, afterException = false) - - /** - * Wrap next behavior in a concrete restarter again. - */ - protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] - - protected def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] - - protected def restart(ctx: ActorContext[T], initialBehavior: Behavior[T], startedBehavior: Behavior[T]): Behavior[T] = { - try Behavior.interpretSignal(startedBehavior, ctx, PreRestart) catch { - case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") - } - wrap(initialBehavior, afterException = true) match { - case s: Supervisor[T, Thr] ⇒ s.init(ctx) - case b ⇒ b - } - } - - protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { - val started = Behavior.start(nextBehavior, ctx) - - val throwableAlreadyHandled = Behavior.existsInStack(started) { - case s: Supervisor[T, Thr] if s.throwableClass == throwableClass ⇒ true - case _ ⇒ false - } - if (throwableAlreadyHandled) started - else Behavior.wrap[T, T](behavior, started, ctx)(wrap(_, afterException = false)) - } - - override def receiveSignal(ctx: ActorContext[T], signal: Signal): Behavior[T] = { - try { - val b = Behavior.interpretSignal(behavior, ctx, signal) - supervise(b, ctx) - } catch handleException(ctx, behavior) - } - - override def receive(ctx: ActorContext[T], msg: T): Behavior[T] = { - try { - val b = Behavior.interpretMessage(behavior, ctx, msg) - supervise(b, ctx) - } catch handleException(ctx, behavior) - } - - protected def log(ctx: ActorContext[T], ex: Thr): Unit = { - if (loggingEnabled) - ctx.asScala.log.error(ex, "Supervisor [{}] saw failure: {}", this, ex.getMessage) - } -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class Stopper[T, Thr <: Throwable: ClassTag]( - override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { - - def init(ctx: ActorContext[T]): Behavior[T] = { - try { - val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx)) - if (Behavior.isAlive(started)) wrap(started, false) - else started - } catch { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - Behavior.stopped - } - } - - override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - Behaviors.stopped - } - - override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] = - new Stopper[T, Thr](nextBehavior, loggingEnabled) - - override def toString = "stop" - -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class Restarter[T, Thr <: Throwable: ClassTag]( - initialBehavior: Behavior[T], override val behavior: Behavior[T], - override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { - - override def init(ctx: ActorContext[T]) = { - // no handling of errors for Restart as that could lead to infinite restart-loop - val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx)) - if (Behavior.isAlive(started)) wrap(started, afterException = false) - else started - } - - override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - restart(ctx, initialBehavior, startedBehavior) - } - - override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] = - new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled) - - override def toString = "restart" -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class LimitedRestarter[T, Thr <: Throwable: ClassTag]( - initialBehavior: Behavior[T], override val behavior: Behavior[T], - strategy: Restart, retries: Int, deadline: OptionVal[Deadline]) extends Supervisor[T, Thr] { - - override def loggingEnabled: Boolean = strategy.loggingEnabled - - override def init(ctx: ActorContext[T]) = - try { - val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx)) - if (Behavior.isAlive(started)) wrap(started, afterException = false) - else started - } catch { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - // we haven't actually wrapped and increased retries yet, so need to compare with +1 - if (deadlineHasTimeLeft && (retries + 1) >= strategy.maxNrOfRetries) throw ex - else { - wrap(initialBehavior, afterException = true) match { - case s: Supervisor[T, Thr] ⇒ s.init(ctx) - case b ⇒ b - } - } - } - - private def deadlineHasTimeLeft: Boolean = deadline match { - case OptionVal.None ⇒ true - case OptionVal.Some(d) ⇒ d.hasTimeLeft - } - - override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - if (deadlineHasTimeLeft && retries >= strategy.maxNrOfRetries) - throw ex - else - restart(ctx, initialBehavior, startedBehavior) - } - - override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] = { - val restarter = if (afterException) { - val timeLeft = deadlineHasTimeLeft - val newRetries = if (timeLeft) retries + 1 else 1 - val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) - new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, newRetries, newDeadline) - } else - new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, retries, deadline) - - restarter - } - - override def toString = s"restartWithLimit(${strategy.maxNrOfRetries}, ${PrettyDuration.format(strategy.withinTimeRange)})" -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] object BackoffRestarter { - /** - * Calculates an exponential back off delay. - */ - def calculateDelay( - restartCount: Int, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double): FiniteDuration = { - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor - if (restartCount >= 30) // Duration overflow protection (> 100 years) - maxBackoff - else - maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match { - case f: FiniteDuration ⇒ f - case _ ⇒ maxBackoff - } - } - - case object ScheduledRestart - final case class ResetRestartCount(current: Int) extends DeadLetterSuppression -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class BackoffRestarter[T, Thr <: Throwable: ClassTag]( - initialBehavior: Behavior[Any], override val behavior: Behavior[Any], - strategy: Backoff, restartCount: Int, blackhole: Boolean) extends Supervisor[Any, Thr] { - - // TODO using Any here because the scheduled messages can't be of type T. - - import BackoffRestarter._ - - override def loggingEnabled: Boolean = strategy.loggingEnabled - - // FIXME weird hack here to avoid having any Behavior.start call start the supervised behavior early when we are backing off - // maybe we can solve this in a less strange way? - override def nestedBehavior: Behavior[Any] = - if (blackhole) { - // if we are currently backing off, we don't want someone outside to start any deferred behaviors - Behaviors.empty - } else { - behavior - } - - override def replaceNested(newNested: Behavior[Any]): Behavior[Any] = { - if (blackhole) { - // if we are currently backing off, we don't want someone outside to replace our inner behavior - this - } else { - super.replaceNested(newNested) - } - } - - def init(ctx: ActorContext[Any]) = - try { - val started = Behavior.validateAsInitial(Behavior.start(initialBehavior, ctx)) - if (Behavior.isAlive(started)) wrap(started, afterException = false) - else started - } catch { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - val restartDelay = calculateDelay(restartCount, strategy.minBackoff, strategy.maxBackoff, strategy.randomFactor) - ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) - new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount + 1, blackhole = true) - } - - override def receiveSignal(ctx: ActorContext[Any], signal: Signal): Behavior[Any] = { - if (blackhole) { - import scaladsl.adapter._ - ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self)) - Behavior.same - } else - super.receiveSignal(ctx, signal) - } - - override def receive(ctx: ActorContext[Any], msg: Any): Behavior[Any] = { - // intercept the scheduled messages and drop incoming messages if we are in backoff mode - msg match { - case ScheduledRestart ⇒ - // actual restart after scheduled backoff delay - ctx.asScala.schedule(strategy.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) - new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount, blackhole = false).init(ctx) - case ResetRestartCount(current) ⇒ - if (current == restartCount) - new BackoffRestarter[T, Thr](initialBehavior, behavior, strategy, restartCount = 0, blackhole) - else - Behavior.same - case _ ⇒ - if (blackhole) { - import scaladsl.adapter._ - ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self)) - Behavior.same - } else - super.receive(ctx, msg) - } - } - - override def handleException(ctx: ActorContext[Any], startedBehavior: Behavior[Any]): Catcher[Supervisor[Any, Thr]] = { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - // actual restart happens after the scheduled backoff delay - try Behavior.interpretSignal(behavior, ctx, PreRestart) catch { - case NonFatal(ex2) ⇒ ctx.asScala.log.error(ex2, "failure during PreRestart") - } - val restartDelay = calculateDelay(restartCount, strategy.minBackoff, strategy.maxBackoff, strategy.randomFactor) - ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) - new BackoffRestarter[T, Thr](initialBehavior, startedBehavior, strategy, restartCount + 1, blackhole = true) - } - - override protected def wrap(nextBehavior: Behavior[Any], afterException: Boolean): Behavior[Any] = { - if (afterException) - throw new IllegalStateException("wrap not expected afterException in BackoffRestarter") - else - new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole) - } - - override def toString = s"restartWithBackoff(${PrettyDuration.format(strategy.minBackoff)}, ${PrettyDuration.format(strategy.maxBackoff)}, ${strategy.randomFactor})" -} - diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala new file mode 100644 index 0000000000..536503c22b --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ + +package akka.actor.typed +package internal + +import java.util.concurrent.ThreadLocalRandom + +import akka.actor.DeadLetterSuppression +import akka.actor.typed.BehaviorInterceptor.SignalTarget +import akka.actor.typed.SupervisorStrategy._ +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.InternalApi +import akka.util.OptionVal + +import scala.concurrent.duration.{ Deadline, FiniteDuration } +import scala.reflect.ClassTag +import scala.util.control.Exception.Catcher +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi private[akka] object Supervisor { + def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = { + strategy match { + case r: Resume ⇒ + Behaviors.intercept[T, T](new ResumeSupervisor(r))(initialBehavior) + case r: Restart ⇒ + Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior) + case r: Stop ⇒ + Behaviors.intercept[T, T](new StopSupervisor(initialBehavior, r))(initialBehavior) + case r: Backoff ⇒ + Behaviors.intercept[AnyRef, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior).asInstanceOf[Behavior[T]] + } + } +} + +abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { + + private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass + + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { + other match { + case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass ⇒ true + case _ ⇒ false + } + } + + override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { + try { + target.start(ctx) + } catch handleExceptionOnStart(ctx) + } + + def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { + try { + target(ctx, signal) + } catch handleSignalException(ctx, target) + } + + def log(ctx: ActorContext[_], t: Throwable): Unit = { + if (ss.loggingEnabled) { + ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage) + } + } + + protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]] + protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]] + protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]] +} + +/** + * For cases where O == I for BehaviorInterceptor. + */ +abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { + + override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + try { + target(ctx, msg) + } catch handleReceiveException(ctx, target) + } + + protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(_: Thr) ⇒ + Behaviors.stopped + } + + // convenience if target not required to handle exception + protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] = + handleException(ctx) + protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] = + handleException(ctx) + protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = + handleException(ctx) +} + +class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { + override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + Behaviors.stopped + } +} + +class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { + override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + Behaviors.same + } +} + +class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { + + var restarts = 0 + var deadline: OptionVal[Deadline] = OptionVal.None + + private def deadlineHasTimeLeft: Boolean = deadline match { + case OptionVal.None ⇒ true + case OptionVal.Some(d) ⇒ d.hasTimeLeft + } + + override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { + try { + target.start(ctx) + } catch { + case NonFatal(t: Thr) ⇒ + // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop + log(ctx, t) + if (strategy.unlimitedRestarts() || ((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft)) { + throw t + } else { + restart(ctx, t) + aroundStart(ctx, target) + } + } + } + + private def restart(ctx: ActorContext[_], t: Throwable) = { + val timeLeft = deadlineHasTimeLeft + val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) + restarts = if (timeLeft) restarts + 1 else 0 + deadline = newDeadline + } + + private def handleException(ctx: ActorContext[T], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { + throw t + } else { + try { + signalRestart() + } catch { + case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") + } + log(ctx, t) + restart(ctx, t) + Behavior.validateAsInitial(Behavior.start(initial, ctx)) + } + } + + override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, () ⇒ target(ctx, PreRestart)) + } + override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, () ⇒ target.signalRestart(ctx)) + } +} + +class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) { + + import BackoffSupervisor._ + + var blackhole = false + var restartCount: Int = 0 + + override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + try { + msg match { + case ScheduledRestart ⇒ + blackhole = false + ctx.asScala.schedule(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) + try { + Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]])) + } catch { + case NonFatal(ex: Thr) ⇒ + log(ctx, ex) + val restartDelay = BackoffSupervisor.calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + restartCount += 1 + blackhole = true + Behaviors.empty + } + case ResetRestartCount(current) ⇒ + println("Reset restart count: " + current) + if (current == restartCount) { + println("Resetting") + restartCount = 0 + } + Behavior.same + case _ ⇒ + // TODO publish dropped message + target(ctx, msg.asInstanceOf[T]) + } + } catch handleReceiveException(ctx, target) + } + + protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + scheduleRestart(ctx, t) + } + + protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + try { + target.signalRestart(ctx) + } catch { + case NonFatal(t) ⇒ ctx.asScala.log.error(t, "failure during PreRestart") + } + scheduleRestart(ctx, t) + } + + protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + try { + target(ctx, PreRestart) + } catch { + case NonFatal(t) ⇒ ctx.asScala.log.error(t, "failure during PreRestart") + } + scheduleRestart(ctx, t) + } + + private def scheduleRestart(ctx: ActorContext[AnyRef], reason: Throwable): Behavior[T] = { + log(ctx, reason) + val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + restartCount += 1 + blackhole = true + Behaviors.empty + } + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object BackoffSupervisor { + /** + * Calculates an exponential back off delay. + */ + def calculateDelay( + restartCount: Int, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): FiniteDuration = { + val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + if (restartCount >= 30) // Duration overflow protection (> 100 years) + maxBackoff + else + maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd match { + case f: FiniteDuration ⇒ f + case _ ⇒ maxBackoff + } + } + + case object ScheduledRestart + final case class ResetRestartCount(current: Int) extends DeadLetterSuppression +} + diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index dd0cfb28ac..cb24d655d8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -5,7 +5,6 @@ package akka.actor.typed package scaladsl -import akka.actor.typed.SupervisorStrategy.{ Backoff, Restart, Resume, Stop } import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.actor.typed.internal._ @@ -17,9 +16,6 @@ import scala.reflect.ClassTag @ApiMayChange object Behaviors { - private val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ () - private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)] - /** * `setup` is a factory for a behavior. Creation of the behavior instance is deferred until * the actor is started, as opposed to [[Behaviors.receive]] that creates the behavior instance @@ -188,22 +184,12 @@ object Behaviors { private final val NothingClassTag = ClassTag(classOf[Nothing]) private final val ThrowableClassTag = ClassTag(classOf[Throwable]) - final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal { /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = { val tag = implicitly[ClassTag[Thr]] val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag - strategy match { - case r: Resume ⇒ - Behaviors.intercept[T, T](new ResumeSupervisor(r)(effectiveTag))(wrapped) - case r: Restart ⇒ - Behaviors.intercept[T, T](new RestartSupervisor(wrapped, r)(effectiveTag))(wrapped) - case r: Stop ⇒ - Behaviors.intercept[T, T](new StopSupervisor(wrapped, r)(effectiveTag))(wrapped) - case r: Backoff ⇒ - Behaviors.intercept[AnyRef, T](new BackoffSupervisor(wrapped, r)(effectiveTag))(wrapped).asInstanceOf[Behavior[T]] - } + Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) } } From 79c6320bfcfa904fcaa310c0706e5d48bc6e942d Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 11:44:45 +0100 Subject: [PATCH 4/7] Dropped messages when backing off --- .../actor/testkit/typed/TestKitSettings.scala | 2 +- .../akka/actor/typed/SupervisionSpec.scala | 23 ++++++++++++++++ .../actor/typed/internal/Supervision.scala | 26 +++++++++++++++---- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala index 0a9bea09c5..62bb15cb97 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala @@ -15,7 +15,7 @@ import scala.util.control.NoStackTrace /** * Exception without stack trace to use for verifying exceptions in tests */ -final case class TE(message: String) extends RuntimeException(message) +final case class TE(message: String) extends RuntimeException(message) with NoStackTrace object TestKitSettings { /** diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index d72fbfc04b..a7dd41e862 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -506,6 +506,29 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } + "publish dropped messages while backing off" in { + val probe = TestProbe[Event]("evt") + val startedProbe = TestProbe[Event]("started") + val minBackoff = 10.seconds + val strategy = SupervisorStrategy + .restartWithBackoff(minBackoff, minBackoff, 0.0) + val behv = Behaviors.supervise(Behaviors.setup[Command] { _ ⇒ + startedProbe.ref ! Started + targetBehavior(probe.ref) + }).onFailure[Exception](strategy) + + val droppedMessagesProbe = TestProbe[Dropped]() + system.toUntyped.eventStream.subscribe(droppedMessagesProbe.ref.toUntyped, classOf[Dropped]) + val ref = spawn(behv) + EventFilter[Exc1](occurrences = 1).intercept { + startedProbe.expectMessage(Started) + ref ! Throw(new Exc1) + probe.expectMessage(GotSignal(PreRestart)) + } + ref ! Ping + droppedMessagesProbe.expectMessage(Dropped(Ping, ref)) + } + "restart after exponential backoff" in { val probe = TestProbe[Event]("evt") val startedProbe = TestProbe[Event]("started") diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 536503c22b..23fa6d87ba 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -141,12 +141,13 @@ class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Res private def restart(ctx: ActorContext[_], t: Throwable) = { val timeLeft = deadlineHasTimeLeft val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) - restarts = if (timeLeft) restarts + 1 else 0 + restarts = if (timeLeft) restarts + 1 else 1 deadline = newDeadline } private def handleException(ctx: ActorContext[T], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ + println(s"ex: $t. $restarts $deadlineHasTimeLeft $strategy") if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { throw t } else { @@ -176,6 +177,16 @@ class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: var blackhole = false var restartCount: Int = 0 + override def aroundSignal(ctx: ActorContext[AnyRef], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + if (blackhole) { + import akka.actor.typed.scaladsl.adapter._ + ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self)) + Behaviors.same + } else { + super.aroundSignal(ctx, signal, target) + } + } + override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { try { msg match { @@ -201,8 +212,13 @@ class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: } Behavior.same case _ ⇒ - // TODO publish dropped message - target(ctx, msg.asInstanceOf[T]) + if (blackhole) { + import akka.actor.typed.scaladsl.adapter._ + ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self)) + Behaviors.same + } else { + target(ctx, msg.asInstanceOf[T]) + } } } catch handleReceiveException(ctx, target) } @@ -217,7 +233,7 @@ class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: try { target.signalRestart(ctx) } catch { - case NonFatal(t) ⇒ ctx.asScala.log.error(t, "failure during PreRestart") + case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") } scheduleRestart(ctx, t) } @@ -227,7 +243,7 @@ class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: try { target(ctx, PreRestart) } catch { - case NonFatal(t) ⇒ ctx.asScala.log.error(t, "failure during PreRestart") + case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") } scheduleRestart(ctx, t) } From 58330ad03e782daf59210512b1faf0f3daf76d84 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 12:49:37 +0100 Subject: [PATCH 5/7] Private all the things --- .../akka/actor/typed/SupervisionSpec.scala | 1 - .../actor/typed/internal/Supervision.scala | 26 +++++++------------ 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index a7dd41e862..07bbabad99 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -820,7 +820,6 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( actor ! "give me stacktrace" val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] - stacktrace.foreach(println) stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 23fa6d87ba..5fff701827 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -37,7 +37,7 @@ import scala.util.control.NonFatal } } -abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { +private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass @@ -61,7 +61,7 @@ abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy } def log(ctx: ActorContext[_], t: Throwable): Unit = { - if (ss.loggingEnabled) { + if (strategy.loggingEnabled) { ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage) } } @@ -74,7 +74,7 @@ abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy /** * For cases where O == I for BehaviorInterceptor. */ -abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { +private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { try { @@ -96,7 +96,7 @@ abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStr handleException(ctx) } -class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { +private class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ log(ctx, t) @@ -104,7 +104,7 @@ class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strate } } -class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { +private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ log(ctx, t) @@ -112,10 +112,10 @@ class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends Simple } } -class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { +private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { - var restarts = 0 - var deadline: OptionVal[Deadline] = OptionVal.None + private var restarts = 0 + private var deadline: OptionVal[Deadline] = OptionVal.None private def deadlineHasTimeLeft: Boolean = deadline match { case OptionVal.None ⇒ true @@ -147,7 +147,6 @@ class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Res private def handleException(ctx: ActorContext[T], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ - println(s"ex: $t. $restarts $deadlineHasTimeLeft $strategy") if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) { throw t } else { @@ -170,7 +169,7 @@ class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Res } } -class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) { +private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) { import BackoffSupervisor._ @@ -205,9 +204,7 @@ class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Behaviors.empty } case ResetRestartCount(current) ⇒ - println("Reset restart count: " + current) if (current == restartCount) { - println("Resetting") restartCount = 0 } Behavior.same @@ -259,10 +256,7 @@ class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: } -/** - * INTERNAL API - */ -@InternalApi private[akka] object BackoffSupervisor { +private object BackoffSupervisor { /** * Calculates an exponential back off delay. */ From a7d778ad503ea334295608598ad71c210ab586e5 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 15:44:12 +0100 Subject: [PATCH 6/7] Review feedback --- .../main/scala/akka/actor/typed/internal/Supervision.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 5fff701827..ed28b72571 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -37,6 +37,10 @@ import scala.util.control.NonFatal } } +/** + * INTERNAL API + */ +@InternalApi private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass @@ -62,7 +66,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe def log(ctx: ActorContext[_], t: Throwable): Unit = { if (strategy.loggingEnabled) { - ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage) + ctx.asScala.log.error(t, "Supervisor {} saw failure: {}", this, t.getMessage) } } From 5ad6f2c56807985cbe5d6135ae20cd640ac3d48a Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 20 Sep 2018 16:37:26 +0100 Subject: [PATCH 7/7] Review: Don't double log exception --- .../src/test/scala/akka/actor/typed/SupervisionSpec.scala | 2 +- .../main/scala/akka/actor/typed/internal/Supervision.scala | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 07bbabad99..99cc90e7bb 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -685,7 +685,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( ) { EventFilter[ActorInitializationException](occurrences = 1).intercept { - EventFilter[TE](occurrences = 2).intercept { + EventFilter[TE](occurrences = 1).intercept { spawn(behv) // restarted 2 times before it gave up diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index ed28b72571..f8dc06968b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -38,8 +38,8 @@ import scala.util.control.NonFatal } /** - * INTERNAL API - */ + * INTERNAL API + */ @InternalApi private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { @@ -132,10 +132,11 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat } catch { case NonFatal(t: Thr) ⇒ // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop - log(ctx, t) if (strategy.unlimitedRestarts() || ((restarts + 1) >= strategy.maxNrOfRetries && deadlineHasTimeLeft)) { + // don't log here as it'll be logged as ActorInitializationException throw t } else { + log(ctx, t) restart(ctx, t) aroundStart(ctx, target) }