From 07fcf90fbf0e17af9e78f7e33a5b7540e7f550ae Mon Sep 17 00:00:00 2001 From: leonidb Date: Mon, 13 Jul 2015 00:57:25 +0300 Subject: [PATCH] =per #17959 Prevent PersistentFSM from saving state change event on stay --- ...tentFSMActor.scala => PersistentFSM.scala} | 42 ++++++++--- .../persistence/fsm/PersistentFSMSpec.scala | 72 ++++++++++++++++++- 2 files changed, 103 insertions(+), 11 deletions(-) rename akka-persistence/src/main/scala/akka/persistence/fsm/{PersistentFSMActor.scala => PersistentFSM.scala} (88%) diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala similarity index 88% rename from akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala rename to akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index 4e3a261f64..8edb1935c9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -18,10 +18,12 @@ import scala.reflect.ClassTag * A FSM implementation with persistent state. * * Supports the usual [[akka.actor.FSM]] functionality with additional persistence features. - * State and State Data are persisted on every state change. * `PersistentFSM` is identified by 'persistenceId' value. + * State changes are persisted atomically together with domain events, which means that either both succeed or both fail, + * i.e. a state transition event will not be stored if persistence of an event related to that change fails. * Persistence execution order is: persist -> wait for ack -> apply state. * Incoming messages are deferred until the state is applied. + * State Data is constructed based on domain events, according to user's implementation of applyEvent function. */ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging { import akka.persistence.fsm.PersistentFSM._ @@ -80,14 +82,36 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent * Persist FSM State and FSM State Data */ override private[akka] def applyState(nextState: State): Unit = { - val eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout) - var nextData: D = stateData - persistAll[Any](eventsToPersist) { - case domainEventTag(event) ⇒ - nextData = applyEvent(event, nextData) - case StateChangeEvent(stateIdentifier, timeout) ⇒ - super.applyState(nextState using nextData) - nextState.afterTransitionDo(stateData) + var eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList + + //Prevent StateChangeEvent persistence when staying in the same state, except when state defines a timeout + if (nextState.notifies || nextState.timeout.nonEmpty) { + eventsToPersist = eventsToPersist :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout) + } + + if (eventsToPersist.isEmpty) { + //If there are no events to persist, just apply the state + super.applyState(nextState) + } else { + //Persist the events and apply the new state after all event handlers were executed + var nextData: D = stateData + var handlersExecutedCounter = 0 + + def applyStateOnLastHandler() = { + handlersExecutedCounter += 1 + if (handlersExecutedCounter == eventsToPersist.size) { + super.applyState(nextState using nextData) + nextState.afterTransitionDo(stateData) + } + } + + persistAll[Any](eventsToPersist) { + case domainEventTag(event) ⇒ + nextData = applyEvent(event, nextData) + applyStateOnLastHandler() + case StateChangeEvent(stateIdentifier, timeout) ⇒ + applyStateOnLastHandler() + } } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index 5e57dcbaf3..6ed0997a77 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -5,7 +5,7 @@ package akka.persistence.fsm import akka.actor._ -import akka.persistence.PersistenceSpec +import akka.persistence.{PersistentActor, RecoveryCompleted, PersistenceSpec} import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ import com.typesafe.config.Config @@ -119,7 +119,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) recoveredFsmRef ! GetCurrentCart recoveredFsmRef ! Leave - expectMsg(CurrentState(recoveredFsmRef, Shopping, None)) + expectMsg(CurrentState(recoveredFsmRef, Shopping, Some(1 second))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) @@ -231,6 +231,56 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) fsmRef ! "stay" // causes stay() probe.expectNoMsg(3.seconds) } + + "not persist state change event when staying in the same state" in { + val persistenceId = name + + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) + watch(fsmRef) + + val shirt = Item("1", "Shirt", 59.99F) + val shoes = Item("2", "Shoes", 89.99F) + val coat = Item("3", "Coat", 119.99F) + + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shirt) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shoes) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(coat) + fsmRef ! GetCurrentCart + fsmRef ! Buy + fsmRef ! GetCurrentCart + fsmRef ! Leave + + expectMsg(EmptyShoppingCart) + + expectMsg(NonEmptyShoppingCart(List(shirt))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectTerminated(fsmRef) + + val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) + + expectMsg(ItemAdded(Item("1", "Shirt", 59.99F))) + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + + expectMsg(ItemAdded(Item("2", "Shoes", 89.99F))) + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + + expectMsg(ItemAdded(Item("3", "Coat", 119.99F))) + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + + expectMsg(OrderExecuted) + expectMsgType[StateChangeEvent] + + watch(persistentEventsStreamer) + persistentEventsStreamer ! PoisonPill + expectTerminated(persistentEventsStreamer) + } } } @@ -381,6 +431,24 @@ object PersistentFSMSpec { def props(persistenceId: String, reportActor: ActorRef) = Props(new WebStoreCustomerFSM(persistenceId, reportActor)) } + + class PersistentEventsStreamer(id: String, client: ActorRef) extends PersistentActor { + override val persistenceId: String = id + + def receiveRecover = { + case RecoveryCompleted ⇒ // do nothing + case persistentEvent ⇒ client ! persistentEvent + } + + def receiveCommand = { + case _ ⇒ // do nothing + } + } + + object PersistentEventsStreamer { + def props(persistenceId: String, client: ActorRef) = + Props(new PersistentEventsStreamer(persistenceId, client)) + } } class LeveldbPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("leveldb", "PersistentFSMSpec"))