From aa8742738c0558f40135170d414cc1983307f6dc Mon Sep 17 00:00:00 2001 From: qian miao Date: Fri, 18 Mar 2016 12:11:43 +0800 Subject: [PATCH] =per #19828 pop the internal stash when the writing was finished in the processing commands state --- .../src/main/resources/reference.conf | 49 ------ .../scala/akka/persistence/Eventsourced.scala | 26 ++-- .../PersistentActorStashingSpec.scala | 145 +++++++++++++----- project/MiMa.scala | 6 +- 4 files changed, 123 insertions(+), 103 deletions(-) diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index a32843c2a4..0f53f264ac 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -4,52 +4,3 @@ # This is the reference config file that contains all the default settings. # Make your edits/overrides in your application.conf. - - - -# //#sharding-ext-config -# Settings for the ClusterShardingExtension -akka.contrib.cluster.sharding { - # The extension creates a top level actor with this name in top level user scope, - # e.g. '/user/sharding' - guardian-name = sharding - # If the coordinator can't store state changes it will be stopped - # and started again after this duration. - coordinator-failure-backoff = 10 s - # Start the coordinator singleton manager on members tagged with this role. - # All members are used if undefined or empty. - # ShardRegion actor is started in proxy only mode on nodes that are not tagged - # with this role. - role = "" - # The ShardRegion retries registration and shard location requests to the - # ShardCoordinator with this interval if it does not reply. - retry-interval = 2 s - # Maximum number of messages that are buffered by a ShardRegion actor. - buffer-size = 100000 - # Timeout of the shard rebalancing process. - handoff-timeout = 60 s - # Time given to a region to acknowdge it's hosting a shard. - shard-start-timeout = 10 s - # If the shard can't store state changes it will retry the action - # again after this duration. Any messages sent to an affected entry - # will be buffered until the state change is processed - shard-failure-backoff = 10 s - # If the shard is remembering entries and an entry stops itself without - # using passivate. The entry will be restarted after this duration or when - # the next message for it is received, which ever occurs first. - entry-restart-backoff = 10 s - # Rebalance check is performed periodically with this interval. - rebalance-interval = 10 s - # How often the coordinator saves persistent snapshots, which are - # used to reduce recovery times - snapshot-interval = 3600 s - # Setting for the default shard allocation strategy - least-shard-allocation-strategy { - # Threshold of how large the difference between most and least number of - # allocated shards must be to begin the rebalancing. - rebalance-threshold = 10 - # The number of ongoing rebalancing processes is limited to this number. - max-simultaneous-rebalance = 3 - } -} -# //#sharding-ext-config diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 84c185000d..1ef1ed4b54 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -163,6 +163,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } } + private def unstashInternally(all: Boolean): Unit = + if (all) internalStash.unstashAll() else internalStash.unstash() + private def startRecovery(recovery: Recovery): Unit = { changeState(recoveryStarted(recovery.replayMax)) loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr) @@ -538,6 +541,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * Common receive handler for processingCommands and persistingEvents */ private abstract class ProcessingState extends State { + override def recoveryRunning: Boolean = false + val common: Receive = { case WriteMessageSuccess(p, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart @@ -582,8 +587,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas () // it will be stopped by the first WriteMessageFailure message } - def onWriteMessageComplete(err: Boolean): Unit = - pendingInvocations.pop() + def onWriteMessageComplete(err: Boolean): Unit } /** @@ -592,7 +596,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas */ private val processingCommands: State = new ProcessingState { override def toString: String = "processing commands" - override def recoveryRunning: Boolean = false override def stateReceive(receive: Receive, message: Any) = if (common.isDefinedAt(message)) common(message) @@ -604,12 +607,13 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas private def aroundReceiveComplete(err: Boolean): Unit = { if (eventBatch.nonEmpty) flushBatch() - if (pendingStashingPersistInvocations > 0) - changeState(persistingEvents) - else if (err) - internalStash.unstashAll() - else - internalStash.unstash() + if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) + else unstashInternally(all = err) + } + + override def onWriteMessageComplete(err: Boolean): Unit = { + pendingInvocations.pop() + unstashInternally(all = err) } } @@ -620,7 +624,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas */ private val persistingEvents: State = new ProcessingState { override def toString: String = "persisting events" - override def recoveryRunning: Boolean = false override def stateReceive(receive: Receive, message: Any) = if (common.isDefinedAt(message)) common(message) @@ -638,8 +641,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas if (pendingStashingPersistInvocations == 0) { changeState(processingCommands) - if (err) internalStash.unstashAll() - else internalStash.unstash() + unstashInternally(all = err) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala index 7bdd8ef266..464bb01536 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala @@ -10,12 +10,13 @@ import akka.testkit.ImplicitSender import com.typesafe.config.Config import scala.concurrent.duration._ +import scala.reflect.ClassTag object PersistentActorStashingSpec { final case class Cmd(data: Any) final case class Evt(data: Any) - abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) { + abstract class StashExamplePersistentActor(name: String) extends NamedPersistentActor(name) { var events: List[Any] = Nil var askedForDelete: Option[ActorRef] = None @@ -28,22 +29,35 @@ object PersistentActorStashingSpec { case "boom" ⇒ throw new TestException("boom") case GetState ⇒ sender() ! events.reverse } + + def unstashBehavior: Receive def receiveRecover = updateState } - class UserStashPersistentActor(name: String) extends ExamplePersistentActor(name) { + class UserStashPersistentActor(name: String) extends StashExamplePersistentActor(name) { var stashed = false - val receiveCommand: Receive = { - case Cmd("a") ⇒ if (!stashed) { stash(); stashed = true } else sender() ! "a" - case Cmd("b") ⇒ persist(Evt("b"))(evt ⇒ sender() ! evt.data) - case Cmd("c") ⇒ unstashAll(); sender() ! "c" + + val receiveCommand: Receive = unstashBehavior orElse { + case Cmd("a") if !stashed ⇒ stash(); stashed = true + case Cmd("a") ⇒ sender() ! "a" + case Cmd("b") ⇒ persist(Evt("b"))(evt ⇒ sender() ! evt.data) + } + + def unstashBehavior: Receive = { + case Cmd("c") ⇒ unstashAll(); sender () ! "c" + } + } + + class UserStashWithinHandlerPersistentActor(name: String) extends UserStashPersistentActor(name: String) { + override def unstashBehavior: Receive = { + case Cmd("c") ⇒ persist(Evt("c")) { evt ⇒ sender() ! evt.data; unstashAll() } } } - class UserStashManyPersistentActor(name: String) extends ExamplePersistentActor(name) { + class UserStashManyPersistentActor(name: String) extends StashExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { - case Cmd("a") ⇒ persist(Evt("a")) { evt ⇒ + case Cmd("a") ⇒ persist(Evt("a")) { evt ⇒ updateState(evt) context.become(processC) } @@ -51,60 +65,87 @@ object PersistentActorStashingSpec { case Cmd("b-2") ⇒ persist(Evt("b-2"))(updateState) } - val processC: Receive = { + val processC: Receive = unstashBehavior orElse { + case other ⇒ stash() + } + + def unstashBehavior: Receive = { + case Cmd("c") ⇒ + persist(Evt("c")) { evt ⇒ updateState(evt); context.unbecome() } + unstashAll() + } + } + + class UserStashWithinHandlerManyPersistentActor(name: String) extends UserStashManyPersistentActor(name) { + override def unstashBehavior: Receive = { + case Cmd("c") ⇒ persist(Evt("c")) { evt ⇒ updateState(evt); context.unbecome(); unstashAll() } + } + } + + class UserStashFailurePersistentActor(name: String) extends StashExamplePersistentActor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + if (data == "b-2") throw new TestException("boom") + persist(Evt(data)) { evt ⇒ + updateState(evt) + if (data == "a") context.become(otherCommandHandler) + } + } + + val otherCommandHandler: Receive = unstashBehavior orElse { + case other ⇒ stash() + } + + def unstashBehavior: Receive = { case Cmd("c") ⇒ persist(Evt("c")) { evt ⇒ updateState(evt) context.unbecome() } unstashAll() - case other ⇒ stash() } } - class UserStashFailurePersistentActor(name: String) extends ExamplePersistentActor(name) { - val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ - if (data == "b-2") throw new TestException("boom") - persist(Evt(data)) { event ⇒ - updateState(event) - if (data == "a") context.become(otherCommandHandler) - } - } - - val otherCommandHandler: Receive = { + class UserStashWithinHandlerFailureCallbackPersistentActor(name: String) extends UserStashFailurePersistentActor(name) { + override def unstashBehavior: Receive = { case Cmd("c") ⇒ - persist(Evt("c")) { event ⇒ - updateState(event) + persist(Evt("c")) { evt ⇒ + updateState(evt) context.unbecome() + unstashAll() } - unstashAll() - case other ⇒ stash() } } - class AsyncStashingPersistentActor(name: String) extends ExamplePersistentActor(name) { + class AsyncStashingPersistentActor(name: String) extends StashExamplePersistentActor(name) { var stashed = false - val receiveCommand: Receive = commonBehavior orElse { - case Cmd("a") ⇒ persistAsync(Evt("a"))(updateState) - case Cmd("b") if !stashed ⇒ - stash(); stashed = true - case Cmd("b") ⇒ persistAsync(Evt("b"))(updateState) + + val receiveCommand: Receive = commonBehavior orElse unstashBehavior orElse { + case Cmd("a") ⇒ persistAsync(Evt("a"))(updateState) + case Cmd("b") if !stashed ⇒ stash(); stashed = true + case Cmd("b") ⇒ persistAsync(Evt("b"))(updateState) + } + + override def unstashBehavior: Receive = { case Cmd("c") ⇒ persistAsync(Evt("c"))(updateState); unstashAll() } } + + class AsyncStashingWithinHandlerPersistentActor(name: String) extends AsyncStashingPersistentActor(name) { + override def unstashBehavior: Receive = { + case Cmd("c") ⇒ persistAsync(Evt("c")) { evt ⇒ updateState(evt); unstashAll() } + } + } } abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { - import PersistentActorStashingSpec._ - "Stashing in a persistent actor" must { - + def stash[T <: NamedPersistentActor : ClassTag](): Unit = { "support user stash operations" in { - val persistentActor = namedPersistentActor[UserStashPersistentActor] + val persistentActor = namedPersistentActor[T] persistentActor ! Cmd("a") persistentActor ! Cmd("b") persistentActor ! Cmd("c") @@ -112,9 +153,11 @@ abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSp expectMsg("c") expectMsg("a") } + } + def stashWithSeveralMessages[T <: NamedPersistentActor : ClassTag](): Unit = { "support user stash operations with several stashed messages" in { - val persistentActor = namedPersistentActor[UserStashManyPersistentActor] + val persistentActor = namedPersistentActor[T] val n = 10 val cmds = 1 to n flatMap (_ ⇒ List(Cmd("a"), Cmd("b-1"), Cmd("b-2"), Cmd("c"))) val evts = 1 to n flatMap (_ ⇒ List("a", "c", "b-1", "b-2")) @@ -123,9 +166,11 @@ abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSp persistentActor ! GetState expectMsg(evts) } + } + def stashUnderFailures[T <: NamedPersistentActor : ClassTag](): Unit = { "support user stash operations under failures" in { - val persistentActor = namedPersistentActor[UserStashFailurePersistentActor] + val persistentActor = namedPersistentActor[T] val bs = 1 to 10 map ("b-" + _) persistentActor ! Cmd("a") bs foreach (persistentActor ! Cmd(_)) @@ -134,18 +179,29 @@ abstract class PersistentActorStashingSpec(config: Config) extends PersistenceSp expectMsg(List("a", "c") ++ bs.filter(_ != "b-2")) } } + + "Stashing in a persistent actor" must { + behave like stash[UserStashPersistentActor]() + behave like stashWithSeveralMessages[UserStashManyPersistentActor]() + behave like stashUnderFailures[UserStashFailurePersistentActor]() + } + + "Stashing(unstashAll called in handler) in a persistent actor" must { + behave like stash[UserStashWithinHandlerPersistentActor]() + behave like stashWithSeveralMessages[UserStashWithinHandlerManyPersistentActor]() + behave like stashUnderFailures[UserStashWithinHandlerFailureCallbackPersistentActor]() + } + } class SteppingInMemPersistentActorStashingSpec extends PersistenceSpec( SteppingInmemJournal.config("persistence-stash").withFallback(PersistenceSpec.config("stepping-inmem", "SteppingInMemPersistentActorStashingSpec"))) with ImplicitSender { - import PersistentActorStashingSpec._ - "Stashing in a persistent actor mixed with persistAsync" should { - + def stash[T <: NamedPersistentActor : ClassTag](): Unit = { "handle async callback not happening until next message has been stashed" in { - val persistentActor = namedPersistentActor[AsyncStashingPersistentActor] + val persistentActor = namedPersistentActor[T] awaitAssert(SteppingInmemJournal.getRef("persistence-stash"), 3.seconds) val journal = SteppingInmemJournal.getRef("persistence-stash") @@ -170,7 +226,14 @@ class SteppingInMemPersistentActorStashingSpec extends PersistenceSpec( } } } + } + "Stashing in a persistent actor mixed with persistAsync" must { + behave like stash[AsyncStashingPersistentActor]() + } + + "Stashing(unstashAll called in handler) in a persistent actor mixed with persistAsync" must { + behave like stash[AsyncStashingWithinHandlerPersistentActor]() } } diff --git a/project/MiMa.scala b/project/MiMa.scala index edd56b44e6..cea31b95b6 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -731,7 +731,11 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Dispatch.initWithFailure"), // #19877 Source.queue termination support - ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.SourceQueueAdapter.this") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.SourceQueueAdapter.this"), + + // #19828 + ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), + ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete") ) ) }