Merge pull request #16255 from leonidb/master

+per #15279 FSM for PersistentActor
This commit is contained in:
Patrik Nordwall 2015-06-01 13:13:47 +02:00
commit d2a00d3b98
22 changed files with 3595 additions and 16 deletions

View file

@ -128,6 +128,26 @@ public final class FI {
public void apply(I1 i1, I2 i2, I3 i3) throws Exception;
}
/**
* Functional interface for an application.
*
* @param <I1> the first input type, that this Apply will be applied to
* @param <I2> the second input type, that this Apply will be applied to
* @param <I3> the third input type, that this Apply will be applied to
* @param <I4> the fourth input type, that this Apply will be applied to
*/
public static interface UnitApply4<I1, I2, I3, I4> {
/**
* The application to perform.
*
* @param i1 an instance that the application is performed on
* @param i2 an instance that the application is performed on
* @param i3 an instance that the application is performed on
* @param i4 an instance that the application is performed on
*/
public void apply(I1 i1, I2 i2, I3 i3, I4 i4) throws Exception;
}
/**
* Functional interface for an application.
*/

View file

@ -488,6 +488,61 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values.
.. _persistent-fsm-java-lambda:
Persistent FSM
==============
``AbstractPersistentFSMActor`` handles the incoming messages in an FSM like fashion.
Its internal state is persisted as a sequence of changes, later referred to as domain events.
Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL.
A Simple Example
----------------
To demonstrate the features of the ``AbstractPersistentFSMActor``, consider an actor which represents a Web store customer.
The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands:
.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-commands
``AddItem`` sent when the customer adds an item to a shopping cart
``Buy`` - when the customer finishes the purchase
``Leave`` - when the customer leaves the store without purchasing anything
``GetCurrentCart`` allows to query the current state of customer's shopping cart
The customer can be in one of the following states:
.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-states
``LookingAround`` customer is browsing the site, but hasn't added anything to the shopping cart
``Shopping`` customer has recently added items to the shopping cart
``Inactive`` customer has items in the shopping cart, but hasn't added anything recently,
``Paid`` customer has purchased the items
.. note::
``AbstractPersistentFSMActor`` states must inherit from ``PersistentFsmActor.FSMState`` and implement the
``String identifier()`` method. This is required in order to simplify the serialization of FSM states.
String identifiers should be unique!
Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's
start in order to restore the latest customer's state:
.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-domain-events
Customer state data represents the items in customer's shopping cart:
.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-states-data
Here is how everything is wired together:
.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-fsm-body
.. note::
State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events.
Override the ``applyEvent`` method to define how state data is affected by domain events, see the example below
.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-apply-event
Storage plugins
===============

View file

@ -490,6 +490,61 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values.
.. _persistent-fsm:
Persistent FSM
==============
``PersistentFSMActor`` handles the incoming messages in an FSM like fashion.
Its internal state is persisted as a sequence of changes, later referred to as domain events.
Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL.
A Simple Example
----------------
To demonstrate the features of the ``PersistentFSMActor`` trait, consider an actor which represents a Web store customer.
The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands:
.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-commands
``AddItem`` sent when the customer adds an item to a shopping cart
``Buy`` - when the customer finishes the purchase
``Leave`` - when the customer leaves the store without purchasing anything
``GetCurrentCart`` allows to query the current state of customer's shopping cart
The customer can be in one of the following states:
.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-states
``LookingAround`` customer is browsing the site, but hasn't added anything to the shopping cart
``Shopping`` customer has recently added items to the shopping cart
``Inactive`` customer has items in the shopping cart, but hasn't added anything recently,
``Paid`` customer has purchased the items
.. note::
``PersistentFSMActor`` states must inherit from trait ``PersistentFsmActor.FSMState`` and implement the
``def identifier: String`` method. This is required in order to simplify the serialization of FSM states.
String identifiers should be unique!
Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's
start in order to restore the latest customer's state:
.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-domain-events
Customer state data represents the items in customer's shopping cart:
.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-states-data
Here is how everything is wired together:
.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-fsm-body
.. note::
State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events.
Override the ``applyEvent`` method to define how state data is affected by domain events, see the example below
.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-apply-event
.. _storage-plugins:
Storage plugins

View file

@ -0,0 +1,272 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.fsm.japi.pf;
import akka.persistence.fsm.FSM;
import akka.japi.pf.FI;
import akka.japi.pf.PFBuilder;
import scala.PartialFunction;
import java.util.List;
/**
* Builder used to create a partial function for {@link akka.actor.FSM#whenUnhandled}.
*
* @param <S> the state type
* @param <D> the data type
* @param <E> the domain event type
*
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
@SuppressWarnings("rawtypes")
public class FSMStateFunctionBuilder<S, D, E> {
private PFBuilder<FSM.Event<D>, FSM.State<S, D, E>> builder =
new PFBuilder<FSM.Event<D>, FSM.State<S, D, E>>();
/**
* An erased processing of the event matcher. The compile time checks are enforced
* by the public typed versions.
*
* It works like this.
*
* If eventOrType or dataOrType is a Class, then we do a isInstance check,
* otherwise we do an equals check. The null value compares true for anything.
* If the predicate is null, it is skipped otherwise the predicate has to match
* as well.
*
* @param eventOrType an event or a type to match against
* @param dataOrType a data instance or a type to match against
* @param predicate a predicate to match against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
private FSMStateFunctionBuilder<S, D, E> erasedEvent(final Object eventOrType,
final Object dataOrType,
final FI.TypedPredicate2 predicate,
final FI.Apply2 apply) {
builder.match(FSM.Event.class,
new FI.TypedPredicate<FSM.Event>() {
@Override
public boolean defined(FSM.Event e) {
boolean res = true;
if (eventOrType != null) {
if (eventOrType instanceof Class) {
Class eventType = (Class) eventOrType;
res = eventType.isInstance(e.event());
}
else {
res = eventOrType.equals(e.event());
}
}
if (res && dataOrType != null) {
if (dataOrType instanceof Class) {
Class dataType = (Class) dataOrType;
res = dataType.isInstance(e.stateData());
}
else {
res = dataOrType.equals(e.stateData());
}
}
if (res && predicate != null) {
@SuppressWarnings("unchecked")
boolean ures = predicate.defined(e.event(), e.stateData());
res = ures;
}
return res;
}
},
new FI.Apply<FSM.Event, FSM.State<S, D, E>>() {
public FSM.State<S, D, E> apply(FSM.Event e) throws Exception {
@SuppressWarnings("unchecked")
FSM.State<S, D, E> res = (FSM.State<S, D, E>) apply.apply(e.event(), e.stateData());
return res;
}
}
);
return this;
}
/**
* Add a case statement that matches on an event and data type and a predicate.
*
* @param eventType the event type to match on
* @param dataType the data type to match on
* @param predicate a predicate to evaluate on the matched types
* @param apply an action to apply to the event and state data if there is a match
* @param <P> the event type to match on
* @param <Q> the data type to match on
* @return the builder with the case statement added
*/
public final <P, Q> FSMStateFunctionBuilder<S, D, E> event(final Class<P> eventType,
final Class<Q> dataType,
final FI.TypedPredicate2<P, Q> predicate,
final FI.Apply2<P, Q, FSM.State<S, D, E>> apply) {
erasedEvent(eventType, dataType, predicate, apply);
return this;
}
/**
* Add a case statement that matches on an event and data type.
*
* @param eventType the event type to match on
* @param dataType the data type to match on
* @param apply an action to apply to the event and state data if there is a match
* @param <P> the event type to match on
* @param <Q> the data type to match on
* @return the builder with the case statement added
*/
public <P, Q> FSMStateFunctionBuilder<S, D, E> event(final Class<P> eventType,
final Class<Q> dataType,
final FI.Apply2<P, Q, FSM.State<S, D, E>> apply) {
return erasedEvent(eventType, dataType, null, apply);
}
/**
* Add a case statement that matches if the event type and predicate matches.
*
* @param eventType the event type to match on
* @param predicate a predicate that will be evaluated on the data and the event
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public <P> FSMStateFunctionBuilder<S, D, E> event(final Class<P> eventType,
final FI.TypedPredicate2<P, D> predicate,
final FI.Apply2<P, D, FSM.State<S, D, E>> apply) {
return erasedEvent(eventType, null, predicate, apply);
}
/**
* Add a case statement that matches if the event type and predicate matches.
*
* @param eventType the event type to match on
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public <P> FSMStateFunctionBuilder<S, D, E> event(final Class<P> eventType,
final FI.Apply2<P, D, FSM.State<S, D, E>> apply) {
return erasedEvent(eventType, null, null, apply);
}
/**
* Add a case statement that matches if the predicate matches.
*
* @param predicate a predicate that will be evaluated on the data and the event
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public FSMStateFunctionBuilder<S, D, E> event(final FI.TypedPredicate2<Object, D> predicate,
final FI.Apply2<Object, D, FSM.State<S, D, E>> apply) {
return erasedEvent(null, null, predicate, apply);
}
/**
* Add a case statement that matches on the data type and if any of the event types
* in the list match or any of the event instances in the list compares equal.
*
* @param eventMatches a list of types or instances to match against
* @param dataType the data type to match on
* @param apply an action to apply to the event and state data if there is a match
* @param <Q> the data type to match on
* @return the builder with the case statement added
*/
public <Q> FSMStateFunctionBuilder<S, D, E> event(final List<Object> eventMatches,
final Class<Q> dataType,
final FI.Apply2<Object, Q, FSM.State<S, D, E>> apply) {
builder.match(FSM.Event.class,
new FI.TypedPredicate<FSM.Event>() {
@Override
public boolean defined(FSM.Event e) {
if (dataType != null && !dataType.isInstance(e.stateData()))
return false;
boolean emMatch = false;
Object event = e.event();
for (Object em : eventMatches) {
if (em instanceof Class) {
Class emc = (Class) em;
emMatch = emc.isInstance(event);
} else {
emMatch = event.equals(em);
}
if (emMatch)
break;
}
return emMatch;
}
},
new FI.Apply<FSM.Event, FSM.State<S, D, E>>() {
public FSM.State<S, D, E> apply(FSM.Event e) throws Exception {
@SuppressWarnings("unchecked")
Q q = (Q) e.stateData();
return apply.apply(e.event(), q);
}
}
);
return this;
}
/**
* Add a case statement that matches if any of the event types in the list match or
* any of the event instances in the list compares equal.
*
* @param eventMatches a list of types or instances to match against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public FSMStateFunctionBuilder<S, D, E> event(final List<Object> eventMatches,
final FI.Apply2<Object, D, FSM.State<S, D, E>> apply) {
return event(eventMatches, null, apply);
}
/**
* Add a case statement that matches on the data type and if the event compares equal.
*
* @param event an event to compare equal against
* @param dataType the data type to match on
* @param apply an action to apply to the event and state data if there is a match
* @param <Q> the data type to match on
* @return the builder with the case statement added
*/
public <P, Q> FSMStateFunctionBuilder<S, D, E> eventEquals(final P event,
final Class<Q> dataType,
final FI.Apply2<P, Q, FSM.State<S, D, E>> apply) {
return erasedEvent(event, dataType, null, apply);
}
/**
* Add a case statement that matches if event compares equal.
*
* @param event an event to compare equal against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public <P> FSMStateFunctionBuilder<S, D, E> eventEquals(final P event,
final FI.Apply2<P, D, FSM.State<S, D, E>> apply) {
return erasedEvent(event, null, null, apply);
}
/**
* Add a case statement that matches on any type of event.
*
* @param apply an action to apply to the event and state data
* @return the builder with the case statement added
*/
public FSMStateFunctionBuilder<S, D, E> anyEvent(final FI.Apply2<Object, D, FSM.State<S, D, E>> apply) {
return erasedEvent(null, null, null, apply);
}
/**
* Build a {@link scala.PartialFunction} from this builder.
* After this call the builder will be reset.
*
* @return a PartialFunction for this builder.
*/
public PartialFunction<FSM.Event<D>, FSM.State<S, D, E>> build() {
return builder.build();
}
}

View file

@ -0,0 +1,125 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.fsm.japi.pf;
import akka.persistence.fsm.FSM;
import akka.japi.pf.FI;
import akka.japi.pf.UnitPFBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
/**
* Builder used to create a partial function for {@link akka.actor.FSM#onTermination}.
*
* @param <S> the state type
* @param <D> the data type
*
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
public class FSMStopBuilder<S, D> {
private UnitPFBuilder<FSM.StopEvent<S, D>> builder =
new UnitPFBuilder<FSM.StopEvent<S, D>>();
/**
* Add a case statement that matches on an {@link akka.actor.FSM.Reason}.
*
* @param reason the reason for the termination
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public FSMStopBuilder<S, D> stop(final FSM.Reason reason,
final FI.UnitApply2<S, D> apply) {
builder.match(FSM.StopEvent.class,
new FI.TypedPredicate<FSM.StopEvent>() {
@Override
public boolean defined(FSM.StopEvent e) {
return reason.equals(e.reason());
}
},
new FI.UnitApply<FSM.StopEvent>() {
public void apply(FSM.StopEvent e) throws Exception {
@SuppressWarnings("unchecked")
S s = (S) e.currentState();
@SuppressWarnings("unchecked")
D d = (D) e.stateData();
apply.apply(s, d);
}
}
);
return this;
}
/**
* Add a case statement that matches on a reason type.
*
* @param reasonType the reason type to match on
* @param apply an action to apply to the reason, event and state data if there is a match
* @param <P> the reason type to match on
* @return the builder with the case statement added
*/
public <P extends FSM.Reason> FSMStopBuilder<S, D> stop(final Class<P> reasonType,
final FI.UnitApply3<P, S, D> apply) {
return this.stop(reasonType,
new FI.TypedPredicate<P>() {
@Override
public boolean defined(P p) {
return true;
}
}, apply);
}
/**
* Add a case statement that matches on a reason type and a predicate.
*
* @param reasonType the reason type to match on
* @param apply an action to apply to the reason, event and state data if there is a match
* @param predicate a predicate that will be evaluated on the reason if the type matches
* @param <P> the reason type to match on
* @return the builder with the case statement added
*/
public <P extends FSM.Reason> FSMStopBuilder<S, D> stop(final Class<P> reasonType,
final FI.TypedPredicate<P> predicate,
final FI.UnitApply3<P, S, D> apply) {
builder.match(FSM.StopEvent.class,
new FI.TypedPredicate<FSM.StopEvent>() {
@Override
public boolean defined(FSM.StopEvent e) {
if (reasonType.isInstance(e.reason())) {
@SuppressWarnings("unchecked")
P p = (P) e.reason();
return predicate.defined(p);
} else {
return false;
}
}
},
new FI.UnitApply<FSM.StopEvent>() {
public void apply(FSM.StopEvent e) throws Exception {
@SuppressWarnings("unchecked")
P p = (P) e.reason();
@SuppressWarnings("unchecked")
S s = (S) e.currentState();
@SuppressWarnings("unchecked")
D d = (D) e.stateData();
apply.apply(p, s, d);
}
}
);
return this;
}
/**
* Build a {@link scala.PartialFunction} from this builder.
* After this call the builder will be reset.
*
* @return a PartialFunction for this builder.
*/
public PartialFunction<FSM.StopEvent<S, D>, BoxedUnit> build() {
return builder.build();
}
}

View file

@ -63,14 +63,38 @@ public final class MessageFormats {
// optional string sender = 11;
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
boolean hasSender();
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
java.lang.String getSender();
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
com.google.protobuf.ByteString
getSenderBytes();
@ -301,12 +325,28 @@ public final class MessageFormats {
private java.lang.Object sender_;
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public boolean hasSender() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public java.lang.String getSender() {
java.lang.Object ref = sender_;
@ -324,6 +364,14 @@ public final class MessageFormats {
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public com.google.protobuf.ByteString
getSenderBytes() {
@ -920,12 +968,28 @@ public final class MessageFormats {
private java.lang.Object sender_ = "";
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public boolean hasSender() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public java.lang.String getSender() {
java.lang.Object ref = sender_;
@ -940,6 +1004,14 @@ public final class MessageFormats {
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public com.google.protobuf.ByteString
getSenderBytes() {
@ -956,6 +1028,14 @@ public final class MessageFormats {
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public Builder setSender(
java.lang.String value) {
@ -969,6 +1049,14 @@ public final class MessageFormats {
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public Builder clearSender() {
bitField0_ = (bitField0_ & ~0x00000010);
@ -978,6 +1066,14 @@ public final class MessageFormats {
}
/**
* <code>optional string sender = 11;</code>
*
* <pre>
* optional int32 redeliveries = 6; // Removed in 2.4
* repeated string confirms = 7; // Removed in 2.4
* optional bool confirmable = 8; // Removed in 2.4
* optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
* optional string confirmTarget = 10;
* </pre>
*/
public Builder setSenderBytes(
com.google.protobuf.ByteString value) {
@ -3135,6 +3231,641 @@ public final class MessageFormats {
// @@protoc_insertion_point(class_scope:AtLeastOnceDeliverySnapshot)
}
public interface PersistentStateChangeEventOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required string stateIdentifier = 1;
/**
* <code>required string stateIdentifier = 1;</code>
*/
boolean hasStateIdentifier();
/**
* <code>required string stateIdentifier = 1;</code>
*/
java.lang.String getStateIdentifier();
/**
* <code>required string stateIdentifier = 1;</code>
*/
com.google.protobuf.ByteString
getStateIdentifierBytes();
// optional string timeout = 2;
/**
* <code>optional string timeout = 2;</code>
*/
boolean hasTimeout();
/**
* <code>optional string timeout = 2;</code>
*/
java.lang.String getTimeout();
/**
* <code>optional string timeout = 2;</code>
*/
com.google.protobuf.ByteString
getTimeoutBytes();
}
/**
* Protobuf type {@code PersistentStateChangeEvent}
*/
public static final class PersistentStateChangeEvent extends
com.google.protobuf.GeneratedMessage
implements PersistentStateChangeEventOrBuilder {
// Use PersistentStateChangeEvent.newBuilder() to construct.
private PersistentStateChangeEvent(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private PersistentStateChangeEvent(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final PersistentStateChangeEvent defaultInstance;
public static PersistentStateChangeEvent getDefaultInstance() {
return defaultInstance;
}
public PersistentStateChangeEvent getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private PersistentStateChangeEvent(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
stateIdentifier_ = input.readBytes();
break;
}
case 18: {
bitField0_ |= 0x00000002;
timeout_ = input.readBytes();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.class, akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.Builder.class);
}
public static com.google.protobuf.Parser<PersistentStateChangeEvent> PARSER =
new com.google.protobuf.AbstractParser<PersistentStateChangeEvent>() {
public PersistentStateChangeEvent parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new PersistentStateChangeEvent(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<PersistentStateChangeEvent> getParserForType() {
return PARSER;
}
private int bitField0_;
// required string stateIdentifier = 1;
public static final int STATEIDENTIFIER_FIELD_NUMBER = 1;
private java.lang.Object stateIdentifier_;
/**
* <code>required string stateIdentifier = 1;</code>
*/
public boolean hasStateIdentifier() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public java.lang.String getStateIdentifier() {
java.lang.Object ref = stateIdentifier_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
stateIdentifier_ = s;
}
return s;
}
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public com.google.protobuf.ByteString
getStateIdentifierBytes() {
java.lang.Object ref = stateIdentifier_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
stateIdentifier_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
// optional string timeout = 2;
public static final int TIMEOUT_FIELD_NUMBER = 2;
private java.lang.Object timeout_;
/**
* <code>optional string timeout = 2;</code>
*/
public boolean hasTimeout() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional string timeout = 2;</code>
*/
public java.lang.String getTimeout() {
java.lang.Object ref = timeout_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
timeout_ = s;
}
return s;
}
}
/**
* <code>optional string timeout = 2;</code>
*/
public com.google.protobuf.ByteString
getTimeoutBytes() {
java.lang.Object ref = timeout_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
timeout_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() {
stateIdentifier_ = "";
timeout_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasStateIdentifier()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, getStateIdentifierBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, getTimeoutBytes());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, getStateIdentifierBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, getTimeoutBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code PersistentStateChangeEvent}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements akka.persistence.serialization.MessageFormats.PersistentStateChangeEventOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.class, akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.Builder.class);
}
// Construct using akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
stateIdentifier_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
timeout_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_descriptor;
}
public akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent getDefaultInstanceForType() {
return akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.getDefaultInstance();
}
public akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent build() {
akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent buildPartial() {
akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent result = new akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.stateIdentifier_ = stateIdentifier_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.timeout_ = timeout_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent) {
return mergeFrom((akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent other) {
if (other == akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.getDefaultInstance()) return this;
if (other.hasStateIdentifier()) {
bitField0_ |= 0x00000001;
stateIdentifier_ = other.stateIdentifier_;
onChanged();
}
if (other.hasTimeout()) {
bitField0_ |= 0x00000002;
timeout_ = other.timeout_;
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasStateIdentifier()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// required string stateIdentifier = 1;
private java.lang.Object stateIdentifier_ = "";
/**
* <code>required string stateIdentifier = 1;</code>
*/
public boolean hasStateIdentifier() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public java.lang.String getStateIdentifier() {
java.lang.Object ref = stateIdentifier_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref)
.toStringUtf8();
stateIdentifier_ = s;
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public com.google.protobuf.ByteString
getStateIdentifierBytes() {
java.lang.Object ref = stateIdentifier_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
stateIdentifier_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public Builder setStateIdentifier(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
stateIdentifier_ = value;
onChanged();
return this;
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public Builder clearStateIdentifier() {
bitField0_ = (bitField0_ & ~0x00000001);
stateIdentifier_ = getDefaultInstance().getStateIdentifier();
onChanged();
return this;
}
/**
* <code>required string stateIdentifier = 1;</code>
*/
public Builder setStateIdentifierBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
stateIdentifier_ = value;
onChanged();
return this;
}
// optional string timeout = 2;
private java.lang.Object timeout_ = "";
/**
* <code>optional string timeout = 2;</code>
*/
public boolean hasTimeout() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional string timeout = 2;</code>
*/
public java.lang.String getTimeout() {
java.lang.Object ref = timeout_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref)
.toStringUtf8();
timeout_ = s;
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>optional string timeout = 2;</code>
*/
public com.google.protobuf.ByteString
getTimeoutBytes() {
java.lang.Object ref = timeout_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
timeout_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>optional string timeout = 2;</code>
*/
public Builder setTimeout(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
timeout_ = value;
onChanged();
return this;
}
/**
* <code>optional string timeout = 2;</code>
*/
public Builder clearTimeout() {
bitField0_ = (bitField0_ & ~0x00000002);
timeout_ = getDefaultInstance().getTimeout();
onChanged();
return this;
}
/**
* <code>optional string timeout = 2;</code>
*/
public Builder setTimeoutBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
timeout_ = value;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:PersistentStateChangeEvent)
}
static {
defaultInstance = new PersistentStateChangeEvent(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:PersistentStateChangeEvent)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_PersistentMessage_descriptor;
private static
@ -3155,6 +3886,11 @@ public final class MessageFormats {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_PersistentStateChangeEvent_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_PersistentStateChangeEvent_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -3176,7 +3912,9 @@ public final class MessageFormats {
"ot.UnconfirmedDelivery\032c\n\023UnconfirmedDel",
"ivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013destination" +
"\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.PersistentPayl" +
"oadB\"\n\036akka.persistence.serializationH\001"
"oad\"F\n\032PersistentStateChangeEvent\022\027\n\017sta" +
"teIdentifier\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\tB\"\n\036a" +
"kka.persistence.serializationH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -3207,6 +3945,12 @@ public final class MessageFormats {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor,
new java.lang.String[] { "DeliveryId", "Destination", "Payload", });
internal_static_PersistentStateChangeEvent_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_PersistentStateChangeEvent_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PersistentStateChangeEvent_descriptor,
new java.lang.String[] { "StateIdentifier", "Timeout", });
return null;
}
};

View file

@ -34,3 +34,8 @@ message AtLeastOnceDeliverySnapshot {
required int64 currentDeliveryId = 1;
repeated UnconfirmedDelivery unconfirmedDeliveries = 2;
}
message PersistentStateChangeEvent {
required string stateIdentifier = 1;
optional string timeout = 2;
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,154 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.fsm
import akka.actor.ActorLogging
import akka.persistence.fsm.PersistentFsmActor.FSMState
import akka.persistence.serialization.Message
import akka.persistence.{ PersistentActor, RecoveryCompleted }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
/**
* FSM actor implementation with persistent state
*
* Supports the usual [[akka.actor.FSM]] functionality with additional persistence features.
* State and State Data are persisted on every state change.
* FSM is identified by 'persistenceId' value.
* Persistence execution order is: persist -&gt; wait for ack -&gt; apply state. Incoming messages are deferred until the state is applied.
*/
trait PersistentFsmActor[S <: FSMState, D, E] extends PersistentActor with FSM[S, D, E] with ActorLogging {
import akka.persistence.fsm.PersistentFsmActor._
/**
* Enables to pass a ClassTag of a domain event base type from the implementing class
*
* @return [[scala.reflect.ClassTag]] of domain event base type
*/
implicit def domainEventClassTag: ClassTag[E]
/**
* Domain event's [[scala.reflect.ClassTag]]
* Used for identifying domain events during recovery
*/
val domainEventTag = domainEventClassTag
/**
* Map from state identifier to state instance
*/
lazy val statesMap: Map[String, S] = stateNames.map(name (name.identifier, name)).toMap
/**
* Override this handler to define the action on Domain Event
*
* @param domainEvent domain event to apply
* @param currentData state data of the previous state
* @return updated state data
*/
def applyEvent(domainEvent: E, currentData: D): D
/**
* Override this handler to define the action on recovery completion
*/
def onRecoveryCompleted(): Unit = {}
/**
* After recovery events are handled as in usual FSM actor
*/
override def receiveCommand: Receive = {
super[FSM].receive
}
/**
* Discover the latest recorded state
*/
override def receiveRecover: Receive = {
case domainEventTag(event) startWith(stateName, applyEvent(event, stateData))
case StateChangeEvent(stateIdentifier, timeout) startWith(statesMap(stateIdentifier), stateData, timeout)
case RecoveryCompleted
initialize()
onRecoveryCompleted()
}
/**
* Persist FSM State and FSM State Data
*/
override private[akka] def applyState(nextState: State): Unit = {
val eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout)
var nextData: D = stateData
persist[Any](eventsToPersist) {
case domainEventTag(event)
nextData = applyEvent(event, nextData)
case StateChangeEvent(stateIdentifier, timeout)
super.applyState(nextState using nextData)
nextState.afterTransitionDo(stateData)
}
}
}
object PersistentFsmActor {
/**
* Base persistent event class
*/
private[persistence] sealed trait PersistentFsmEvent extends Message
/**
* Persisted on state change
*
* @param stateIdentifier FSM state identifier
* @param timeout FSM state timeout
*/
private[persistence] case class StateChangeEvent(stateIdentifier: String, timeout: Option[FiniteDuration]) extends PersistentFsmEvent
/**
* FSMState base trait, makes possible for simple default serialization by conversion to String
*/
trait FSMState {
def identifier: String
}
}
/**
* Java API: compatible with lambda expressions
*
* Persistent Finite State Machine actor abstract base class.
*
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
abstract class AbstractPersistentFsmActor[S <: FSMState, D, E] extends AbstractFSM[S, D, E] with PersistentFsmActor[S, D, E] {
import java.util.function.Consumer
/**
* Adapter from Java 8 Functional Interface to Scala Function
* @param action - Java 8 lambda expression defining the action
* @return action represented as a Scala Functin
*/
final def exec(action: Consumer[D]): D Unit =
data action.accept(data)
/**
* Adapter from Java [[Class]] to [[scala.reflect.ClassTag]]
* @return domain event [[scala.reflect.ClassTag]]
*/
final override def domainEventClassTag: ClassTag[E] =
ClassTag(domainEventClass)
/**
* Domain event's [[Class]]
* Used for identifying domain events during recovery
*/
def domainEventClass: Class[E]
}
/**
* Java API: compatible with lambda expressions
*
* Persistent Finite State Machine actor abstract base class with FSM Logging
*
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
abstract class AbstractPersistentLoggingFsmActor[S <: FSMState, D, E] extends AbstractLoggingFSM[S, D, E] with PersistentFsmActor[S, D, E]

View file

@ -4,6 +4,8 @@
package akka.persistence.serialization
import scala.concurrent.duration
import scala.concurrent.duration.Duration
import scala.language.existentials
import com.google.protobuf._
import akka.actor.{ ActorPath, ExtendedActorSystem }
@ -14,6 +16,7 @@ import akka.serialization._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap }
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import scala.collection.immutable.VectorBuilder
import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent
/**
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
@ -21,7 +24,7 @@ import scala.collection.immutable.VectorBuilder
trait Message extends Serializable
/**
* Protobuf serializer for [[akka.persistence.PersistentRepr]] and [[akka.persistence.AtLeastOnceDelivery]] messages.
* Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFsmActor.StateChangeEvent]] messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import PersistentRepr.Undefined
@ -29,6 +32,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
val PersistentStateChangeEventClass = classOf[StateChangeEvent]
override val includeManifest: Boolean = true
@ -45,6 +49,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case a: AtLeastOnceDeliverySnap atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case s: StateChangeEvent stateChangeBuilder(s).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
@ -58,6 +63,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
case PersistentImplClass persistent(PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(PersistentMessage.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case PersistentStateChangeEventClass stateChange(PersistentStateChangeEvent.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
@ -80,6 +86,14 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
builder
}
def stateChangeBuilder(stateChange: StateChangeEvent): PersistentStateChangeEvent.Builder = {
val builder = PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier)
stateChange.timeout match {
case None builder
case Some(timeout) builder.setTimeout(timeout.toString())
}
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
@ -93,6 +107,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
unconfirmedDeliveries.result())
}
def stateChange(persistentStateChange: PersistentStateChangeEvent): StateChangeEvent = {
StateChangeEvent(
persistentStateChange.getStateIdentifier,
if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None)
}
private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = PersistentMessage.newBuilder

View file

@ -0,0 +1,596 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.fsm;
import akka.actor.*;
import akka.japi.Option;
import akka.persistence.PersistenceSpec;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import org.junit.ClassRule;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import akka.persistence.fsm.FSM.CurrentState;
import org.junit.Test;
import scala.concurrent.duration.Duration;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.UserState;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.ShoppingCart;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Item;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.GetCurrentCart;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.AddItem;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Buy;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Leave;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.PurchaseWasMade;
import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.ShoppingCardDiscarded;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.matchers.JUnitMatchers.hasItems;
public class AbstractPersistentFsmActorTest {
private static Option<String> none = Option.none();
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("PersistentFSMJavaTest", PersistenceSpec.config(
"leveldb", "AbstractPersistentFsmActorTest", "off", none.asScala()));
private final ActorSystem system = actorSystemResource.getSystem();
//Dummy report actor, for tests that don't need it
private final ActorRef dummyReportActorRef = new TestProbe(system).ref();
@Test
public void fsmFunctionalTest() throws Exception {
new JavaTestKit(system) {{
String persistenceId = generateId();
ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(fsmRef);
fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
Item shirt = new Item("1", "Shirt", 59.99F);
Item shoes = new Item("2", "Shoes", 89.99F);
Item coat = new Item("3", "Coat", 119.99F);
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(new AddItem(shirt), getRef());
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(new AddItem(shoes), getRef());
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(new AddItem(coat), getRef());
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(Buy.INSTANCE, getRef());
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(Leave.INSTANCE, getRef());
CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.LOOKING_AROUND);
ShoppingCart shoppingCart = expectMsgClass(ShoppingCart.class);
assertTrue(shoppingCart.getItems().isEmpty());
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt));
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes));
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat));
stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.PAID);
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat));
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(fsmRef, terminated.getActor());
}};
}
@Test
public void fsmTimeoutTest() throws Exception {
new JavaTestKit(system) {{
String persistenceId = generateId();
ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(fsmRef);
fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
Item shirt = new Item("1", "Shirt", 59.99F);
fsmRef.tell(new AddItem(shirt), getRef());
CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.LOOKING_AROUND);
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
new Within(duration("0.9 seconds"), duration("1.1 seconds")) {
@Override
protected void run() {
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.INACTIVE);
}
};
new Within(duration("1.9 seconds"), duration("2.1 seconds")) {
@Override
protected void run() {
expectTerminated(fsmRef);
}
};
}};
}
@Test
public void testSuccessfulRecoveryWithCorrectStateData() {
new JavaTestKit(system) {{
String persistenceId = generateId();
ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(fsmRef);
fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
Item shirt = new Item("1", "Shirt", 59.99F);
Item shoes = new Item("2", "Shoes", 89.99F);
Item coat = new Item("3", "Coat", 119.99F);
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(new AddItem(shirt), getRef());
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
fsmRef.tell(new AddItem(shoes), getRef());
fsmRef.tell(GetCurrentCart.INSTANCE, getRef());
CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.LOOKING_AROUND);
ShoppingCart shoppingCart = expectMsgClass(ShoppingCart.class);
assertTrue(shoppingCart.getItems().isEmpty());
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt));
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes));
fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
expectTerminated(fsmRef);
ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(recoveredFsmRef);
recoveredFsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef());
recoveredFsmRef.tell(new AddItem(coat), getRef());
recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef());
recoveredFsmRef.tell(Buy.INSTANCE, getRef());
recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef());
recoveredFsmRef.tell(Leave.INSTANCE, getRef());
currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.SHOPPING);
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes));
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat));
stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, recoveredFsmRef, UserState.SHOPPING, UserState.PAID);
shoppingCart = expectMsgClass(ShoppingCart.class);
assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat));
expectTerminated(recoveredFsmRef);
}};
}
@Test
public void testExecutionOfDefinedActionsFollowingSuccessfulPersistence() {
new JavaTestKit(system) {{
String persistenceId = generateId();
TestProbe reportActorProbe = new TestProbe(system);
ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref()));
watch(fsmRef);
fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
Item shirt = new Item("1", "Shirt", 59.99F);
Item shoes = new Item("2", "Shoes", 89.99F);
Item coat = new Item("3", "Coat", 119.99F);
fsmRef.tell(new AddItem(shirt), getRef());
fsmRef.tell(new AddItem(shoes), getRef());
fsmRef.tell(new AddItem(coat), getRef());
fsmRef.tell(Buy.INSTANCE, getRef());
fsmRef.tell(Leave.INSTANCE, getRef());
CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.LOOKING_AROUND);
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.PAID);
PurchaseWasMade purchaseWasMade = reportActorProbe.expectMsgClass(PurchaseWasMade.class);
assertThat(purchaseWasMade.getItems(), hasItems(shirt, shoes, coat));
expectTerminated(fsmRef);
}};
}
@Test
public void testExecutionOfDefinedActionsFollowingSuccessfulPersistenceOfFSMStop() {
new JavaTestKit(system) {{
String persistenceId = generateId();
TestProbe reportActorProbe = new TestProbe(system);
ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref()));
watch(fsmRef);
fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
Item shirt = new Item("1", "Shirt", 59.99F);
Item shoes = new Item("2", "Shoes", 89.99F);
Item coat = new Item("3", "Coat", 119.99F);
fsmRef.tell(new AddItem(shirt), getRef());
fsmRef.tell(new AddItem(shoes), getRef());
fsmRef.tell(new AddItem(coat), getRef());
fsmRef.tell(Leave.INSTANCE, getRef());
CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.LOOKING_AROUND);
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
reportActorProbe.expectMsgClass(ShoppingCardDiscarded.class);
expectTerminated(fsmRef);
}};
}
@Test
public void testCorrectStateTimeoutFollowingRecovery() {
new JavaTestKit(system) {{
String persistenceId = generateId();
ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(fsmRef);
fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
Item shirt = new Item("1", "Shirt", 59.99F);
fsmRef.tell(new AddItem(shirt), getRef());
CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.LOOKING_AROUND);
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
expectNoMsg(duration("0.6seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM
fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
expectTerminated(fsmRef);
final ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(recoveredFsmRef);
recoveredFsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.SHOPPING);
new Within(duration("0.9 seconds"), duration("1.1 seconds")) {
@Override
protected void run() {
FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class);
assertTransition(stateTransition, recoveredFsmRef, UserState.SHOPPING, UserState.INACTIVE);
}
};
expectNoMsg(duration("0.9 seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM
recoveredFsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
expectTerminated(recoveredFsmRef);
final ActorRef recoveredFsmRef2 = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef));
watch(recoveredFsmRef2);
recoveredFsmRef2.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef());
currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class);
assertEquals(currentState.state(), UserState.INACTIVE);
new Within(duration("1.9 seconds"), duration("2.1 seconds")) {
@Override
protected void run() {
expectTerminated(recoveredFsmRef2);
}
};
}};
}
private static <State, From extends State, To extends State> void assertTransition(FSM.Transition transition, ActorRef ref, From from, To to) {
assertEquals(ref, transition.fsmRef());
assertEquals(from, transition.from());
assertEquals(to, transition.to());
}
private static String generateId() {
return UUID.randomUUID().toString();
}
public static class WebStoreCustomerFSMActor extends AbstractPersistentFsmActor<WebStoreCustomerFSMActor.UserState, WebStoreCustomerFSMActor.ShoppingCart, WebStoreCustomerFSMActor.DomainEvent> {
//State name
//#customer-states
enum UserState implements PersistentFsmActor.FSMState {
LOOKING_AROUND("Looking Around"),
SHOPPING("Shopping"),
INACTIVE("Inactive"),
PAID("Paid");
private final String stateIdentifier;
UserState(String stateIdentifier) {
this.stateIdentifier = stateIdentifier;
}
@Override
public String identifier() {
return stateIdentifier;
}
}
//#customer-states
//#customer-states-data
static class ShoppingCart {
private final List<Item> items = new ArrayList<>();
public List<Item> getItems() {
return Collections.unmodifiableList(items);
}
void addItem(Item item) {
items.add(item);
}
void empty() {
items.clear();
}
}
static class Item implements Serializable {
private final String id;
private final String name;
private final float price;
Item(String id, String name, float price) {
this.id = id;
this.name = name;
this.price = price;
}
public String getId() {
return id;
}
public float getPrice() {
return price;
}
public String getName() {
return name;
}
@Override
public String toString() {
return String.format("Item{id=%s, name=%s, price=%s}", id, price, name);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Item item = (Item) o;
return item.price == price && id.equals(item.id) && name.equals(item.name);
}
}
//#customer-states-data
public interface Command {
}
//#customer-commands
public static final class AddItem implements Command {
private final Item item;
public AddItem(Item item) {
this.item = item;
}
public Item getItem() {
return item;
}
}
public enum Buy implements Command {INSTANCE}
public enum Leave implements Command {INSTANCE}
public enum GetCurrentCart implements Command {INSTANCE}
//#customer-commands
interface DomainEvent extends Serializable {
}
//#customer-domain-events
public static final class ItemAdded implements DomainEvent {
private final Item item;
public ItemAdded(Item item) {
this.item = item;
}
public Item getItem() {
return item;
}
}
public enum OrderExecuted implements DomainEvent {INSTANCE}
public enum OrderDiscarded implements DomainEvent {INSTANCE}
//#customer-domain-events
//Side effects - report events to be sent to some "Report Actor"
public interface ReportEvent {
}
public static final class PurchaseWasMade implements ReportEvent {
private final List<Item> items;
public PurchaseWasMade(List<Item> items) {
this.items = Collections.unmodifiableList(items);
}
public List<Item> getItems() {
return items;
}
}
public enum ShoppingCardDiscarded implements ReportEvent {INSTANCE}
final private String persistenceId;
@Override
public Class<DomainEvent> domainEventClass() {
return DomainEvent.class;
}
@Override
public String persistenceId() {
return persistenceId;
}
public static Props props(String persistenceId, ActorRef reportActor) {
return Props.create(WebStoreCustomerFSMActor.class, persistenceId, reportActor);
}
public WebStoreCustomerFSMActor(String persistenceId, ActorRef reportActor) {
this.persistenceId = persistenceId;
//#customer-fsm-body
startWith(UserState.LOOKING_AROUND, new ShoppingCart());
when(UserState.LOOKING_AROUND,
matchEvent(AddItem.class,
(event, data) ->
goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem()))
.forMax(Duration.create(1, TimeUnit.SECONDS))
)
.event(GetCurrentCart.class, (event, data) -> stay().replying(data))
);
when(UserState.SHOPPING,
matchEvent(AddItem.class,
(event, data) ->
stay().applying(new ItemAdded(event.getItem()))
.forMax(Duration.create(1, TimeUnit.SECONDS)))
.event(Buy.class,
(event, data) ->
goTo(UserState.PAID).applying(OrderExecuted.INSTANCE)
.andThen(exec(cart ->
reportActor.tell(new PurchaseWasMade(cart.getItems()), self()))
))
.event(Leave.class,
(event, data) ->
stop().applying(OrderDiscarded.INSTANCE)
.andThen(exec(cart ->
reportActor.tell(ShoppingCardDiscarded.INSTANCE, self())
)))
.event(GetCurrentCart.class, (event, data) -> stay().replying(data))
.event(StateTimeout$.class,
(event, data) ->
goTo(UserState.INACTIVE).forMax(Duration.create(2, TimeUnit.SECONDS)))
);
when(UserState.INACTIVE,
matchEvent(AddItem.class,
(event, data) ->
goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem()))
.forMax(Duration.create(1, TimeUnit.SECONDS)))
.event(GetCurrentCart.class, (event, data) -> stay().replying(data))
.event(StateTimeout$.class,
(event, data) ->
stop().applying(OrderDiscarded.INSTANCE)
.andThen(exec(cart ->
reportActor.tell(ShoppingCardDiscarded.INSTANCE, self())
)))
);
when(UserState.PAID,
matchEvent(Leave.class, (event, data) -> stop())
.event(GetCurrentCart.class, (event, data) -> stay().replying(data))
);
initialize();
//#customer-fsm-body
}
/**
* Override this handler to define the action on Domain Event during recovery
*
* @param event domain event to apply
* @param currentData state data of the previous state
*/
//#customer-apply-event
@Override
public ShoppingCart applyEvent(DomainEvent event, ShoppingCart currentData) {
if (event instanceof ItemAdded) {
currentData.addItem(((ItemAdded) event).getItem());
return currentData;
} else if (event instanceof OrderExecuted) {
return currentData;
} else if (event instanceof OrderDiscarded) {
currentData.empty();
return currentData;
}
throw new RuntimeException("Unhandled");
}
//#customer-apply-event
}
}

View file

@ -149,7 +149,7 @@ object AtLeastOnceDeliverySpec {
}
abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec {
abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
import akka.persistence.AtLeastOnceDeliverySpec._
"AtLeastOnceDelivery" must {

View file

@ -111,7 +111,7 @@ object PerformanceSpec {
}
}
class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "PerformanceSpec", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender {
class PerformanceSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "PerformanceSpec", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with ImplicitSender {
import PerformanceSpec._
val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load")

View file

@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag
import scala.util.control.NoStackTrace
import com.typesafe.config.ConfigFactory
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterEach
@ -18,7 +18,7 @@ import org.scalatest.BeforeAndAfterEach
import akka.actor.Props
import akka.testkit.AkkaSpec
trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec
abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with BeforeAndAfterEach with Cleanup { this: AkkaSpec
private var _name: String = _
lazy val extension = Persistence(system)

View file

@ -131,10 +131,10 @@ object PersistentActorFailureSpec {
}
}
class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
"""
akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal"
"""))) with PersistenceSpec with ImplicitSender {
"""))) with ImplicitSender {
import PersistentActorFailureSpec._
import PersistentActorSpec._

View file

@ -509,7 +509,7 @@ object PersistentActorSpec {
}
abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
import PersistentActorSpec._
override protected def beforeEach() {

View file

@ -152,7 +152,7 @@ object PersistentViewSpec {
}
}
abstract class PersistentViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
import akka.persistence.PersistentViewSpec._
var persistentActor: ActorRef = _

View file

@ -58,10 +58,10 @@ object SnapshotFailureRobustnessSpec {
}
}
class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some(
class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some(
"""
akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore"
"""))) with PersistenceSpec with ImplicitSender {
"""))) with ImplicitSender {
import SnapshotFailureRobustnessSpec._

View file

@ -33,7 +33,7 @@ object SnapshotRecoveryLocalStoreSpec {
}
}
class SnapshotRecoveryLocalStoreSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotRecoveryLocalStoreSpec")) with PersistenceSpec with ImplicitSender {
class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotRecoveryLocalStoreSpec")) with ImplicitSender {
import SnapshotRecoveryLocalStoreSpec._

View file

@ -64,7 +64,7 @@ object SnapshotSerializationSpec {
}
class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotSerializationSpec", serialization = "off", extraConfig = Some(
class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotSerializationSpec", serialization = "off", extraConfig = Some(
"""
akka.actor {
serializers {
@ -74,7 +74,7 @@ class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb
"akka.persistence.SnapshotSerializationSpec$SerializationMarker" = my-snapshot
}
}
"""))) with PersistenceSpec with ImplicitSender {
"""))) with ImplicitSender {
import SnapshotSerializationSpec._
import SnapshotSerializationSpec.XXXXXXXXXXXXXXXXXXXX._

View file

@ -60,7 +60,7 @@ object SnapshotSpec {
}
}
class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotSpec")) with PersistenceSpec with ImplicitSender {
class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotSpec")) with ImplicitSender {
import SnapshotSpec._
import SnapshotProtocol._

View file

@ -0,0 +1,354 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.fsm
import akka.actor._
import akka.persistence.PersistenceSpec
import akka.persistence.fsm.FSM.{ CurrentState, SubscribeTransitionCallBack, Transition }
import akka.persistence.fsm.PersistentFsmActor.FSMState
import akka.testkit._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
import PersistentFSMActorSpec._
//Dummy report actor, for tests that don't need it
val dummyReportActorRef = TestProbe().ref
"Persistent FSM Actor" must {
"function as a regular FSM " in {
val persistenceId = name
val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(fsmRef)
fsmRef ! SubscribeTransitionCallBack(testActor)
val shirt = Item("1", "Shirt", 59.99F)
val shoes = Item("2", "Shoes", 89.99F)
val coat = Item("3", "Coat", 119.99F)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(shirt)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(shoes)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(coat)
fsmRef ! GetCurrentCart
fsmRef ! Buy
fsmRef ! GetCurrentCart
fsmRef ! Leave
expectMsg(CurrentState(fsmRef, LookingAround))
expectMsg(EmptyShoppingCart)
expectMsg(Transition(fsmRef, LookingAround, Shopping))
expectMsg(NonEmptyShoppingCart(List(shirt)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
expectMsg(Transition(fsmRef, Shopping, Paid))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
expectTerminated(fsmRef)
}
"function as a regular FSM on state timeout" in {
val persistenceId = name
val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(fsmRef)
fsmRef ! SubscribeTransitionCallBack(testActor)
val shirt = Item("1", "Shirt", 59.99F)
fsmRef ! AddItem(shirt)
expectMsg(CurrentState(fsmRef, LookingAround))
expectMsg(Transition(fsmRef, LookingAround, Shopping))
within(0.9 seconds, 1.1 seconds) {
expectMsg(Transition(fsmRef, Shopping, Inactive))
}
within(1.9 seconds, 2.1 seconds) {
expectTerminated(fsmRef)
}
}
"recover successfully with correct state data" in {
val persistenceId = name
val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(fsmRef)
fsmRef ! SubscribeTransitionCallBack(testActor)
val shirt = Item("1", "Shirt", 59.99F)
val shoes = Item("2", "Shoes", 89.99F)
val coat = Item("3", "Coat", 119.99F)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(shirt)
fsmRef ! GetCurrentCart
fsmRef ! AddItem(shoes)
fsmRef ! GetCurrentCart
expectMsg(CurrentState(fsmRef, LookingAround))
expectMsg(EmptyShoppingCart)
expectMsg(Transition(fsmRef, LookingAround, Shopping))
expectMsg(NonEmptyShoppingCart(List(shirt)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes)))
fsmRef ! PoisonPill
expectTerminated(fsmRef)
val recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(recoveredFsmRef)
recoveredFsmRef ! SubscribeTransitionCallBack(testActor)
recoveredFsmRef ! GetCurrentCart
recoveredFsmRef ! AddItem(coat)
recoveredFsmRef ! GetCurrentCart
recoveredFsmRef ! Buy
recoveredFsmRef ! GetCurrentCart
recoveredFsmRef ! Leave
expectMsg(CurrentState(recoveredFsmRef, Shopping))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes)))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
expectMsg(Transition(recoveredFsmRef, Shopping, Paid))
expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat)))
expectTerminated(recoveredFsmRef)
}
"execute the defined actions following successful persistence of state change" in {
val persistenceId = name
val reportActorProbe = TestProbe()
val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref))
watch(fsmRef)
fsmRef ! SubscribeTransitionCallBack(testActor)
val shirt = Item("1", "Shirt", 59.99F)
val shoes = Item("2", "Shoes", 89.99F)
val coat = Item("3", "Coat", 119.99F)
fsmRef ! AddItem(shirt)
fsmRef ! AddItem(shoes)
fsmRef ! AddItem(coat)
fsmRef ! Buy
fsmRef ! Leave
expectMsg(CurrentState(fsmRef, LookingAround))
expectMsg(Transition(fsmRef, LookingAround, Shopping))
expectMsg(Transition(fsmRef, Shopping, Paid))
reportActorProbe.expectMsg(PurchaseWasMade(List(shirt, shoes, coat)))
expectTerminated(fsmRef)
}
"execute the defined actions following successful persistence of FSM stop" in {
val persistenceId = name
val reportActorProbe = TestProbe()
val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref))
watch(fsmRef)
fsmRef ! SubscribeTransitionCallBack(testActor)
val shirt = Item("1", "Shirt", 59.99F)
val shoes = Item("2", "Shoes", 89.99F)
val coat = Item("3", "Coat", 119.99F)
fsmRef ! AddItem(shirt)
fsmRef ! AddItem(shoes)
fsmRef ! AddItem(coat)
fsmRef ! Leave
expectMsg(CurrentState(fsmRef, LookingAround))
expectMsg(Transition(fsmRef, LookingAround, Shopping))
reportActorProbe.expectMsg(ShoppingCardDiscarded)
expectTerminated(fsmRef)
}
"recover successfully with correct state timeout" in {
val persistenceId = name
val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(fsmRef)
fsmRef ! SubscribeTransitionCallBack(testActor)
val shirt = Item("1", "Shirt", 59.99F)
fsmRef ! AddItem(shirt)
expectMsg(CurrentState(fsmRef, LookingAround))
expectMsg(Transition(fsmRef, LookingAround, Shopping))
expectNoMsg(0.6 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM
fsmRef ! PoisonPill
expectTerminated(fsmRef)
var recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(recoveredFsmRef)
recoveredFsmRef ! SubscribeTransitionCallBack(testActor)
expectMsg(CurrentState(recoveredFsmRef, Shopping))
within(0.9 seconds, 1.1 seconds) {
expectMsg(Transition(recoveredFsmRef, Shopping, Inactive))
}
expectNoMsg(0.9 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM
recoveredFsmRef ! PoisonPill
expectTerminated(recoveredFsmRef)
recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef))
watch(recoveredFsmRef)
recoveredFsmRef ! SubscribeTransitionCallBack(testActor)
expectMsg(CurrentState(recoveredFsmRef, Inactive))
within(1.9 seconds, 2.1 seconds) {
expectTerminated(recoveredFsmRef)
}
}
}
}
object PersistentFSMActorSpec {
//#customer-states
sealed trait UserState extends FSMState
case object LookingAround extends UserState {
override def identifier: String = "Looking Around"
}
case object Shopping extends UserState {
override def identifier: String = "Shopping"
}
case object Inactive extends UserState {
override def identifier: String = "Inactive"
}
case object Paid extends UserState {
override def identifier: String = "Paid"
}
//#customer-states
//#customer-states-data
case class Item(id: String, name: String, price: Float)
sealed trait ShoppingCart {
def addItem(item: Item): ShoppingCart
def empty(): ShoppingCart
}
case object EmptyShoppingCart extends ShoppingCart {
def addItem(item: Item) = NonEmptyShoppingCart(item :: Nil)
def empty() = this
}
case class NonEmptyShoppingCart(items: Seq[Item]) extends ShoppingCart {
def addItem(item: Item) = NonEmptyShoppingCart(items :+ item)
def empty() = EmptyShoppingCart
}
//#customer-states-data
//#customer-commands
sealed trait Command
case class AddItem(item: Item) extends Command
case object Buy extends Command
case object Leave extends Command
case object GetCurrentCart extends Command
//#customer-commands
//#customer-domain-events
sealed trait DomainEvent
case class ItemAdded(item: Item) extends DomainEvent
case object OrderExecuted extends DomainEvent
case object OrderDiscarded extends DomainEvent
//#customer-domain-events
//Side effects - report events to be sent to some "Report Actor"
sealed trait ReportEvent
case class PurchaseWasMade(items: Seq[Item]) extends ReportEvent
case object ShoppingCardDiscarded extends ReportEvent
class WebStoreCustomerFSMActor(_persistenceId: String, reportActor: ActorRef)(implicit val domainEventClassTag: ClassTag[DomainEvent]) extends PersistentFsmActor[UserState, ShoppingCart, DomainEvent] {
override def persistenceId = _persistenceId
//#customer-fsm-body
startWith(LookingAround, EmptyShoppingCart)
when(LookingAround) {
case Event(AddItem(item), _)
goto(Shopping) applying ItemAdded(item) forMax (1 seconds)
case Event(GetCurrentCart, data)
stay replying data
}
when(Shopping) {
case Event(AddItem(item), _)
stay applying ItemAdded(item) forMax (1 seconds)
case Event(Buy, _)
goto(Paid) applying OrderExecuted andThen {
case NonEmptyShoppingCart(items) reportActor ! PurchaseWasMade(items)
}
case Event(Leave, _)
stop applying OrderDiscarded andThen {
case _ reportActor ! ShoppingCardDiscarded
}
case Event(GetCurrentCart, data)
stay replying data
case Event(StateTimeout, _)
goto(Inactive) forMax (2 seconds)
}
when(Inactive) {
case Event(AddItem(item), _)
goto(Shopping) applying ItemAdded(item) forMax (1 seconds)
case Event(StateTimeout, _)
stop applying OrderDiscarded andThen {
case _ reportActor ! ShoppingCardDiscarded
}
}
when(Paid) {
case Event(Leave, _) stop()
case Event(GetCurrentCart, data)
stay replying data
}
//#customer-fsm-body
/**
* Override this handler to define the action on Domain Event
*
* @param event domain event to apply
* @param cartBeforeEvent state data of the previous state
*/
//#customer-apply-event
override def applyEvent(event: DomainEvent, cartBeforeEvent: ShoppingCart): ShoppingCart = {
event match {
case ItemAdded(item) cartBeforeEvent.addItem(item)
case OrderExecuted cartBeforeEvent
case OrderDiscarded cartBeforeEvent.empty()
}
}
//#customer-apply-event
}
object WebStoreCustomerFSMActor {
def props(persistenceId: String, reportActor: ActorRef) =
Props(new WebStoreCustomerFSMActor(persistenceId, reportActor))
}
}
class LeveldbPersistentFSMActorSpec extends PersistentFSMActorSpec(PersistenceSpec.config("leveldb", "PersistentFSMActorSpec"))
class InmemPersistentFSMActorSpec extends PersistentFSMActorSpec(PersistenceSpec.config("inmem", "PersistentFSMActorSpec"))