=per #19551 support stash overflow strategy

This commit is contained in:
qian miao 2016-02-02 18:40:20 +08:00
parent 396f4370e9
commit 952d768693
8 changed files with 352 additions and 50 deletions

View file

@ -10,6 +10,10 @@
# Default persistence extension settings.
akka.persistence {
# Fully qualified class name providing a default internal stash overflow strategy.
# It needs to be a subclass of akka.persistence.StashOverflowStrategyConfigurator.
# The default strategy throws StashOverflowException.
internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator"
journal {
# Absolute path to the journal plugin configuration entry used by
# persistent actor or view by default.

View file

@ -5,14 +5,14 @@
package akka.persistence
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.actor.Stash
import akka.actor.StashFactory
import akka.actor.DeadLetter
import akka.actor.StashOverflowException
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.actor.ActorRef
import java.util.UUID
/**
* INTERNAL API
@ -37,7 +37,7 @@ private[persistence] object Eventsourced {
* Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and
* [[UntypedPersistentActor]].
*/
private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity with PersistenceRecovery {
private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
import Eventsourced._
@ -148,6 +148,20 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
event.getClass.getName, seqNr, persistenceId, cause.getMessage)
}
private def stashInternally(currMsg: Any): Unit =
try internalStash.stash() catch {
case e: StashOverflowException
internalStashOverflowStrategy match {
case DiscardToDeadLetterStrategy
val snd = sender()
context.system.deadLetters.tell(DeadLetter(currMsg, snd, self), snd)
case ReplyToStrategy(response)
sender() ! response
case ThrowOverflowExceptionStrategy
throw e
}
}
private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
@ -459,7 +473,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
}
changeState(recovering(recoveryBehavior))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case other internalStash.stash()
case other
stashInternally(other)
}
}
@ -474,7 +489,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* All incoming messages are stashed.
*/
private def recovering(recoveryBehavior: Receive) = new State {
override def toString: String = s"replay started"
override def toString: String = "replay started"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
@ -496,7 +511,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case ReplayMessagesFailure(cause)
try onRecoveryFailure(cause, event = None) finally context.stop(self)
case other
internalStash.stash()
stashInternally(other)
}
}
@ -626,7 +641,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def stateReceive(receive: Receive, message: Any) =
if (common.isDefinedAt(message)) common(message)
else internalStash.stash()
else stashInternally(message)
override def onWriteMessageComplete(err: Boolean): Unit = {
pendingInvocations.pop() match {

View file

@ -9,6 +9,7 @@ import java.util.function.Consumer
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters }
import akka.util.Collections.EmptyImmutableSeq
import akka.util.Helpers.ConfigOps
import com.typesafe.config.Config
import scala.annotation.tailrec
@ -108,6 +109,15 @@ trait PersistenceRecovery {
//#persistence-recovery
}
trait PersistenceStash extends Stash with StashFactory {
/**
* The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash
* when the internal Stash capacity exceeded.
*/
def internalStashOverflowStrategy: StashOverflowStrategy =
Persistence(context.system).defaultInternalStashOverflowStrategy
}
/**
* Persistence extension provider.
*/
@ -129,7 +139,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private def log: LoggingAdapter = Logging(system, getClass.getName)
private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher"
private val NoSnapshotStorePluginId = "akka.persistence.no-snapshot-store"
private val config = system.settings.config.getConfig("akka.persistence")
@ -153,16 +162,19 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
} else configPath
}
// Lazy, so user is not forced to configure defaults when she is not using them.
lazy val defaultInternalStashOverflowStrategy: StashOverflowStrategy =
system.dynamicAccess.createInstanceFor[StashOverflowStrategyConfigurator](config.getString(
"internal-stash-overflow-strategy"), EmptyImmutableSeq)
.map(_.create(system.settings.config)).get
val settings = new PersistenceSettings(config)
/** Check for default or missing identity. */
private def isEmpty(text: String) = text == null || text.length == 0
/** Discovered persistence journal and snapshot store plugins. */
private val journalPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/** Discovered persistence snapshot store plugins. */
private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
private val pluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
@ -195,7 +207,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* Looks up [[akka.persistence.journal.EventAdapters]] by journal plugin's ActorRef.
*/
private[akka] final def adaptersFor(journalPluginActor: ActorRef): EventAdapters = {
journalPluginExtensionId.get().values collectFirst {
pluginExtensionId.get().values collectFirst {
case ext if ext(system).actor == journalPluginActor ext(system).adapters
} match {
case Some(adapters) adapters
@ -219,7 +231,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* Looks up the plugin config by plugin's ActorRef.
*/
private[akka] final def configFor(journalPluginActor: ActorRef): Config =
journalPluginExtensionId.get().values.collectFirst {
pluginExtensionId.get().values.collectFirst {
case ext if ext(system).actor == journalPluginActor ext(system).config
} match {
case Some(conf) conf
@ -252,13 +264,13 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
}
@tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = {
val extensionIdMap = journalPluginExtensionId.get
val extensionIdMap = pluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system)
case None
val extensionId = new PluginHolderExtensionId(configPath, fallbackPath)
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
pluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
pluginHolderFor(configPath, fallbackPath) // Recursive invocation.
}
}

View file

@ -4,10 +4,10 @@
package akka.persistence
import java.lang.{ Iterable JIterable }
import akka.actor.UntypedActor
import akka.actor._
import akka.japi.Procedure
import akka.actor.AbstractActor
import akka.japi.Util
import com.typesafe.config.Config
abstract class RecoveryCompleted
/**
@ -98,6 +98,58 @@ object Recovery {
val none: Recovery = Recovery(toSequenceNr = 0L)
}
/**
* This defines how to handle the current received message which failed to stash, when the size of
* Stash exceeding the capacity of Stash.
*/
sealed trait StashOverflowStrategy
/**
* Discard the message to [[DeadLetter]].
*/
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Throw [[StashOverflowException]], hence the persistent actor will starting recovery
* if guarded by default supervisor strategy.
* Be carefully if used together with persist/persistAll or has many messages needed
* to replay.
*/
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Reply to sender with predefined response, and discard the received message silently.
* @param response the message replying to sender with
*/
final case class ReplyToStrategy(response: Any) extends StashOverflowStrategy
/**
* Implement this interface in order to configure the stashOverflowStrategy for
* the internal stash of persistent actor.
* An instance of this class must be instantiable using a no-arg constructor.
*/
trait StashOverflowStrategyConfigurator {
def create(config: Config): StashOverflowStrategy
}
final class ThrowExceptionConfigurator extends StashOverflowStrategyConfigurator {
override def create(config: Config) = ThrowOverflowExceptionStrategy
}
final class DiscardConfigurator extends StashOverflowStrategyConfigurator {
override def create(config: Config) = DiscardToDeadLetterStrategy
}
/**
* An persistent Actor - can be used to implement command or event sourcing.
*/

View file

@ -0,0 +1,156 @@
/**
* Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor.DeadLetter
import akka.persistence.PersistentActorBoundedStashingSpec._
import akka.persistence.journal.SteppingInmemJournal
import akka.testkit.TestEvent.Mute
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import com.typesafe.config.Config
import org.scalatest.BeforeAndAfterEach
import scala.concurrent.duration._
object PersistentActorBoundedStashingSpec {
final case class Cmd(data: Any)
final case class Evt(data: Any)
class ReplyToWithRejectConfigurator extends StashOverflowStrategyConfigurator {
override def create(config: Config): StashOverflowStrategy = ReplyToStrategy("RejectToStash")
}
class StashOverflowStrategyFromConfigPersistentActor(name: String) extends NamedPersistentActor(name) {
var events: List[Any] = Nil
val updateState: Receive = {
case Evt(data) events = data :: events
}
val commonBehavior: Receive = {
case GetState sender() ! events.reverse
}
def receiveRecover = updateState
override def receiveCommand: Receive = commonBehavior orElse {
case Cmd(x: Any) persist(Evt(x))(updateState)
}
}
val capacity = 10
val templateConfig =
s"""
|akka.actor.default-mailbox.stash-capacity=$capacity
|akka.actor.guardian-supervisor-strategy="akka.actor.StoppingSupervisorStrategy"
|akka.persistence.internal-stash-overflow-strategy = "%s"
|""".stripMargin
val throwConfig = String.format(templateConfig, "akka.persistence.ThrowExceptionConfigurator")
val discardConfig = String.format(templateConfig, "akka.persistence.DiscardConfigurator")
val replyToConfig = String.format(templateConfig, "akka.persistence.PersistentActorBoundedStashingSpec$ReplyToWithRejectConfigurator")
}
class SteppingInMemPersistentActorBoundedStashingSpec(strategyConfig: String)
extends PersistenceSpec(SteppingInmemJournal.config("persistence-bounded-stash").withFallback(PersistenceSpec
.config("stepping-inmem", "SteppingInMemPersistentActorBoundedStashingSpec", extraConfig = Some(strategyConfig))))
with BeforeAndAfterEach
with ImplicitSender {
override def atStartup: Unit = {
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*Cmd.*")))
}
override def beforeEach(): Unit =
system.eventStream.subscribe(testActor, classOf[DeadLetter])
override def afterEach(): Unit =
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
}
class ThrowExceptionStrategyPersistentActorBoundedStashingSpec
extends SteppingInMemPersistentActorBoundedStashingSpec(PersistentActorBoundedStashingSpec.throwConfig) {
"Stashing with ThrowOverflowExceptionStrategy in a persistence actor " should {
"throws stash overflow exception" in {
val persistentActor = namedPersistentActor[StashOverflowStrategyFromConfigPersistentActor]
awaitAssert(SteppingInmemJournal.getRef("persistence-bounded-stash"), 3.seconds)
val journal = SteppingInmemJournal.getRef("persistence-bounded-stash")
// initial read highest
SteppingInmemJournal.step(journal)
//barrier for stash
persistentActor ! Cmd("a")
//internal stash overflow
1 to (2 * capacity) foreach (persistentActor ! Cmd(_))
//after PA stopped, all stashed messages forward to deadletters
1 to capacity foreach (i expectMsg(DeadLetter(Cmd(i), testActor, persistentActor)))
//non-stashed messages
(capacity + 2) to (2 * capacity) foreach (i expectMsg(DeadLetter(Cmd(i), testActor, persistentActor)))
}
}
}
class DiscardStrategyPersistentActorBoundedStashingSpec
extends SteppingInMemPersistentActorBoundedStashingSpec(PersistentActorBoundedStashingSpec.discardConfig) {
"Stashing with DiscardToDeadLetterStrategy in a persistence actor " should {
"discard to deadletter" in {
val persistentActor = namedPersistentActor[StashOverflowStrategyFromConfigPersistentActor]
awaitAssert(SteppingInmemJournal.getRef("persistence-bounded-stash"), 3.seconds)
val journal = SteppingInmemJournal.getRef("persistence-bounded-stash")
//initial read highest
SteppingInmemJournal.step(journal)
//barrier for stash
persistentActor ! Cmd("a")
//internal stash overflow after 10
1 to (2 * capacity) foreach (persistentActor ! Cmd(_))
//so, 11 to 20 discard to deadletter
(1 + capacity) to (2 * capacity) foreach (i expectMsg(DeadLetter(Cmd(i), testActor, persistentActor)))
//allow "a" and 1 to 10 write complete
1 to (1 + capacity) foreach (i SteppingInmemJournal.step(journal))
persistentActor ! GetState
expectMsg("a" :: (1 to capacity).toList ::: Nil)
}
}
}
class ReplyToStrategyPersistentActorBoundedStashingSpec
extends SteppingInMemPersistentActorBoundedStashingSpec(PersistentActorBoundedStashingSpec.replyToConfig) {
"Stashing with DiscardToDeadLetterStrategy in a persistence actor" should {
"reply to request with custom message" in {
val persistentActor = namedPersistentActor[StashOverflowStrategyFromConfigPersistentActor]
awaitAssert(SteppingInmemJournal.getRef("persistence-bounded-stash"), 3.seconds)
val journal = SteppingInmemJournal.getRef("persistence-bounded-stash")
//initial read highest
SteppingInmemJournal.step(journal)
//barrier for stash
persistentActor ! Cmd("a")
//internal stash overflow after 10
1 to (2 * capacity) foreach (persistentActor ! Cmd(_))
//so, 11 to 20 reply to with "Reject" String
(1 + capacity) to (2 * capacity) foreach (i expectMsg("RejectToStash"))
//allow "a" and 1 to 10 write complete
1 to (1 + capacity) foreach (i SteppingInmemJournal.step(journal))
persistentActor ! GetState
expectMsg("a" :: (1 to capacity).toList ::: Nil)
}
}
}