diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 7191fb023c..bdef9e2353 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -608,16 +608,16 @@ configuration key. The method can be overridden by implementation classes to ret Persistent FSM ============== -``AbstractPersistentFSMActor`` handles the incoming messages in an FSM like fashion. +``AbstractPersistentFSM`` handles the incoming messages in an FSM like fashion. Its internal state is persisted as a sequence of changes, later referred to as domain events. Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL. A Simple Example ---------------- -To demonstrate the features of the ``AbstractPersistentFSMActor``, consider an actor which represents a Web store customer. +To demonstrate the features of the ``AbstractPersistentFSM``, consider an actor which represents a Web store customer. The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands: -.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-commands +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-commands ``AddItem`` sent when the customer adds an item to a shopping cart ``Buy`` - when the customer finishes the purchase @@ -626,7 +626,7 @@ The contract of our "WebStoreCustomerFSMActor" is that it accepts the following The customer can be in one of the following states: -.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-states +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-states ``LookingAround`` customer is browsing the site, but hasn't added anything to the shopping cart ``Shopping`` customer has recently added items to the shopping cart @@ -635,29 +635,29 @@ The customer can be in one of the following states: .. note:: - ``AbstractPersistentFSMActor`` states must inherit from ``PersistentFsmActor.FSMState`` and implement the + ``AbstractPersistentFSM`` states must inherit from ``PersistentFSM.FSMState`` and implement the ``String identifier()`` method. This is required in order to simplify the serialization of FSM states. String identifiers should be unique! Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's start in order to restore the latest customer's state: -.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-domain-events +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-domain-events Customer state data represents the items in customer's shopping cart: -.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-states-data +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-states-data Here is how everything is wired together: -.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-fsm-body +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-fsm-body .. note:: State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events. Override the ``applyEvent`` method to define how state data is affected by domain events, see the example below -.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-apply-event +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-apply-event Storage plugins =============== diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 8750b1c0fe..2c28f9092a 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -663,16 +663,16 @@ adaptation simply return ``EventSeq.empty``. The adapted events are then deliver Persistent FSM ============== -``PersistentFSMActor`` handles the incoming messages in an FSM like fashion. +``PersistentFSM`` handles the incoming messages in an FSM like fashion. Its internal state is persisted as a sequence of changes, later referred to as domain events. Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL. A Simple Example ---------------- -To demonstrate the features of the ``PersistentFSMActor`` trait, consider an actor which represents a Web store customer. +To demonstrate the features of the ``PersistentFSM`` trait, consider an actor which represents a Web store customer. The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands: -.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-commands +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-commands ``AddItem`` sent when the customer adds an item to a shopping cart ``Buy`` - when the customer finishes the purchase @@ -681,7 +681,7 @@ The contract of our "WebStoreCustomerFSMActor" is that it accepts the following The customer can be in one of the following states: -.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-states +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-states ``LookingAround`` customer is browsing the site, but hasn't added anything to the shopping cart ``Shopping`` customer has recently added items to the shopping cart @@ -690,29 +690,29 @@ The customer can be in one of the following states: .. note:: - ``PersistentFSMActor`` states must inherit from trait ``PersistentFsmActor.FSMState`` and implement the + ``PersistentFSM`` states must inherit from trait ``PersistentFSM.FSMState`` and implement the ``def identifier: String`` method. This is required in order to simplify the serialization of FSM states. String identifiers should be unique! Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's start in order to restore the latest customer's state: -.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-domain-events +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-domain-events Customer state data represents the items in customer's shopping cart: -.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-states-data +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-states-data Here is how everything is wired together: -.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-fsm-body +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-fsm-body .. note:: State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events. Override the ``applyEvent`` method to define how state data is affected by domain events, see the example below -.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-apply-event +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-apply-event .. _storage-plugins: diff --git a/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java index 5e2bf948c3..b3e3b94e4f 100644 --- a/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java +++ b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java @@ -4,7 +4,8 @@ package akka.persistence.fsm.japi.pf; -import akka.persistence.fsm.FSM; +import akka.persistence.fsm.PersistentFSM; +import akka.persistence.fsm.PersistentFSMBase; import akka.japi.pf.FI; import akka.japi.pf.PFBuilder; import scala.PartialFunction; @@ -23,8 +24,8 @@ import java.util.List; @SuppressWarnings("rawtypes") public class FSMStateFunctionBuilder { - private PFBuilder, FSM.State> builder = - new PFBuilder, FSM.State>(); + private PFBuilder, PersistentFSM.State> builder = + new PFBuilder, PersistentFSM.State>(); /** * An erased processing of the event matcher. The compile time checks are enforced @@ -47,10 +48,10 @@ public class FSMStateFunctionBuilder { final Object dataOrType, final FI.TypedPredicate2 predicate, final FI.Apply2 apply) { - builder.match(FSM.Event.class, - new FI.TypedPredicate() { + builder.match(PersistentFSM.Event.class, + new FI.TypedPredicate() { @Override - public boolean defined(FSM.Event e) { + public boolean defined(PersistentFSM.Event e) { boolean res = true; if (eventOrType != null) { if (eventOrType instanceof Class) { @@ -78,10 +79,10 @@ public class FSMStateFunctionBuilder { return res; } }, - new FI.Apply>() { - public FSM.State apply(FSM.Event e) throws Exception { + new FI.Apply>() { + public PersistentFSM.State apply(PersistentFSM.Event e) throws Exception { @SuppressWarnings("unchecked") - FSM.State res = (FSM.State) apply.apply(e.event(), e.stateData()); + PersistentFSM.State res = (PersistentFSM.State) apply.apply(e.event(), e.stateData()); return res; } } @@ -104,7 +105,7 @@ public class FSMStateFunctionBuilder { public final FSMStateFunctionBuilder event(final Class

eventType, final Class dataType, final FI.TypedPredicate2 predicate, - final FI.Apply2> apply) { + final FI.Apply2> apply) { erasedEvent(eventType, dataType, predicate, apply); return this; } @@ -121,7 +122,7 @@ public class FSMStateFunctionBuilder { */ public FSMStateFunctionBuilder event(final Class

eventType, final Class dataType, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return erasedEvent(eventType, dataType, null, apply); } @@ -135,7 +136,7 @@ public class FSMStateFunctionBuilder { */ public

FSMStateFunctionBuilder event(final Class

eventType, final FI.TypedPredicate2 predicate, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return erasedEvent(eventType, null, predicate, apply); } @@ -147,7 +148,7 @@ public class FSMStateFunctionBuilder { * @return the builder with the case statement added */ public

FSMStateFunctionBuilder event(final Class

eventType, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return erasedEvent(eventType, null, null, apply); } @@ -159,7 +160,7 @@ public class FSMStateFunctionBuilder { * @return the builder with the case statement added */ public FSMStateFunctionBuilder event(final FI.TypedPredicate2 predicate, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return erasedEvent(null, null, predicate, apply); } @@ -175,11 +176,11 @@ public class FSMStateFunctionBuilder { */ public FSMStateFunctionBuilder event(final List eventMatches, final Class dataType, - final FI.Apply2> apply) { - builder.match(FSM.Event.class, - new FI.TypedPredicate() { + final FI.Apply2> apply) { + builder.match(PersistentFSM.Event.class, + new FI.TypedPredicate() { @Override - public boolean defined(FSM.Event e) { + public boolean defined(PersistentFSM.Event e) { if (dataType != null && !dataType.isInstance(e.stateData())) return false; @@ -198,8 +199,8 @@ public class FSMStateFunctionBuilder { return emMatch; } }, - new FI.Apply>() { - public FSM.State apply(FSM.Event e) throws Exception { + new FI.Apply>() { + public PersistentFSM.State apply(PersistentFSM.Event e) throws Exception { @SuppressWarnings("unchecked") Q q = (Q) e.stateData(); return apply.apply(e.event(), q); @@ -219,7 +220,7 @@ public class FSMStateFunctionBuilder { * @return the builder with the case statement added */ public FSMStateFunctionBuilder event(final List eventMatches, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return event(eventMatches, null, apply); } @@ -234,7 +235,7 @@ public class FSMStateFunctionBuilder { */ public FSMStateFunctionBuilder eventEquals(final P event, final Class dataType, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return erasedEvent(event, dataType, null, apply); } @@ -246,7 +247,7 @@ public class FSMStateFunctionBuilder { * @return the builder with the case statement added */ public

FSMStateFunctionBuilder eventEquals(final P event, - final FI.Apply2> apply) { + final FI.Apply2> apply) { return erasedEvent(event, null, null, apply); } @@ -256,7 +257,7 @@ public class FSMStateFunctionBuilder { * @param apply an action to apply to the event and state data * @return the builder with the case statement added */ - public FSMStateFunctionBuilder anyEvent(final FI.Apply2> apply) { + public FSMStateFunctionBuilder anyEvent(final FI.Apply2> apply) { return erasedEvent(null, null, null, apply); } @@ -266,7 +267,7 @@ public class FSMStateFunctionBuilder { * * @return a PartialFunction for this builder. */ - public PartialFunction, FSM.State> build() { + public PartialFunction, PersistentFSM.State> build() { return builder.build(); } } diff --git a/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java index 087a1ff82e..d5a8cd1dae 100644 --- a/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java +++ b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java @@ -4,7 +4,8 @@ package akka.persistence.fsm.japi.pf; -import akka.persistence.fsm.FSM; +import akka.persistence.fsm.PersistentFSM; +import akka.persistence.fsm.PersistentFSMBase; import akka.japi.pf.FI; import akka.japi.pf.UnitPFBuilder; import scala.PartialFunction; @@ -20,8 +21,8 @@ import scala.runtime.BoxedUnit; */ public class FSMStopBuilder { - private UnitPFBuilder> builder = - new UnitPFBuilder>(); + private UnitPFBuilder> builder = + new UnitPFBuilder<>(); /** * Add a case statement that matches on an {@link akka.actor.FSM.Reason}. @@ -30,17 +31,17 @@ public class FSMStopBuilder { * @param apply an action to apply to the event and state data if there is a match * @return the builder with the case statement added */ - public FSMStopBuilder stop(final FSM.Reason reason, + public FSMStopBuilder stop(final PersistentFSM.Reason reason, final FI.UnitApply2 apply) { - builder.match(FSM.StopEvent.class, - new FI.TypedPredicate() { + builder.match(PersistentFSM.StopEvent.class, + new FI.TypedPredicate() { @Override - public boolean defined(FSM.StopEvent e) { + public boolean defined(PersistentFSM.StopEvent e) { return reason.equals(e.reason()); } }, - new FI.UnitApply() { - public void apply(FSM.StopEvent e) throws Exception { + new FI.UnitApply() { + public void apply(PersistentFSM.StopEvent e) throws Exception { @SuppressWarnings("unchecked") S s = (S) e.currentState(); @SuppressWarnings("unchecked") @@ -61,7 +62,7 @@ public class FSMStopBuilder { * @param

the reason type to match on * @return the builder with the case statement added */ - public

FSMStopBuilder stop(final Class

reasonType, + public

FSMStopBuilder stop(final Class

reasonType, final FI.UnitApply3 apply) { return this.stop(reasonType, new FI.TypedPredicate

() { @@ -81,13 +82,13 @@ public class FSMStopBuilder { * @param

the reason type to match on * @return the builder with the case statement added */ - public

FSMStopBuilder stop(final Class

reasonType, + public

FSMStopBuilder stop(final Class

reasonType, final FI.TypedPredicate

predicate, final FI.UnitApply3 apply) { - builder.match(FSM.StopEvent.class, - new FI.TypedPredicate() { + builder.match(PersistentFSM.StopEvent.class, + new FI.TypedPredicate() { @Override - public boolean defined(FSM.StopEvent e) { + public boolean defined(PersistentFSM.StopEvent e) { if (reasonType.isInstance(e.reason())) { @SuppressWarnings("unchecked") P p = (P) e.reason(); @@ -97,8 +98,8 @@ public class FSMStopBuilder { } } }, - new FI.UnitApply() { - public void apply(FSM.StopEvent e) throws Exception { + new FI.UnitApply() { + public void apply(PersistentFSM.StopEvent e) throws Exception { @SuppressWarnings("unchecked") P p = (P) e.reason(); @SuppressWarnings("unchecked") @@ -119,7 +120,7 @@ public class FSMStopBuilder { * * @return a PartialFunction for this builder. */ - public PartialFunction, BoxedUnit> build() { + public PartialFunction, BoxedUnit> build() { return builder.build(); } } diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala index 4db744ede6..4e3a261f64 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala @@ -4,25 +4,27 @@ package akka.persistence.fsm -import akka.actor.ActorLogging -import akka.persistence.fsm.PersistentFsmActor.FSMState +import akka.actor._ +import akka.persistence.fsm.PersistentFSM.{ State, FSMState } import akka.persistence.serialization.Message import akka.persistence.{ PersistentActor, RecoveryCompleted } +import scala.annotation.varargs import scala.collection.immutable -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.reflect.ClassTag /** - * FSM actor implementation with persistent state + * A FSM implementation with persistent state. * * Supports the usual [[akka.actor.FSM]] functionality with additional persistence features. * State and State Data are persisted on every state change. - * FSM is identified by 'persistenceId' value. - * Persistence execution order is: persist -> wait for ack -> apply state. Incoming messages are deferred until the state is applied. + * `PersistentFSM` is identified by 'persistenceId' value. + * Persistence execution order is: persist -> wait for ack -> apply state. + * Incoming messages are deferred until the state is applied. */ -trait PersistentFsmActor[S <: FSMState, D, E] extends PersistentActor with FSM[S, D, E] with ActorLogging { - import akka.persistence.fsm.PersistentFsmActor._ +trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging { + import akka.persistence.fsm.PersistentFSM._ /** * Enables to pass a ClassTag of a domain event base type from the implementing class @@ -60,7 +62,7 @@ trait PersistentFsmActor[S <: FSMState, D, E] extends PersistentActor with FSM[S * After recovery events are handled as in usual FSM actor */ override def receiveCommand: Receive = { - super[FSM].receive + super[PersistentFSMBase].receive } /** @@ -90,7 +92,7 @@ trait PersistentFsmActor[S <: FSMState, D, E] extends PersistentActor with FSM[S } } -object PersistentFsmActor { +object PersistentFSM { /** * Base persistent event class */ @@ -110,6 +112,201 @@ object PersistentFsmActor { trait FSMState { def identifier: String } + + /** + * A partial function value which does not match anything and can be used to + * “reset” `whenUnhandled` and `onTermination` handlers. + * + * {{{ + * onTermination(FSM.NullFunction) + * }}} + */ + object NullFunction extends PartialFunction[Any, Nothing] { + def isDefinedAt(o: Any) = false + def apply(o: Any) = sys.error("undefined") + } + + /** + * Message type which is sent directly to the subscribed actor in + * [[akka.actor.FSM.SubscribeTransitionCallBack]] before sending any + * [[akka.actor.FSM.Transition]] messages. + */ + final case class CurrentState[S](fsmRef: ActorRef, state: S, timeout: Option[FiniteDuration]) + + /** + * Message type which is used to communicate transitions between states to + * all subscribed listeners (use [[akka.actor.FSM.SubscribeTransitionCallBack]]). + */ + final case class Transition[S](fsmRef: ActorRef, from: S, to: S, timeout: Option[FiniteDuration]) + + /** + * Send this to an [[akka.actor.FSM]] to request first the [[PersistentFSM.CurrentState]] + * and then a series of [[PersistentFSM.Transition]] updates. Cancel the subscription + * using [[PersistentFSM.UnsubscribeTransitionCallBack]]. + */ + final case class SubscribeTransitionCallBack(actorRef: ActorRef) + + /** + * Unsubscribe from [[akka.actor.FSM.Transition]] notifications which was + * effected by sending the corresponding [[akka.actor.FSM.SubscribeTransitionCallBack]]. + */ + final case class UnsubscribeTransitionCallBack(actorRef: ActorRef) + + /** + * Reason why this [[akka.actor.FSM]] is shutting down. + */ + sealed trait Reason + + /** + * Default reason if calling `stop()`. + */ + case object Normal extends Reason + + /** + * Reason given when someone was calling `system.stop(fsm)` from outside; + * also applies to `Stop` supervision directive. + */ + case object Shutdown extends Reason + + /** + * Signifies that the [[akka.actor.FSM]] is shutting itself down because of + * an error, e.g. if the state to transition into does not exist. You can use + * this to communicate a more precise cause to the `onTermination` block. + */ + final case class Failure(cause: Any) extends Reason + + /** + * This case object is received in case of a state timeout. + */ + case object StateTimeout + + /** INTERNAL API */ + private[persistence] final case class TimeoutMarker(generation: Long) + + /** + * INTERNAL API + */ + // FIXME: what about the cancellable? + private[persistence] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) + extends NoSerializationVerificationNeeded { + private var ref: Option[Cancellable] = _ + private val scheduler = context.system.scheduler + private implicit val executionContext = context.dispatcher + + def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = + ref = Some( + if (repeat) scheduler.schedule(timeout, timeout, actor, this) + else scheduler.scheduleOnce(timeout, actor, this)) + + def cancel(): Unit = + if (ref.isDefined) { + ref.get.cancel() + ref = None + } + } + + /** + * This extractor is just convenience for matching a (S, S) pair, including a + * reminder what the new state is. + */ + object -> { + def unapply[S](in: (S, S)) = Some(in) + } + + /** + * Log Entry of the [[akka.actor.LoggingFSM]], can be obtained by calling `getLog`. + */ + final case class LogEntry[S, D](stateName: S, stateData: D, event: Any) + + /** + * This captures all of the managed state of the [[akka.actor.FSM]]: the state + * name, the state data, possibly custom timeout, stop reason, replies + * accumulated while processing the last message, possibly domain event and handler + * to be executed after FSM moves to the new state (also triggered when staying in the same state) + */ + final case class State[S, D, E]( + stateName: S, + stateData: D, + timeout: Option[FiniteDuration] = None, + stopReason: Option[Reason] = None, + replies: List[Any] = Nil, + domainEvents: Seq[E] = Nil, + afterTransitionDo: D ⇒ Unit = { _: D ⇒ })(private[akka] val notifies: Boolean = true) { + + /** + * Copy object and update values if needed. + */ + private[akka] def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies, notifies: Boolean = notifies, domainEvents: Seq[E] = domainEvents, afterTransitionDo: D ⇒ Unit = afterTransitionDo): State[S, D, E] = { + State(stateName, stateData, timeout, stopReason, replies, domainEvents, afterTransitionDo)(notifies) + } + + /** + * Modify state transition descriptor to include a state timeout for the + * next state. This timeout overrides any default timeout set for the next + * state. + * + * Use Duration.Inf to deactivate an existing timeout. + */ + def forMax(timeout: Duration): State[S, D, E] = timeout match { + case f: FiniteDuration ⇒ copy(timeout = Some(f)) + case _ ⇒ copy(timeout = None) + } + + /** + * Send reply to sender of the current message, if available. + * + * @return this state transition descriptor + */ + def replying(replyValue: Any): State[S, D, E] = { + copy(replies = replyValue :: replies) + } + + /** + * Modify state transition descriptor with new state data. The data will be + * set when transitioning to the new state. + */ + private[akka] def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D, E] = { + copy(stateData = nextStateData) + } + + /** + * INTERNAL API. + */ + private[akka] def withStopReason(reason: Reason): State[S, D, E] = { + copy(stopReason = Some(reason)) + } + + private[akka] def withNotification(notifies: Boolean): State[S, D, E] = { + copy(notifies = notifies) + } + + /** + * Specify domain events to be applied when transitioning to the new state. + */ + @varargs def applying(events: E*): State[S, D, E] = { + copy(domainEvents = domainEvents ++ events) + } + + /** + * Register a handler to be triggered after the state has been persisted successfully + */ + def andThen(handler: D ⇒ Unit): State[S, D, E] = { + copy(afterTransitionDo = handler) + } + } + + /** + * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an + * `Event`, which allows pattern matching to extract both state and data. + */ + final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded + + /** + * Case class representing the state of the [[akka.actor.FSM]] whithin the + * `onTermination` block. + */ + final case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) extends NoSerializationVerificationNeeded + } /** @@ -119,7 +316,7 @@ object PersistentFsmActor { * * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. */ -abstract class AbstractPersistentFsmActor[S <: FSMState, D, E] extends AbstractFSM[S, D, E] with PersistentFsmActor[S, D, E] { +abstract class AbstractPersistentFSM[S <: FSMState, D, E] extends AbstractPersistentFSMBase[S, D, E] with PersistentFSM[S, D, E] { import java.util.function.Consumer /** @@ -151,4 +348,7 @@ abstract class AbstractPersistentFsmActor[S <: FSMState, D, E] extends AbstractF * * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. */ -abstract class AbstractPersistentLoggingFsmActor[S <: FSMState, D, E] extends AbstractLoggingFSM[S, D, E] with PersistentFsmActor[S, D, E] +abstract class AbstractPersistentLoggingFSM[S <: FSMState, D, E] + extends AbstractPersistentFSMBase[S, D, E] + with LoggingPersistentFSM[S, D, E] + with PersistentFSM[S, D, E] diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala similarity index 81% rename from akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala rename to akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala index 6ca95cb5c1..d178fa2644 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2014 Typesafe Inc. + * Copyright (C) 2009-2015 Typesafe Inc. */ package akka.persistence.fsm @@ -7,212 +7,10 @@ import akka.actor._ import akka.japi.pf.{ UnitPFBuilder, UnitMatch, FSMTransitionHandlerBuilder } import language.implicitConversions -import scala.annotation.varargs -import scala.concurrent.duration.Duration import scala.collection.mutable import akka.routing.{ Deafen, Listen, Listeners } import scala.concurrent.duration.FiniteDuration -object FSM { - - /** - * A partial function value which does not match anything and can be used to - * “reset” `whenUnhandled` and `onTermination` handlers. - * - * {{{ - * onTermination(FSM.NullFunction) - * }}} - */ - object NullFunction extends PartialFunction[Any, Nothing] { - def isDefinedAt(o: Any) = false - def apply(o: Any) = sys.error("undefined") - } - - /** - * Message type which is sent directly to the subscribed actor in - * [[akka.actor.FSM.SubscribeTransitionCallBack]] before sending any - * [[akka.actor.FSM.Transition]] messages. - */ - final case class CurrentState[S](fsmRef: ActorRef, state: S) - - /** - * Message type which is used to communicate transitions between states to - * all subscribed listeners (use [[akka.actor.FSM.SubscribeTransitionCallBack]]). - */ - final case class Transition[S](fsmRef: ActorRef, from: S, to: S) - - /** - * Send this to an [[akka.actor.FSM]] to request first the [[FSM.CurrentState]] - * and then a series of [[FSM.Transition]] updates. Cancel the subscription - * using [[FSM.UnsubscribeTransitionCallBack]]. - */ - final case class SubscribeTransitionCallBack(actorRef: ActorRef) - - /** - * Unsubscribe from [[akka.actor.FSM.Transition]] notifications which was - * effected by sending the corresponding [[akka.actor.FSM.SubscribeTransitionCallBack]]. - */ - final case class UnsubscribeTransitionCallBack(actorRef: ActorRef) - - /** - * Reason why this [[akka.actor.FSM]] is shutting down. - */ - sealed trait Reason - - /** - * Default reason if calling `stop()`. - */ - case object Normal extends Reason - - /** - * Reason given when someone was calling `system.stop(fsm)` from outside; - * also applies to `Stop` supervision directive. - */ - case object Shutdown extends Reason - - /** - * Signifies that the [[akka.actor.FSM]] is shutting itself down because of - * an error, e.g. if the state to transition into does not exist. You can use - * this to communicate a more precise cause to the `onTermination` block. - */ - final case class Failure(cause: Any) extends Reason - - /** - * This case object is received in case of a state timeout. - */ - case object StateTimeout - - /** - * INTERNAL API - */ - private final case class TimeoutMarker(generation: Long) - - /** - * INTERNAL API - */ - // FIXME: what about the cancellable? - private[akka] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) - extends NoSerializationVerificationNeeded { - private var ref: Option[Cancellable] = _ - private val scheduler = context.system.scheduler - private implicit val executionContext = context.dispatcher - - def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = - ref = Some( - if (repeat) scheduler.schedule(timeout, timeout, actor, this) - else scheduler.scheduleOnce(timeout, actor, this)) - - def cancel(): Unit = - if (ref.isDefined) { - ref.get.cancel() - ref = None - } - } - - /** - * This extractor is just convenience for matching a (S, S) pair, including a - * reminder what the new state is. - */ - object -> { - def unapply[S](in: (S, S)) = Some(in) - } - - /** - * Log Entry of the [[akka.actor.LoggingFSM]], can be obtained by calling `getLog`. - */ - final case class LogEntry[S, D](stateName: S, stateData: D, event: Any) - - /** - * This captures all of the managed state of the [[akka.actor.FSM]]: the state - * name, the state data, possibly custom timeout, stop reason, replies - * accumulated while processing the last message, possibly domain event and handler - * to be executed after FSM moves to the new state (also triggered when staying in the same state) - */ - final case class State[S, D, E]( - stateName: S, - stateData: D, - timeout: Option[FiniteDuration] = None, - stopReason: Option[Reason] = None, - replies: List[Any] = Nil, - domainEvents: Seq[E] = Nil, - afterTransitionDo: D ⇒ Unit = { _: D ⇒ })(private[akka] val notifies: Boolean = true) { - - /** - * Copy object and update values if needed. - */ - private[akka] def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies, notifies: Boolean = notifies, domainEvents: Seq[E] = domainEvents, afterTransitionDo: D ⇒ Unit = afterTransitionDo): State[S, D, E] = { - State(stateName, stateData, timeout, stopReason, replies, domainEvents, afterTransitionDo)(notifies) - } - - /** - * Modify state transition descriptor to include a state timeout for the - * next state. This timeout overrides any default timeout set for the next - * state. - * - * Use Duration.Inf to deactivate an existing timeout. - */ - def forMax(timeout: Duration): State[S, D, E] = timeout match { - case f: FiniteDuration ⇒ copy(timeout = Some(f)) - case _ ⇒ copy(timeout = None) - } - - /** - * Send reply to sender of the current message, if available. - * - * @return this state transition descriptor - */ - def replying(replyValue: Any): State[S, D, E] = { - copy(replies = replyValue :: replies) - } - - /** - * Modify state transition descriptor with new state data. The data will be - * set when transitioning to the new state. - */ - private[akka] def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D, E] = { - copy(stateData = nextStateData) - } - - /** - * INTERNAL API. - */ - private[akka] def withStopReason(reason: Reason): State[S, D, E] = { - copy(stopReason = Some(reason)) - } - - private[akka] def withNotification(notifies: Boolean): State[S, D, E] = { - copy(notifies = notifies) - } - - /** - * Specify domain events to be applied when transitioning to the new state. - */ - @varargs def applying(events: E*): State[S, D, E] = { - copy(domainEvents = domainEvents ++ events) - } - - /** - * Register a handler to be triggered after the state has been persisted successfully - */ - def andThen(handler: D ⇒ Unit): State[S, D, E] = { - copy(afterTransitionDo = handler) - } - } - - /** - * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an - * `Event`, which allows pattern matching to extract both state and data. - */ - final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded - - /** - * Case class representing the state of the [[akka.actor.FSM]] whithin the - * `onTermination` block. - */ - final case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) extends NoSerializationVerificationNeeded - -} - /** * Finite State Machine actor trait. Use as follows: * @@ -292,13 +90,13 @@ object FSM { * isTimerActive("tock") * */ -trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { +trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging { - import akka.persistence.fsm.FSM._ + import akka.persistence.fsm.PersistentFSM._ - type State = FSM.State[S, D, E] - type Event = FSM.Event[D] - type StopEvent = FSM.StopEvent[S, D] + type State = PersistentFSM.State[S, D, E] + type Event = PersistentFSM.Event[D] + type StopEvent = PersistentFSM.StopEvent[S, D] type StateFunction = scala.PartialFunction[Event, State] type Timeout = Option[FiniteDuration] type TransitionHandler = PartialFunction[(S, S), Unit] @@ -306,19 +104,19 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { /* * “import” so that these are visible without an import */ - val Event: FSM.Event.type = FSM.Event - val StopEvent: FSM.StopEvent.type = FSM.StopEvent + val Event: PersistentFSM.Event.type = PersistentFSM.Event + val StopEvent: PersistentFSM.StopEvent.type = PersistentFSM.StopEvent /** * This extractor is just convenience for matching a (S, S) pair, including a * reminder what the new state is. */ - val -> = FSM.-> + val -> = PersistentFSM.-> /** * This case object is received in case of a state timeout. */ - val StateTimeout = FSM.StateTimeout + val StateTimeout = PersistentFSM.StateTimeout /** * **************************************** @@ -349,7 +147,7 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { * @param timeout state timeout for the initial state, overriding the default timeout for that state */ final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit = - currentState = FSM.State(stateName, stateData, timeout)() + currentState = PersistentFSM.State(stateName, stateData, timeout)() /** * Produce transition to other state. @@ -361,7 +159,7 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { * @param nextStateName state designator for the next state * @return state transition descriptor */ - final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)() + final def goto(nextStateName: S): State = PersistentFSM.State(nextStateName, currentState.stateData)() /** * Produce "empty" transition descriptor. @@ -622,24 +420,23 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list listeners.add(actorRef) // send current state back as reference point - actorRef ! CurrentState(self, currentState.stateName) + actorRef ! CurrentState(self, currentState.stateName, currentState.timeout) case Listen(actorRef) ⇒ // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list listeners.add(actorRef) // send current state back as reference point - actorRef ! CurrentState(self, currentState.stateName) + actorRef ! CurrentState(self, currentState.stateName, currentState.timeout) case UnsubscribeTransitionCallBack(actorRef) ⇒ listeners.remove(actorRef) case Deafen(actorRef) ⇒ listeners.remove(actorRef) - case value ⇒ { + case value ⇒ if (timeoutFuture.isDefined) { timeoutFuture.get.cancel() timeoutFuture = None } generation += 1 processMsg(value, sender()) - } } private def processMsg(value: Any, source: AnyRef): Unit = { @@ -676,7 +473,7 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { if (currentState.stateName != nextState.stateName || nextState.notifies) { this.nextState = nextState handleTransition(currentState.stateName, nextState.stateName) - gossip(Transition(self, currentState.stateName, nextState.stateName)) + gossip(Transition(self, currentState.stateName, nextState.stateName, nextState.timeout)) this.nextState = null } currentState = nextState @@ -723,7 +520,7 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { } /** - * By default [[FSM.Failure]] is logged at error level and other reason + * By default [[PersistentFSM.Failure]] is logged at error level and other reason * types are not logged. It is possible to override this behavior. */ protected def logTermination(reason: Reason): Unit = reason match { @@ -739,9 +536,9 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { * * @since 1.2 */ -trait LoggingFSM[S, D, E] extends FSM[S, D, E] { this: Actor ⇒ +trait LoggingPersistentFSM[S, D, E] extends PersistentFSMBase[S, D, E] { this: Actor ⇒ - import akka.persistence.fsm.FSM._ + import akka.persistence.fsm.PersistentFSM._ def logDepth: Int = 0 @@ -808,7 +605,7 @@ trait LoggingFSM[S, D, E] extends FSM[S, D, E] { this: Actor ⇒ * * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. */ -object AbstractFSM { +object AbstractPersistentFSMBase { /** * A partial function value which does not match anything and can be used to * “reset” `whenUnhandled` and `onTermination` handlers. @@ -817,7 +614,7 @@ object AbstractFSM { * onTermination(FSM.NullFunction()) * }}} */ - def NullFunction[S, D]: PartialFunction[S, D] = FSM.NullFunction + def NullFunction[S, D]: PartialFunction[S, D] = PersistentFSM.NullFunction } /** @@ -827,12 +624,12 @@ object AbstractFSM { * * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. */ -abstract class AbstractFSM[S, D, E] extends FSM[S, D, E] { +abstract class AbstractPersistentFSMBase[S, D, E] extends PersistentFSMBase[S, D, E] { import akka.persistence.fsm.japi.pf.FSMStateFunctionBuilder import akka.persistence.fsm.japi.pf.FSMStopBuilder import akka.japi.pf.FI._ import java.util.{ List ⇒ JList } - import FSM._ + import PersistentFSM._ /** * Insert a new StateFunction at the end of the processing chain for the @@ -1084,7 +881,7 @@ abstract class AbstractFSM[S, D, E] extends FSM[S, D, E] { /** * Create an [[akka.japi.pf.FSMStopBuilder]] with the first case statement set. * - * A case statement that matches on an [[FSM.Reason]]. + * A case statement that matches on an [[PersistentFSM.Reason]]. * * @param reason the reason for the termination * @param apply an action to apply to the event and state data if there is a match @@ -1162,20 +959,11 @@ abstract class AbstractFSM[S, D, E] extends FSM[S, D, E] { /** * Default reason if calling `stop()`. */ - val Normal: FSM.Reason = FSM.Normal + val Normal: PersistentFSM.Reason = PersistentFSM.Normal /** * Reason given when someone was calling `system.stop(fsm)` from outside; * also applies to `Stop` supervision directive. */ - val Shutdown: FSM.Reason = FSM.Shutdown + val Shutdown: PersistentFSM.Reason = PersistentFSM.Shutdown } - -/** - * Java API: compatible with lambda expressions - * - * Finite State Machine actor abstract base class. - * - * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. - */ -abstract class AbstractLoggingFSM[S, D, E] extends AbstractFSM[S, D, E] with LoggingFSM[S, D, E] diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 9f15010f1c..61e7dc81cf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -7,7 +7,7 @@ package akka.persistence.serialization import akka.actor.{ ActorPath, ExtendedActorSystem } import akka.persistence.AtLeastOnceDelivery._ import akka.persistence._ -import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent +import akka.persistence.fsm.PersistentFSM.StateChangeEvent import akka.persistence.serialization.{ MessageFormats ⇒ mf } import akka.serialization._ import com.google.protobuf._ @@ -23,7 +23,7 @@ import scala.language.existentials trait Message extends Serializable /** - * Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFsmActor.StateChangeEvent]] messages. + * Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFSM.StateChangeEvent]] messages. */ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import PersistentRepr.Undefined diff --git a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java similarity index 81% rename from akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java rename to akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java index b34065dddc..fef46cc316 100644 --- a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java +++ b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java @@ -19,33 +19,33 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; -import akka.persistence.fsm.FSM.CurrentState; +import akka.persistence.fsm.PersistentFSM.CurrentState; import org.junit.Test; import scala.concurrent.duration.Duration; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.UserState; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.ShoppingCart; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Item; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.UserState; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.ShoppingCart; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.Item; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.GetCurrentCart; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.AddItem; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Buy; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Leave; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.GetCurrentCart; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.AddItem; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.Buy; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.Leave; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.PurchaseWasMade; -import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.ShoppingCardDiscarded; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.PurchaseWasMade; +import static akka.persistence.fsm.AbstractPersistentFSMTest.WebStoreCustomerFSM.ShoppingCardDiscarded; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.matchers.JUnitMatchers.hasItems; -public class AbstractPersistentFsmActorTest { +public class AbstractPersistentFSMTest { private static Option none = Option.none(); @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("PersistentFSMJavaTest", PersistenceSpec.config( - "leveldb", "AbstractPersistentFsmActorTest", "off", none.asScala())); + "leveldb", "AbstractPersistentFSMTest", "off", none.asScala())); private final ActorSystem system = actorSystemResource.getSystem(); @@ -56,10 +56,10 @@ public class AbstractPersistentFsmActorTest { public void fsmFunctionalTest() throws Exception { new JavaTestKit(system) {{ String persistenceId = generateId(); - ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(fsmRef); - fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + fsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); Item shirt = new Item("1", "Shirt", 59.99F); Item shoes = new Item("2", "Shoes", 89.99F); @@ -76,13 +76,13 @@ public class AbstractPersistentFsmActorTest { fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); fsmRef.tell(Leave.INSTANCE, getRef()); - CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.LOOKING_AROUND); ShoppingCart shoppingCart = expectMsgClass(ShoppingCart.class); assertTrue(shoppingCart.getItems().isEmpty()); - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); shoppingCart = expectMsgClass(ShoppingCart.class); @@ -94,7 +94,7 @@ public class AbstractPersistentFsmActorTest { shoppingCart = expectMsgClass(ShoppingCart.class); assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat)); - stateTransition = expectMsgClass(FSM.Transition.class); + stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.PAID); shoppingCart = expectMsgClass(ShoppingCart.class); @@ -109,25 +109,25 @@ public class AbstractPersistentFsmActorTest { public void fsmTimeoutTest() throws Exception { new JavaTestKit(system) {{ String persistenceId = generateId(); - ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(fsmRef); - fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + fsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); Item shirt = new Item("1", "Shirt", 59.99F); fsmRef.tell(new AddItem(shirt), getRef()); - CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.LOOKING_AROUND); - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); new Within(duration("0.9 seconds"), duration("1.9 seconds")) { @Override protected void run() { - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.INACTIVE); } }; @@ -145,10 +145,10 @@ public class AbstractPersistentFsmActorTest { public void testSuccessfulRecoveryWithCorrectStateData() { new JavaTestKit(system) {{ String persistenceId = generateId(); - ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(fsmRef); - fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + fsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); Item shirt = new Item("1", "Shirt", 59.99F); Item shoes = new Item("2", "Shoes", 89.99F); @@ -160,13 +160,13 @@ public class AbstractPersistentFsmActorTest { fsmRef.tell(new AddItem(shoes), getRef()); fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); - CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.LOOKING_AROUND); ShoppingCart shoppingCart = expectMsgClass(ShoppingCart.class); assertTrue(shoppingCart.getItems().isEmpty()); - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); shoppingCart = expectMsgClass(ShoppingCart.class); @@ -178,9 +178,9 @@ public class AbstractPersistentFsmActorTest { fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); expectTerminated(fsmRef); - ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(recoveredFsmRef); - recoveredFsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + recoveredFsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef()); @@ -191,7 +191,7 @@ public class AbstractPersistentFsmActorTest { recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef()); recoveredFsmRef.tell(Leave.INSTANCE, getRef()); - currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.SHOPPING); shoppingCart = expectMsgClass(ShoppingCart.class); @@ -200,7 +200,7 @@ public class AbstractPersistentFsmActorTest { shoppingCart = expectMsgClass(ShoppingCart.class); assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat)); - stateTransition = expectMsgClass(FSM.Transition.class); + stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, recoveredFsmRef, UserState.SHOPPING, UserState.PAID); shoppingCart = expectMsgClass(ShoppingCart.class); @@ -216,10 +216,10 @@ public class AbstractPersistentFsmActorTest { String persistenceId = generateId(); TestProbe reportActorProbe = new TestProbe(system); - ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref())); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, reportActorProbe.ref())); watch(fsmRef); - fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + fsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); Item shirt = new Item("1", "Shirt", 59.99F); Item shoes = new Item("2", "Shoes", 89.99F); @@ -231,13 +231,13 @@ public class AbstractPersistentFsmActorTest { fsmRef.tell(Buy.INSTANCE, getRef()); fsmRef.tell(Leave.INSTANCE, getRef()); - CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.LOOKING_AROUND); - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); - stateTransition = expectMsgClass(FSM.Transition.class); + stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.PAID); PurchaseWasMade purchaseWasMade = reportActorProbe.expectMsgClass(PurchaseWasMade.class); @@ -253,10 +253,10 @@ public class AbstractPersistentFsmActorTest { String persistenceId = generateId(); TestProbe reportActorProbe = new TestProbe(system); - ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref())); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, reportActorProbe.ref())); watch(fsmRef); - fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + fsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); Item shirt = new Item("1", "Shirt", 59.99F); Item shoes = new Item("2", "Shoes", 89.99F); @@ -267,10 +267,10 @@ public class AbstractPersistentFsmActorTest { fsmRef.tell(new AddItem(coat), getRef()); fsmRef.tell(Leave.INSTANCE, getRef()); - CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.LOOKING_AROUND); - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); reportActorProbe.expectMsgClass(ShoppingCardDiscarded.class); @@ -283,37 +283,37 @@ public class AbstractPersistentFsmActorTest { public void testCorrectStateTimeoutFollowingRecovery() { new JavaTestKit(system) {{ String persistenceId = generateId(); - ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(fsmRef); - fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + fsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); Item shirt = new Item("1", "Shirt", 59.99F); fsmRef.tell(new AddItem(shirt), getRef()); - CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.LOOKING_AROUND); - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); expectNoMsg(duration("0.6seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); expectTerminated(fsmRef); - final ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + final ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(recoveredFsmRef); - recoveredFsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + recoveredFsmRef.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.SHOPPING); new Within(duration("0.9 seconds"), duration("1.9 seconds")) { @Override protected void run() { - FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); assertTransition(stateTransition, recoveredFsmRef, UserState.SHOPPING, UserState.INACTIVE); } }; @@ -322,11 +322,11 @@ public class AbstractPersistentFsmActorTest { recoveredFsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); expectTerminated(recoveredFsmRef); - final ActorRef recoveredFsmRef2 = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + final ActorRef recoveredFsmRef2 = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)); watch(recoveredFsmRef2); - recoveredFsmRef2.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + recoveredFsmRef2.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); assertEquals(currentState.state(), UserState.INACTIVE); new Within(duration("1.9 seconds"), duration("2.9 seconds")) { @@ -339,7 +339,7 @@ public class AbstractPersistentFsmActorTest { } - private static void assertTransition(FSM.Transition transition, ActorRef ref, From from, To to) { + private static void assertTransition(PersistentFSM.Transition transition, ActorRef ref, From from, To to) { assertEquals(ref, transition.fsmRef()); assertEquals(from, transition.from()); assertEquals(to, transition.to()); @@ -350,11 +350,11 @@ public class AbstractPersistentFsmActorTest { } - public static class WebStoreCustomerFSMActor extends AbstractPersistentFsmActor { + public static class WebStoreCustomerFSM extends AbstractPersistentFSM { //State name //#customer-states - enum UserState implements PersistentFsmActor.FSMState { + enum UserState implements PersistentFSM.FSMState { LOOKING_AROUND("Looking Around"), SHOPPING("Shopping"), INACTIVE("Inactive"), @@ -506,10 +506,10 @@ public class AbstractPersistentFsmActorTest { } public static Props props(String persistenceId, ActorRef reportActor) { - return Props.create(WebStoreCustomerFSMActor.class, persistenceId, reportActor); + return Props.create(WebStoreCustomerFSM.class, persistenceId, reportActor); } - public WebStoreCustomerFSMActor(String persistenceId, ActorRef reportActor) { + public WebStoreCustomerFSM(String persistenceId, ActorRef reportActor) { this.persistenceId = persistenceId; //#customer-fsm-body diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala index 6541898209..5e57dcbaf3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala @@ -6,8 +6,7 @@ package akka.persistence.fsm import akka.actor._ import akka.persistence.PersistenceSpec -import akka.persistence.fsm.FSM.{ CurrentState, SubscribeTransitionCallBack, Transition } -import akka.persistence.fsm.PersistentFsmActor.FSMState +import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ import com.typesafe.config.Config @@ -16,16 +15,16 @@ import scala.language.postfixOps import scala.reflect.ClassTag @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { - import PersistentFSMActorSpec._ +abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { + import PersistentFSMSpec._ //Dummy report actor, for tests that don't need it val dummyReportActorRef = TestProbe().ref - "Persistent FSM Actor" must { + "PersistentFSM" must { "function as a regular FSM " in { val persistenceId = name - val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(fsmRef) fsmRef ! SubscribeTransitionCallBack(testActor) @@ -45,15 +44,15 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co fsmRef ! GetCurrentCart fsmRef ! Leave - expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(CurrentState(fsmRef, LookingAround, None)) expectMsg(EmptyShoppingCart) - expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) expectMsg(NonEmptyShoppingCart(List(shirt))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) - expectMsg(Transition(fsmRef, Shopping, Paid)) + expectMsg(Transition(fsmRef, Shopping, Paid, None)) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) expectTerminated(fsmRef) @@ -61,7 +60,7 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co "function as a regular FSM on state timeout" taggedAs TimingTest in { val persistenceId = name - val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(fsmRef) fsmRef ! SubscribeTransitionCallBack(testActor) @@ -70,22 +69,20 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co fsmRef ! AddItem(shirt) - expectMsg(CurrentState(fsmRef, LookingAround)) - expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(CurrentState(fsmRef, LookingAround, None)) + expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) within(0.9 seconds, 1.9 seconds) { - expectMsg(Transition(fsmRef, Shopping, Inactive)) + expectMsg(Transition(fsmRef, Shopping, Inactive, Some(2 seconds))) } - within(1.9 seconds, 2.9 seconds) { - expectTerminated(fsmRef) - } + expectTerminated(fsmRef) } "recover successfully with correct state data" in { val persistenceId = name - val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(fsmRef) fsmRef ! SubscribeTransitionCallBack(testActor) @@ -99,17 +96,17 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co fsmRef ! AddItem(shoes) fsmRef ! GetCurrentCart - expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(CurrentState(fsmRef, LookingAround, None)) expectMsg(EmptyShoppingCart) - expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) expectMsg(NonEmptyShoppingCart(List(shirt))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) fsmRef ! PoisonPill expectTerminated(fsmRef) - val recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + val recoveredFsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(recoveredFsmRef) recoveredFsmRef ! SubscribeTransitionCallBack(testActor) @@ -122,12 +119,12 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co recoveredFsmRef ! GetCurrentCart recoveredFsmRef ! Leave - expectMsg(CurrentState(recoveredFsmRef, Shopping)) + expectMsg(CurrentState(recoveredFsmRef, Shopping, None)) expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) - expectMsg(Transition(recoveredFsmRef, Shopping, Paid)) + expectMsg(Transition(recoveredFsmRef, Shopping, Paid, None)) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) expectTerminated(recoveredFsmRef) @@ -137,7 +134,7 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co val persistenceId = name val reportActorProbe = TestProbe() - val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref)) + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, reportActorProbe.ref)) watch(fsmRef) fsmRef ! SubscribeTransitionCallBack(testActor) @@ -151,9 +148,9 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co fsmRef ! Buy fsmRef ! Leave - expectMsg(CurrentState(fsmRef, LookingAround)) - expectMsg(Transition(fsmRef, LookingAround, Shopping)) - expectMsg(Transition(fsmRef, Shopping, Paid)) + expectMsg(CurrentState(fsmRef, LookingAround, None)) + expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) + expectMsg(Transition(fsmRef, Shopping, Paid, None)) reportActorProbe.expectMsg(PurchaseWasMade(List(shirt, shoes, coat))) expectTerminated(fsmRef) } @@ -162,7 +159,7 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co val persistenceId = name val reportActorProbe = TestProbe() - val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref)) + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, reportActorProbe.ref)) watch(fsmRef) fsmRef ! SubscribeTransitionCallBack(testActor) @@ -175,15 +172,15 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co fsmRef ! AddItem(coat) fsmRef ! Leave - expectMsg(CurrentState(fsmRef, LookingAround)) - expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(CurrentState(fsmRef, LookingAround, None)) + expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) reportActorProbe.expectMsg(ShoppingCardDiscarded) expectTerminated(fsmRef) } "recover successfully with correct state timeout" taggedAs TimingTest in { val persistenceId = name - val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(fsmRef) fsmRef ! SubscribeTransitionCallBack(testActor) @@ -192,42 +189,39 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co fsmRef ! AddItem(shirt) - expectMsg(CurrentState(fsmRef, LookingAround)) - expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(CurrentState(fsmRef, LookingAround, None)) + expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) - expectNoMsg(0.6 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM + expectNoMsg(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM fsmRef ! PoisonPill expectTerminated(fsmRef) - var recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + var recoveredFsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(recoveredFsmRef) recoveredFsmRef ! SubscribeTransitionCallBack(testActor) - expectMsg(CurrentState(recoveredFsmRef, Shopping)) + expectMsg(CurrentState(recoveredFsmRef, Shopping, Some(1 second))) within(0.9 seconds, 1.9 seconds) { - expectMsg(Transition(recoveredFsmRef, Shopping, Inactive)) + expectMsg(Transition(recoveredFsmRef, Shopping, Inactive, Some(2 seconds))) } - expectNoMsg(0.9 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM + expectNoMsg(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM recoveredFsmRef ! PoisonPill expectTerminated(recoveredFsmRef) - recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + recoveredFsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) watch(recoveredFsmRef) recoveredFsmRef ! SubscribeTransitionCallBack(testActor) - expectMsg(CurrentState(recoveredFsmRef, Inactive)) - - within(1.9 seconds, 2.9 seconds) { - expectTerminated(recoveredFsmRef) - } + expectMsg(CurrentState(recoveredFsmRef, Inactive, Some(2 seconds))) + expectTerminated(recoveredFsmRef) } "not trigger onTransition for stay()" taggedAs TimingTest in { val persistenceId = name val probe = TestProbe() - val fsmRef = system.actorOf(SimpleTransitionFSMActor.props(persistenceId, probe.ref)) + val fsmRef = system.actorOf(SimpleTransitionFSM.props(persistenceId, probe.ref)) probe.expectMsg(3.seconds, "LookingAround -> LookingAround") // caused by initialize(), OK @@ -241,7 +235,7 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co } -object PersistentFSMActorSpec { +object PersistentFSMSpec { //#customer-states sealed trait UserState extends FSMState case object LookingAround extends UserState { @@ -295,7 +289,7 @@ object PersistentFSMActorSpec { case class PurchaseWasMade(items: Seq[Item]) extends ReportEvent case object ShoppingCardDiscarded extends ReportEvent - class SimpleTransitionFSMActor(_persistenceId: String, reportActor: ActorRef)(implicit val domainEventClassTag: ClassTag[DomainEvent]) extends PersistentFsmActor[UserState, ShoppingCart, DomainEvent] { + class SimpleTransitionFSM(_persistenceId: String, reportActor: ActorRef)(implicit val domainEventClassTag: ClassTag[DomainEvent]) extends PersistentFSM[UserState, ShoppingCart, DomainEvent] { override val persistenceId = _persistenceId startWith(LookingAround, EmptyShoppingCart) @@ -312,12 +306,14 @@ object PersistentFSMActorSpec { override def applyEvent(domainEvent: DomainEvent, currentData: ShoppingCart): ShoppingCart = currentData } - object SimpleTransitionFSMActor { + object SimpleTransitionFSM { def props(persistenceId: String, reportActor: ActorRef) = - Props(new SimpleTransitionFSMActor(persistenceId, reportActor)) + Props(new SimpleTransitionFSM(persistenceId, reportActor)) } - class WebStoreCustomerFSMActor(_persistenceId: String, reportActor: ActorRef)(implicit val domainEventClassTag: ClassTag[DomainEvent]) extends PersistentFsmActor[UserState, ShoppingCart, DomainEvent] { + class WebStoreCustomerFSM(_persistenceId: String, reportActor: ActorRef)(implicit val domainEventClassTag: ClassTag[DomainEvent]) + extends PersistentFSM[UserState, ShoppingCart, DomainEvent] { + override def persistenceId = _persistenceId //#customer-fsm-body @@ -381,11 +377,11 @@ object PersistentFSMActorSpec { //#customer-apply-event } - object WebStoreCustomerFSMActor { + object WebStoreCustomerFSM { def props(persistenceId: String, reportActor: ActorRef) = - Props(new WebStoreCustomerFSMActor(persistenceId, reportActor)) + Props(new WebStoreCustomerFSM(persistenceId, reportActor)) } } -class LeveldbPersistentFSMActorSpec extends PersistentFSMActorSpec(PersistenceSpec.config("leveldb", "PersistentFSMActorSpec")) -class InmemPersistentFSMActorSpec extends PersistentFSMActorSpec(PersistenceSpec.config("inmem", "PersistentFSMActorSpec")) +class LeveldbPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("leveldb", "PersistentFSMSpec")) +class InmemPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("inmem", "PersistentFSMSpec"))