Fix NotInfluenceReceiveTimeout behavior when used with Timers trait (#24989)
This commit is contained in:
parent
fcabd43a36
commit
028937f52f
4 changed files with 60 additions and 6 deletions
|
|
@ -131,5 +131,29 @@ class ReceiveTimeoutSpec extends AkkaSpec {
|
|||
Await.ready(timeoutLatch, TestLatch.DefaultTimeout)
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
|
||||
"get timeout while receiving NotInfluenceReceiveTimeout messages scheduled with Timers" taggedAs TimingTest in {
|
||||
val timeoutLatch = TestLatch()
|
||||
val count = new AtomicInteger(0)
|
||||
|
||||
class ActorWithTimer() extends Actor with Timers {
|
||||
timers.startPeriodicTimer("transparentTick", TransperentTick, 100.millis)
|
||||
timers.startPeriodicTimer("identifyTick", Identify(None), 100.millis)
|
||||
|
||||
context.setReceiveTimeout(1 second)
|
||||
def receive: Receive = {
|
||||
case ReceiveTimeout ⇒
|
||||
timeoutLatch.open
|
||||
case TransperentTick ⇒
|
||||
count.incrementAndGet()
|
||||
}
|
||||
}
|
||||
|
||||
val timeoutActor = system.actorOf(Props(new ActorWithTimer()))
|
||||
|
||||
Await.ready(timeoutLatch, TestLatch.DefaultTimeout)
|
||||
count.get() should be > 0
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.actor.typed
|
||||
package internal
|
||||
|
||||
import akka.actor.Cancellable
|
||||
import akka.actor.{ Cancellable, NotInfluenceReceiveTimeout }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.actor.typed.ActorRef.ActorRefOps
|
||||
|
|
@ -21,8 +21,16 @@ import scala.reflect.ClassTag
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object TimerSchedulerImpl {
|
||||
sealed trait TimerMsg {
|
||||
def key: Any
|
||||
def generation: Int
|
||||
def owner: AnyRef
|
||||
}
|
||||
|
||||
final case class Timer[T](key: Any, msg: T, repeat: Boolean, generation: Int, task: Cancellable)
|
||||
final case class TimerMsg(key: Any, generation: Int, owner: AnyRef)
|
||||
final case class InfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: AnyRef) extends TimerMsg
|
||||
final case class NotInfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: AnyRef)
|
||||
extends TimerMsg with NotInfluenceReceiveTimeout
|
||||
|
||||
def withTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T]): Behavior[T] = {
|
||||
scaladsl.Behaviors.setup[T](wrapWithTimers(factory))
|
||||
|
|
@ -68,7 +76,12 @@ import scala.reflect.ClassTag
|
|||
}
|
||||
val nextGen = timerGen.next()
|
||||
|
||||
val timerMsg = TimerMsg(key, nextGen, this)
|
||||
val timerMsg =
|
||||
if (msg.isInstanceOf[NotInfluenceReceiveTimeout])
|
||||
NotInfluenceReceiveTimeoutTimerMsg(key, nextGen, this)
|
||||
else
|
||||
InfluenceReceiveTimeoutTimerMsg(key, nextGen, this)
|
||||
|
||||
val task =
|
||||
if (repeat)
|
||||
ctx.system.scheduler.schedule(timeout, timeout) {
|
||||
|
|
|
|||
|
|
@ -4,3 +4,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvide
|
|||
# #24646 java.time.Duration
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.cancelReceiveTimeout")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout")
|
||||
|
||||
# #24989 Fix NotInfluenceReceiveTimeout behavior when used with Timers trait
|
||||
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("akka.actor.TimerSchedulerImpl$TimerMsg")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.actor.TimerSchedulerImpl$TimerMsg$")
|
||||
|
|
|
|||
|
|
@ -14,9 +14,17 @@ import akka.util.OptionVal
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object TimerSchedulerImpl {
|
||||
sealed trait TimerMsg {
|
||||
def key: Any
|
||||
def generation: Int
|
||||
def owner: TimerSchedulerImpl
|
||||
}
|
||||
|
||||
final case class Timer(key: Any, msg: Any, repeat: Boolean, generation: Int, task: Cancellable)
|
||||
final case class TimerMsg(key: Any, generation: Int, owner: TimerSchedulerImpl)
|
||||
extends NoSerializationVerificationNeeded
|
||||
final case class InfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: TimerSchedulerImpl)
|
||||
extends TimerMsg with NoSerializationVerificationNeeded
|
||||
final case class NotInfluenceReceiveTimeoutTimerMsg(key: Any, generation: Int, owner: TimerSchedulerImpl)
|
||||
extends TimerMsg with NoSerializationVerificationNeeded with NotInfluenceReceiveTimeout
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -46,7 +54,12 @@ import akka.util.OptionVal
|
|||
}
|
||||
val nextGen = nextTimerGen()
|
||||
|
||||
val timerMsg = TimerMsg(key, nextGen, this)
|
||||
val timerMsg =
|
||||
if (msg.isInstanceOf[NotInfluenceReceiveTimeout])
|
||||
NotInfluenceReceiveTimeoutTimerMsg(key, nextGen, this)
|
||||
else
|
||||
InfluenceReceiveTimeoutTimerMsg(key, nextGen, this)
|
||||
|
||||
val task =
|
||||
if (repeat)
|
||||
ctx.system.scheduler.schedule(timeout, timeout, ctx.self, timerMsg)(ctx.dispatcher)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue