From 9dae4050ebbd311cb4663de9df9ee19864fb8112 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 2 Apr 2019 18:26:15 +0200 Subject: [PATCH] Always intercept TimerMsg, also when restarted, #26556 (#26650) * To avoid ClassCastException of TimerMsg if TimerMsg is already enqueued in mailbox and there is a restart with intiial behavior that is not using withTimers * let ActorAdapter be responsible of intercepting TimerMsg * instead of trying to keep the TimerInterceptor when restarting * more conistent cancelation of timers when exception/restart --- .../typed/internal/StubbedActorContext.scala | 1 + .../akka/actor/typed/InterceptSpec.scala | 47 +++++++++ .../scala/akka/actor/typed/TimerSpec.scala | 97 ++++++++++++++----- .../typed/internal/ActorContextImpl.scala | 7 ++ .../actor/typed/internal/Supervision.scala | 5 + .../typed/internal/TimerSchedulerImpl.scala | 51 +--------- .../typed/internal/adapter/ActorAdapter.scala | 25 ++++- .../actor/typed/scaladsl/ActorContext.scala | 13 +++ 8 files changed, 175 insertions(+), 71 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala index 060133a0f2..78fe313e39 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala @@ -306,4 +306,5 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T def clearUnhandled(): Unit = unhandled = Nil override private[akka] def currentBehavior: Behavior[T] = currentBehaviorProvider() + } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index f675386381..b82da40350 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -18,6 +18,26 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit object InterceptSpec { final case class Msg(hello: String, replyTo: ActorRef[String]) case object MyPoisonPill + + class SameTypeInterceptor extends BehaviorInterceptor[String, String] { + import BehaviorInterceptor._ + override def aroundReceive( + context: TypedActorContext[String], + message: String, + target: ReceiveTarget[String]): Behavior[String] = { + target(context, message) + } + + override def aroundSignal( + context: TypedActorContext[String], + signal: Signal, + target: SignalTarget[String]): Behavior[String] = { + target(context, signal) + } + + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = + other.isInstanceOf[SameTypeInterceptor] + } } class InterceptSpec extends ScalaTestWithActorTestKit(""" @@ -371,6 +391,33 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" probe.expectMessage("callback-post-stop") } } + + "not grow stack when nesting same interceptor" in { + def next(n: Int, p: ActorRef[Array[StackTraceElement]]): Behavior[String] = { + Behaviors.intercept(new SameTypeInterceptor) { + + Behaviors.receiveMessage { _ => + if (n == 20) { + val e = new RuntimeException().fillInStackTrace() + val trace = e.getStackTrace + p ! trace + Behaviors.stopped + } else { + next(n + 1, p) + } + } + } + } + + val probe = TestProbe[Array[StackTraceElement]]() + val ref = spawn(next(0, probe.ref)) + (1 to 21).foreach { n => + ref ! n.toString + } + val elements = probe.receiveMessage() + if (elements.count(_.getClassName == "SameTypeInterceptor") > 1) + fail(s"Stack contains SameTypeInterceptor more than once: \n${elements.mkString("\n\t")}") + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala index 8ee3d467ea..b2d9688a65 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.actor.DeadLetter +import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.TimerScheduler @@ -87,6 +88,7 @@ class TimerSpec extends ScalaTestWithActorTestKit(""" } "A timer" must { + "schedule non-repeated ticks" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val behv = Behaviors.withTimers[Command] { timer => @@ -297,28 +299,6 @@ class TimerSpec extends ScalaTestWithActorTestKit(""" ref ! "stop" } - "not grow stack when nesting withTimers" in { - def next(n: Int, probe: ActorRef[Array[StackTraceElement]]): Behavior[String] = Behaviors.withTimers { timers => - timers.startSingleTimer("key", "tick", 1.millis) - Behaviors.receiveMessage { message => - if (n == 20) { - val e = new RuntimeException().fillInStackTrace() - val trace = e.getStackTrace - probe ! trace - Behaviors.stopped - } else { - next(n + 1, probe) - } - } - } - - val probe = TestProbe[Array[StackTraceElement]]() - spawn(next(0, probe.ref)) - val elements = probe.receiveMessage() - if (elements.count(_.getClassName == "TimerInterceptor") > 1) - fail(s"Stack contains TimerInterceptor more than once: \n${elements.mkString("\n\t")}") - } - "not leak timers when PostStop is used" in { val probe = TestProbe[Any]() val ref = spawn(Behaviors.withTimers[String] { timers => @@ -337,4 +317,77 @@ class TimerSpec extends ScalaTestWithActorTestKit(""" probe.expectNoMessage(1.second) } } + + "discard TimerMsg on restart" in { + // reproducer of similar issue as #26556, ClassCastException TimerMsg + val probe = TestProbe[Event]("evt") + + def behv: Behavior[Command] = + Behaviors.receiveMessage[Command] { + case Tick(-1) => + probe.ref ! Tock(-1) + Behaviors.withTimers[Command] { timer => + timer.startSingleTimer("T0", Tick(0), 5.millis) + Behaviors.receiveMessage[Command] { + case Tick(0) => + probe.ref ! Tock(0) + timer.startSingleTimer("T1", Tick(1), 5.millis) + // let Tick(0) arrive in mailbox, test will not fail if it arrives later + Thread.sleep(100) + throw TestException("boom") + } + } + case Tick(n) => + probe.ref ! Tock(n) + Behaviors.same + case End => + Behaviors.stopped + } + + EventFilter[TestException](occurrences = 1).intercept { + val ref = spawn(Behaviors.supervise(behv).onFailure[TestException](SupervisorStrategy.restart)) + ref ! Tick(-1) + probe.expectMessage(Tock(-1)) + probe.expectMessage(Tock(0)) + probe.expectNoMessage() + + // confirm that it was restarted, and not stopped due to ClassCastException of TimerMsg + ref ! Tick(100) + probe.expectMessage(Tock(100)) + + ref ! End + } + } + + "discard TimerMsg when exception from withTimers block" in { + val probe = TestProbe[Event]("evt") + val behv = Behaviors.receiveMessage[Command] { + case Tick(-1) => + probe.ref ! Tock(-1) + Behaviors.withTimers[Command] { timer => + timer.startSingleTimer("T0", Tick(0), 5.millis) + // let Tick(0) arrive in mailbox, test will not fail if it arrives later + Thread.sleep(100) + throw TestException("boom") + } + case Tick(n) => + probe.ref ! Tock(n) + Behaviors.same + case End => + Behaviors.stopped + } + + EventFilter[TestException](occurrences = 1).intercept { + val ref = spawn(Behaviors.supervise(behv).onFailure[TestException](SupervisorStrategy.restart)) + ref ! Tick(-1) + probe.expectMessage(Tock(-1)) + probe.expectNoMessage() + + // confirm that it was restarted, and not stopped due to ClassCastException of TimerMsg + ref ! Tick(100) + probe.expectMessage(Tock(100)) + + ref ! End + } + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 3de4f64583..893568b344 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -42,6 +42,13 @@ import akka.util.JavaDurationConverters._ timer } + override private[akka] def hasTimer: Boolean = _timer.isDefined + + override private[akka] def cancelAllTimers(): Unit = { + if (hasTimer) + timer.cancelAll() + } + override def asJava: javadsl.ActorContext[T] = this override def asScala: scaladsl.ActorContext[T] = this diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 67c921c431..a95e6b3915 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -255,6 +255,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav ctx: TypedActorContext[O], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => + ctx.asScala.cancelAllTimers() strategy match { case _: Restart => // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop @@ -288,6 +289,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => + ctx.asScala.cancelAllTimers() if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) { strategy match { case _: Restart => throw t @@ -336,6 +338,9 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = { + // probably already done, but doesn't hurt to make sure they are canceled + ctx.asScala.cancelAllTimers() + strategy match { case backoff: Backoff => gotScheduledRestart = false diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index 0d91471604..c556973f3f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -5,16 +5,16 @@ package akka.actor.typed package internal -import akka.actor.typed.ActorRef.ActorRefOps +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Cancellable +import akka.actor.NotInfluenceReceiveTimeout import akka.actor.typed.scaladsl.ActorContext -import akka.actor.{ typed, Cancellable, NotInfluenceReceiveTimeout } import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.util.JavaDurationConverters._ import akka.util.OptionVal -import scala.concurrent.duration.FiniteDuration - /** * INTERNAL API */ @@ -32,8 +32,7 @@ import scala.concurrent.duration.FiniteDuration ctx match { case ctxImpl: ActorContextImpl[T] => val timerScheduler = ctxImpl.timer - val behavior = factory(timerScheduler) - timerScheduler.intercept(behavior) + factory(timerScheduler) case _ => throw new IllegalArgumentException(s"timers not supported with [${ctx.getClass}]") } @@ -142,44 +141,4 @@ import scala.concurrent.duration.FiniteDuration } } - def intercept(behavior: Behavior[T]): Behavior[T] = { - // The scheduled TimerMsg is intercepted to guard against old messages enqueued - // in mailbox before timer was canceled. - // Intercept some signals to cancel timers when restarting and stopping. - BehaviorImpl.intercept(new TimerInterceptor(this))(behavior) - } - -} - -/** - * INTERNAL API - */ -@InternalApi -private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T]) extends BehaviorInterceptor[T, T] { - import TimerSchedulerImpl._ - import BehaviorInterceptor._ - - override def aroundReceive(ctx: typed.TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { - val maybeIntercepted = msg match { - case msg: TimerMsg => timerSchedulerImpl.interceptTimerMsg(ctx.asScala.log, msg) - case msg => OptionVal.Some(msg) - } - - maybeIntercepted match { - case OptionVal.None => Behavior.same // None means not applicable - case OptionVal.Some(intercepted) => target(ctx, intercepted) - } - } - - override def aroundSignal(ctx: typed.TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = { - signal match { - case PreRestart | PostStop => timerSchedulerImpl.cancelAll() - case _ => // unhandled - } - target(ctx, signal) - } - - override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = - // only one timer interceptor per behavior stack is needed - other.isInstanceOf[TimerInterceptor[_]] } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 3da2372d2f..234f8bf865 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -14,15 +14,16 @@ import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.StoppedBehavior import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException import akka.annotation.InternalApi - import scala.annotation.tailrec import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.Exception.Catcher - import scala.annotation.switch +import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg +import akka.util.OptionVal + /** * INTERNAL API */ @@ -102,7 +103,21 @@ import scala.annotation.switch private def handleMessage(msg: T): Unit = { try { - next(Behavior.interpretMessage(behavior, ctx, msg), msg) + val c = ctx + if (c.hasTimer) { + msg match { + case timerMsg: TimerMsg => + c.timer.interceptTimerMsg(ctx.log, timerMsg) match { + case OptionVal.None => // means TimerMsg not applicable, discard + case OptionVal.Some(m) => + next(Behavior.interpretMessage(behavior, c, m), m) + } + case _ => + next(Behavior.interpretMessage(behavior, c, msg), msg) + } + } else { + next(Behavior.interpretMessage(behavior, c, msg), msg) + } } catch handleUnstashException } @@ -215,16 +230,19 @@ import scala.annotation.switch } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + ctx.cancelAllTimers() Behavior.interpretSignal(behavior, ctx, PreRestart) behavior = Behavior.stopped } override def postRestart(reason: Throwable): Unit = { + ctx.cancelAllTimers() behavior = validateAsInitial(Behavior.start(behavior, ctx)) if (!isAlive(behavior)) context.stop(self) } override def postStop(): Unit = { + ctx.cancelAllTimers() behavior match { case _: DeferredBehavior[_] => // Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination. @@ -232,6 +250,7 @@ import scala.annotation.switch } behavior = Behavior.stopped } + } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 4ed2e6c6dc..61777c0c5c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -308,6 +308,19 @@ trait ActorContext[T] extends TypedActorContext[T] { /** * INTERNAL API */ + @InternalApi private[akka] def currentBehavior: Behavior[T] + /** + * INTERNAL API + */ + @InternalApi + private[akka] def hasTimer: Boolean + + /** + * INTERNAL API + */ + @InternalApi + private[akka] def cancelAllTimers(): Unit + }