AutoReceivedMessage with Timers bug fixed (#24080)
AutoReceivedMessage with FSM schedule bug fixed (#24080) Tests added for both FSM and Timers trait for (#24080) AutoReceivedMessage with PersistentFSM bug fixed and test added (#24080)
This commit is contained in:
parent
046b88ce90
commit
e495dab941
5 changed files with 57 additions and 13 deletions
|
|
@ -22,6 +22,7 @@ object TimerSpec {
|
|||
case object Cancel extends Command
|
||||
case class SlowThenThrow(latch: TestLatch, e: Throwable) extends Command
|
||||
with NoSerializationVerificationNeeded
|
||||
case object AutoReceive extends Command
|
||||
|
||||
sealed trait Event
|
||||
case class Tock(n: Int) extends Event
|
||||
|
|
@ -55,6 +56,10 @@ object TimerSpec {
|
|||
timers.startPeriodicTimer("T", Tick(bumpCount), interval)
|
||||
}
|
||||
|
||||
def autoReceive(): Unit = {
|
||||
timers.startSingleTimer("A", PoisonPill, interval)
|
||||
}
|
||||
|
||||
override def receive = {
|
||||
case Tick(n) ⇒
|
||||
monitor ! Tock(n)
|
||||
|
|
@ -72,6 +77,7 @@ object TimerSpec {
|
|||
case SlowThenThrow(latch, e) ⇒
|
||||
Await.ready(latch, 10.seconds)
|
||||
throw e
|
||||
case AutoReceive ⇒ autoReceive()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -101,6 +107,11 @@ object TimerSpec {
|
|||
stay using (bumpCount + 1)
|
||||
}
|
||||
|
||||
def autoReceive(): State = {
|
||||
setTimer("A", PoisonPill, interval, repeat)
|
||||
stay
|
||||
}
|
||||
|
||||
{
|
||||
val i = initial()
|
||||
startWith(TheState, i)
|
||||
|
|
@ -126,6 +137,8 @@ object TimerSpec {
|
|||
case Event(SlowThenThrow(latch, e), _) ⇒
|
||||
Await.ready(latch, 10.seconds)
|
||||
throw e
|
||||
case Event(AutoReceive, _) ⇒
|
||||
autoReceive()
|
||||
}
|
||||
|
||||
initialize()
|
||||
|
|
@ -263,6 +276,15 @@ abstract class AbstractTimerSpec extends AkkaSpec {
|
|||
ref ! End
|
||||
probe.expectMsg(GotPostStop(false))
|
||||
}
|
||||
|
||||
"handle AutoReceivedMessages automatically" in {
|
||||
val probe = TestProbe()
|
||||
val ref = system.actorOf(target(probe.ref, 10.millis, repeat = false))
|
||||
watch(ref)
|
||||
ref ! AutoReceive
|
||||
expectTerminated(ref)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -97,11 +97,15 @@ object FSM {
|
|||
private val scheduler = context.system.scheduler
|
||||
private implicit val executionContext = context.dispatcher
|
||||
|
||||
def schedule(actor: ActorRef, timeout: FiniteDuration): Unit =
|
||||
def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = {
|
||||
val timerMsg = msg match {
|
||||
case m: AutoReceivedMessage ⇒ m
|
||||
case _ ⇒ this
|
||||
}
|
||||
ref = Some(
|
||||
if (repeat) scheduler.schedule(timeout, timeout, actor, this)
|
||||
else scheduler.scheduleOnce(timeout, actor, this))
|
||||
|
||||
if (repeat) scheduler.schedule(timeout, timeout, actor, timerMsg)
|
||||
else scheduler.scheduleOnce(timeout, actor, timerMsg))
|
||||
}
|
||||
def cancel(): Unit =
|
||||
if (ref.isDefined) {
|
||||
ref.get.cancel()
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.actor
|
|||
import akka.util.JavaDurationConverters._
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.dispatch.Envelope
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
|
|
@ -40,12 +41,13 @@ trait Timers extends Actor {
|
|||
msg match {
|
||||
case timerMsg: TimerSchedulerImpl.TimerMsg ⇒
|
||||
_timers.interceptTimerMsg(timerMsg) match {
|
||||
case OptionVal.Some(m) if this.isInstanceOf[Stash] ⇒
|
||||
// this is important for stash interaction, as stash will look directly at currentMessage #24557
|
||||
actorCell.currentMessage = actorCell.currentMessage.copy(message = m)
|
||||
super.aroundReceive(receive, m)
|
||||
case OptionVal.Some(m: AutoReceivedMessage) ⇒
|
||||
context.asInstanceOf[ActorCell].autoReceiveMessage(Envelope(m, self))
|
||||
case OptionVal.Some(m) ⇒
|
||||
// avoid the extra allocation if not using stash
|
||||
if (this.isInstanceOf[Stash]) {
|
||||
// this is important for stash interaction, as stash will look directly at currentMessage #24557
|
||||
actorCell.currentMessage = actorCell.currentMessage.copy(message = m)
|
||||
}
|
||||
super.aroundReceive(receive, m)
|
||||
case OptionVal.None ⇒ // discard
|
||||
}
|
||||
|
|
|
|||
|
|
@ -296,11 +296,15 @@ object PersistentFSM {
|
|||
private val scheduler = context.system.scheduler
|
||||
private implicit val executionContext = context.dispatcher
|
||||
|
||||
def schedule(actor: ActorRef, timeout: FiniteDuration): Unit =
|
||||
def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = {
|
||||
val timerMsg = msg match {
|
||||
case m: AutoReceivedMessage ⇒ m
|
||||
case _ ⇒ this
|
||||
}
|
||||
ref = Some(
|
||||
if (repeat) scheduler.schedule(timeout, timeout, actor, this)
|
||||
else scheduler.scheduleOnce(timeout, actor, this))
|
||||
|
||||
if (repeat) scheduler.schedule(timeout, timeout, actor, timerMsg)
|
||||
else scheduler.scheduleOnce(timeout, actor, timerMsg))
|
||||
}
|
||||
def cancel(): Unit =
|
||||
if (ref.isDefined) {
|
||||
ref.get.cancel()
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ object TimerPersistentActorSpec {
|
|||
|
||||
final case class Scheduled(msg: Any, replyTo: ActorRef)
|
||||
|
||||
final case class AutoReceivedMessageWrapper(msg: AutoReceivedMessage)
|
||||
|
||||
class TestPersistentActor(name: String) extends Timers with PersistentActor {
|
||||
|
||||
override def persistenceId = name
|
||||
|
|
@ -32,6 +34,8 @@ object TimerPersistentActorSpec {
|
|||
override def receiveCommand: Receive = {
|
||||
case Scheduled(msg, replyTo) ⇒
|
||||
replyTo ! msg
|
||||
case AutoReceivedMessageWrapper(msg) ⇒
|
||||
timers.startSingleTimer("PoisonPill", PoisonPill, Duration.Zero)
|
||||
case msg ⇒
|
||||
timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero)
|
||||
persist(msg)(_ ⇒ ())
|
||||
|
|
@ -102,6 +106,14 @@ class TimerPersistentActorSpec extends PersistenceSpec(ConfigFactory.parseString
|
|||
watch(pa)
|
||||
expectTerminated(pa)
|
||||
}
|
||||
|
||||
"handle AutoReceivedMessage's automatically" in {
|
||||
val pa = system.actorOf(testProps("p3"))
|
||||
watch(pa)
|
||||
pa ! AutoReceivedMessageWrapper(PoisonPill)
|
||||
expectTerminated(pa)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue