Merge pull request #17974 from leonidb/fix-17959-PersistentFsmActor-persists-StateChangeEvent-on-stay

=per #17959 PersistentFsmActor persists StateChangeEvent on stay
This commit is contained in:
Patrik Nordwall 2015-08-13 20:21:09 +02:00
commit aef28faaf3
2 changed files with 103 additions and 11 deletions

View file

@ -18,10 +18,12 @@ import scala.reflect.ClassTag
* A FSM implementation with persistent state.
*
* Supports the usual [[akka.actor.FSM]] functionality with additional persistence features.
* State and State Data are persisted on every state change.
* `PersistentFSM` is identified by 'persistenceId' value.
* State changes are persisted atomically together with domain events, which means that either both succeed or both fail,
* i.e. a state transition event will not be stored if persistence of an event related to that change fails.
* Persistence execution order is: persist -> wait for ack -> apply state.
* Incoming messages are deferred until the state is applied.
* State Data is constructed based on domain events, according to user's implementation of applyEvent function.
*/
trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging {
import akka.persistence.fsm.PersistentFSM._
@ -80,14 +82,36 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent
* Persist FSM State and FSM State Data
*/
override private[akka] def applyState(nextState: State): Unit = {
val eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout)
var nextData: D = stateData
persistAll[Any](eventsToPersist) {
case domainEventTag(event)
nextData = applyEvent(event, nextData)
case StateChangeEvent(stateIdentifier, timeout)
super.applyState(nextState using nextData)
nextState.afterTransitionDo(stateData)
var eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList
//Prevent StateChangeEvent persistence when staying in the same state, except when state defines a timeout
if (nextState.notifies || nextState.timeout.nonEmpty) {
eventsToPersist = eventsToPersist :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout)
}
if (eventsToPersist.isEmpty) {
//If there are no events to persist, just apply the state
super.applyState(nextState)
} else {
//Persist the events and apply the new state after all event handlers were executed
var nextData: D = stateData
var handlersExecutedCounter = 0
def applyStateOnLastHandler() = {
handlersExecutedCounter += 1
if (handlersExecutedCounter == eventsToPersist.size) {
super.applyState(nextState using nextData)
nextState.afterTransitionDo(stateData)
}
}
persistAll[Any](eventsToPersist) {
case domainEventTag(event)
nextData = applyEvent(event, nextData)
applyStateOnLastHandler()
case StateChangeEvent(stateIdentifier, timeout)
applyStateOnLastHandler()
}
}
}
}

View file

@ -5,7 +5,7 @@
package akka.persistence.fsm
import akka.actor._
import akka.persistence.PersistenceSpec
import akka.persistence.{PersistentActor, RecoveryCompleted, PersistenceSpec}
import akka.persistence.fsm.PersistentFSM._
import akka.testkit._
import com.typesafe.config.Config
@ -119,7 +119,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
recoveredFsmRef ! GetCurrentCart
recoveredFsmRef ! Leave
expectMsg(CurrentState(recoveredFsmRef, Shopping, None))
expectMsg(CurrentState(recoveredFsmRef, Shopping, Some(1 second)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
@ -231,6 +231,56 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
fsmRef ! "stay" // causes stay()
probe.expectNoMsg(3.seconds)
}
"not persist state change event when staying in the same state" in {
val persistenceId = name
val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef))
watch(fsmRef)
val shirt = Item("1", "Shirt", 59.99F)
val shoes = Item("2", "Shoes", 89.99F)
val coat = Item("3", "Coat", 119.99F)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(shirt)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(shoes)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(coat)
fsmRef ! GetCurrentCart
fsmRef ! Buy
fsmRef ! GetCurrentCart
fsmRef ! Leave
expectMsg(EmptyShoppingCart)
expectMsg(NonEmptyShoppingCart(List(shirt)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
expectTerminated(fsmRef)
val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor))
expectMsg(ItemAdded(Item("1", "Shirt", 59.99F)))
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsg(ItemAdded(Item("2", "Shoes", 89.99F)))
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsg(ItemAdded(Item("3", "Coat", 119.99F)))
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsg(OrderExecuted)
expectMsgType[StateChangeEvent]
watch(persistentEventsStreamer)
persistentEventsStreamer ! PoisonPill
expectTerminated(persistentEventsStreamer)
}
}
}
@ -381,6 +431,24 @@ object PersistentFSMSpec {
def props(persistenceId: String, reportActor: ActorRef) =
Props(new WebStoreCustomerFSM(persistenceId, reportActor))
}
class PersistentEventsStreamer(id: String, client: ActorRef) extends PersistentActor {
override val persistenceId: String = id
def receiveRecover = {
case RecoveryCompleted // do nothing
case persistentEvent client ! persistentEvent
}
def receiveCommand = {
case _ // do nothing
}
}
object PersistentEventsStreamer {
def props(persistenceId: String, client: ActorRef) =
Props(new PersistentEventsStreamer(persistenceId, client))
}
}
class LeveldbPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("leveldb", "PersistentFSMSpec"))