From e495dab941783caee3b264f1cbd49c836efb1f87 Mon Sep 17 00:00:00 2001 From: mucahitkantepe Date: Mon, 19 Feb 2018 22:58:14 +0300 Subject: [PATCH] AutoReceivedMessage with Timers bug fixed (#24080) AutoReceivedMessage with FSM schedule bug fixed (#24080) Tests added for both FSM and Timers trait for (#24080) AutoReceivedMessage with PersistentFSM bug fixed and test added (#24080) --- .../src/test/scala/akka/actor/TimerSpec.scala | 22 +++++++++++++++++++ .../src/main/scala/akka/actor/FSM.scala | 12 ++++++---- .../src/main/scala/akka/actor/Timers.scala | 12 +++++----- .../akka/persistence/fsm/PersistentFSM.scala | 12 ++++++---- .../TimerPersistentActorSpec.scala | 12 ++++++++++ 5 files changed, 57 insertions(+), 13 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala index 46c968aca4..3d4d605ec3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala @@ -22,6 +22,7 @@ object TimerSpec { case object Cancel extends Command case class SlowThenThrow(latch: TestLatch, e: Throwable) extends Command with NoSerializationVerificationNeeded + case object AutoReceive extends Command sealed trait Event case class Tock(n: Int) extends Event @@ -55,6 +56,10 @@ object TimerSpec { timers.startPeriodicTimer("T", Tick(bumpCount), interval) } + def autoReceive(): Unit = { + timers.startSingleTimer("A", PoisonPill, interval) + } + override def receive = { case Tick(n) ⇒ monitor ! Tock(n) @@ -72,6 +77,7 @@ object TimerSpec { case SlowThenThrow(latch, e) ⇒ Await.ready(latch, 10.seconds) throw e + case AutoReceive ⇒ autoReceive() } } @@ -101,6 +107,11 @@ object TimerSpec { stay using (bumpCount + 1) } + def autoReceive(): State = { + setTimer("A", PoisonPill, interval, repeat) + stay + } + { val i = initial() startWith(TheState, i) @@ -126,6 +137,8 @@ object TimerSpec { case Event(SlowThenThrow(latch, e), _) ⇒ Await.ready(latch, 10.seconds) throw e + case Event(AutoReceive, _) ⇒ + autoReceive() } initialize() @@ -263,6 +276,15 @@ abstract class AbstractTimerSpec extends AkkaSpec { ref ! End probe.expectMsg(GotPostStop(false)) } + + "handle AutoReceivedMessages automatically" in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, 10.millis, repeat = false)) + watch(ref) + ref ! AutoReceive + expectTerminated(ref) + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index b9d04da08f..02816cd0bb 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -97,11 +97,15 @@ object FSM { private val scheduler = context.system.scheduler private implicit val executionContext = context.dispatcher - def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = + def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = { + val timerMsg = msg match { + case m: AutoReceivedMessage ⇒ m + case _ ⇒ this + } ref = Some( - if (repeat) scheduler.schedule(timeout, timeout, actor, this) - else scheduler.scheduleOnce(timeout, actor, this)) - + if (repeat) scheduler.schedule(timeout, timeout, actor, timerMsg) + else scheduler.scheduleOnce(timeout, actor, timerMsg)) + } def cancel(): Unit = if (ref.isDefined) { ref.get.cancel() diff --git a/akka-actor/src/main/scala/akka/actor/Timers.scala b/akka-actor/src/main/scala/akka/actor/Timers.scala index 2de5b95413..03b5d43d2a 100644 --- a/akka-actor/src/main/scala/akka/actor/Timers.scala +++ b/akka-actor/src/main/scala/akka/actor/Timers.scala @@ -7,6 +7,7 @@ package akka.actor import akka.util.JavaDurationConverters._ import scala.concurrent.duration.FiniteDuration import akka.annotation.DoNotInherit +import akka.dispatch.Envelope import akka.util.OptionVal /** @@ -40,12 +41,13 @@ trait Timers extends Actor { msg match { case timerMsg: TimerSchedulerImpl.TimerMsg ⇒ _timers.interceptTimerMsg(timerMsg) match { - case OptionVal.Some(m) if this.isInstanceOf[Stash] ⇒ - // this is important for stash interaction, as stash will look directly at currentMessage #24557 - actorCell.currentMessage = actorCell.currentMessage.copy(message = m) - super.aroundReceive(receive, m) + case OptionVal.Some(m: AutoReceivedMessage) ⇒ + context.asInstanceOf[ActorCell].autoReceiveMessage(Envelope(m, self)) case OptionVal.Some(m) ⇒ - // avoid the extra allocation if not using stash + if (this.isInstanceOf[Stash]) { + // this is important for stash interaction, as stash will look directly at currentMessage #24557 + actorCell.currentMessage = actorCell.currentMessage.copy(message = m) + } super.aroundReceive(receive, m) case OptionVal.None ⇒ // discard } diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index 68927bc6c9..c9bb7dc3cc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -296,11 +296,15 @@ object PersistentFSM { private val scheduler = context.system.scheduler private implicit val executionContext = context.dispatcher - def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = + def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = { + val timerMsg = msg match { + case m: AutoReceivedMessage ⇒ m + case _ ⇒ this + } ref = Some( - if (repeat) scheduler.schedule(timeout, timeout, actor, this) - else scheduler.scheduleOnce(timeout, actor, this)) - + if (repeat) scheduler.schedule(timeout, timeout, actor, timerMsg) + else scheduler.scheduleOnce(timeout, actor, timerMsg)) + } def cancel(): Unit = if (ref.isDefined) { ref.get.cancel() diff --git a/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala index 6bcfe7317f..52ef20d061 100644 --- a/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala @@ -21,6 +21,8 @@ object TimerPersistentActorSpec { final case class Scheduled(msg: Any, replyTo: ActorRef) + final case class AutoReceivedMessageWrapper(msg: AutoReceivedMessage) + class TestPersistentActor(name: String) extends Timers with PersistentActor { override def persistenceId = name @@ -32,6 +34,8 @@ object TimerPersistentActorSpec { override def receiveCommand: Receive = { case Scheduled(msg, replyTo) ⇒ replyTo ! msg + case AutoReceivedMessageWrapper(msg) ⇒ + timers.startSingleTimer("PoisonPill", PoisonPill, Duration.Zero) case msg ⇒ timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero) persist(msg)(_ ⇒ ()) @@ -102,6 +106,14 @@ class TimerPersistentActorSpec extends PersistenceSpec(ConfigFactory.parseString watch(pa) expectTerminated(pa) } + + "handle AutoReceivedMessage's automatically" in { + val pa = system.actorOf(testProps("p3")) + watch(pa) + pa ! AutoReceivedMessageWrapper(PoisonPill) + expectTerminated(pa) + } + } }