Merge pull request #26577 from akka/wip-24687-more-persistence-tests6-patriknw

add more tests for receiving PoisonPill in various states, #24687
This commit is contained in:
Patrik Nordwall 2019-03-28 10:07:32 +01:00 committed by GitHub
commit 02e68abae2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 79 additions and 13 deletions

View file

@ -123,7 +123,8 @@ private[akka] final class ReplayingEvents[C, E, S](
private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing) setup.log.debug("Discarding message [{}], because actor is to be stopped", cmd)
if (setup.settings.logOnStashing)
setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else {
stashInternal(cmd)

View file

@ -56,12 +56,17 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
case JournalResponse(r) => onJournalResponse(r)
case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot)
case cmd: IncomingCommand[C] =>
if (receivedPoisonPill) Behaviors.unhandled
else onCommand(cmd)
if (receivedPoisonPill) {
if (setup.settings.logOnStashing)
setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else
onCommand(cmd)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
}
.receiveSignal(returnPermitOnStop.orElse {
case (_, PoisonPill) => stay(receivedPoisonPill = true)
case (_, PoisonPill) =>
stay(receivedPoisonPill = true)
})
}
stay(receivedPoisonPillInPreviousPhase)

View file

@ -43,13 +43,10 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi
case InternalProtocol.RecoveryPermitGranted =>
becomeReplaying(receivedPoisonPill)
case _ if receivedPoisonPill =>
Behaviors.unhandled
case other =>
if (receivedPoisonPill) {
if (setup.settings.logOnStashing)
setup.log.debug("Discarding message [{}], because actor is to be stopped", other)
setup.log.debug("Discarding message [{}], because actor is to be stopped.", other)
Behaviors.unhandled
} else {
stashInternal(other)

View file

@ -240,7 +240,7 @@ private[akka] object Running {
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing)
setup.log.debug("Discarding message [{}], because actor is to be stopped", cmd)
setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else {
stashInternal(cmd)
@ -313,7 +313,7 @@ private[akka] object Running {
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing)
setup.log.debug("Discarding message [{}], because actor is to be stopped", cmd)
setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd)
Behaviors.unhandled
} else {
stashUser(cmd)

View file

@ -16,6 +16,7 @@ import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.Dropped
import akka.actor.typed.PostStop
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.javadsl.StashOverflowException
@ -23,6 +24,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import com.typesafe.config.Config
@ -71,14 +73,16 @@ object EventSourcedBehaviorStashSpec {
final case class State(value: Int, active: Boolean)
def counter(persistenceId: PersistenceId): Behavior[Command[_]] =
def counter(persistenceId: PersistenceId, signalProbe: Option[ActorRef[String]] = None): Behavior[Command[_]] =
Behaviors
.supervise[Command[_]] {
Behaviors.setup(_ => eventSourcedCounter(persistenceId))
Behaviors.setup(_ => eventSourcedCounter(persistenceId, signalProbe))
}
.onFailure(SupervisorStrategy.restart.withLoggingEnabled(enabled = false))
def eventSourcedCounter(persistenceId: PersistenceId): EventSourcedBehavior[Command[_], Event, State] = {
def eventSourcedCounter(
persistenceId: PersistenceId,
signalProbe: Option[ActorRef[String]]): EventSourcedBehavior[Command[_], Event, State] = {
EventSourcedBehavior
.withEnforcedReplies[Command[_], Event, State](
persistenceId,
@ -104,6 +108,11 @@ object EventSourcedBehaviorStashSpec {
.onPersistFailure(SupervisorStrategy
.restartWithBackoff(1.second, maxBackoff = 2.seconds, 0.0)
.withLoggingEnabled(enabled = false))
.receiveSignal {
// FIXME #26574: the state type of RecoveryCompleted can't be infered, do we need to include state as param in the partial function?
case RecoveryCompleted(state: State) => signalProbe.foreach(_ ! s"RecoveryCompleted-${state.value}")
case PostStop => signalProbe.foreach(_ ! "PostStop")
}
}
private def active(state: State, command: Command[_]): ReplyEffect[Event, State] = {
@ -647,6 +656,60 @@ class EventSourcedBehaviorStashSpec
// 6 shouldn't make it, already stopped
ackProbe.expectNoMessage(100.millis)
}
"stop from PoisonPill after recovery completed" in {
val pid = nextPid()
val c = spawn(counter(pid))
val ackProbe = TestProbe[Ack]
c ! Increment("1", ackProbe.ref)
c ! Increment("2", ackProbe.ref)
c ! Increment("3", ackProbe.ref)
ackProbe.expectMessage(Ack("1"))
ackProbe.expectMessage(Ack("2"))
ackProbe.expectMessage(Ack("3"))
// silence some dead letters that may, or may not, occur depending on timing
system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "unhandled message", occurrences = 100)))
system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter", occurrences = 100)))
system.toUntyped.eventStream.publish(Mute(EventFilter.info(pattern = ".*was not delivered.*", occurrences = 100)))
val signalProbe = TestProbe[String]
val c2 = spawn(counter(pid, Some(signalProbe.ref)))
// this PoisonPill will most likely be received in RequestingRecoveryPermit since it's sent immediately
c2.toUntyped ! PoisonPill
signalProbe.expectMessage("RecoveryCompleted-3")
signalProbe.expectMessage("PostStop")
val c3 = spawn(counter(pid, Some(signalProbe.ref)))
// this PoisonPill will most likely be received in RequestingRecoveryPermit since it's sent slightly afterwards
Thread.sleep(1)
c3.toUntyped ! PoisonPill
c3 ! Increment("4", ackProbe.ref)
signalProbe.expectMessage("RecoveryCompleted-3")
signalProbe.expectMessage("PostStop")
ackProbe.expectNoMessage(20.millis)
val c4 = spawn(counter(pid, Some(signalProbe.ref)))
signalProbe.expectMessage("RecoveryCompleted-3")
// this PoisonPill will be received in Running
c4.toUntyped ! PoisonPill
c4 ! Increment("4", ackProbe.ref)
signalProbe.expectMessage("PostStop")
ackProbe.expectNoMessage(20.millis)
val c5 = spawn(counter(pid, Some(signalProbe.ref)))
signalProbe.expectMessage("RecoveryCompleted-3")
c5 ! Increment("4", ackProbe.ref)
c5 ! Increment("5", ackProbe.ref)
// this PoisonPill will most likely be received in PersistingEvents
c5.toUntyped ! PoisonPill
c5 ! Increment("6", ackProbe.ref)
ackProbe.expectMessage(Ack("4"))
ackProbe.expectMessage(Ack("5"))
signalProbe.expectMessage("PostStop")
ackProbe.expectNoMessage(20.millis)
}
}
}