From e6775d72252665bf2fee1923d064fb83b2c846ed Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Mar 2019 08:05:57 +0100 Subject: [PATCH] add more tests for receiving PoisonPill in various states, #24687 --- .../typed/internal/ReplayingEvents.scala | 3 +- .../typed/internal/ReplayingSnapshot.scala | 11 ++- .../internal/RequestingRecoveryPermit.scala | 5 +- .../persistence/typed/internal/Running.scala | 4 +- .../EventSourcedBehaviorStashSpec.scala | 69 ++++++++++++++++++- 5 files changed, 79 insertions(+), 13 deletions(-) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 08812803fb..5f7d459dd5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -123,7 +123,8 @@ private[akka] final class ReplayingEvents[C, E, S]( private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands if (state.receivedPoisonPill) { - if (setup.settings.logOnStashing) setup.log.debug("Discarding message [{}], because actor is to be stopped", cmd) + if (setup.settings.logOnStashing) + setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd) Behaviors.unhandled } else { stashInternal(cmd) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index d0c15cd144..fa13ed5a41 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -56,12 +56,17 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup case JournalResponse(r) => onJournalResponse(r) case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot) case cmd: IncomingCommand[C] => - if (receivedPoisonPill) Behaviors.unhandled - else onCommand(cmd) + if (receivedPoisonPill) { + if (setup.settings.logOnStashing) + setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd) + Behaviors.unhandled + } else + onCommand(cmd) case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } .receiveSignal(returnPermitOnStop.orElse { - case (_, PoisonPill) => stay(receivedPoisonPill = true) + case (_, PoisonPill) => + stay(receivedPoisonPill = true) }) } stay(receivedPoisonPillInPreviousPhase) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala index 43890895b5..a5f875054a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala @@ -43,13 +43,10 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi case InternalProtocol.RecoveryPermitGranted => becomeReplaying(receivedPoisonPill) - case _ if receivedPoisonPill => - Behaviors.unhandled - case other => if (receivedPoisonPill) { if (setup.settings.logOnStashing) - setup.log.debug("Discarding message [{}], because actor is to be stopped", other) + setup.log.debug("Discarding message [{}], because actor is to be stopped.", other) Behaviors.unhandled } else { stashInternal(other) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 38d6d90a4d..016c465309 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -240,7 +240,7 @@ private[akka] object Running { def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { if (state.receivedPoisonPill) { if (setup.settings.logOnStashing) - setup.log.debug("Discarding message [{}], because actor is to be stopped", cmd) + setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd) Behaviors.unhandled } else { stashInternal(cmd) @@ -313,7 +313,7 @@ private[akka] object Running { def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { if (state.receivedPoisonPill) { if (setup.settings.logOnStashing) - setup.log.debug("Discarding message [{}], because actor is to be stopped", cmd) + setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd) Behaviors.unhandled } else { stashUser(cmd) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala index 38e201645e..fccaef32df 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala @@ -16,6 +16,7 @@ import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.Dropped +import akka.actor.typed.PostStop import akka.actor.typed.SupervisorStrategy import akka.actor.typed.internal.PoisonPill import akka.actor.typed.javadsl.StashOverflowException @@ -23,6 +24,7 @@ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RecoveryCompleted import akka.testkit.EventFilter import akka.testkit.TestEvent.Mute import com.typesafe.config.Config @@ -71,14 +73,16 @@ object EventSourcedBehaviorStashSpec { final case class State(value: Int, active: Boolean) - def counter(persistenceId: PersistenceId): Behavior[Command[_]] = + def counter(persistenceId: PersistenceId, signalProbe: Option[ActorRef[String]] = None): Behavior[Command[_]] = Behaviors .supervise[Command[_]] { - Behaviors.setup(_ => eventSourcedCounter(persistenceId)) + Behaviors.setup(_ => eventSourcedCounter(persistenceId, signalProbe)) } .onFailure(SupervisorStrategy.restart.withLoggingEnabled(enabled = false)) - def eventSourcedCounter(persistenceId: PersistenceId): EventSourcedBehavior[Command[_], Event, State] = { + def eventSourcedCounter( + persistenceId: PersistenceId, + signalProbe: Option[ActorRef[String]]): EventSourcedBehavior[Command[_], Event, State] = { EventSourcedBehavior .withEnforcedReplies[Command[_], Event, State]( persistenceId, @@ -104,6 +108,11 @@ object EventSourcedBehaviorStashSpec { .onPersistFailure(SupervisorStrategy .restartWithBackoff(1.second, maxBackoff = 2.seconds, 0.0) .withLoggingEnabled(enabled = false)) + .receiveSignal { + // FIXME #26574: the state type of RecoveryCompleted can't be infered, do we need to include state as param in the partial function? + case RecoveryCompleted(state: State) => signalProbe.foreach(_ ! s"RecoveryCompleted-${state.value}") + case PostStop => signalProbe.foreach(_ ! "PostStop") + } } private def active(state: State, command: Command[_]): ReplyEffect[Event, State] = { @@ -647,6 +656,60 @@ class EventSourcedBehaviorStashSpec // 6 shouldn't make it, already stopped ackProbe.expectNoMessage(100.millis) } + + "stop from PoisonPill after recovery completed" in { + val pid = nextPid() + val c = spawn(counter(pid)) + val ackProbe = TestProbe[Ack] + + c ! Increment("1", ackProbe.ref) + c ! Increment("2", ackProbe.ref) + c ! Increment("3", ackProbe.ref) + ackProbe.expectMessage(Ack("1")) + ackProbe.expectMessage(Ack("2")) + ackProbe.expectMessage(Ack("3")) + + // silence some dead letters that may, or may not, occur depending on timing + system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "unhandled message", occurrences = 100))) + system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter", occurrences = 100))) + system.toUntyped.eventStream.publish(Mute(EventFilter.info(pattern = ".*was not delivered.*", occurrences = 100))) + + val signalProbe = TestProbe[String] + val c2 = spawn(counter(pid, Some(signalProbe.ref))) + // this PoisonPill will most likely be received in RequestingRecoveryPermit since it's sent immediately + c2.toUntyped ! PoisonPill + signalProbe.expectMessage("RecoveryCompleted-3") + signalProbe.expectMessage("PostStop") + + val c3 = spawn(counter(pid, Some(signalProbe.ref))) + // this PoisonPill will most likely be received in RequestingRecoveryPermit since it's sent slightly afterwards + Thread.sleep(1) + c3.toUntyped ! PoisonPill + c3 ! Increment("4", ackProbe.ref) + signalProbe.expectMessage("RecoveryCompleted-3") + signalProbe.expectMessage("PostStop") + ackProbe.expectNoMessage(20.millis) + + val c4 = spawn(counter(pid, Some(signalProbe.ref))) + signalProbe.expectMessage("RecoveryCompleted-3") + // this PoisonPill will be received in Running + c4.toUntyped ! PoisonPill + c4 ! Increment("4", ackProbe.ref) + signalProbe.expectMessage("PostStop") + ackProbe.expectNoMessage(20.millis) + + val c5 = spawn(counter(pid, Some(signalProbe.ref))) + signalProbe.expectMessage("RecoveryCompleted-3") + c5 ! Increment("4", ackProbe.ref) + c5 ! Increment("5", ackProbe.ref) + // this PoisonPill will most likely be received in PersistingEvents + c5.toUntyped ! PoisonPill + c5 ! Increment("6", ackProbe.ref) + ackProbe.expectMessage(Ack("4")) + ackProbe.expectMessage(Ack("5")) + signalProbe.expectMessage("PostStop") + ackProbe.expectNoMessage(20.millis) + } } }