Cleanup TimerSchedulerImpl & TimerInterceptor (#25891)

* Replace TimerMsg hierarchy with a simple mix-in

* Fix a couple of typos

* Remove the need for a cast but just passing a log

* Remove the need for casts by making TimerInterceptor specify it handles Ts

* Prefer a sealed class to a sealed case class
This commit is contained in:
Dale Wijnand 2018-11-29 10:53:25 +00:00 committed by Christopher Batey
parent 68b5fbb2ff
commit d72f26e042

View file

@ -18,16 +18,10 @@ import scala.concurrent.duration.FiniteDuration
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] object TimerSchedulerImpl { @InternalApi private[akka] object TimerSchedulerImpl {
sealed trait TimerMsg {
def key: Any
def generation: Int
def owner: AnyRef
}
final case class Timer[T](key: Any, msg: T, repeat: Boolean, generation: Int, task: Cancellable) final case class Timer[T](key: Any, msg: T, repeat: Boolean, generation: Int, task: Cancellable)
final case class InfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: AnyRef) extends TimerMsg sealed class TimerMsg(val key: Any, val generation: Int, val owner: AnyRef) {
final case class NotInfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: AnyRef) override def toString = s"TimerMsg(key=$key, generation=$generation, owner=$owner)"
extends TimerMsg with NotInfluenceReceiveTimeout }
def withTimers[T](factory: TimerSchedulerImpl[T] Behavior[T]): Behavior[T] = { def withTimers[T](factory: TimerSchedulerImpl[T] Behavior[T]): Behavior[T] = {
scaladsl.Behaviors.setup[T](wrapWithTimers(factory)) scaladsl.Behaviors.setup[T](wrapWithTimers(factory))
@ -75,9 +69,9 @@ import scala.concurrent.duration.FiniteDuration
val timerMsg = val timerMsg =
if (msg.isInstanceOf[NotInfluenceReceiveTimeout]) if (msg.isInstanceOf[NotInfluenceReceiveTimeout])
NotInfluenceReceiveTimeoutTimerMsg(key, nextGen, this) new TimerMsg(key, nextGen, this) with NotInfluenceReceiveTimeout
else else
InfluenceReceiveTimeoutTimerMsg(key, nextGen, this) new TimerMsg(key, nextGen, this)
val task = val task =
if (repeat) if (repeat)
@ -118,16 +112,16 @@ import scala.concurrent.duration.FiniteDuration
timers = Map.empty timers = Map.empty
} }
def interceptTimerMsg(ctx: ActorContext[TimerMsg], timerMsg: TimerMsg): T = { def interceptTimerMsg(log: Logger, timerMsg: TimerMsg): T = {
timers.get(timerMsg.key) match { timers.get(timerMsg.key) match {
case None case None
// it was from canceled timer that was already enqueued in mailbox // it was from canceled timer that was already enqueued in mailbox
ctx.log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key) log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key)
null.asInstanceOf[T] // message should be ignored null.asInstanceOf[T] // message should be ignored
case Some(t) case Some(t)
if (timerMsg.owner ne this) { if (timerMsg.owner ne this) {
// after restart, it was from an old instance that was enqueued in mailbox before canceled // after restart, it was from an old instance that was enqueued in mailbox before canceled
ctx.log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key) log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key)
null.asInstanceOf[T] // message should be ignored null.asInstanceOf[T] // message should be ignored
} else if (timerMsg.generation == t.generation) { } else if (timerMsg.generation == t.generation) {
// valid timer // valid timer
@ -136,8 +130,8 @@ import scala.concurrent.duration.FiniteDuration
t.msg t.msg
} else { } else {
// it was from an old timer that was enqueued in mailbox before canceled // it was from an old timer that was enqueued in mailbox before canceled
ctx.log.debug( log.debug(
"Received timer [{}] from from old generation [{}], expected generation [{}], discarding", "Received timer [{}] from old generation [{}], expected generation [{}], discarding",
timerMsg.key, timerMsg.generation, t.generation) timerMsg.key, timerMsg.generation, t.generation)
null.asInstanceOf[T] // message should be ignored null.asInstanceOf[T] // message should be ignored
} }
@ -147,8 +141,8 @@ import scala.concurrent.duration.FiniteDuration
def intercept(behavior: Behavior[T]): Behavior[T] = { def intercept(behavior: Behavior[T]): Behavior[T] = {
// The scheduled TimerMsg is intercepted to guard against old messages enqueued // The scheduled TimerMsg is intercepted to guard against old messages enqueued
// in mailbox before timer was canceled. // in mailbox before timer was canceled.
// Intercept some signals to cancel timers when when restarting and stopping. // Intercept some signals to cancel timers when restarting and stopping.
BehaviorImpl.intercept(new TimerInterceptor(this))(behavior).asInstanceOf[Behavior[T]] BehaviorImpl.intercept(new TimerInterceptor(this))(behavior)
} }
} }
@ -157,14 +151,14 @@ import scala.concurrent.duration.FiniteDuration
* INTERNAL API * INTERNAL API
*/ */
@InternalApi @InternalApi
private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T]) extends BehaviorInterceptor[AnyRef, T] { private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T]) extends BehaviorInterceptor[T, T] {
import TimerSchedulerImpl._ import TimerSchedulerImpl._
import BehaviorInterceptor._ import BehaviorInterceptor._
override def aroundReceive(ctx: typed.ActorContext[AnyRef], msg: AnyRef, target: ReceiveTarget[T]): Behavior[T] = { override def aroundReceive(ctx: typed.ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
val intercepted = msg match { val intercepted = msg match {
case msg: TimerMsg timerSchedulerImpl.interceptTimerMsg(ctx.asInstanceOf[ActorContext[TimerMsg]], msg) case msg: TimerMsg timerSchedulerImpl.interceptTimerMsg(ctx.asScala.log, msg)
case msg msg.asInstanceOf[T] case msg msg
} }
// null means not applicable // null means not applicable
@ -172,7 +166,7 @@ private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T
else target(ctx, intercepted) else target(ctx, intercepted)
} }
override def aroundSignal(ctx: typed.ActorContext[AnyRef], signal: Signal, target: SignalTarget[T]): Behavior[T] = { override def aroundSignal(ctx: typed.ActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
signal match { signal match {
case PreRestart | PostStop timerSchedulerImpl.cancelAll() case PreRestart | PostStop timerSchedulerImpl.cancelAll()
case _ // unhandled case _ // unhandled