=per #19828 pop the internal stash when the writing was finished in the processing commands state

This commit is contained in:
qian miao 2016-03-18 12:11:43 +08:00
parent a2c46999a8
commit aa8742738c
4 changed files with 123 additions and 103 deletions

View file

@ -4,52 +4,3 @@
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
# //#sharding-ext-config
# Settings for the ClusterShardingExtension
akka.contrib.cluster.sharding {
# The extension creates a top level actor with this name in top level user scope,
# e.g. '/user/sharding'
guardian-name = sharding
# If the coordinator can't store state changes it will be stopped
# and started again after this duration.
coordinator-failure-backoff = 10 s
# Start the coordinator singleton manager on members tagged with this role.
# All members are used if undefined or empty.
# ShardRegion actor is started in proxy only mode on nodes that are not tagged
# with this role.
role = ""
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
retry-interval = 2 s
# Maximum number of messages that are buffered by a ShardRegion actor.
buffer-size = 100000
# Timeout of the shard rebalancing process.
handoff-timeout = 60 s
# Time given to a region to acknowdge it's hosting a shard.
shard-start-timeout = 10 s
# If the shard can't store state changes it will retry the action
# again after this duration. Any messages sent to an affected entry
# will be buffered until the state change is processed
shard-failure-backoff = 10 s
# If the shard is remembering entries and an entry stops itself without
# using passivate. The entry will be restarted after this duration or when
# the next message for it is received, which ever occurs first.
entry-restart-backoff = 10 s
# Rebalance check is performed periodically with this interval.
rebalance-interval = 10 s
# How often the coordinator saves persistent snapshots, which are
# used to reduce recovery times
snapshot-interval = 3600 s
# Setting for the default shard allocation strategy
least-shard-allocation-strategy {
# Threshold of how large the difference between most and least number of
# allocated shards must be to begin the rebalancing.
rebalance-threshold = 10
# The number of ongoing rebalancing processes is limited to this number.
max-simultaneous-rebalance = 3
}
}
# //#sharding-ext-config

View file

@ -163,6 +163,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
}
}
private def unstashInternally(all: Boolean): Unit =
if (all) internalStash.unstashAll() else internalStash.unstash()
private def startRecovery(recovery: Recovery): Unit = {
changeState(recoveryStarted(recovery.replayMax))
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
@ -538,6 +541,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* Common receive handler for processingCommands and persistingEvents
*/
private abstract class ProcessingState extends State {
override def recoveryRunning: Boolean = false
val common: Receive = {
case WriteMessageSuccess(p, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
@ -582,8 +587,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
() // it will be stopped by the first WriteMessageFailure message
}
def onWriteMessageComplete(err: Boolean): Unit =
pendingInvocations.pop()
def onWriteMessageComplete(err: Boolean): Unit
}
/**
@ -592,7 +596,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
*/
private val processingCommands: State = new ProcessingState {
override def toString: String = "processing commands"
override def recoveryRunning: Boolean = false
override def stateReceive(receive: Receive, message: Any) =
if (common.isDefinedAt(message)) common(message)
@ -604,12 +607,13 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
private def aroundReceiveComplete(err: Boolean): Unit = {
if (eventBatch.nonEmpty) flushBatch()
if (pendingStashingPersistInvocations > 0)
changeState(persistingEvents)
else if (err)
internalStash.unstashAll()
else
internalStash.unstash()
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)
else unstashInternally(all = err)
}
override def onWriteMessageComplete(err: Boolean): Unit = {
pendingInvocations.pop()
unstashInternally(all = err)
}
}
@ -620,7 +624,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
*/
private val persistingEvents: State = new ProcessingState {
override def toString: String = "persisting events"
override def recoveryRunning: Boolean = false
override def stateReceive(receive: Receive, message: Any) =
if (common.isDefinedAt(message)) common(message)
@ -638,8 +641,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
if (pendingStashingPersistInvocations == 0) {
changeState(processingCommands)
if (err) internalStash.unstashAll()
else internalStash.unstash()
unstashInternally(all = err)
}
}

View file

@ -10,12 +10,13 @@ import akka.testkit.ImplicitSender
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.reflect.ClassTag
object PersistentActorStashingSpec {
final case class Cmd(data: Any)
final case class Evt(data: Any)
abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) {
abstract class StashExamplePersistentActor(name: String) extends NamedPersistentActor(name) {
var events: List[Any] = Nil
var askedForDelete: Option[ActorRef] = None
@ -28,22 +29,35 @@ object PersistentActorStashingSpec {
case "boom" throw new TestException("boom")
case GetState sender() ! events.reverse
}
def unstashBehavior: Receive
def receiveRecover = updateState
}
class UserStashPersistentActor(name: String) extends ExamplePersistentActor(name) {
class UserStashPersistentActor(name: String) extends StashExamplePersistentActor(name) {
var stashed = false
val receiveCommand: Receive = {
case Cmd("a") if (!stashed) { stash(); stashed = true } else sender() ! "a"
case Cmd("b") persist(Evt("b"))(evt sender() ! evt.data)
case Cmd("c") unstashAll(); sender() ! "c"
val receiveCommand: Receive = unstashBehavior orElse {
case Cmd("a") if !stashed stash(); stashed = true
case Cmd("a") sender() ! "a"
case Cmd("b") persist(Evt("b"))(evt sender() ! evt.data)
}
def unstashBehavior: Receive = {
case Cmd("c") unstashAll(); sender () ! "c"
}
}
class UserStashWithinHandlerPersistentActor(name: String) extends UserStashPersistentActor(name: String) {
override def unstashBehavior: Receive = {
case Cmd("c") persist(Evt("c")) { evt sender() ! evt.data; unstashAll() }
}
}
class UserStashManyPersistentActor(name: String) extends ExamplePersistentActor(name) {
class UserStashManyPersistentActor(name: String) extends StashExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd("a") persist(Evt("a")) { evt
case Cmd("a") persist(Evt("a")) { evt
updateState(evt)
context.become(processC)
}
@ -51,60 +65,87 @@ object PersistentActorStashingSpec {
case Cmd("b-2") persist(Evt("b-2"))(updateState)
}
val processC: Receive = {
val processC: Receive = unstashBehavior orElse {
case other stash()
}
def unstashBehavior: Receive = {
case Cmd("c")
persist(Evt("c")) { evt updateState(evt); context.unbecome() }
unstashAll()
}
}
class UserStashWithinHandlerManyPersistentActor(name: String) extends UserStashManyPersistentActor(name) {
override def unstashBehavior: Receive = {
case Cmd("c") persist(Evt("c")) { evt updateState(evt); context.unbecome(); unstashAll() }
}
}
class UserStashFailurePersistentActor(name: String) extends StashExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
if (data == "b-2") throw new TestException("boom")
persist(Evt(data)) { evt
updateState(evt)
if (data == "a") context.become(otherCommandHandler)
}
}
val otherCommandHandler: Receive = unstashBehavior orElse {
case other stash()
}
def unstashBehavior: Receive = {
case Cmd("c")
persist(Evt("c")) { evt
updateState(evt)
context.unbecome()
}
unstashAll()
case other stash()
}
}
class UserStashFailurePersistentActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
if (data == "b-2") throw new TestException("boom")
persist(Evt(data)) { event
updateState(event)
if (data == "a") context.become(otherCommandHandler)
}
}
val otherCommandHandler: Receive = {
class UserStashWithinHandlerFailureCallbackPersistentActor(name: String) extends UserStashFailurePersistentActor(name) {
override def unstashBehavior: Receive = {
case Cmd("c")
persist(Evt("c")) { event
updateState(event)
persist(Evt("c")) { evt
updateState(evt)
context.unbecome()
unstashAll()
}
unstashAll()
case other stash()
}
}
class AsyncStashingPersistentActor(name: String) extends ExamplePersistentActor(name) {
class AsyncStashingPersistentActor(name: String) extends StashExamplePersistentActor(name) {
var stashed = false
val receiveCommand: Receive = commonBehavior orElse {
case Cmd("a") persistAsync(Evt("a"))(updateState)
case Cmd("b") if !stashed
stash(); stashed = true
case Cmd("b") persistAsync(Evt("b"))(updateState)
val receiveCommand: Receive = commonBehavior orElse unstashBehavior orElse {
case Cmd("a") persistAsync(Evt("a"))(updateState)
case Cmd("b") if !stashed stash(); stashed = true
case Cmd("b") persistAsync(Evt("b"))(updateState)
}
override def unstashBehavior: Receive = {
case Cmd("c") persistAsync(Evt("c"))(updateState); unstashAll()
}
}
class AsyncStashingWithinHandlerPersistentActor(name: String) extends AsyncStashingPersistentActor(name) {
override def unstashBehavior: Receive = {
case Cmd("c") persistAsync(Evt("c")) { evt updateState(evt); unstashAll() }
}
}
}
abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSpec(config)
with ImplicitSender {
import PersistentActorStashingSpec._
"Stashing in a persistent actor" must {
def stash[T <: NamedPersistentActor : ClassTag](): Unit = {
"support user stash operations" in {
val persistentActor = namedPersistentActor[UserStashPersistentActor]
val persistentActor = namedPersistentActor[T]
persistentActor ! Cmd("a")
persistentActor ! Cmd("b")
persistentActor ! Cmd("c")
@ -112,9 +153,11 @@ abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSp
expectMsg("c")
expectMsg("a")
}
}
def stashWithSeveralMessages[T <: NamedPersistentActor : ClassTag](): Unit = {
"support user stash operations with several stashed messages" in {
val persistentActor = namedPersistentActor[UserStashManyPersistentActor]
val persistentActor = namedPersistentActor[T]
val n = 10
val cmds = 1 to n flatMap (_ List(Cmd("a"), Cmd("b-1"), Cmd("b-2"), Cmd("c")))
val evts = 1 to n flatMap (_ List("a", "c", "b-1", "b-2"))
@ -123,9 +166,11 @@ abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSp
persistentActor ! GetState
expectMsg(evts)
}
}
def stashUnderFailures[T <: NamedPersistentActor : ClassTag](): Unit = {
"support user stash operations under failures" in {
val persistentActor = namedPersistentActor[UserStashFailurePersistentActor]
val persistentActor = namedPersistentActor[T]
val bs = 1 to 10 map ("b-" + _)
persistentActor ! Cmd("a")
bs foreach (persistentActor ! Cmd(_))
@ -134,18 +179,29 @@ abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSp
expectMsg(List("a", "c") ++ bs.filter(_ != "b-2"))
}
}
"Stashing in a persistent actor" must {
behave like stash[UserStashPersistentActor]()
behave like stashWithSeveralMessages[UserStashManyPersistentActor]()
behave like stashUnderFailures[UserStashFailurePersistentActor]()
}
"Stashing(unstashAll called in handler) in a persistent actor" must {
behave like stash[UserStashWithinHandlerPersistentActor]()
behave like stashWithSeveralMessages[UserStashWithinHandlerManyPersistentActor]()
behave like stashUnderFailures[UserStashWithinHandlerFailureCallbackPersistentActor]()
}
}
class SteppingInMemPersistentActorStashingSpec extends PersistenceSpec(
SteppingInmemJournal.config("persistence-stash").withFallback(PersistenceSpec.config("stepping-inmem", "SteppingInMemPersistentActorStashingSpec")))
with ImplicitSender {
import PersistentActorStashingSpec._
"Stashing in a persistent actor mixed with persistAsync" should {
def stash[T <: NamedPersistentActor : ClassTag](): Unit = {
"handle async callback not happening until next message has been stashed" in {
val persistentActor = namedPersistentActor[AsyncStashingPersistentActor]
val persistentActor = namedPersistentActor[T]
awaitAssert(SteppingInmemJournal.getRef("persistence-stash"), 3.seconds)
val journal = SteppingInmemJournal.getRef("persistence-stash")
@ -170,7 +226,14 @@ class SteppingInMemPersistentActorStashingSpec extends PersistenceSpec(
}
}
}
}
"Stashing in a persistent actor mixed with persistAsync" must {
behave like stash[AsyncStashingPersistentActor]()
}
"Stashing(unstashAll called in handler) in a persistent actor mixed with persistAsync" must {
behave like stash[AsyncStashingWithinHandlerPersistentActor]()
}
}

View file

@ -731,7 +731,11 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Dispatch.initWithFailure"),
// #19877 Source.queue termination support
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.SourceQueueAdapter.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.SourceQueueAdapter.this"),
// #19828
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"),
ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete")
)
)
}