From d72f26e0428151df1c1ed276c57918a38b7fcd9a Mon Sep 17 00:00:00 2001 From: Dale Wijnand <344610+dwijnand@users.noreply.github.com> Date: Thu, 29 Nov 2018 10:53:25 +0000 Subject: [PATCH] Cleanup TimerSchedulerImpl & TimerInterceptor (#25891) * Replace TimerMsg hierarchy with a simple mix-in * Fix a couple of typos * Remove the need for a cast but just passing a log * Remove the need for casts by making TimerInterceptor specify it handles Ts * Prefer a sealed class to a sealed case class --- .../typed/internal/TimerSchedulerImpl.scala | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index 7645e8660c..78c959bbee 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -18,16 +18,10 @@ import scala.concurrent.duration.FiniteDuration * INTERNAL API */ @InternalApi private[akka] object TimerSchedulerImpl { - sealed trait TimerMsg { - def key: Any - def generation: Int - def owner: AnyRef - } - final case class Timer[T](key: Any, msg: T, repeat: Boolean, generation: Int, task: Cancellable) - final case class InfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: AnyRef) extends TimerMsg - final case class NotInfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: AnyRef) - extends TimerMsg with NotInfluenceReceiveTimeout + sealed class TimerMsg(val key: Any, val generation: Int, val owner: AnyRef) { + override def toString = s"TimerMsg(key=$key, generation=$generation, owner=$owner)" + } def withTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T]): Behavior[T] = { scaladsl.Behaviors.setup[T](wrapWithTimers(factory)) @@ -75,9 +69,9 @@ import scala.concurrent.duration.FiniteDuration val timerMsg = if (msg.isInstanceOf[NotInfluenceReceiveTimeout]) - NotInfluenceReceiveTimeoutTimerMsg(key, nextGen, this) + new TimerMsg(key, nextGen, this) with NotInfluenceReceiveTimeout else - InfluenceReceiveTimeoutTimerMsg(key, nextGen, this) + new TimerMsg(key, nextGen, this) val task = if (repeat) @@ -118,16 +112,16 @@ import scala.concurrent.duration.FiniteDuration timers = Map.empty } - def interceptTimerMsg(ctx: ActorContext[TimerMsg], timerMsg: TimerMsg): T = { + def interceptTimerMsg(log: Logger, timerMsg: TimerMsg): T = { timers.get(timerMsg.key) match { case None ⇒ // it was from canceled timer that was already enqueued in mailbox - ctx.log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key) + log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key) null.asInstanceOf[T] // message should be ignored case Some(t) ⇒ if (timerMsg.owner ne this) { // after restart, it was from an old instance that was enqueued in mailbox before canceled - ctx.log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key) + log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key) null.asInstanceOf[T] // message should be ignored } else if (timerMsg.generation == t.generation) { // valid timer @@ -136,8 +130,8 @@ import scala.concurrent.duration.FiniteDuration t.msg } else { // it was from an old timer that was enqueued in mailbox before canceled - ctx.log.debug( - "Received timer [{}] from from old generation [{}], expected generation [{}], discarding", + log.debug( + "Received timer [{}] from old generation [{}], expected generation [{}], discarding", timerMsg.key, timerMsg.generation, t.generation) null.asInstanceOf[T] // message should be ignored } @@ -147,8 +141,8 @@ import scala.concurrent.duration.FiniteDuration def intercept(behavior: Behavior[T]): Behavior[T] = { // The scheduled TimerMsg is intercepted to guard against old messages enqueued // in mailbox before timer was canceled. - // Intercept some signals to cancel timers when when restarting and stopping. - BehaviorImpl.intercept(new TimerInterceptor(this))(behavior).asInstanceOf[Behavior[T]] + // Intercept some signals to cancel timers when restarting and stopping. + BehaviorImpl.intercept(new TimerInterceptor(this))(behavior) } } @@ -157,14 +151,14 @@ import scala.concurrent.duration.FiniteDuration * INTERNAL API */ @InternalApi -private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T]) extends BehaviorInterceptor[AnyRef, T] { +private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T]) extends BehaviorInterceptor[T, T] { import TimerSchedulerImpl._ import BehaviorInterceptor._ - override def aroundReceive(ctx: typed.ActorContext[AnyRef], msg: AnyRef, target: ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: typed.ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { val intercepted = msg match { - case msg: TimerMsg ⇒ timerSchedulerImpl.interceptTimerMsg(ctx.asInstanceOf[ActorContext[TimerMsg]], msg) - case msg ⇒ msg.asInstanceOf[T] + case msg: TimerMsg ⇒ timerSchedulerImpl.interceptTimerMsg(ctx.asScala.log, msg) + case msg ⇒ msg } // null means not applicable @@ -172,7 +166,7 @@ private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T else target(ctx, intercepted) } - override def aroundSignal(ctx: typed.ActorContext[AnyRef], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + override def aroundSignal(ctx: typed.ActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = { signal match { case PreRestart | PostStop ⇒ timerSchedulerImpl.cancelAll() case _ ⇒ // unhandled