diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala index 62bb15cb97..0a9bea09c5 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/TestKitSettings.scala @@ -15,7 +15,7 @@ import scala.util.control.NoStackTrace /** * Exception without stack trace to use for verifying exceptions in tests */ -final case class TE(message: String) extends RuntimeException(message) with NoStackTrace +final case class TE(message: String) extends RuntimeException(message) object TestKitSettings { /** diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 9e1472ad48..76a7d2d912 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -255,6 +255,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( // FIXME eventfilter support in typed testkit import scaladsl.adapter._ + implicit val untypedSystem = system.toUntyped class FailingConstructorTestSetup(failCount: Int) { @@ -315,6 +316,18 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( } } + "stop when strategy is stop - exception in setup" in { + val probe = TestProbe[Event]("evt") + val failedSetup = Behaviors.setup[Command](_ ⇒ { + throw new Exc3() + targetBehavior(probe.ref) + }) + val behv = Behaviors.supervise(failedSetup).onFailure[Throwable](SupervisorStrategy.stop) + EventFilter[Exc3](occurrences = 1).intercept { + spawn(behv) + } + } + "support nesting exceptions with different strategies" in { val probe = TestProbe[Event]("evt") val behv = @@ -374,6 +387,27 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectMessage(State(0, Map.empty)) } + "stop when restart limit is hit" in { + val probe = TestProbe[Event]("evt") + val behv = Behaviors.supervise(targetBehavior(probe.ref)) + .onFailure[Exc1](SupervisorStrategy.restartWithLimit(2, 1.minute)) + val ref = spawn(behv) + ref ! IncrementState + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + + EventFilter[Exc2](occurrences = 3).intercept { + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PostStop)) + } + ref ! GetState + probe.expectNoMessage() + } + "NOT stop children when restarting" in { val parentProbe = TestProbe[Event]("evt") val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)) @@ -684,9 +718,9 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( actor ! "give me stacktrace" val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] - // supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException + // InterceptorImpl receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException // and then the IllegalArgument one is kept since it has a different throwable - stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2) + stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.InterceptorImpl.receive")) should ===(2) } "replace supervision when new returned behavior catches same exception nested in other behaviors" in { @@ -737,9 +771,8 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( actor ! "give me stacktrace" val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] - // supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException - // and then the IllegalArgument one is kept since it has a different throwable - stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2) + stacktrace.foreach(println) + stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.SimpleSupervisor.aroundReceive")) should ===(2) } "replace backoff supervision duplicate when behavior is created in a setup" in { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index bc6199fb3b..618794531d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -130,6 +130,7 @@ object Behavior { * that is not necessary. */ def same[T]: Behavior[T] = SameBehavior.asInstanceOf[Behavior[T]] + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior, including the hint that the diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index 71c4f88b8a..502237cd82 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] { * @return The returned behavior will be the "started" behavior of the actor used to accept * the next message or signal. */ - def preStart(ctx: ActorContext[I], target: PreStartTarget[I]): Behavior[I] = + def preStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = target.start(ctx) /** @@ -72,6 +72,8 @@ object BehaviorInterceptor { @DoNotInherit trait ReceiveTarget[T] { def apply(ctx: ActorContext[_], msg: T): Behavior[T] + def current(): Behavior[T] + def signal(ctx: ActorContext[_], signal: Signal): Behavior[T] } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 6e21664a7f..0b9ccffeed 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -47,6 +47,11 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] { override def apply(ctx: ActorContext[_], msg: I): Behavior[I] = Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], msg) + + override def signal(ctx: ActorContext[_], signal: Signal): Behavior[I] = + Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal) + + override def current(): Behavior[I] = nestedBehavior } private val signalTarget = new SignalTarget[I] { @@ -56,7 +61,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce // invoked pre-start to start/de-duplicate the initial behavior stack def preStart(ctx: typed.ActorContext[O]): Behavior[O] = { - val started = interceptor.preStart(ctx.asInstanceOf[ActorContext[I]], preStartTarget) + val started = interceptor.preStart(ctx, preStartTarget) deduplicate(started, ctx) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 56785a0265..88a9183cf2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -8,7 +8,9 @@ package internal import java.util.concurrent.ThreadLocalRandom import akka.actor.DeadLetterSuppression +import akka.actor.typed.BehaviorInterceptor.{ ReceiveTarget, SignalTarget } import akka.actor.typed.SupervisorStrategy._ +import akka.actor.typed.internal.BackoffRestarter.ScheduledRestart import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.util.{ OptionVal, PrettyDuration } @@ -17,6 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.reflect.ClassTag import scala.util.control.Exception.Catcher import scala.util.control.NonFatal +import scala.concurrent.duration._ /** * INTERNAL API @@ -29,8 +32,7 @@ import scala.util.control.NonFatal new Restarter(initialBehavior, initialBehavior, loggingEnabled) case r: Restart ⇒ new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None) - case Resume(loggingEnabled) ⇒ new Resumer(initialBehavior, loggingEnabled) - case Stop(loggingEnabled) ⇒ new Stopper(initialBehavior, loggingEnabled) + case Stop(loggingEnabled) ⇒ new Stopper(initialBehavior, loggingEnabled) case b: Backoff ⇒ val backoffRestarter = new BackoffRestarter( @@ -46,6 +48,195 @@ import scala.util.control.NonFatal } +abstract class AbstractSupervisor[O, I, Thr <: Throwable](ss: SupervisorStrategy)(implicit ev: ClassTag[Thr]) extends BehaviorInterceptor[O, I] { + + private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass + + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { + other match { + case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass ⇒ true + case _ ⇒ false + } + } + + override def preStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { + try { + target.start(ctx) + } catch handleExceptionOnStart(ctx) + } + + def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { + try { + target(ctx, signal) + } catch handleSignalException(ctx, target) + } + + def log(ctx: ActorContext[_], t: Throwable): Unit = { + if (ss.loggingEnabled) { + ctx.asScala.log.error(t, "Supervisor [{}] saw failure: {}", this, t.getMessage) + } + } + + protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]] + protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]] + protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]] +} + +/** + * For cases where O == I for BehaviorInterceptor. + */ +abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { + + override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + try { + target(ctx, msg) + } catch handleReceiveException(ctx, target) + } + + protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(_: Thr) ⇒ + Behaviors.stopped + } + + // convenience if target not required to handle exception + protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] = + handleException(ctx) + protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] = + handleException(ctx) + protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = + handleException(ctx) +} + +class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { + override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + Behaviors.stopped + } +} + +class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { + override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + Behaviors.same + } +} + +// FIXME tests + impl of resetting time +class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strategy: Restart)(implicit ev: ClassTag[Thr]) extends SimpleSupervisor[T, Thr](strategy) { + + var restarts = 0 + + override def preStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { + try { + target.start(ctx) + } catch { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + restarts += 1 + // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop + if (restarts != strategy.maxNrOfRetries && strategy.maxNrOfRetries != -1) { + preStart(ctx, target) + } else { + throw t + } + } + } + + private def handleException(ctx: ActorContext[T], signalTarget: Signal ⇒ Behavior[T]): Catcher[Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + restarts += 1 + signalTarget(PreRestart) + if (restarts != strategy.maxNrOfRetries) { + // TODO what about exceptions here? + Behavior.validateAsInitial(Behavior.start(initial, ctx)) + } else { + throw t + } + } + + + override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, s ⇒ target(ctx, s)) + } + override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = { + handleException(ctx, s ⇒ target.signal(ctx, s)) + } +} + +class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) { + + import BackoffRestarter._ + + var blackhole = false + var restartCount: Int = 0 + + override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + try { + msg match { + case ScheduledRestart ⇒ + blackhole = false + // TODO do we need to start it? + ctx.asScala.log.info("Scheduled restart") + ctx.asScala.schedule(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) + + try { + Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]])) + } catch { + case NonFatal(ex: Thr) ⇒ + val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) + ctx.asScala.log.info("Failure during initialisation") + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + restartCount += 1 + blackhole = true + Behaviors.empty + } + case ResetRestartCount(current) ⇒ + println("Reset restart count: " + current) + if (current == restartCount) { + println("Resetting") + restartCount = 0 + } + Behavior.same + case _ ⇒ + // TODO publish dropped message + target(ctx, msg.asInstanceOf[T]) + } + } catch handleReceiveException(ctx, target) + } + + protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + log(ctx, t) + scheduleRestart(ctx) + } + + protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + target.signal(ctx, PreRestart) + log(ctx, t) + scheduleRestart(ctx) + } + + protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = { + case NonFatal(t: Thr) ⇒ + target(ctx, PreRestart) + log(ctx, t) + scheduleRestart(ctx) + } + + private def scheduleRestart(ctx: ActorContext[AnyRef]): Behavior[T] = { + val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + restartCount += 1 + blackhole = true + Behaviors.empty + } + +} + /** * INTERNAL API */ @@ -123,32 +314,6 @@ import scala.util.control.NonFatal } } -/** - * INTERNAL API - */ -@InternalApi private[akka] final class Resumer[T, Thr <: Throwable: ClassTag]( - override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { - - def init(ctx: ActorContext[T]) = { - // no handling of errors for Resume as that could lead to infinite restart-loop - val started = Behavior.validateAsInitial(Behavior.start(behavior, ctx)) - if (Behavior.isAlive(started)) wrap(started, afterException = false) - else started - } - - override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - wrap(startedBehavior, afterException = true) - } - - override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Behavior[T] = - new Resumer[T, Thr](nextBehavior, loggingEnabled) - - override def toString = "resume" - -} - /** * INTERNAL API */ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index be68c470f0..dd0cfb28ac 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -5,6 +5,7 @@ package akka.actor.typed package scaladsl +import akka.actor.typed.SupervisorStrategy.{ Backoff, Restart, Resume, Stop } import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.actor.typed.internal._ @@ -187,12 +188,22 @@ object Behaviors { private final val NothingClassTag = ClassTag(classOf[Nothing]) private final val ThrowableClassTag = ClassTag(classOf[Throwable]) + final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal { /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = { val tag = implicitly[ClassTag[Thr]] val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag - Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) + strategy match { + case r: Resume ⇒ + Behaviors.intercept[T, T](new ResumeSupervisor(r)(effectiveTag))(wrapped) + case r: Restart ⇒ + Behaviors.intercept[T, T](new RestartSupervisor(wrapped, r)(effectiveTag))(wrapped) + case r: Stop ⇒ + Behaviors.intercept[T, T](new StopSupervisor(wrapped, r)(effectiveTag))(wrapped) + case r: Backoff ⇒ + Behaviors.intercept[AnyRef, T](new BackoffSupervisor(wrapped, r)(effectiveTag))(wrapped).asInstanceOf[Behavior[T]] + } } }