From 952d76869353a4be96a4fec9f3601734d0a61280 Mon Sep 17 00:00:00 2001 From: qian miao Date: Tue, 2 Feb 2016 18:40:20 +0800 Subject: [PATCH] =per #19551 support stash overflow strategy --- .../src/main/scala/akka/actor/Stash.scala | 6 +- akka-docs/rst/java/persistence.rst | 59 +++++-- akka-docs/rst/scala/persistence.rst | 58 +++++-- .../src/main/resources/reference.conf | 4 + .../scala/akka/persistence/Eventsourced.scala | 33 +++- .../scala/akka/persistence/Persistence.scala | 30 +++- .../akka/persistence/PersistentActor.scala | 56 ++++++- .../PersistentActorBoundedStashingSpec.scala | 156 ++++++++++++++++++ 8 files changed, 352 insertions(+), 50 deletions(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 1abbb9f6fb..2f9f0c8d66 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -8,6 +8,8 @@ import scala.collection.immutable import akka.AkkaException import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueueSemantics, Mailboxes } +import scala.util.control.NoStackTrace + /** * The `Stash` trait enables an actor to temporarily stash away messages that can not or * should not be handled using the actor's current behavior. @@ -156,7 +158,7 @@ private[akka] trait StashSupport { if (theStash.nonEmpty && (currMsg eq theStash.last)) throw new IllegalStateException("Can't stash the same message " + currMsg + " more than once") if (capacity <= 0 || theStash.size < capacity) theStash :+= currMsg - else throw new StashOverflowException("Couldn't enqueue message " + currMsg + " to stash of " + self) + else throw new StashOverflowException("Couldn't enqueue message " + currMsg.getClass.getName + " to stash of " + self) } /** @@ -246,4 +248,4 @@ private[akka] trait StashSupport { /** * Is thrown when the size of the Stash exceeds the capacity of the Stash */ -class StashOverflowException(message: String, cause: Throwable = null) extends AkkaException(message, cause) +class StashOverflowException(message: String, cause: Throwable = null) extends AkkaException(message, cause) with NoStackTrace \ No newline at end of file diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index eeda6b173e..5fb8848b37 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -118,20 +118,8 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` -is completed. You should be careful to not send more messages to a persistent actor than it can keep up with, -otherwise the number of stashed messages will grow. It can be wise to protect against `OutOfMemoryError` -by defining a maximum stash capacity in the mailbox configuration:: - - akka.actor.default-mailbox.stash-capacity=10000 - -If the stash capacity is exceeded for an actor the stashed messages are discarded and a -``MessageQueueAppendFailedException`` is thrown, causing actor restart if default supervision -strategy is used. - -Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, -you may need to define a small stash capacity to ensure that the total number of stashed messages in the system -don't consume too much memory. +calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` +is completed. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default), and the actor will unconditionally be stopped. If persistence of an event is rejected before it is @@ -204,6 +192,49 @@ and before any other received messages. If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. +.. _internal-stash: + +Internal stash +-------------- + +The persistent actor has a private :ref:`stash ` for internally caching incoming messages during +:ref:`recovery` or the ``persist\persistAll`` method persisting events. However You can use inherited stash or create +one or more stashes if needed. The internal stash doesn't interfere with these stashes apart from user inherited +``unstashAll`` method, which prepends all messages in the inherited stash to the internal stash instead of mailbox. +Hence, If the message in the inherited stash need to be handled after the messages in the internal stash, you should +call inherited ``unstash`` method. + +You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number +of stashed messages will grow. It can be wise to protect against `OutOfMemoryError` by defining a maximum stash +capacity in the mailbox configuration:: + + akka.actor.default-mailbox.stash-capacity=10000 + +Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, +you may need to define a small stash capacity to ensure that the total number of stashed messages in the system +don't consume too much memory. Additionally, The persistent actor defines three strategies to handle failure when the +internal stash capacity is exceeded. The default overflow strategy is the ``ThrowOverflowExceptionStrategy``, which +discards the current received message and throws a ``StashOverflowException``, causing actor restart if default +supervision strategy is used. you can override the ``internalStashOverflowStrategy`` method to return +``DiscardToDeadLetterStrategy`` or ``ReplyToStrategy`` for any "individual" persistent actor, or define the "default" +for all persistent actors by providing FQCN, which must be a subclass of ``StashOverflowStrategyConfigurator``, in the +persistence configuration:: + + akka.persistence.internal-stash-overflow-strategy= + "akka.persistence.ThrowExceptionConfigurator" + +The ``DiscardToDeadLetterStrategy`` strategy also has a pre-packaged companion configurator +``akka.persistence.DiscardConfigurator``. + +You can also query default strategy via the Akka persistence extension singleton:: + + Persistence.get(context().system()).defaultInternalStashOverflowStrategy() + +.. note:: + The bounded mailbox should be avoid in the persistent actor, because it may be discarding the messages come from + Storage backends. You can use bounded stash instead of bounded mailbox. + + .. _persist-async-java: Relaxed local consistency requirements and high throughput use-cases diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 3551a34bc8..ac1e452900 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -102,20 +102,8 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` -is completed. You should be careful to not send more messages to a persistent actor than it can keep up with, -otherwise the number of stashed messages will grow. It can be wise to protect against `OutOfMemoryError` -by defining a maximum stash capacity in the mailbox configuration:: - - akka.actor.default-mailbox.stash-capacity=10000 - -If the stash capacity is exceeded for an actor the stashed messages are discarded and a -``MessageQueueAppendFailedException`` is thrown, causing actor restart if default supervision -strategy is used. - -Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, -you may need to define a small stash capacity to ensure that the total number of stashed messages in the system -don't consume too much memory. +calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` +is completed. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default), and the actor will unconditionally be stopped. If persistence of an event is rejected before it is @@ -187,6 +175,48 @@ and before any other received messages. If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. +.. _internal-stash: + +Internal stash +-------------- + +The persistent actor has a private :ref:`stash ` for internally caching incoming messages during +:ref:`recovery` or the ``persist\persistAll`` method persisting events. However You can use inherited stash or create +one or more stashes if needed. The internal stash doesn't interfere with these stashes apart from user inherited +``unstashAll`` method, which prepends all messages in the inherited stash to the internal stash instead of mailbox. +Hence, If the message in the inherited stash need to be handled after the messages in the internal stash, you should +call inherited ``unstash`` method. + +You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number +of stashed messages will grow. It can be wise to protect against `OutOfMemoryError` by defining a maximum stash +capacity in the mailbox configuration:: + + akka.actor.default-mailbox.stash-capacity=10000 + +Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, +you may need to define a small stash capacity to ensure that the total number of stashed messages in the system +don't consume too much memory. Additionally, The persistent actor defines three strategies to handle failure when the +internal stash capacity is exceeded. The default overflow strategy is the ``ThrowOverflowExceptionStrategy``, which +discards the current received message and throws a ``StashOverflowException``, causing actor restart if default +supervision strategy is used. you can override the ``internalStashOverflowStrategy`` method to return +``DiscardToDeadLetterStrategy`` or ``ReplyToStrategy`` for any "individual" persistent actor, or define the "default" +for all persistent actors by providing FQCN, which must be a subclass of ``StashOverflowStrategyConfigurator``, in the +persistence configuration:: + + akka.persistence.internal-stash-overflow-strategy= + "akka.persistence.ThrowExceptionConfigurator" + +The ``DiscardToDeadLetterStrategy`` strategy also has a pre-packaged companion configurator +``akka.persistence.DiscardConfigurator``. + +You can also query default strategy via the Akka persistence extension singleton:: + + Persistence(context.system).defaultInternalStashOverflowStrategy + +.. note:: + The bounded mailbox should be avoid in the persistent actor, because it may be discarding the messages come from + Storage backends. You can use bounded stash instead of bounded mailbox. + .. _persist-async-scala: Relaxed local consistency requirements and high throughput use-cases diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 07b674e475..b89be9803b 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -10,6 +10,10 @@ # Default persistence extension settings. akka.persistence { + # Fully qualified class name providing a default internal stash overflow strategy. + # It needs to be a subclass of akka.persistence.StashOverflowStrategyConfigurator. + # The default strategy throws StashOverflowException. + internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator" journal { # Absolute path to the journal plugin configuration entry used by # persistent actor or view by default. diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 1e165c4ddb..3a9f66863e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -5,14 +5,14 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger +import java.util.UUID + import scala.collection.immutable import scala.util.control.NonFatal -import akka.actor.Stash -import akka.actor.StashFactory +import akka.actor.DeadLetter +import akka.actor.StashOverflowException import akka.event.Logging import akka.event.LoggingAdapter -import akka.actor.ActorRef -import java.util.UUID /** * INTERNAL API @@ -37,7 +37,7 @@ private[persistence] object Eventsourced { * Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and * [[UntypedPersistentActor]]. */ -private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity with PersistenceRecovery { +private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery { import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult import Eventsourced._ @@ -148,6 +148,20 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas event.getClass.getName, seqNr, persistenceId, cause.getMessage) } + private def stashInternally(currMsg: Any): Unit = + try internalStash.stash() catch { + case e: StashOverflowException ⇒ + internalStashOverflowStrategy match { + case DiscardToDeadLetterStrategy ⇒ + val snd = sender() + context.system.deadLetters.tell(DeadLetter(currMsg, snd, self), snd) + case ReplyToStrategy(response) ⇒ + sender() ! response + case ThrowOverflowExceptionStrategy ⇒ + throw e + } + } + private def startRecovery(recovery: Recovery): Unit = { changeState(recoveryStarted(recovery.replayMax)) loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) @@ -459,7 +473,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } changeState(recovering(recoveryBehavior)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) - case other ⇒ internalStash.stash() + case other ⇒ + stashInternally(other) } } @@ -474,7 +489,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * All incoming messages are stashed. */ private def recovering(recoveryBehavior: Receive) = new State { - override def toString: String = s"replay started" + override def toString: String = "replay started" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { @@ -496,7 +511,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case ReplayMessagesFailure(cause) ⇒ try onRecoveryFailure(cause, event = None) finally context.stop(self) case other ⇒ - internalStash.stash() + stashInternally(other) } } @@ -626,7 +641,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def stateReceive(receive: Receive, message: Any) = if (common.isDefinedAt(message)) common(message) - else internalStash.stash() + else stashInternally(message) override def onWriteMessageComplete(err: Boolean): Unit = { pendingInvocations.pop() match { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index b88c5ef320..8c5bc16b3f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -9,6 +9,7 @@ import java.util.function.Consumer import akka.actor._ import akka.event.{ Logging, LoggingAdapter } import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } +import akka.util.Collections.EmptyImmutableSeq import akka.util.Helpers.ConfigOps import com.typesafe.config.Config import scala.annotation.tailrec @@ -108,6 +109,15 @@ trait PersistenceRecovery { //#persistence-recovery } +trait PersistenceStash extends Stash with StashFactory { + /** + * The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash + * when the internal Stash capacity exceeded. + */ + def internalStashOverflowStrategy: StashOverflowStrategy = + Persistence(context.system).defaultInternalStashOverflowStrategy +} + /** * Persistence extension provider. */ @@ -129,7 +139,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def log: LoggingAdapter = Logging(system, getClass.getName) - private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher" private val NoSnapshotStorePluginId = "akka.persistence.no-snapshot-store" private val config = system.settings.config.getConfig("akka.persistence") @@ -153,16 +162,19 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } else configPath } + // Lazy, so user is not forced to configure defaults when she is not using them. + lazy val defaultInternalStashOverflowStrategy: StashOverflowStrategy = + system.dynamicAccess.createInstanceFor[StashOverflowStrategyConfigurator](config.getString( + "internal-stash-overflow-strategy"), EmptyImmutableSeq) + .map(_.create(system.settings.config)).get + val settings = new PersistenceSettings(config) /** Check for default or missing identity. */ private def isEmpty(text: String) = text == null || text.length == 0 /** Discovered persistence journal and snapshot store plugins. */ - private val journalPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) - - /** Discovered persistence snapshot store plugins. */ - private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback" private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback" @@ -195,7 +207,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * Looks up [[akka.persistence.journal.EventAdapters]] by journal plugin's ActorRef. */ private[akka] final def adaptersFor(journalPluginActor: ActorRef): EventAdapters = { - journalPluginExtensionId.get().values collectFirst { + pluginExtensionId.get().values collectFirst { case ext if ext(system).actor == journalPluginActor ⇒ ext(system).adapters } match { case Some(adapters) ⇒ adapters @@ -219,7 +231,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * Looks up the plugin config by plugin's ActorRef. */ private[akka] final def configFor(journalPluginActor: ActorRef): Config = - journalPluginExtensionId.get().values.collectFirst { + pluginExtensionId.get().values.collectFirst { case ext if ext(system).actor == journalPluginActor ⇒ ext(system).config } match { case Some(conf) ⇒ conf @@ -252,13 +264,13 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } @tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = { - val extensionIdMap = journalPluginExtensionId.get + val extensionIdMap = pluginExtensionId.get extensionIdMap.get(configPath) match { case Some(extensionId) ⇒ extensionId(system) case None ⇒ val extensionId = new PluginHolderExtensionId(configPath, fallbackPath) - journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + pluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) pluginHolderFor(configPath, fallbackPath) // Recursive invocation. } } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index 337fe0c40a..b8bceb7ac1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -4,10 +4,10 @@ package akka.persistence import java.lang.{ Iterable ⇒ JIterable } -import akka.actor.UntypedActor +import akka.actor._ import akka.japi.Procedure -import akka.actor.AbstractActor import akka.japi.Util +import com.typesafe.config.Config abstract class RecoveryCompleted /** @@ -98,6 +98,58 @@ object Recovery { val none: Recovery = Recovery(toSequenceNr = 0L) } +/** + * This defines how to handle the current received message which failed to stash, when the size of + * Stash exceeding the capacity of Stash. + */ +sealed trait StashOverflowStrategy + +/** + * Discard the message to [[DeadLetter]]. + */ +case object DiscardToDeadLetterStrategy extends StashOverflowStrategy { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +/** + * Throw [[StashOverflowException]], hence the persistent actor will starting recovery + * if guarded by default supervisor strategy. + * Be carefully if used together with persist/persistAll or has many messages needed + * to replay. + */ +case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} + +/** + * Reply to sender with predefined response, and discard the received message silently. + * @param response the message replying to sender with + */ +final case class ReplyToStrategy(response: Any) extends StashOverflowStrategy + +/** + * Implement this interface in order to configure the stashOverflowStrategy for + * the internal stash of persistent actor. + * An instance of this class must be instantiable using a no-arg constructor. + */ +trait StashOverflowStrategyConfigurator { + def create(config: Config): StashOverflowStrategy +} + +final class ThrowExceptionConfigurator extends StashOverflowStrategyConfigurator { + override def create(config: Config) = ThrowOverflowExceptionStrategy +} + +final class DiscardConfigurator extends StashOverflowStrategyConfigurator { + override def create(config: Config) = DiscardToDeadLetterStrategy +} + /** * An persistent Actor - can be used to implement command or event sourcing. */ diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala new file mode 100644 index 0000000000..18e7f90380 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala @@ -0,0 +1,156 @@ +/** + * Copyright (C) 2009-2016 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor.DeadLetter +import akka.persistence.PersistentActorBoundedStashingSpec._ +import akka.persistence.journal.SteppingInmemJournal +import akka.testkit.TestEvent.Mute +import akka.testkit.EventFilter +import akka.testkit.ImplicitSender +import com.typesafe.config.Config +import org.scalatest.BeforeAndAfterEach + +import scala.concurrent.duration._ + +object PersistentActorBoundedStashingSpec { + final case class Cmd(data: Any) + final case class Evt(data: Any) + + class ReplyToWithRejectConfigurator extends StashOverflowStrategyConfigurator { + override def create(config: Config): StashOverflowStrategy = ReplyToStrategy("RejectToStash") + } + + class StashOverflowStrategyFromConfigPersistentActor(name: String) extends NamedPersistentActor(name) { + var events: List[Any] = Nil + + val updateState: Receive = { + case Evt(data) ⇒ events = data :: events + } + + val commonBehavior: Receive = { + case GetState ⇒ sender() ! events.reverse + } + + def receiveRecover = updateState + + override def receiveCommand: Receive = commonBehavior orElse { + case Cmd(x: Any) ⇒ persist(Evt(x))(updateState) + } + } + + val capacity = 10 + + val templateConfig = + s""" + |akka.actor.default-mailbox.stash-capacity=$capacity + |akka.actor.guardian-supervisor-strategy="akka.actor.StoppingSupervisorStrategy" + |akka.persistence.internal-stash-overflow-strategy = "%s" + |""".stripMargin + + val throwConfig = String.format(templateConfig, "akka.persistence.ThrowExceptionConfigurator") + val discardConfig = String.format(templateConfig, "akka.persistence.DiscardConfigurator") + val replyToConfig = String.format(templateConfig, "akka.persistence.PersistentActorBoundedStashingSpec$ReplyToWithRejectConfigurator") + +} + +class SteppingInMemPersistentActorBoundedStashingSpec(strategyConfig: String) + extends PersistenceSpec(SteppingInmemJournal.config("persistence-bounded-stash").withFallback(PersistenceSpec + .config("stepping-inmem", "SteppingInMemPersistentActorBoundedStashingSpec", extraConfig = Some(strategyConfig)))) + with BeforeAndAfterEach + with ImplicitSender { + + override def atStartup: Unit = { + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*Cmd.*"))) + } + + override def beforeEach(): Unit = + system.eventStream.subscribe(testActor, classOf[DeadLetter]) + + override def afterEach(): Unit = + system.eventStream.unsubscribe(testActor, classOf[DeadLetter]) + +} + +class ThrowExceptionStrategyPersistentActorBoundedStashingSpec + extends SteppingInMemPersistentActorBoundedStashingSpec(PersistentActorBoundedStashingSpec.throwConfig) { + "Stashing with ThrowOverflowExceptionStrategy in a persistence actor " should { + "throws stash overflow exception" in { + val persistentActor = namedPersistentActor[StashOverflowStrategyFromConfigPersistentActor] + awaitAssert(SteppingInmemJournal.getRef("persistence-bounded-stash"), 3.seconds) + val journal = SteppingInmemJournal.getRef("persistence-bounded-stash") + + // initial read highest + SteppingInmemJournal.step(journal) + + //barrier for stash + persistentActor ! Cmd("a") + + //internal stash overflow + 1 to (2 * capacity) foreach (persistentActor ! Cmd(_)) + //after PA stopped, all stashed messages forward to deadletters + 1 to capacity foreach (i ⇒ expectMsg(DeadLetter(Cmd(i), testActor, persistentActor))) + //non-stashed messages + (capacity + 2) to (2 * capacity) foreach (i ⇒ expectMsg(DeadLetter(Cmd(i), testActor, persistentActor))) + + } + } +} + +class DiscardStrategyPersistentActorBoundedStashingSpec + extends SteppingInMemPersistentActorBoundedStashingSpec(PersistentActorBoundedStashingSpec.discardConfig) { + "Stashing with DiscardToDeadLetterStrategy in a persistence actor " should { + "discard to deadletter" in { + val persistentActor = namedPersistentActor[StashOverflowStrategyFromConfigPersistentActor] + awaitAssert(SteppingInmemJournal.getRef("persistence-bounded-stash"), 3.seconds) + val journal = SteppingInmemJournal.getRef("persistence-bounded-stash") + + //initial read highest + SteppingInmemJournal.step(journal) + + //barrier for stash + persistentActor ! Cmd("a") + + //internal stash overflow after 10 + 1 to (2 * capacity) foreach (persistentActor ! Cmd(_)) + //so, 11 to 20 discard to deadletter + (1 + capacity) to (2 * capacity) foreach (i ⇒ expectMsg(DeadLetter(Cmd(i), testActor, persistentActor))) + //allow "a" and 1 to 10 write complete + 1 to (1 + capacity) foreach (i ⇒ SteppingInmemJournal.step(journal)) + + persistentActor ! GetState + + expectMsg("a" :: (1 to capacity).toList ::: Nil) + } + } +} + +class ReplyToStrategyPersistentActorBoundedStashingSpec + extends SteppingInMemPersistentActorBoundedStashingSpec(PersistentActorBoundedStashingSpec.replyToConfig) { + "Stashing with DiscardToDeadLetterStrategy in a persistence actor" should { + "reply to request with custom message" in { + val persistentActor = namedPersistentActor[StashOverflowStrategyFromConfigPersistentActor] + awaitAssert(SteppingInmemJournal.getRef("persistence-bounded-stash"), 3.seconds) + val journal = SteppingInmemJournal.getRef("persistence-bounded-stash") + + //initial read highest + SteppingInmemJournal.step(journal) + + //barrier for stash + persistentActor ! Cmd("a") + + //internal stash overflow after 10 + 1 to (2 * capacity) foreach (persistentActor ! Cmd(_)) + //so, 11 to 20 reply to with "Reject" String + (1 + capacity) to (2 * capacity) foreach (i ⇒ expectMsg("RejectToStash")) + //allow "a" and 1 to 10 write complete + 1 to (1 + capacity) foreach (i ⇒ SteppingInmemJournal.step(journal)) + + persistentActor ! GetState + + expectMsg("a" :: (1 to capacity).toList ::: Nil) + } + } +} \ No newline at end of file