Always intercept TimerMsg, also when restarted, #26556 (#26650)

* To avoid ClassCastException of TimerMsg if TimerMsg is already enqueued
  in mailbox and there is a restart with intiial behavior that is not using withTimers
* let ActorAdapter be responsible of intercepting TimerMsg
  * instead of trying to keep the TimerInterceptor when restarting
* more conistent cancelation of timers when exception/restart
This commit is contained in:
Patrik Nordwall 2019-04-02 18:26:15 +02:00 committed by GitHub
parent c1a9a691a6
commit 9dae4050eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 175 additions and 71 deletions

View file

@ -306,4 +306,5 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T
def clearUnhandled(): Unit = unhandled = Nil
override private[akka] def currentBehavior: Behavior[T] = currentBehaviorProvider()
}

View file

@ -18,6 +18,26 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
object InterceptSpec {
final case class Msg(hello: String, replyTo: ActorRef[String])
case object MyPoisonPill
class SameTypeInterceptor extends BehaviorInterceptor[String, String] {
import BehaviorInterceptor._
override def aroundReceive(
context: TypedActorContext[String],
message: String,
target: ReceiveTarget[String]): Behavior[String] = {
target(context, message)
}
override def aroundSignal(
context: TypedActorContext[String],
signal: Signal,
target: SignalTarget[String]): Behavior[String] = {
target(context, signal)
}
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean =
other.isInstanceOf[SameTypeInterceptor]
}
}
class InterceptSpec extends ScalaTestWithActorTestKit("""
@ -371,6 +391,33 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
probe.expectMessage("callback-post-stop")
}
}
"not grow stack when nesting same interceptor" in {
def next(n: Int, p: ActorRef[Array[StackTraceElement]]): Behavior[String] = {
Behaviors.intercept(new SameTypeInterceptor) {
Behaviors.receiveMessage { _ =>
if (n == 20) {
val e = new RuntimeException().fillInStackTrace()
val trace = e.getStackTrace
p ! trace
Behaviors.stopped
} else {
next(n + 1, p)
}
}
}
}
val probe = TestProbe[Array[StackTraceElement]]()
val ref = spawn(next(0, probe.ref))
(1 to 21).foreach { n =>
ref ! n.toString
}
val elements = probe.receiveMessage()
if (elements.count(_.getClassName == "SameTypeInterceptor") > 1)
fail(s"Stack contains SameTypeInterceptor more than once: \n${elements.mkString("\n\t")}")
}
}
}

View file

@ -12,6 +12,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.DeadLetter
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.TimerScheduler
@ -87,6 +88,7 @@ class TimerSpec extends ScalaTestWithActorTestKit("""
}
"A timer" must {
"schedule non-repeated ticks" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.withTimers[Command] { timer =>
@ -297,28 +299,6 @@ class TimerSpec extends ScalaTestWithActorTestKit("""
ref ! "stop"
}
"not grow stack when nesting withTimers" in {
def next(n: Int, probe: ActorRef[Array[StackTraceElement]]): Behavior[String] = Behaviors.withTimers { timers =>
timers.startSingleTimer("key", "tick", 1.millis)
Behaviors.receiveMessage { message =>
if (n == 20) {
val e = new RuntimeException().fillInStackTrace()
val trace = e.getStackTrace
probe ! trace
Behaviors.stopped
} else {
next(n + 1, probe)
}
}
}
val probe = TestProbe[Array[StackTraceElement]]()
spawn(next(0, probe.ref))
val elements = probe.receiveMessage()
if (elements.count(_.getClassName == "TimerInterceptor") > 1)
fail(s"Stack contains TimerInterceptor more than once: \n${elements.mkString("\n\t")}")
}
"not leak timers when PostStop is used" in {
val probe = TestProbe[Any]()
val ref = spawn(Behaviors.withTimers[String] { timers =>
@ -337,4 +317,77 @@ class TimerSpec extends ScalaTestWithActorTestKit("""
probe.expectNoMessage(1.second)
}
}
"discard TimerMsg on restart" in {
// reproducer of similar issue as #26556, ClassCastException TimerMsg
val probe = TestProbe[Event]("evt")
def behv: Behavior[Command] =
Behaviors.receiveMessage[Command] {
case Tick(-1) =>
probe.ref ! Tock(-1)
Behaviors.withTimers[Command] { timer =>
timer.startSingleTimer("T0", Tick(0), 5.millis)
Behaviors.receiveMessage[Command] {
case Tick(0) =>
probe.ref ! Tock(0)
timer.startSingleTimer("T1", Tick(1), 5.millis)
// let Tick(0) arrive in mailbox, test will not fail if it arrives later
Thread.sleep(100)
throw TestException("boom")
}
}
case Tick(n) =>
probe.ref ! Tock(n)
Behaviors.same
case End =>
Behaviors.stopped
}
EventFilter[TestException](occurrences = 1).intercept {
val ref = spawn(Behaviors.supervise(behv).onFailure[TestException](SupervisorStrategy.restart))
ref ! Tick(-1)
probe.expectMessage(Tock(-1))
probe.expectMessage(Tock(0))
probe.expectNoMessage()
// confirm that it was restarted, and not stopped due to ClassCastException of TimerMsg
ref ! Tick(100)
probe.expectMessage(Tock(100))
ref ! End
}
}
"discard TimerMsg when exception from withTimers block" in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.receiveMessage[Command] {
case Tick(-1) =>
probe.ref ! Tock(-1)
Behaviors.withTimers[Command] { timer =>
timer.startSingleTimer("T0", Tick(0), 5.millis)
// let Tick(0) arrive in mailbox, test will not fail if it arrives later
Thread.sleep(100)
throw TestException("boom")
}
case Tick(n) =>
probe.ref ! Tock(n)
Behaviors.same
case End =>
Behaviors.stopped
}
EventFilter[TestException](occurrences = 1).intercept {
val ref = spawn(Behaviors.supervise(behv).onFailure[TestException](SupervisorStrategy.restart))
ref ! Tick(-1)
probe.expectMessage(Tock(-1))
probe.expectNoMessage()
// confirm that it was restarted, and not stopped due to ClassCastException of TimerMsg
ref ! Tick(100)
probe.expectMessage(Tock(100))
ref ! End
}
}
}

View file

@ -42,6 +42,13 @@ import akka.util.JavaDurationConverters._
timer
}
override private[akka] def hasTimer: Boolean = _timer.isDefined
override private[akka] def cancelAllTimers(): Unit = {
if (hasTimer)
timer.cancelAll()
}
override def asJava: javadsl.ActorContext[T] = this
override def asScala: scaladsl.ActorContext[T] = this

View file

@ -255,6 +255,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
ctx: TypedActorContext[O],
@unused target: PreStartTarget[T]): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
ctx.asScala.cancelAllTimers()
strategy match {
case _: Restart =>
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
@ -288,6 +289,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
ctx.asScala.cancelAllTimers()
if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) {
strategy match {
case _: Restart => throw t
@ -336,6 +338,9 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
}
private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = {
// probably already done, but doesn't hurt to make sure they are canceled
ctx.asScala.cancelAllTimers()
strategy match {
case backoff: Backoff =>
gotScheduledRestart = false

View file

@ -5,16 +5,16 @@
package akka.actor.typed
package internal
import akka.actor.typed.ActorRef.ActorRefOps
import scala.concurrent.duration.FiniteDuration
import akka.actor.Cancellable
import akka.actor.NotInfluenceReceiveTimeout
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.{ typed, Cancellable, NotInfluenceReceiveTimeout }
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.util.JavaDurationConverters._
import akka.util.OptionVal
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
*/
@ -32,8 +32,7 @@ import scala.concurrent.duration.FiniteDuration
ctx match {
case ctxImpl: ActorContextImpl[T] =>
val timerScheduler = ctxImpl.timer
val behavior = factory(timerScheduler)
timerScheduler.intercept(behavior)
factory(timerScheduler)
case _ => throw new IllegalArgumentException(s"timers not supported with [${ctx.getClass}]")
}
@ -142,44 +141,4 @@ import scala.concurrent.duration.FiniteDuration
}
}
def intercept(behavior: Behavior[T]): Behavior[T] = {
// The scheduled TimerMsg is intercepted to guard against old messages enqueued
// in mailbox before timer was canceled.
// Intercept some signals to cancel timers when restarting and stopping.
BehaviorImpl.intercept(new TimerInterceptor(this))(behavior)
}
}
/**
* INTERNAL API
*/
@InternalApi
private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T]) extends BehaviorInterceptor[T, T] {
import TimerSchedulerImpl._
import BehaviorInterceptor._
override def aroundReceive(ctx: typed.TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
val maybeIntercepted = msg match {
case msg: TimerMsg => timerSchedulerImpl.interceptTimerMsg(ctx.asScala.log, msg)
case msg => OptionVal.Some(msg)
}
maybeIntercepted match {
case OptionVal.None => Behavior.same // None means not applicable
case OptionVal.Some(intercepted) => target(ctx, intercepted)
}
}
override def aroundSignal(ctx: typed.TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
signal match {
case PreRestart | PostStop => timerSchedulerImpl.cancelAll()
case _ => // unhandled
}
target(ctx, signal)
}
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean =
// only one timer interceptor per behavior stack is needed
other.isInstanceOf[TimerInterceptor[_]]
}

View file

@ -14,15 +14,16 @@ import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.Behavior.StoppedBehavior
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
import akka.annotation.InternalApi
import scala.annotation.tailrec
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.Exception.Catcher
import scala.annotation.switch
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
import akka.util.OptionVal
/**
* INTERNAL API
*/
@ -102,7 +103,21 @@ import scala.annotation.switch
private def handleMessage(msg: T): Unit = {
try {
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
val c = ctx
if (c.hasTimer) {
msg match {
case timerMsg: TimerMsg =>
c.timer.interceptTimerMsg(ctx.log, timerMsg) match {
case OptionVal.None => // means TimerMsg not applicable, discard
case OptionVal.Some(m) =>
next(Behavior.interpretMessage(behavior, c, m), m)
}
case _ =>
next(Behavior.interpretMessage(behavior, c, msg), msg)
}
} else {
next(Behavior.interpretMessage(behavior, c, msg), msg)
}
} catch handleUnstashException
}
@ -215,16 +230,19 @@ import scala.annotation.switch
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
ctx.cancelAllTimers()
Behavior.interpretSignal(behavior, ctx, PreRestart)
behavior = Behavior.stopped
}
override def postRestart(reason: Throwable): Unit = {
ctx.cancelAllTimers()
behavior = validateAsInitial(Behavior.start(behavior, ctx))
if (!isAlive(behavior)) context.stop(self)
}
override def postStop(): Unit = {
ctx.cancelAllTimers()
behavior match {
case _: DeferredBehavior[_] =>
// Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
@ -232,6 +250,7 @@ import scala.annotation.switch
}
behavior = Behavior.stopped
}
}
/**

View file

@ -308,6 +308,19 @@ trait ActorContext[T] extends TypedActorContext[T] {
/**
* INTERNAL API
*/
@InternalApi
private[akka] def currentBehavior: Behavior[T]
/**
* INTERNAL API
*/
@InternalApi
private[akka] def hasTimer: Boolean
/**
* INTERNAL API
*/
@InternalApi
private[akka] def cancelAllTimers(): Unit
}