From 09b6abd614234601b9fd6018b21998f99585be45 Mon Sep 17 00:00:00 2001 From: leonidb Date: Sun, 9 Nov 2014 14:12:36 +0200 Subject: [PATCH] +per #15279 FSM for PersistentActor --- akka-actor/src/main/java/akka/japi/pf/FI.java | 20 + akka-docs/rst/java/lambda-persistence.rst | 55 + akka-docs/rst/scala/persistence.rst | 55 + .../fsm/japi/pf/FSMStateFunctionBuilder.java | 272 ++++ .../fsm/japi/pf/FSMStopBuilder.java | 125 ++ .../serialization/MessageFormats.java | 746 ++++++++++- .../src/main/protobuf/MessageFormats.proto | 5 + .../main/scala/akka/persistence/fsm/FSM.scala | 1179 +++++++++++++++++ .../persistence/fsm/PersistentFSMActor.scala | 154 +++ .../serialization/MessageSerializer.scala | 22 +- .../fsm/AbstractPersistentFsmActorTest.java | 596 +++++++++ .../persistence/AtLeastOnceDeliverySpec.scala | 2 +- .../akka/persistence/PerformanceSpec.scala | 2 +- .../akka/persistence/PersistenceSpec.scala | 4 +- .../PersistentActorFailureSpec.scala | 4 +- .../persistence/PersistentActorSpec.scala | 2 +- .../akka/persistence/PersistentViewSpec.scala | 2 +- .../SnapshotFailureRobustnessSpec.scala | 4 +- .../SnapshotRecoveryLocalStoreSpec.scala | 2 +- .../SnapshotSerializationSpec.scala | 4 +- .../scala/akka/persistence/SnapshotSpec.scala | 2 +- .../fsm/PersistentFSMActorSpec.scala | 354 +++++ 22 files changed, 3595 insertions(+), 16 deletions(-) create mode 100644 akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java create mode 100644 akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java create mode 100644 akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala create mode 100644 akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java create mode 100644 akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala diff --git a/akka-actor/src/main/java/akka/japi/pf/FI.java b/akka-actor/src/main/java/akka/japi/pf/FI.java index b8a394130e..e6a606d536 100644 --- a/akka-actor/src/main/java/akka/japi/pf/FI.java +++ b/akka-actor/src/main/java/akka/japi/pf/FI.java @@ -128,6 +128,26 @@ public final class FI { public void apply(I1 i1, I2 i2, I3 i3) throws Exception; } + /** + * Functional interface for an application. + * + * @param the first input type, that this Apply will be applied to + * @param the second input type, that this Apply will be applied to + * @param the third input type, that this Apply will be applied to + * @param the fourth input type, that this Apply will be applied to + */ + public static interface UnitApply4 { + /** + * The application to perform. + * + * @param i1 an instance that the application is performed on + * @param i2 an instance that the application is performed on + * @param i3 an instance that the application is performed on + * @param i4 an instance that the application is performed on + */ + public void apply(I1 i1, I2 i2, I3 i3, I4 i4) throws Exception; + } + /** * Functional interface for an application. */ diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 489840d3b6..9cea3522ae 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -488,6 +488,61 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` configuration key. The method can be overridden by implementation classes to return non-default values. +.. _persistent-fsm-java-lambda: + +Persistent FSM +============== +``AbstractPersistentFSMActor`` handles the incoming messages in an FSM like fashion. +Its internal state is persisted as a sequence of changes, later referred to as domain events. +Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL. + +A Simple Example +---------------- +To demonstrate the features of the ``AbstractPersistentFSMActor``, consider an actor which represents a Web store customer. +The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-commands + +``AddItem`` sent when the customer adds an item to a shopping cart +``Buy`` - when the customer finishes the purchase +``Leave`` - when the customer leaves the store without purchasing anything +``GetCurrentCart`` allows to query the current state of customer's shopping cart + +The customer can be in one of the following states: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-states + +``LookingAround`` customer is browsing the site, but hasn't added anything to the shopping cart +``Shopping`` customer has recently added items to the shopping cart +``Inactive`` customer has items in the shopping cart, but hasn't added anything recently, +``Paid`` customer has purchased the items + +.. note:: + + ``AbstractPersistentFSMActor`` states must inherit from ``PersistentFsmActor.FSMState`` and implement the + ``String identifier()`` method. This is required in order to simplify the serialization of FSM states. + String identifiers should be unique! + +Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's +start in order to restore the latest customer's state: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-domain-events + +Customer state data represents the items in customer's shopping cart: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-states-data + +Here is how everything is wired together: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-fsm-body + +.. note:: + + State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events. + Override the ``applyEvent`` method to define how state data is affected by domain events, see the example below + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java#customer-apply-event + Storage plugins =============== diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 1abbac6780..4654732a76 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -490,6 +490,61 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` configuration key. The method can be overridden by implementation classes to return non-default values. +.. _persistent-fsm: + +Persistent FSM +============== +``PersistentFSMActor`` handles the incoming messages in an FSM like fashion. +Its internal state is persisted as a sequence of changes, later referred to as domain events. +Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL. + +A Simple Example +---------------- +To demonstrate the features of the ``PersistentFSMActor`` trait, consider an actor which represents a Web store customer. +The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-commands + +``AddItem`` sent when the customer adds an item to a shopping cart +``Buy`` - when the customer finishes the purchase +``Leave`` - when the customer leaves the store without purchasing anything +``GetCurrentCart`` allows to query the current state of customer's shopping cart + +The customer can be in one of the following states: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-states + +``LookingAround`` customer is browsing the site, but hasn't added anything to the shopping cart +``Shopping`` customer has recently added items to the shopping cart +``Inactive`` customer has items in the shopping cart, but hasn't added anything recently, +``Paid`` customer has purchased the items + +.. note:: + + ``PersistentFSMActor`` states must inherit from trait ``PersistentFsmActor.FSMState`` and implement the + ``def identifier: String`` method. This is required in order to simplify the serialization of FSM states. + String identifiers should be unique! + +Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's +start in order to restore the latest customer's state: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-domain-events + +Customer state data represents the items in customer's shopping cart: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-states-data + +Here is how everything is wired together: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-fsm-body + +.. note:: + + State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events. + Override the ``applyEvent`` method to define how state data is affected by domain events, see the example below + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala#customer-apply-event + .. _storage-plugins: Storage plugins diff --git a/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java new file mode 100644 index 0000000000..5e2bf948c3 --- /dev/null +++ b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStateFunctionBuilder.java @@ -0,0 +1,272 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence.fsm.japi.pf; + +import akka.persistence.fsm.FSM; +import akka.japi.pf.FI; +import akka.japi.pf.PFBuilder; +import scala.PartialFunction; + +import java.util.List; + +/** + * Builder used to create a partial function for {@link akka.actor.FSM#whenUnhandled}. + * + * @param the state type + * @param the data type + * @param the domain event type + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +@SuppressWarnings("rawtypes") +public class FSMStateFunctionBuilder { + + private PFBuilder, FSM.State> builder = + new PFBuilder, FSM.State>(); + + /** + * An erased processing of the event matcher. The compile time checks are enforced + * by the public typed versions. + * + * It works like this. + * + * If eventOrType or dataOrType is a Class, then we do a isInstance check, + * otherwise we do an equals check. The null value compares true for anything. + * If the predicate is null, it is skipped otherwise the predicate has to match + * as well. + * + * @param eventOrType an event or a type to match against + * @param dataOrType a data instance or a type to match against + * @param predicate a predicate to match against + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + private FSMStateFunctionBuilder erasedEvent(final Object eventOrType, + final Object dataOrType, + final FI.TypedPredicate2 predicate, + final FI.Apply2 apply) { + builder.match(FSM.Event.class, + new FI.TypedPredicate() { + @Override + public boolean defined(FSM.Event e) { + boolean res = true; + if (eventOrType != null) { + if (eventOrType instanceof Class) { + Class eventType = (Class) eventOrType; + res = eventType.isInstance(e.event()); + } + else { + res = eventOrType.equals(e.event()); + } + } + if (res && dataOrType != null) { + if (dataOrType instanceof Class) { + Class dataType = (Class) dataOrType; + res = dataType.isInstance(e.stateData()); + } + else { + res = dataOrType.equals(e.stateData()); + } + } + if (res && predicate != null) { + @SuppressWarnings("unchecked") + boolean ures = predicate.defined(e.event(), e.stateData()); + res = ures; + } + return res; + } + }, + new FI.Apply>() { + public FSM.State apply(FSM.Event e) throws Exception { + @SuppressWarnings("unchecked") + FSM.State res = (FSM.State) apply.apply(e.event(), e.stateData()); + return res; + } + } + ); + + return this; + } + + /** + * Add a case statement that matches on an event and data type and a predicate. + * + * @param eventType the event type to match on + * @param dataType the data type to match on + * @param predicate a predicate to evaluate on the matched types + * @param apply an action to apply to the event and state data if there is a match + * @param

the event type to match on + * @param the data type to match on + * @return the builder with the case statement added + */ + public final FSMStateFunctionBuilder event(final Class

eventType, + final Class dataType, + final FI.TypedPredicate2 predicate, + final FI.Apply2> apply) { + erasedEvent(eventType, dataType, predicate, apply); + return this; + } + + /** + * Add a case statement that matches on an event and data type. + * + * @param eventType the event type to match on + * @param dataType the data type to match on + * @param apply an action to apply to the event and state data if there is a match + * @param

the event type to match on + * @param the data type to match on + * @return the builder with the case statement added + */ + public FSMStateFunctionBuilder event(final Class

eventType, + final Class dataType, + final FI.Apply2> apply) { + return erasedEvent(eventType, dataType, null, apply); + } + + /** + * Add a case statement that matches if the event type and predicate matches. + * + * @param eventType the event type to match on + * @param predicate a predicate that will be evaluated on the data and the event + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + public

FSMStateFunctionBuilder event(final Class

eventType, + final FI.TypedPredicate2 predicate, + final FI.Apply2> apply) { + return erasedEvent(eventType, null, predicate, apply); + } + + /** + * Add a case statement that matches if the event type and predicate matches. + * + * @param eventType the event type to match on + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + public

FSMStateFunctionBuilder event(final Class

eventType, + final FI.Apply2> apply) { + return erasedEvent(eventType, null, null, apply); + } + + /** + * Add a case statement that matches if the predicate matches. + * + * @param predicate a predicate that will be evaluated on the data and the event + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + public FSMStateFunctionBuilder event(final FI.TypedPredicate2 predicate, + final FI.Apply2> apply) { + return erasedEvent(null, null, predicate, apply); + } + + /** + * Add a case statement that matches on the data type and if any of the event types + * in the list match or any of the event instances in the list compares equal. + * + * @param eventMatches a list of types or instances to match against + * @param dataType the data type to match on + * @param apply an action to apply to the event and state data if there is a match + * @param the data type to match on + * @return the builder with the case statement added + */ + public FSMStateFunctionBuilder event(final List eventMatches, + final Class dataType, + final FI.Apply2> apply) { + builder.match(FSM.Event.class, + new FI.TypedPredicate() { + @Override + public boolean defined(FSM.Event e) { + if (dataType != null && !dataType.isInstance(e.stateData())) + return false; + + boolean emMatch = false; + Object event = e.event(); + for (Object em : eventMatches) { + if (em instanceof Class) { + Class emc = (Class) em; + emMatch = emc.isInstance(event); + } else { + emMatch = event.equals(em); + } + if (emMatch) + break; + } + return emMatch; + } + }, + new FI.Apply>() { + public FSM.State apply(FSM.Event e) throws Exception { + @SuppressWarnings("unchecked") + Q q = (Q) e.stateData(); + return apply.apply(e.event(), q); + } + } + ); + + return this; + } + + /** + * Add a case statement that matches if any of the event types in the list match or + * any of the event instances in the list compares equal. + * + * @param eventMatches a list of types or instances to match against + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + public FSMStateFunctionBuilder event(final List eventMatches, + final FI.Apply2> apply) { + return event(eventMatches, null, apply); + } + + /** + * Add a case statement that matches on the data type and if the event compares equal. + * + * @param event an event to compare equal against + * @param dataType the data type to match on + * @param apply an action to apply to the event and state data if there is a match + * @param the data type to match on + * @return the builder with the case statement added + */ + public FSMStateFunctionBuilder eventEquals(final P event, + final Class dataType, + final FI.Apply2> apply) { + return erasedEvent(event, dataType, null, apply); + } + + /** + * Add a case statement that matches if event compares equal. + * + * @param event an event to compare equal against + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + public

FSMStateFunctionBuilder eventEquals(final P event, + final FI.Apply2> apply) { + return erasedEvent(event, null, null, apply); + } + + /** + * Add a case statement that matches on any type of event. + * + * @param apply an action to apply to the event and state data + * @return the builder with the case statement added + */ + public FSMStateFunctionBuilder anyEvent(final FI.Apply2> apply) { + return erasedEvent(null, null, null, apply); + } + + /** + * Build a {@link scala.PartialFunction} from this builder. + * After this call the builder will be reset. + * + * @return a PartialFunction for this builder. + */ + public PartialFunction, FSM.State> build() { + return builder.build(); + } +} diff --git a/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java new file mode 100644 index 0000000000..087a1ff82e --- /dev/null +++ b/akka-persistence/src/main/java/akka/persistence/fsm/japi/pf/FSMStopBuilder.java @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence.fsm.japi.pf; + +import akka.persistence.fsm.FSM; +import akka.japi.pf.FI; +import akka.japi.pf.UnitPFBuilder; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +/** + * Builder used to create a partial function for {@link akka.actor.FSM#onTermination}. + * + * @param the state type + * @param the data type + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +public class FSMStopBuilder { + + private UnitPFBuilder> builder = + new UnitPFBuilder>(); + + /** + * Add a case statement that matches on an {@link akka.actor.FSM.Reason}. + * + * @param reason the reason for the termination + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + public FSMStopBuilder stop(final FSM.Reason reason, + final FI.UnitApply2 apply) { + builder.match(FSM.StopEvent.class, + new FI.TypedPredicate() { + @Override + public boolean defined(FSM.StopEvent e) { + return reason.equals(e.reason()); + } + }, + new FI.UnitApply() { + public void apply(FSM.StopEvent e) throws Exception { + @SuppressWarnings("unchecked") + S s = (S) e.currentState(); + @SuppressWarnings("unchecked") + D d = (D) e.stateData(); + apply.apply(s, d); + } + } + ); + + return this; + } + + /** + * Add a case statement that matches on a reason type. + * + * @param reasonType the reason type to match on + * @param apply an action to apply to the reason, event and state data if there is a match + * @param

the reason type to match on + * @return the builder with the case statement added + */ + public

FSMStopBuilder stop(final Class

reasonType, + final FI.UnitApply3 apply) { + return this.stop(reasonType, + new FI.TypedPredicate

() { + @Override + public boolean defined(P p) { + return true; + } + }, apply); + } + + /** + * Add a case statement that matches on a reason type and a predicate. + * + * @param reasonType the reason type to match on + * @param apply an action to apply to the reason, event and state data if there is a match + * @param predicate a predicate that will be evaluated on the reason if the type matches + * @param

the reason type to match on + * @return the builder with the case statement added + */ + public

FSMStopBuilder stop(final Class

reasonType, + final FI.TypedPredicate

predicate, + final FI.UnitApply3 apply) { + builder.match(FSM.StopEvent.class, + new FI.TypedPredicate() { + @Override + public boolean defined(FSM.StopEvent e) { + if (reasonType.isInstance(e.reason())) { + @SuppressWarnings("unchecked") + P p = (P) e.reason(); + return predicate.defined(p); + } else { + return false; + } + } + }, + new FI.UnitApply() { + public void apply(FSM.StopEvent e) throws Exception { + @SuppressWarnings("unchecked") + P p = (P) e.reason(); + @SuppressWarnings("unchecked") + S s = (S) e.currentState(); + @SuppressWarnings("unchecked") + D d = (D) e.stateData(); + apply.apply(p, s, d); + } + } + ); + + return this; + } + + /** + * Build a {@link scala.PartialFunction} from this builder. + * After this call the builder will be reset. + * + * @return a PartialFunction for this builder. + */ + public PartialFunction, BoxedUnit> build() { + return builder.build(); + } +} diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 1d2c08f378..9cd5f561c9 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -63,14 +63,38 @@ public final class MessageFormats { // optional string sender = 11; /** * optional string sender = 11; + * + *

+     * optional int32 redeliveries = 6; // Removed in 2.4
+     * repeated string confirms = 7; // Removed in 2.4
+     * optional bool confirmable = 8;  // Removed in 2.4
+     * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+     * optional string confirmTarget = 10;
+     * 
*/ boolean hasSender(); /** * optional string sender = 11; + * + *
+     * optional int32 redeliveries = 6; // Removed in 2.4
+     * repeated string confirms = 7; // Removed in 2.4
+     * optional bool confirmable = 8;  // Removed in 2.4
+     * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+     * optional string confirmTarget = 10;
+     * 
*/ java.lang.String getSender(); /** * optional string sender = 11; + * + *
+     * optional int32 redeliveries = 6; // Removed in 2.4
+     * repeated string confirms = 7; // Removed in 2.4
+     * optional bool confirmable = 8;  // Removed in 2.4
+     * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+     * optional string confirmTarget = 10;
+     * 
*/ com.google.protobuf.ByteString getSenderBytes(); @@ -301,12 +325,28 @@ public final class MessageFormats { private java.lang.Object sender_; /** * optional string sender = 11; + * + *
+     * optional int32 redeliveries = 6; // Removed in 2.4
+     * repeated string confirms = 7; // Removed in 2.4
+     * optional bool confirmable = 8;  // Removed in 2.4
+     * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+     * optional string confirmTarget = 10;
+     * 
*/ public boolean hasSender() { return ((bitField0_ & 0x00000010) == 0x00000010); } /** * optional string sender = 11; + * + *
+     * optional int32 redeliveries = 6; // Removed in 2.4
+     * repeated string confirms = 7; // Removed in 2.4
+     * optional bool confirmable = 8;  // Removed in 2.4
+     * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+     * optional string confirmTarget = 10;
+     * 
*/ public java.lang.String getSender() { java.lang.Object ref = sender_; @@ -324,6 +364,14 @@ public final class MessageFormats { } /** * optional string sender = 11; + * + *
+     * optional int32 redeliveries = 6; // Removed in 2.4
+     * repeated string confirms = 7; // Removed in 2.4
+     * optional bool confirmable = 8;  // Removed in 2.4
+     * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+     * optional string confirmTarget = 10;
+     * 
*/ public com.google.protobuf.ByteString getSenderBytes() { @@ -920,12 +968,28 @@ public final class MessageFormats { private java.lang.Object sender_ = ""; /** * optional string sender = 11; + * + *
+       * optional int32 redeliveries = 6; // Removed in 2.4
+       * repeated string confirms = 7; // Removed in 2.4
+       * optional bool confirmable = 8;  // Removed in 2.4
+       * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+       * optional string confirmTarget = 10;
+       * 
*/ public boolean hasSender() { return ((bitField0_ & 0x00000010) == 0x00000010); } /** * optional string sender = 11; + * + *
+       * optional int32 redeliveries = 6; // Removed in 2.4
+       * repeated string confirms = 7; // Removed in 2.4
+       * optional bool confirmable = 8;  // Removed in 2.4
+       * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+       * optional string confirmTarget = 10;
+       * 
*/ public java.lang.String getSender() { java.lang.Object ref = sender_; @@ -940,6 +1004,14 @@ public final class MessageFormats { } /** * optional string sender = 11; + * + *
+       * optional int32 redeliveries = 6; // Removed in 2.4
+       * repeated string confirms = 7; // Removed in 2.4
+       * optional bool confirmable = 8;  // Removed in 2.4
+       * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+       * optional string confirmTarget = 10;
+       * 
*/ public com.google.protobuf.ByteString getSenderBytes() { @@ -956,6 +1028,14 @@ public final class MessageFormats { } /** * optional string sender = 11; + * + *
+       * optional int32 redeliveries = 6; // Removed in 2.4
+       * repeated string confirms = 7; // Removed in 2.4
+       * optional bool confirmable = 8;  // Removed in 2.4
+       * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+       * optional string confirmTarget = 10;
+       * 
*/ public Builder setSender( java.lang.String value) { @@ -969,6 +1049,14 @@ public final class MessageFormats { } /** * optional string sender = 11; + * + *
+       * optional int32 redeliveries = 6; // Removed in 2.4
+       * repeated string confirms = 7; // Removed in 2.4
+       * optional bool confirmable = 8;  // Removed in 2.4
+       * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+       * optional string confirmTarget = 10;
+       * 
*/ public Builder clearSender() { bitField0_ = (bitField0_ & ~0x00000010); @@ -978,6 +1066,14 @@ public final class MessageFormats { } /** * optional string sender = 11; + * + *
+       * optional int32 redeliveries = 6; // Removed in 2.4
+       * repeated string confirms = 7; // Removed in 2.4
+       * optional bool confirmable = 8;  // Removed in 2.4
+       * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
+       * optional string confirmTarget = 10;
+       * 
*/ public Builder setSenderBytes( com.google.protobuf.ByteString value) { @@ -3135,6 +3231,641 @@ public final class MessageFormats { // @@protoc_insertion_point(class_scope:AtLeastOnceDeliverySnapshot) } + public interface PersistentStateChangeEventOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string stateIdentifier = 1; + /** + * required string stateIdentifier = 1; + */ + boolean hasStateIdentifier(); + /** + * required string stateIdentifier = 1; + */ + java.lang.String getStateIdentifier(); + /** + * required string stateIdentifier = 1; + */ + com.google.protobuf.ByteString + getStateIdentifierBytes(); + + // optional string timeout = 2; + /** + * optional string timeout = 2; + */ + boolean hasTimeout(); + /** + * optional string timeout = 2; + */ + java.lang.String getTimeout(); + /** + * optional string timeout = 2; + */ + com.google.protobuf.ByteString + getTimeoutBytes(); + } + /** + * Protobuf type {@code PersistentStateChangeEvent} + */ + public static final class PersistentStateChangeEvent extends + com.google.protobuf.GeneratedMessage + implements PersistentStateChangeEventOrBuilder { + // Use PersistentStateChangeEvent.newBuilder() to construct. + private PersistentStateChangeEvent(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private PersistentStateChangeEvent(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final PersistentStateChangeEvent defaultInstance; + public static PersistentStateChangeEvent getDefaultInstance() { + return defaultInstance; + } + + public PersistentStateChangeEvent getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PersistentStateChangeEvent( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + stateIdentifier_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + timeout_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.class, akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public PersistentStateChangeEvent parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new PersistentStateChangeEvent(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string stateIdentifier = 1; + public static final int STATEIDENTIFIER_FIELD_NUMBER = 1; + private java.lang.Object stateIdentifier_; + /** + * required string stateIdentifier = 1; + */ + public boolean hasStateIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string stateIdentifier = 1; + */ + public java.lang.String getStateIdentifier() { + java.lang.Object ref = stateIdentifier_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + stateIdentifier_ = s; + } + return s; + } + } + /** + * required string stateIdentifier = 1; + */ + public com.google.protobuf.ByteString + getStateIdentifierBytes() { + java.lang.Object ref = stateIdentifier_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + stateIdentifier_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional string timeout = 2; + public static final int TIMEOUT_FIELD_NUMBER = 2; + private java.lang.Object timeout_; + /** + * optional string timeout = 2; + */ + public boolean hasTimeout() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string timeout = 2; + */ + public java.lang.String getTimeout() { + java.lang.Object ref = timeout_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + timeout_ = s; + } + return s; + } + } + /** + * optional string timeout = 2; + */ + public com.google.protobuf.ByteString + getTimeoutBytes() { + java.lang.Object ref = timeout_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + timeout_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + stateIdentifier_ = ""; + timeout_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasStateIdentifier()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getStateIdentifierBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getTimeoutBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getStateIdentifierBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getTimeoutBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code PersistentStateChangeEvent} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.persistence.serialization.MessageFormats.PersistentStateChangeEventOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.class, akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.Builder.class); + } + + // Construct using akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + stateIdentifier_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + timeout_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentStateChangeEvent_descriptor; + } + + public akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent getDefaultInstanceForType() { + return akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.getDefaultInstance(); + } + + public akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent build() { + akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent buildPartial() { + akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent result = new akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.stateIdentifier_ = stateIdentifier_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.timeout_ = timeout_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent) { + return mergeFrom((akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent other) { + if (other == akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent.getDefaultInstance()) return this; + if (other.hasStateIdentifier()) { + bitField0_ |= 0x00000001; + stateIdentifier_ = other.stateIdentifier_; + onChanged(); + } + if (other.hasTimeout()) { + bitField0_ |= 0x00000002; + timeout_ = other.timeout_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasStateIdentifier()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.persistence.serialization.MessageFormats.PersistentStateChangeEvent) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string stateIdentifier = 1; + private java.lang.Object stateIdentifier_ = ""; + /** + * required string stateIdentifier = 1; + */ + public boolean hasStateIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string stateIdentifier = 1; + */ + public java.lang.String getStateIdentifier() { + java.lang.Object ref = stateIdentifier_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + stateIdentifier_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string stateIdentifier = 1; + */ + public com.google.protobuf.ByteString + getStateIdentifierBytes() { + java.lang.Object ref = stateIdentifier_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + stateIdentifier_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string stateIdentifier = 1; + */ + public Builder setStateIdentifier( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + stateIdentifier_ = value; + onChanged(); + return this; + } + /** + * required string stateIdentifier = 1; + */ + public Builder clearStateIdentifier() { + bitField0_ = (bitField0_ & ~0x00000001); + stateIdentifier_ = getDefaultInstance().getStateIdentifier(); + onChanged(); + return this; + } + /** + * required string stateIdentifier = 1; + */ + public Builder setStateIdentifierBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + stateIdentifier_ = value; + onChanged(); + return this; + } + + // optional string timeout = 2; + private java.lang.Object timeout_ = ""; + /** + * optional string timeout = 2; + */ + public boolean hasTimeout() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string timeout = 2; + */ + public java.lang.String getTimeout() { + java.lang.Object ref = timeout_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + timeout_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string timeout = 2; + */ + public com.google.protobuf.ByteString + getTimeoutBytes() { + java.lang.Object ref = timeout_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + timeout_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string timeout = 2; + */ + public Builder setTimeout( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + timeout_ = value; + onChanged(); + return this; + } + /** + * optional string timeout = 2; + */ + public Builder clearTimeout() { + bitField0_ = (bitField0_ & ~0x00000002); + timeout_ = getDefaultInstance().getTimeout(); + onChanged(); + return this; + } + /** + * optional string timeout = 2; + */ + public Builder setTimeoutBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + timeout_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:PersistentStateChangeEvent) + } + + static { + defaultInstance = new PersistentStateChangeEvent(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:PersistentStateChangeEvent) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_PersistentMessage_descriptor; private static @@ -3155,6 +3886,11 @@ public final class MessageFormats { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_PersistentStateChangeEvent_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_PersistentStateChangeEvent_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -3176,7 +3912,9 @@ public final class MessageFormats { "ot.UnconfirmedDelivery\032c\n\023UnconfirmedDel", "ivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013destination" + "\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.PersistentPayl" + - "oadB\"\n\036akka.persistence.serializationH\001" + "oad\"F\n\032PersistentStateChangeEvent\022\027\n\017sta" + + "teIdentifier\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\tB\"\n\036a" + + "kka.persistence.serializationH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3207,6 +3945,12 @@ public final class MessageFormats { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor, new java.lang.String[] { "DeliveryId", "Destination", "Payload", }); + internal_static_PersistentStateChangeEvent_descriptor = + getDescriptor().getMessageTypes().get(3); + internal_static_PersistentStateChangeEvent_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_PersistentStateChangeEvent_descriptor, + new java.lang.String[] { "StateIdentifier", "Timeout", }); return null; } }; diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index 884e1c8a5b..5de75c419d 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -34,3 +34,8 @@ message AtLeastOnceDeliverySnapshot { required int64 currentDeliveryId = 1; repeated UnconfirmedDelivery unconfirmedDeliveries = 2; } + +message PersistentStateChangeEvent { + required string stateIdentifier = 1; + optional string timeout = 2; +} diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala new file mode 100644 index 0000000000..9aefbe0917 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala @@ -0,0 +1,1179 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.persistence.fsm + +import akka.actor._ +import akka.japi.pf.{ UnitPFBuilder, UnitMatch, FSMTransitionHandlerBuilder } + +import language.implicitConversions +import scala.annotation.varargs +import scala.concurrent.duration.Duration +import scala.collection.mutable +import akka.routing.{ Deafen, Listen, Listeners } +import scala.concurrent.duration.FiniteDuration + +object FSM { + + /** + * A partial function value which does not match anything and can be used to + * “reset” `whenUnhandled` and `onTermination` handlers. + * + * {{{ + * onTermination(FSM.NullFunction) + * }}} + */ + object NullFunction extends PartialFunction[Any, Nothing] { + def isDefinedAt(o: Any) = false + def apply(o: Any) = sys.error("undefined") + } + + /** + * Message type which is sent directly to the subscribed actor in + * [[akka.actor.FSM.SubscribeTransitionCallBack]] before sending any + * [[akka.actor.FSM.Transition]] messages. + */ + final case class CurrentState[S](fsmRef: ActorRef, state: S) + + /** + * Message type which is used to communicate transitions between states to + * all subscribed listeners (use [[akka.actor.FSM.SubscribeTransitionCallBack]]). + */ + final case class Transition[S](fsmRef: ActorRef, from: S, to: S) + + /** + * Send this to an [[akka.actor.FSM]] to request first the [[FSM.CurrentState]] + * and then a series of [[FSM.Transition]] updates. Cancel the subscription + * using [[FSM.UnsubscribeTransitionCallBack]]. + */ + final case class SubscribeTransitionCallBack(actorRef: ActorRef) + + /** + * Unsubscribe from [[akka.actor.FSM.Transition]] notifications which was + * effected by sending the corresponding [[akka.actor.FSM.SubscribeTransitionCallBack]]. + */ + final case class UnsubscribeTransitionCallBack(actorRef: ActorRef) + + /** + * Reason why this [[akka.actor.FSM]] is shutting down. + */ + sealed trait Reason + + /** + * Default reason if calling `stop()`. + */ + case object Normal extends Reason + + /** + * Reason given when someone was calling `system.stop(fsm)` from outside; + * also applies to `Stop` supervision directive. + */ + case object Shutdown extends Reason + + /** + * Signifies that the [[akka.actor.FSM]] is shutting itself down because of + * an error, e.g. if the state to transition into does not exist. You can use + * this to communicate a more precise cause to the `onTermination` block. + */ + final case class Failure(cause: Any) extends Reason + + /** + * This case object is received in case of a state timeout. + */ + case object StateTimeout + + /** + * INTERNAL API + */ + private final case class TimeoutMarker(generation: Long) + + /** + * INTERNAL API + */ + // FIXME: what about the cancellable? + private[akka] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) + extends NoSerializationVerificationNeeded { + private var ref: Option[Cancellable] = _ + private val scheduler = context.system.scheduler + private implicit val executionContext = context.dispatcher + + def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = + ref = Some( + if (repeat) scheduler.schedule(timeout, timeout, actor, this) + else scheduler.scheduleOnce(timeout, actor, this)) + + def cancel(): Unit = + if (ref.isDefined) { + ref.get.cancel() + ref = None + } + } + + /** + * This extractor is just convenience for matching a (S, S) pair, including a + * reminder what the new state is. + */ + object -> { + def unapply[S](in: (S, S)) = Some(in) + } + + /** + * Log Entry of the [[akka.actor.LoggingFSM]], can be obtained by calling `getLog`. + */ + final case class LogEntry[S, D](stateName: S, stateData: D, event: Any) + + /** + * This captures all of the managed state of the [[akka.actor.FSM]]: the state + * name, the state data, possibly custom timeout, stop reason, replies + * accumulated while processing the last message, possibly domain event and handler + * to be executed after FSM moves to the new state (also triggered when staying in the same state) + */ + final case class State[S, D, E]( + stateName: S, + stateData: D, + timeout: Option[FiniteDuration] = None, + stopReason: Option[Reason] = None, + replies: List[Any] = Nil, + domainEvents: Seq[E] = Nil, + afterTransitionDo: D ⇒ Unit = { _: D ⇒ })(private[akka] val notifies: Boolean = true) { + + /** + * Copy object and update values if needed. + */ + private[akka] def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies, notifies: Boolean = notifies, domainEvents: Seq[E] = domainEvents, afterTransitionDo: D ⇒ Unit = afterTransitionDo): State[S, D, E] = { + State(stateName, stateData, timeout, stopReason, replies, domainEvents, afterTransitionDo)(notifies) + } + + /** + * Modify state transition descriptor to include a state timeout for the + * next state. This timeout overrides any default timeout set for the next + * state. + * + * Use Duration.Inf to deactivate an existing timeout. + */ + def forMax(timeout: Duration): State[S, D, E] = timeout match { + case f: FiniteDuration ⇒ copy(timeout = Some(f)) + case _ ⇒ copy(timeout = None) + } + + /** + * Send reply to sender of the current message, if available. + * + * @return this state transition descriptor + */ + def replying(replyValue: Any): State[S, D, E] = { + copy(replies = replyValue :: replies) + } + + /** + * Modify state transition descriptor with new state data. The data will be + * set when transitioning to the new state. + */ + private[akka] def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D, E] = { + copy(stateData = nextStateData) + } + + /** + * INTERNAL API. + */ + private[akka] def withStopReason(reason: Reason): State[S, D, E] = { + copy(stopReason = Some(reason)) + } + + private[akka] def withNotification(notifies: Boolean): State[S, D, E] = { + copy(notifies = notifies) + } + + /** + * Specify domain events to be applied when transitioning to the new state. + */ + @varargs def applying(events: E*): State[S, D, E] = { + copy(domainEvents = domainEvents ++ events) + } + + /** + * Register a handler to be triggered after the state has been persisted successfully + */ + def andThen(handler: D ⇒ Unit): State[S, D, E] = { + copy(afterTransitionDo = handler) + } + } + + /** + * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an + * `Event`, which allows pattern matching to extract both state and data. + */ + final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded + + /** + * Case class representing the state of the [[akka.actor.FSM]] whithin the + * `onTermination` block. + */ + final case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) extends NoSerializationVerificationNeeded + +} + +/** + * Finite State Machine actor trait. Use as follows: + * + *
+ *     trait State
+ *     case class One extends State
+ *     case class Two extends State
+ *
+ *     case class Data(i : Int)
+ *   }
+ *
+ *   class A extends Actor with FSM[A.State, A.Data] {
+ *     import A._
+ *
+ *     startWith(One, Data(42))
+ *     when(One) {
+ *         case Event(SomeMsg, Data(x)) => ...
+ *         case Ev(SomeMsg) => ... // convenience when data not needed
+ *     }
+ *     when(Two, stateTimeout = 5 seconds) { ... }
+ *     initialize()
+ *   }
+ * 
+ * + * Within the partial function the following values are returned for effecting + * state transitions: + * + * - stay for staying in the same state + * - stay using Data(...) for staying in the same state, but with + * different data + * - stay forMax 5.millis for staying with a state timeout; can be + * combined with using + * - goto(...) for changing into a different state; also supports + * using and forMax + * - stop for terminating this FSM actor + * + * Each of the above also supports the method replying(AnyRef) for + * sending a reply before changing state. + * + * While changing state, custom handlers may be invoked which are registered + * using onTransition. This is meant to enable concentrating + * different concerns in different places; you may choose to use + * when for describing the properties of a state, including of + * course initiating transitions, but you can describe the transitions using + * onTransition to avoid having to duplicate that code among + * multiple paths which lead to a transition: + * + *
+ * onTransition {
+ *   case Active -> _ => cancelTimer("activeTimer")
+ * }
+ * 
+ * + * Multiple such blocks are supported and all of them will be called, not only + * the first matching one. + * + * Another feature is that other actors may subscribe for transition events by + * sending a SubscribeTransitionCallback message to this actor. + * Stopping a listener without unregistering will not remove the listener from the + * subscription list; use UnsubscribeTransitionCallback before stopping + * the listener. + * + * State timeouts set an upper bound to the time which may pass before another + * message is received in the current state. If no external message is + * available, then upon expiry of the timeout a StateTimeout message is sent. + * Note that this message will only be received in the state for which the + * timeout was set and that any message received will cancel the timeout + * (possibly to be started again by the next transition). + * + * Another feature is the ability to install and cancel single-shot as well as + * repeated timers which arrange for the sending of a user-specified message: + * + *
+ *   setTimer("tock", TockMsg, 1 second, true) // repeating
+ *   setTimer("lifetime", TerminateMsg, 1 hour, false) // single-shot
+ *   cancelTimer("tock")
+ *   isTimerActive("tock")
+ * 
+ */ +trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { + + import akka.persistence.fsm.FSM._ + + type State = FSM.State[S, D, E] + type Event = FSM.Event[D] + type StopEvent = FSM.StopEvent[S, D] + type StateFunction = scala.PartialFunction[Event, State] + type Timeout = Option[FiniteDuration] + type TransitionHandler = PartialFunction[(S, S), Unit] + + /* + * “import” so that these are visible without an import + */ + val Event: FSM.Event.type = FSM.Event + val StopEvent: FSM.StopEvent.type = FSM.StopEvent + + /** + * This extractor is just convenience for matching a (S, S) pair, including a + * reminder what the new state is. + */ + val -> = FSM.-> + + /** + * This case object is received in case of a state timeout. + */ + val StateTimeout = FSM.StateTimeout + + /** + * **************************************** + * DSL + * **************************************** + */ + + /** + * Insert a new StateFunction at the end of the processing chain for the + * given state. If the stateTimeout parameter is set, entering this state + * without a differing explicit timeout setting will trigger a StateTimeout + * event; the same is true when using #stay. + * + * @param stateName designator for the state + * @param stateTimeout default state timeout for this state + * @param stateFunction partial function describing response to input + */ + final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit = + register(stateName, stateFunction, Option(stateTimeout)) + + /** + * Set initial state. Call this method from the constructor before the [[#initialize]] method. + * If different state is needed after a restart this method, followed by [[#initialize]], can + * be used in the actor life cycle hooks [[akka.actor.Actor#preStart]] and [[akka.actor.Actor#postRestart]]. + * + * @param stateName initial state designator + * @param stateData initial state data + * @param timeout state timeout for the initial state, overriding the default timeout for that state + */ + final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit = + currentState = FSM.State(stateName, stateData, timeout)() + + /** + * Produce transition to other state. + * Return this from a state function in order to effect the transition. + * + * This method always triggers transition events, even for `A -> A` transitions. + * If you want to stay in the same state without triggering an state transition event use [[#stay]] instead. + * + * @param nextStateName state designator for the next state + * @return state transition descriptor + */ + final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)() + + /** + * Produce "empty" transition descriptor. + * Return this from a state function when no state change is to be effected. + * + * No transition event will be triggered by [[#stay]]. + * If you want to trigger an event like `S -> S` for `onTransition` to handle use `goto` instead. + * + * @return descriptor for staying in current state + */ + final def stay(): State = goto(currentState.stateName).withNotification(false) // cannot directly use currentState because of the timeout field + + /** + * Produce change descriptor to stop this FSM actor with reason "Normal". + */ + final def stop(): State = stop(Normal) + + /** + * Produce change descriptor to stop this FSM actor including specified reason. + */ + final def stop(reason: Reason): State = stop(reason, currentState.stateData) + + /** + * Produce change descriptor to stop this FSM actor including specified reason. + */ + final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason) + + final class TransformHelper(func: StateFunction) { + def using(andThen: PartialFunction[State, State]): StateFunction = + func andThen (andThen orElse { case x ⇒ x }) + } + + final def transform(func: StateFunction): TransformHelper = new TransformHelper(func) + + /** + * Schedule named timer to deliver message after given delay, possibly repeating. + * Any existing timer with the same name will automatically be canceled before + * adding the new timer. + * @param name identifier to be used with cancelTimer() + * @param msg message to be delivered + * @param timeout delay of first message delivery and between subsequent messages + * @param repeat send once if false, scheduleAtFixedRate if true + */ + final def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false): Unit = { + if (debugEvent) + log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) + if (timers contains name) { + timers(name).cancel + } + val timer = Timer(name, msg, repeat, timerGen.next)(context) + timer.schedule(self, timeout) + timers(name) = timer + } + + /** + * Cancel named timer, ensuring that the message is not subsequently delivered (no race). + * @param name of the timer to cancel + */ + final def cancelTimer(name: String): Unit = { + if (debugEvent) + log.debug("canceling timer '" + name + "'") + if (timers contains name) { + timers(name).cancel + timers -= name + } + } + + /** + * Inquire whether the named timer is still active. Returns true unless the + * timer does not exist, has previously been canceled or if it was a + * single-shot timer whose message was already received. + */ + final def isTimerActive(name: String): Boolean = timers contains name + + /** + * Set state timeout explicitly. This method can safely be used from within a + * state handler. + */ + final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout + + /** + * INTERNAL API, used for testing. + */ + private[akka] final def isStateTimerActive = timeoutFuture.isDefined + + /** + * Set handler which is called upon each state transition, i.e. not when + * staying in the same state. This may use the pair extractor defined in the + * FSM companion object like so: + * + *
+   * onTransition {
+   *   case Old -> New => doSomething
+   * }
+   * 
+ * + * It is also possible to supply a 2-ary function object: + * + *
+   * onTransition(handler _)
+   *
+   * private def handler(from: S, to: S) { ... }
+   * 
+ * + * The underscore is unfortunately necessary to enable the nicer syntax shown + * above (it uses the implicit conversion total2pf under the hood). + * + * Multiple handlers may be installed, and every one of them will be + * called, not only the first one matching. + */ + final def onTransition(transitionHandler: TransitionHandler): Unit = transitionEvent :+= transitionHandler + + /** + * Convenience wrapper for using a total function instead of a partial + * function literal. To be used with onTransition. + */ + implicit final def total2pf(transitionHandler: (S, S) ⇒ Unit): TransitionHandler = + new TransitionHandler { + def isDefinedAt(in: (S, S)) = true + def apply(in: (S, S)) { transitionHandler(in._1, in._2) } + } + + /** + * Set handler which is called upon termination of this FSM actor. Calling + * this method again will overwrite the previous contents. + */ + final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit = + terminateEvent = terminationHandler + + /** + * Set handler which is called upon reception of unhandled messages. Calling + * this method again will overwrite the previous contents. + * + * The current state may be queried using ``stateName``. + */ + final def whenUnhandled(stateFunction: StateFunction): Unit = + handleEvent = stateFunction orElse handleEventDefault + + /** + * Verify existence of initial state and setup timers. This should be the + * last call within the constructor, or [[akka.actor.Actor#preStart]] and + * [[akka.actor.Actor#postRestart]] + * + * @see [[#startWith]] + */ + final def initialize(): Unit = makeTransition(currentState) + + /** + * Return current state name (i.e. object of type S) + */ + final def stateName: S = currentState.stateName + + /** + * Return current state data (i.e. object of type D) + */ + final def stateData: D = currentState.stateData + + /** + * Return all defined state names + */ + private[akka] final def stateNames: Iterable[S] = stateFunctions.keys + + /** + * Return next state data (available in onTransition handlers) + */ + final def nextStateData = nextState match { + case null ⇒ throw new IllegalStateException("nextStateData is only available during onTransition") + case x ⇒ x.stateData + } + + /* + * **************************************************************** + * PRIVATE IMPLEMENTATION DETAILS + * **************************************************************** + */ + + private[akka] def debugEvent: Boolean = false + + /* + * FSM State data and current timeout handling + */ + private var currentState: State = _ + private var timeoutFuture: Option[Cancellable] = None + private var nextState: State = _ + private var generation: Long = 0L + + /* + * Timer handling + */ + private val timers = mutable.Map[String, Timer]() + private val timerGen = Iterator from 0 + + /* + * State definitions + */ + private val stateFunctions = mutable.Map[S, StateFunction]() + private val stateTimeouts = mutable.Map[S, Timeout]() + + private def register(name: S, function: StateFunction, timeout: Timeout): Unit = { + if (stateFunctions contains name) { + stateFunctions(name) = stateFunctions(name) orElse function + stateTimeouts(name) = timeout orElse stateTimeouts(name) + } else { + stateFunctions(name) = function + stateTimeouts(name) = timeout + } + } + + /* + * unhandled event handler + */ + private val handleEventDefault: StateFunction = { + case Event(value, stateData) ⇒ + log.warning("unhandled event " + value + " in state " + stateName) + stay + } + private var handleEvent: StateFunction = handleEventDefault + + /* + * termination handling + */ + private var terminateEvent: PartialFunction[StopEvent, Unit] = NullFunction + + /* + * transition handling + */ + private var transitionEvent: List[TransitionHandler] = Nil + private def handleTransition(prev: S, next: S) { + val tuple = (prev, next) + for (te ← transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) } + } + + /* + * ******************************************* + * Main actor receive() method + * ******************************************* + */ + override def receive: Receive = { + case TimeoutMarker(gen) ⇒ + if (generation == gen) { + processMsg(StateTimeout, "state timeout") + } + case t @ Timer(name, msg, repeat, gen) ⇒ + if ((timers contains name) && (timers(name).generation == gen)) { + if (timeoutFuture.isDefined) { + timeoutFuture.get.cancel() + timeoutFuture = None + } + generation += 1 + if (!repeat) { + timers -= name + } + processMsg(msg, t) + } + case SubscribeTransitionCallBack(actorRef) ⇒ + // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list + listeners.add(actorRef) + // send current state back as reference point + actorRef ! CurrentState(self, currentState.stateName) + case Listen(actorRef) ⇒ + // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list + listeners.add(actorRef) + // send current state back as reference point + actorRef ! CurrentState(self, currentState.stateName) + case UnsubscribeTransitionCallBack(actorRef) ⇒ + listeners.remove(actorRef) + case Deafen(actorRef) ⇒ + listeners.remove(actorRef) + case value ⇒ { + if (timeoutFuture.isDefined) { + timeoutFuture.get.cancel() + timeoutFuture = None + } + generation += 1 + processMsg(value, sender()) + } + } + + private def processMsg(value: Any, source: AnyRef): Unit = { + val event = Event(value, currentState.stateData) + processEvent(event, source) + } + + private[akka] def processEvent(event: Event, source: AnyRef): Unit = { + val stateFunc = stateFunctions(currentState.stateName) + val nextState = if (stateFunc isDefinedAt event) { + stateFunc(event) + } else { + // handleEventDefault ensures that this is always defined + handleEvent(event) + } + applyState(nextState) + } + + private[akka] def applyState(nextState: State): Unit = { + nextState.stopReason match { + case None ⇒ makeTransition(nextState) + case _ ⇒ + nextState.replies.reverse foreach { r ⇒ sender() ! r } + terminate(nextState) + context.stop(self) + } + } + + private[akka] def makeTransition(nextState: State): Unit = { + if (!stateFunctions.contains(nextState.stateName)) { + terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName))) + } else { + nextState.replies.reverse foreach { r ⇒ sender() ! r } + if (currentState.stateName != nextState.stateName || nextState.notifies) { + this.nextState = nextState + handleTransition(currentState.stateName, nextState.stateName) + gossip(Transition(self, currentState.stateName, nextState.stateName)) + this.nextState = null + } + currentState = nextState + val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName) + if (timeout.isDefined) { + val t = timeout.get + if (t.isFinite && t.length >= 0) { + import context.dispatcher + timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) + } + } + } + } + + /** + * Call `onTermination` hook; if you want to retain this behavior when + * overriding make sure to call `super.postStop()`. + * + * Please note that this method is called by default from `preRestart()`, + * so override that one if `onTermination` shall not be called during + * restart. + */ + override def postStop(): Unit = { + /* + * setting this instance’s state to terminated does no harm during restart + * since the new instance will initialize fresh using startWith() + */ + terminate(stay withStopReason Shutdown) + super.postStop() + } + + private def terminate(nextState: State): Unit = { + if (currentState.stopReason.isEmpty) { + val reason = nextState.stopReason.get + logTermination(reason) + for (timer ← timers.values) timer.cancel() + timers.clear() + currentState = nextState + + val stopEvent = StopEvent(reason, currentState.stateName, currentState.stateData) + if (terminateEvent.isDefinedAt(stopEvent)) + terminateEvent(stopEvent) + } + } + + /** + * By default [[FSM.Failure]] is logged at error level and other reason + * types are not logged. It is possible to override this behavior. + */ + protected def logTermination(reason: Reason): Unit = reason match { + case Failure(ex: Throwable) ⇒ log.error(ex, "terminating due to Failure") + case Failure(msg: AnyRef) ⇒ log.error(msg.toString) + case _ ⇒ + } +} + +/** + * Stackable trait for [[akka.actor.FSM]] which adds a rolling event log and + * debug logging capabilities (analogous to [[akka.event.LoggingReceive]]). + * + * @since 1.2 + */ +trait LoggingFSM[S, D, E] extends FSM[S, D, E] { this: Actor ⇒ + + import akka.persistence.fsm.FSM._ + + def logDepth: Int = 0 + + private[akka] override val debugEvent = context.system.settings.FsmDebugEvent + + private val events = new Array[Event](logDepth) + private val states = new Array[AnyRef](logDepth) + private var pos = 0 + private var full = false + + private def advance() { + val n = pos + 1 + if (n == logDepth) { + full = true + pos = 0 + } else { + pos = n + } + } + + private[akka] abstract override def processEvent(event: Event, source: AnyRef): Unit = { + if (debugEvent) { + val srcstr = source match { + case s: String ⇒ s + case Timer(name, _, _, _) ⇒ "timer " + name + case a: ActorRef ⇒ a.toString + case _ ⇒ "unknown" + } + log.debug("processing " + event + " from " + srcstr) + } + + if (logDepth > 0) { + states(pos) = stateName.asInstanceOf[AnyRef] + events(pos) = event + advance() + } + + val oldState = stateName + super.processEvent(event, source) + val newState = stateName + + if (debugEvent && oldState != newState) + log.debug("transition " + oldState + " -> " + newState) + } + + /** + * Retrieve current rolling log in oldest-first order. The log is filled with + * each incoming event before processing by the user supplied state handler. + * The log entries are lost when this actor is restarted. + */ + protected def getLog: IndexedSeq[LogEntry[S, D]] = { + val log = events zip states filter (_._1 ne null) map (x ⇒ LogEntry(x._2.asInstanceOf[S], x._1.stateData, x._1.event)) + if (full) { + IndexedSeq() ++ log.drop(pos) ++ log.take(pos) + } else { + IndexedSeq() ++ log + } + } + +} + +/** + * Java API: compatible with lambda expressions + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +object AbstractFSM { + /** + * A partial function value which does not match anything and can be used to + * “reset” `whenUnhandled` and `onTermination` handlers. + * + * {{{ + * onTermination(FSM.NullFunction()) + * }}} + */ + def NullFunction[S, D]: PartialFunction[S, D] = FSM.NullFunction +} + +/** + * Java API: compatible with lambda expressions + * + * Finite State Machine actor abstract base class. + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +abstract class AbstractFSM[S, D, E] extends FSM[S, D, E] { + import akka.persistence.fsm.japi.pf.FSMStateFunctionBuilder + import akka.persistence.fsm.japi.pf.FSMStopBuilder + import akka.japi.pf.FI._ + import java.util.{ List ⇒ JList } + import FSM._ + + /** + * Insert a new StateFunction at the end of the processing chain for the + * given state. + * + * @param stateName designator for the state + * @param stateFunction partial function describing response to input + */ + final def when(stateName: S)(stateFunction: StateFunction): Unit = + when(stateName, null: FiniteDuration)(stateFunction) + + /** + * Insert a new StateFunction at the end of the processing chain for the + * given state. + * + * @param stateName designator for the state + * @param stateFunctionBuilder partial function builder describing response to input + */ + final def when(stateName: S, stateFunctionBuilder: FSMStateFunctionBuilder[S, D, E]): Unit = + when(stateName, null, stateFunctionBuilder) + + /** + * Insert a new StateFunction at the end of the processing chain for the + * given state. If the stateTimeout parameter is set, entering this state + * without a differing explicit timeout setting will trigger a StateTimeout + * event; the same is true when using #stay. + * + * @param stateName designator for the state + * @param stateTimeout default state timeout for this state + * @param stateFunctionBuilder partial function builder describing response to input + */ + final def when(stateName: S, + stateTimeout: FiniteDuration, + stateFunctionBuilder: FSMStateFunctionBuilder[S, D, E]): Unit = + when(stateName, stateTimeout)(stateFunctionBuilder.build()) + + /** + * Set initial state. Call this method from the constructor before the [[#initialize]] method. + * If different state is needed after a restart this method, followed by [[#initialize]], can + * be used in the actor life cycle hooks [[akka.actor.Actor#preStart]] and [[akka.actor.Actor#postRestart]]. + * + * @param stateName initial state designator + * @param stateData initial state data + */ + final def startWith(stateName: S, stateData: D): Unit = + startWith(stateName, stateData, null: FiniteDuration) + + /** + * Set initial state. Call this method from the constructor before the [[#initialize]] method. + * If different state is needed after a restart this method, followed by [[#initialize]], can + * be used in the actor life cycle hooks [[akka.actor.Actor#preStart]] and [[akka.actor.Actor#postRestart]]. + * + * @param stateName initial state designator + * @param stateData initial state data + * @param timeout state timeout for the initial state, overriding the default timeout for that state + */ + final def startWith(stateName: S, stateData: D, timeout: FiniteDuration): Unit = + startWith(stateName, stateData, Option(timeout)) + + /** + * Add a handler which is called upon each state transition, i.e. not when + * staying in the same state. + * + * Multiple handlers may be installed, and every one of them will be + * called, not only the first one matching. + */ + final def onTransition(transitionHandlerBuilder: FSMTransitionHandlerBuilder[S]): Unit = + onTransition(transitionHandlerBuilder.build().asInstanceOf[TransitionHandler]) + + /** + * Add a handler which is called upon each state transition, i.e. not when + * staying in the same state. + * + * Multiple handlers may be installed, and every one of them will be + * called, not only the first one matching. + */ + final def onTransition(transitionHandler: UnitApply2[S, S]): Unit = + onTransition(transitionHandler) + + /** + * Set handler which is called upon reception of unhandled messages. Calling + * this method again will overwrite the previous contents. + * + * The current state may be queried using ``stateName``. + */ + final def whenUnhandled(stateFunctionBuilder: FSMStateFunctionBuilder[S, D, E]): Unit = + whenUnhandled(stateFunctionBuilder.build()) + + /** + * Set handler which is called upon termination of this FSM actor. Calling + * this method again will overwrite the previous contents. + */ + final def onTermination(stopBuilder: FSMStopBuilder[S, D]): Unit = + onTermination(stopBuilder.build().asInstanceOf[PartialFunction[StopEvent, Unit]]) + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches on an event and data type and a predicate. + * + * @param eventType the event type to match on + * @param dataType the data type to match on + * @param predicate a predicate to evaluate on the matched types + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent[ET, DT <: D](eventType: Class[ET], dataType: Class[DT], predicate: TypedPredicate2[ET, DT], apply: Apply2[ET, DT, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(eventType, dataType, apply) + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches on an event and data type. + * + * @param eventType the event type to match on + * @param dataType the data type to match on + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent[ET, DT <: D](eventType: Class[ET], dataType: Class[DT], apply: Apply2[ET, DT, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(eventType, dataType, apply) + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches if the event type and predicate matches. + * + * @param eventType the event type to match on + * @param predicate a predicate that will be evaluated on the data and the event + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent[ET](eventType: Class[ET], predicate: TypedPredicate2[ET, D], apply: Apply2[ET, D, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(eventType, predicate, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches if the event type matches. + * + * @param eventType the event type to match on + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent[ET](eventType: Class[ET], apply: Apply2[ET, D, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(eventType, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches if the predicate matches. + * + * @param predicate a predicate that will be evaluated on the data and the event + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent(predicate: TypedPredicate2[AnyRef, D], apply: Apply2[AnyRef, D, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(predicate, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches on the data type and if any of the event types + * in the list match or any of the event instances in the list compares equal. + * + * @param eventMatches a list of types or instances to match against + * @param dataType the data type to match on + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent[DT <: D](eventMatches: JList[AnyRef], dataType: Class[DT], apply: Apply2[AnyRef, DT, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(eventMatches, dataType, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches if any of the event types in the list match or any + * of the event instances in the list compares equal. + * + * @param eventMatches a list of types or instances to match against + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEvent(eventMatches: JList[AnyRef], apply: Apply2[AnyRef, D, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().event(eventMatches, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches on the data type and if the event compares equal. + * + * @param event an event to compare equal against + * @param dataType the data type to match on + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEventEquals[Ev, DT <: D](event: Ev, dataType: Class[DT], apply: Apply2[Ev, DT, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().eventEquals(event, dataType, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches if the event compares equal. + * + * @param event an event to compare equal against + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchEventEquals[Ev](event: Ev, apply: Apply2[Ev, D, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().eventEquals(event, apply); + + /** + * Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set. + * + * A case statement that matches on any type of event. + * + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchAnyEvent(apply: Apply2[AnyRef, D, State]): FSMStateFunctionBuilder[S, D, E] = + new FSMStateFunctionBuilder[S, D, E]().anyEvent(apply) + + /** + * Create an [[akka.japi.pf.FSMTransitionHandlerBuilder]] with the first case statement set. + * + * A case statement that matches on a from state and a to state. + * + * @param fromState the from state to match on + * @param toState the to state to match on + * @param apply an action to apply when the states match + * @return the builder with the case statement added + */ + final def matchState(fromState: S, toState: S, apply: UnitApplyVoid): FSMTransitionHandlerBuilder[S] = + new FSMTransitionHandlerBuilder[S]().state(fromState, toState, apply) + + /** + * Create an [[akka.japi.pf.FSMTransitionHandlerBuilder]] with the first case statement set. + * + * A case statement that matches on a from state and a to state. + * + * @param fromState the from state to match on + * @param toState the to state to match on + * @param apply an action to apply when the states match + * @return the builder with the case statement added + */ + final def matchState(fromState: S, toState: S, apply: UnitApply2[S, S]): FSMTransitionHandlerBuilder[S] = + new FSMTransitionHandlerBuilder[S]().state(fromState, toState, apply) + + /** + * Create an [[akka.japi.pf.FSMStopBuilder]] with the first case statement set. + * + * A case statement that matches on an [[FSM.Reason]]. + * + * @param reason the reason for the termination + * @param apply an action to apply to the event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchStop(reason: Reason, apply: UnitApply2[S, D]): FSMStopBuilder[S, D] = + new FSMStopBuilder[S, D]().stop(reason, apply) + + /** + * Create an [[akka.japi.pf.FSMStopBuilder]] with the first case statement set. + * + * A case statement that matches on a reason type. + * + * @param reasonType the reason type to match on + * @param apply an action to apply to the reason, event and state data if there is a match + * @return the builder with the case statement added + */ + final def matchStop[RT <: Reason](reasonType: Class[RT], apply: UnitApply3[RT, S, D]): FSMStopBuilder[S, D] = + new FSMStopBuilder[S, D]().stop(reasonType, apply) + + /** + * Create an [[akka.japi.pf.FSMStopBuilder]] with the first case statement set. + * + * A case statement that matches on a reason type and a predicate. + * + * @param reasonType the reason type to match on + * @param apply an action to apply to the reason, event and state data if there is a match + * @param predicate a predicate that will be evaluated on the reason if the type matches + * @return the builder with the case statement added + */ + final def matchStop[RT <: Reason](reasonType: Class[RT], predicate: TypedPredicate[RT], apply: UnitApply3[RT, S, D]): FSMStopBuilder[S, D] = + new FSMStopBuilder[S, D]().stop(reasonType, predicate, apply) + + /** + * Create a [[akka.japi.pf.UnitPFBuilder]] with the first case statement set. + * + * @param dataType a type to match the argument against + * @param apply an action to apply to the argument if the type matches + * @return a builder with the case statement added + */ + final def matchData[DT <: D](dataType: Class[DT], apply: UnitApply[DT]): UnitPFBuilder[D] = + UnitMatch.`match`(dataType, apply) + + /** + * Create a [[akka.japi.pf.UnitPFBuilder]] with the first case statement set. + * + * @param dataType a type to match the argument against + * @param predicate a predicate that will be evaluated on the argument if the type matches + * @param apply an action to apply to the argument if the type and predicate matches + * @return a builder with the case statement added + */ + final def matchData[DT <: D](dataType: Class[DT], predicate: TypedPredicate[DT], apply: UnitApply[DT]): UnitPFBuilder[D] = + UnitMatch.`match`(dataType, predicate, apply) + + /** + * Produce transition to other state. Return this from a state function in + * order to effect the transition. + * + * @param nextStateName state designator for the next state + * @return state transition descriptor + */ + final def goTo(nextStateName: S): State = goto(nextStateName) + + /** + * Schedule named timer to deliver message after given delay, possibly repeating. + * Any existing timer with the same name will automatically be canceled before + * adding the new timer. + * @param name identifier to be used with cancelTimer() + * @param msg message to be delivered + * @param timeout delay of first message delivery and between subsequent messages + */ + final def setTimer(name: String, msg: Any, timeout: FiniteDuration): Unit = + setTimer(name, msg, timeout, false) + + /** + * Default reason if calling `stop()`. + */ + val Normal: FSM.Reason = FSM.Normal + + /** + * Reason given when someone was calling `system.stop(fsm)` from outside; + * also applies to `Stop` supervision directive. + */ + val Shutdown: FSM.Reason = FSM.Shutdown +} + +/** + * Java API: compatible with lambda expressions + * + * Finite State Machine actor abstract base class. + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +abstract class AbstractLoggingFSM[S, D, E] extends AbstractFSM[S, D, E] with LoggingFSM[S, D, E] diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala new file mode 100644 index 0000000000..a12e02caaf --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMActor.scala @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence.fsm + +import akka.actor.ActorLogging +import akka.persistence.fsm.PersistentFsmActor.FSMState +import akka.persistence.serialization.Message +import akka.persistence.{ PersistentActor, RecoveryCompleted } + +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + +/** + * FSM actor implementation with persistent state + * + * Supports the usual [[akka.actor.FSM]] functionality with additional persistence features. + * State and State Data are persisted on every state change. + * FSM is identified by 'persistenceId' value. + * Persistence execution order is: persist -> wait for ack -> apply state. Incoming messages are deferred until the state is applied. + */ +trait PersistentFsmActor[S <: FSMState, D, E] extends PersistentActor with FSM[S, D, E] with ActorLogging { + import akka.persistence.fsm.PersistentFsmActor._ + + /** + * Enables to pass a ClassTag of a domain event base type from the implementing class + * + * @return [[scala.reflect.ClassTag]] of domain event base type + */ + implicit def domainEventClassTag: ClassTag[E] + + /** + * Domain event's [[scala.reflect.ClassTag]] + * Used for identifying domain events during recovery + */ + val domainEventTag = domainEventClassTag + + /** + * Map from state identifier to state instance + */ + lazy val statesMap: Map[String, S] = stateNames.map(name ⇒ (name.identifier, name)).toMap + + /** + * Override this handler to define the action on Domain Event + * + * @param domainEvent domain event to apply + * @param currentData state data of the previous state + * @return updated state data + */ + def applyEvent(domainEvent: E, currentData: D): D + + /** + * Override this handler to define the action on recovery completion + */ + def onRecoveryCompleted(): Unit = {} + + /** + * After recovery events are handled as in usual FSM actor + */ + override def receiveCommand: Receive = { + super[FSM].receive + } + + /** + * Discover the latest recorded state + */ + override def receiveRecover: Receive = { + case domainEventTag(event) ⇒ startWith(stateName, applyEvent(event, stateData)) + case StateChangeEvent(stateIdentifier, timeout) ⇒ startWith(statesMap(stateIdentifier), stateData, timeout) + case RecoveryCompleted ⇒ + initialize() + onRecoveryCompleted() + } + + /** + * Persist FSM State and FSM State Data + */ + override private[akka] def applyState(nextState: State): Unit = { + val eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout) + var nextData: D = stateData + persist[Any](eventsToPersist) { + case domainEventTag(event) ⇒ + nextData = applyEvent(event, nextData) + case StateChangeEvent(stateIdentifier, timeout) ⇒ + super.applyState(nextState using nextData) + nextState.afterTransitionDo(stateData) + } + } +} + +object PersistentFsmActor { + /** + * Base persistent event class + */ + private[persistence] sealed trait PersistentFsmEvent extends Message + + /** + * Persisted on state change + * + * @param stateIdentifier FSM state identifier + * @param timeout FSM state timeout + */ + private[persistence] case class StateChangeEvent(stateIdentifier: String, timeout: Option[FiniteDuration]) extends PersistentFsmEvent + + /** + * FSMState base trait, makes possible for simple default serialization by conversion to String + */ + trait FSMState { + def identifier: String + } +} + +/** + * Java API: compatible with lambda expressions + * + * Persistent Finite State Machine actor abstract base class. + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +abstract class AbstractPersistentFsmActor[S <: FSMState, D, E] extends AbstractFSM[S, D, E] with PersistentFsmActor[S, D, E] { + import java.util.function.Consumer + + /** + * Adapter from Java 8 Functional Interface to Scala Function + * @param action - Java 8 lambda expression defining the action + * @return action represented as a Scala Functin + */ + final def exec(action: Consumer[D]): D ⇒ Unit = + data ⇒ action.accept(data) + + /** + * Adapter from Java [[Class]] to [[scala.reflect.ClassTag]] + * @return domain event [[scala.reflect.ClassTag]] + */ + final override def domainEventClassTag: ClassTag[E] = + ClassTag(domainEventClass) + + /** + * Domain event's [[Class]] + * Used for identifying domain events during recovery + */ + def domainEventClass: Class[E] +} + +/** + * Java API: compatible with lambda expressions + * + * Persistent Finite State Machine actor abstract base class with FSM Logging + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +abstract class AbstractPersistentLoggingFsmActor[S <: FSMState, D, E] extends AbstractLoggingFSM[S, D, E] with PersistentFsmActor[S, D, E] diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 198e217baf..a00484917f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -4,6 +4,8 @@ package akka.persistence.serialization +import scala.concurrent.duration +import scala.concurrent.duration.Duration import scala.language.existentials import com.google.protobuf._ import akka.actor.{ ActorPath, ExtendedActorSystem } @@ -14,6 +16,7 @@ import akka.serialization._ import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ AtLeastOnceDeliverySnap } import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery import scala.collection.immutable.VectorBuilder +import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent /** * Marker trait for all protobuf-serializable messages in `akka.persistence`. @@ -21,7 +24,7 @@ import scala.collection.immutable.VectorBuilder trait Message extends Serializable /** - * Protobuf serializer for [[akka.persistence.PersistentRepr]] and [[akka.persistence.AtLeastOnceDelivery]] messages. + * Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFsmActor.StateChangeEvent]] messages. */ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import PersistentRepr.Undefined @@ -29,6 +32,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val PersistentReprClass = classOf[PersistentRepr] val PersistentImplClass = classOf[PersistentImpl] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] + val PersistentStateChangeEventClass = classOf[StateChangeEvent] override val includeManifest: Boolean = true @@ -45,6 +49,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer def toBinary(o: AnyRef): Array[Byte] = o match { case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray case a: AtLeastOnceDeliverySnap ⇒ atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray + case s: StateChangeEvent ⇒ stateChangeBuilder(s).build.toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } @@ -58,6 +63,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer case PersistentImplClass ⇒ persistent(PersistentMessage.parseFrom(bytes)) case PersistentReprClass ⇒ persistent(PersistentMessage.parseFrom(bytes)) case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes)) + case PersistentStateChangeEventClass ⇒ stateChange(PersistentStateChangeEvent.parseFrom(bytes)) case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") } } @@ -80,6 +86,14 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer builder } + def stateChangeBuilder(stateChange: StateChangeEvent): PersistentStateChangeEvent.Builder = { + val builder = PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier) + stateChange.timeout match { + case None ⇒ builder + case Some(timeout) ⇒ builder.setTimeout(timeout.toString()) + } + } + def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = { import scala.collection.JavaConverters._ val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]() @@ -93,6 +107,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer unconfirmedDeliveries.result()) } + def stateChange(persistentStateChange: PersistentStateChangeEvent): StateChangeEvent = { + StateChangeEvent( + persistentStateChange.getStateIdentifier, + if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None) + } + private def persistentMessageBuilder(persistent: PersistentRepr) = { val builder = PersistentMessage.newBuilder diff --git a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java new file mode 100644 index 0000000000..9f3f5361da --- /dev/null +++ b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFsmActorTest.java @@ -0,0 +1,596 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence.fsm; + +import akka.actor.*; +import akka.japi.Option; +import akka.persistence.PersistenceSpec; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import org.junit.ClassRule; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import akka.persistence.fsm.FSM.CurrentState; +import org.junit.Test; +import scala.concurrent.duration.Duration; + +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.UserState; +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.ShoppingCart; +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Item; + +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.GetCurrentCart; +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.AddItem; +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Buy; +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.Leave; + +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.PurchaseWasMade; +import static akka.persistence.fsm.AbstractPersistentFsmActorTest.WebStoreCustomerFSMActor.ShoppingCardDiscarded; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.matchers.JUnitMatchers.hasItems; + +public class AbstractPersistentFsmActorTest { + private static Option none = Option.none(); + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("PersistentFSMJavaTest", PersistenceSpec.config( + "leveldb", "AbstractPersistentFsmActorTest", "off", none.asScala())); + + private final ActorSystem system = actorSystemResource.getSystem(); + + //Dummy report actor, for tests that don't need it + private final ActorRef dummyReportActorRef = new TestProbe(system).ref(); + + @Test + public void fsmFunctionalTest() throws Exception { + new JavaTestKit(system) {{ + String persistenceId = generateId(); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + + watch(fsmRef); + fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + Item shirt = new Item("1", "Shirt", 59.99F); + Item shoes = new Item("2", "Shoes", 89.99F); + Item coat = new Item("3", "Coat", 119.99F); + + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(new AddItem(shirt), getRef()); + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(new AddItem(shoes), getRef()); + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(new AddItem(coat), getRef()); + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(Buy.INSTANCE, getRef()); + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(Leave.INSTANCE, getRef()); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.LOOKING_AROUND); + + ShoppingCart shoppingCart = expectMsgClass(ShoppingCart.class); + assertTrue(shoppingCart.getItems().isEmpty()); + + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt)); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes)); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat)); + + stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.PAID); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat)); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(fsmRef, terminated.getActor()); + }}; + } + + @Test + public void fsmTimeoutTest() throws Exception { + new JavaTestKit(system) {{ + String persistenceId = generateId(); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + + watch(fsmRef); + fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + Item shirt = new Item("1", "Shirt", 59.99F); + + fsmRef.tell(new AddItem(shirt), getRef()); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.LOOKING_AROUND); + + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); + + new Within(duration("0.9 seconds"), duration("1.1 seconds")) { + @Override + protected void run() { + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.INACTIVE); + } + }; + + new Within(duration("1.9 seconds"), duration("2.1 seconds")) { + @Override + protected void run() { + expectTerminated(fsmRef); + } + }; + }}; + } + + @Test + public void testSuccessfulRecoveryWithCorrectStateData() { + new JavaTestKit(system) {{ + String persistenceId = generateId(); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + + watch(fsmRef); + fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + Item shirt = new Item("1", "Shirt", 59.99F); + Item shoes = new Item("2", "Shoes", 89.99F); + Item coat = new Item("3", "Coat", 119.99F); + + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(new AddItem(shirt), getRef()); + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + fsmRef.tell(new AddItem(shoes), getRef()); + fsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.LOOKING_AROUND); + + ShoppingCart shoppingCart = expectMsgClass(ShoppingCart.class); + assertTrue(shoppingCart.getItems().isEmpty()); + + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt)); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes)); + + fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + expectTerminated(fsmRef); + + ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + watch(recoveredFsmRef); + recoveredFsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + + recoveredFsmRef.tell(new AddItem(coat), getRef()); + recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + + recoveredFsmRef.tell(Buy.INSTANCE, getRef()); + recoveredFsmRef.tell(GetCurrentCart.INSTANCE, getRef()); + recoveredFsmRef.tell(Leave.INSTANCE, getRef()); + + currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.SHOPPING); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes)); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat)); + + stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, recoveredFsmRef, UserState.SHOPPING, UserState.PAID); + + shoppingCart = expectMsgClass(ShoppingCart.class); + assertThat(shoppingCart.getItems(), hasItems(shirt, shoes, coat)); + + expectTerminated(recoveredFsmRef); + }}; + } + + @Test + public void testExecutionOfDefinedActionsFollowingSuccessfulPersistence() { + new JavaTestKit(system) {{ + String persistenceId = generateId(); + + TestProbe reportActorProbe = new TestProbe(system); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref())); + + watch(fsmRef); + fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + Item shirt = new Item("1", "Shirt", 59.99F); + Item shoes = new Item("2", "Shoes", 89.99F); + Item coat = new Item("3", "Coat", 119.99F); + + fsmRef.tell(new AddItem(shirt), getRef()); + fsmRef.tell(new AddItem(shoes), getRef()); + fsmRef.tell(new AddItem(coat), getRef()); + fsmRef.tell(Buy.INSTANCE, getRef()); + fsmRef.tell(Leave.INSTANCE, getRef()); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.LOOKING_AROUND); + + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); + + stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.SHOPPING, UserState.PAID); + + PurchaseWasMade purchaseWasMade = reportActorProbe.expectMsgClass(PurchaseWasMade.class); + assertThat(purchaseWasMade.getItems(), hasItems(shirt, shoes, coat)); + + expectTerminated(fsmRef); + }}; + } + + @Test + public void testExecutionOfDefinedActionsFollowingSuccessfulPersistenceOfFSMStop() { + new JavaTestKit(system) {{ + String persistenceId = generateId(); + + TestProbe reportActorProbe = new TestProbe(system); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref())); + + watch(fsmRef); + fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + Item shirt = new Item("1", "Shirt", 59.99F); + Item shoes = new Item("2", "Shoes", 89.99F); + Item coat = new Item("3", "Coat", 119.99F); + + fsmRef.tell(new AddItem(shirt), getRef()); + fsmRef.tell(new AddItem(shoes), getRef()); + fsmRef.tell(new AddItem(coat), getRef()); + fsmRef.tell(Leave.INSTANCE, getRef()); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.LOOKING_AROUND); + + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); + + reportActorProbe.expectMsgClass(ShoppingCardDiscarded.class); + + expectTerminated(fsmRef); + }}; + } + + @Test + public void testCorrectStateTimeoutFollowingRecovery() { + new JavaTestKit(system) {{ + String persistenceId = generateId(); + ActorRef fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + + watch(fsmRef); + fsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + Item shirt = new Item("1", "Shirt", 59.99F); + + fsmRef.tell(new AddItem(shirt), getRef()); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.LOOKING_AROUND); + + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); + + expectNoMsg(duration("0.6seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM + fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + expectTerminated(fsmRef); + + final ActorRef recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + watch(recoveredFsmRef); + recoveredFsmRef.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + + currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.SHOPPING); + + new Within(duration("0.9 seconds"), duration("1.1 seconds")) { + @Override + protected void run() { + FSM.Transition stateTransition = expectMsgClass(FSM.Transition.class); + assertTransition(stateTransition, recoveredFsmRef, UserState.SHOPPING, UserState.INACTIVE); + } + }; + + expectNoMsg(duration("0.9 seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM + recoveredFsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + expectTerminated(recoveredFsmRef); + + final ActorRef recoveredFsmRef2 = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)); + watch(recoveredFsmRef2); + recoveredFsmRef2.tell(new FSM.SubscribeTransitionCallBack(getRef()), getRef()); + + currentState = expectMsgClass(akka.persistence.fsm.FSM.CurrentState.class); + assertEquals(currentState.state(), UserState.INACTIVE); + + new Within(duration("1.9 seconds"), duration("2.1 seconds")) { + @Override + protected void run() { + expectTerminated(recoveredFsmRef2); + } + }; + }}; + } + + + private static void assertTransition(FSM.Transition transition, ActorRef ref, From from, To to) { + assertEquals(ref, transition.fsmRef()); + assertEquals(from, transition.from()); + assertEquals(to, transition.to()); + } + + private static String generateId() { + return UUID.randomUUID().toString(); + } + + + public static class WebStoreCustomerFSMActor extends AbstractPersistentFsmActor { + + //State name + //#customer-states + enum UserState implements PersistentFsmActor.FSMState { + LOOKING_AROUND("Looking Around"), + SHOPPING("Shopping"), + INACTIVE("Inactive"), + PAID("Paid"); + + private final String stateIdentifier; + + UserState(String stateIdentifier) { + this.stateIdentifier = stateIdentifier; + } + + @Override + public String identifier() { + return stateIdentifier; + } + } + //#customer-states + + //#customer-states-data + static class ShoppingCart { + private final List items = new ArrayList<>(); + + public List getItems() { + return Collections.unmodifiableList(items); + } + + void addItem(Item item) { + items.add(item); + } + + void empty() { + items.clear(); + } + } + + static class Item implements Serializable { + private final String id; + private final String name; + private final float price; + + Item(String id, String name, float price) { + this.id = id; + this.name = name; + this.price = price; + } + + public String getId() { + return id; + } + + public float getPrice() { + return price; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return String.format("Item{id=%s, name=%s, price=%s}", id, price, name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Item item = (Item) o; + + return item.price == price && id.equals(item.id) && name.equals(item.name); + } + } + //#customer-states-data + + public interface Command { + } + + //#customer-commands + public static final class AddItem implements Command { + private final Item item; + + public AddItem(Item item) { + this.item = item; + } + + public Item getItem() { + return item; + } + } + + public enum Buy implements Command {INSTANCE} + + public enum Leave implements Command {INSTANCE} + + public enum GetCurrentCart implements Command {INSTANCE} + //#customer-commands + + interface DomainEvent extends Serializable { + } + + //#customer-domain-events + public static final class ItemAdded implements DomainEvent { + private final Item item; + + public ItemAdded(Item item) { + this.item = item; + } + + public Item getItem() { + return item; + } + } + + public enum OrderExecuted implements DomainEvent {INSTANCE} + + public enum OrderDiscarded implements DomainEvent {INSTANCE} + //#customer-domain-events + + + //Side effects - report events to be sent to some "Report Actor" + public interface ReportEvent { + } + + public static final class PurchaseWasMade implements ReportEvent { + private final List items; + + public PurchaseWasMade(List items) { + this.items = Collections.unmodifiableList(items); + } + + public List getItems() { + return items; + } + } + + public enum ShoppingCardDiscarded implements ReportEvent {INSTANCE} + + final private String persistenceId; + + @Override + public Class domainEventClass() { + return DomainEvent.class; + } + + @Override + public String persistenceId() { + return persistenceId; + } + + public static Props props(String persistenceId, ActorRef reportActor) { + return Props.create(WebStoreCustomerFSMActor.class, persistenceId, reportActor); + } + + public WebStoreCustomerFSMActor(String persistenceId, ActorRef reportActor) { + this.persistenceId = persistenceId; + + //#customer-fsm-body + startWith(UserState.LOOKING_AROUND, new ShoppingCart()); + + when(UserState.LOOKING_AROUND, + matchEvent(AddItem.class, + (event, data) -> + goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem())) + .forMax(Duration.create(1, TimeUnit.SECONDS)) + ) + .event(GetCurrentCart.class, (event, data) -> stay().replying(data)) + ); + + when(UserState.SHOPPING, + matchEvent(AddItem.class, + (event, data) -> + stay().applying(new ItemAdded(event.getItem())) + .forMax(Duration.create(1, TimeUnit.SECONDS))) + .event(Buy.class, + (event, data) -> + goTo(UserState.PAID).applying(OrderExecuted.INSTANCE) + .andThen(exec(cart -> + reportActor.tell(new PurchaseWasMade(cart.getItems()), self())) + )) + .event(Leave.class, + (event, data) -> + stop().applying(OrderDiscarded.INSTANCE) + .andThen(exec(cart -> + reportActor.tell(ShoppingCardDiscarded.INSTANCE, self()) + ))) + .event(GetCurrentCart.class, (event, data) -> stay().replying(data)) + .event(StateTimeout$.class, + (event, data) -> + goTo(UserState.INACTIVE).forMax(Duration.create(2, TimeUnit.SECONDS))) + ); + + + when(UserState.INACTIVE, + matchEvent(AddItem.class, + (event, data) -> + goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem())) + .forMax(Duration.create(1, TimeUnit.SECONDS))) + .event(GetCurrentCart.class, (event, data) -> stay().replying(data)) + .event(StateTimeout$.class, + (event, data) -> + stop().applying(OrderDiscarded.INSTANCE) + .andThen(exec(cart -> + reportActor.tell(ShoppingCardDiscarded.INSTANCE, self()) + ))) + ); + + when(UserState.PAID, + matchEvent(Leave.class, (event, data) -> stop()) + .event(GetCurrentCart.class, (event, data) -> stay().replying(data)) + ); + + initialize(); + //#customer-fsm-body + } + + /** + * Override this handler to define the action on Domain Event during recovery + * + * @param event domain event to apply + * @param currentData state data of the previous state + */ + //#customer-apply-event + @Override + public ShoppingCart applyEvent(DomainEvent event, ShoppingCart currentData) { + if (event instanceof ItemAdded) { + currentData.addItem(((ItemAdded) event).getItem()); + return currentData; + } else if (event instanceof OrderExecuted) { + return currentData; + } else if (event instanceof OrderDiscarded) { + currentData.empty(); + return currentData; + } + throw new RuntimeException("Unhandled"); + } + //#customer-apply-event + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index 01b7857f92..487c61676a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -149,7 +149,7 @@ object AtLeastOnceDeliverySpec { } -abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec { +abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { import akka.persistence.AtLeastOnceDeliverySpec._ "AtLeastOnceDelivery" must { diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index 58e13f0c1a..4135e111d6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -111,7 +111,7 @@ object PerformanceSpec { } } -class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "PerformanceSpec", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender { +class PerformanceSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "PerformanceSpec", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with ImplicitSender { import PerformanceSpec._ val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load") diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index f8c37b9649..69dccdf7be 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.reflect.ClassTag import scala.util.control.NoStackTrace -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ Config, ConfigFactory } import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfterEach @@ -18,7 +18,7 @@ import org.scalatest.BeforeAndAfterEach import akka.actor.Props import akka.testkit.AkkaSpec -trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒ +abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒ private var _name: String = _ lazy val extension = Persistence(system) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index b288c92faa..6f9047fd16 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -131,10 +131,10 @@ object PersistentActorFailureSpec { } } -class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( +class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( """ akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal" - """))) with PersistenceSpec with ImplicitSender { + """))) with ImplicitSender { import PersistentActorFailureSpec._ import PersistentActorSpec._ diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 8423a051a9..ff5fd350f4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -510,7 +510,7 @@ object PersistentActorSpec { } -abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { +abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { import PersistentActorSpec._ override protected def beforeEach() { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index b4bdc5c1d8..2cc22d8195 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -152,7 +152,7 @@ object PersistentViewSpec { } } -abstract class PersistentViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { +abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { import akka.persistence.PersistentViewSpec._ var persistentActor: ActorRef = _ diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index d93f9f5732..acc3680a97 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -58,10 +58,10 @@ object SnapshotFailureRobustnessSpec { } } -class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some( +class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some( """ akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore" - """))) with PersistenceSpec with ImplicitSender { + """))) with ImplicitSender { import SnapshotFailureRobustnessSpec._ diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index 067bb9217a..961c0f8c45 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -33,7 +33,7 @@ object SnapshotRecoveryLocalStoreSpec { } } -class SnapshotRecoveryLocalStoreSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotRecoveryLocalStoreSpec")) with PersistenceSpec with ImplicitSender { +class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotRecoveryLocalStoreSpec")) with ImplicitSender { import SnapshotRecoveryLocalStoreSpec._ diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index aa290e2c72..8fd8da18b7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -64,7 +64,7 @@ object SnapshotSerializationSpec { } -class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotSerializationSpec", serialization = "off", extraConfig = Some( +class SnapshotSerializationSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotSerializationSpec", serialization = "off", extraConfig = Some( """ akka.actor { serializers { @@ -74,7 +74,7 @@ class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb "akka.persistence.SnapshotSerializationSpec$SerializationMarker" = my-snapshot } } - """))) with PersistenceSpec with ImplicitSender { + """))) with ImplicitSender { import SnapshotSerializationSpec._ import SnapshotSerializationSpec.XXXXXXXXXXXXXXXXXXXX._ diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 22ac847bbb..aae856b423 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -60,7 +60,7 @@ object SnapshotSpec { } } -class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotSpec")) with PersistenceSpec with ImplicitSender { +class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotSpec")) with ImplicitSender { import SnapshotSpec._ import SnapshotProtocol._ diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala new file mode 100644 index 0000000000..2d400c61a3 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala @@ -0,0 +1,354 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence.fsm + +import akka.actor._ +import akka.persistence.PersistenceSpec +import akka.persistence.fsm.FSM.{ CurrentState, SubscribeTransitionCallBack, Transition } +import akka.persistence.fsm.PersistentFsmActor.FSMState +import akka.testkit._ +import com.typesafe.config.Config + +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { + import PersistentFSMActorSpec._ + + //Dummy report actor, for tests that don't need it + val dummyReportActorRef = TestProbe().ref + + "Persistent FSM Actor" must { + "function as a regular FSM " in { + val persistenceId = name + val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + + watch(fsmRef) + fsmRef ! SubscribeTransitionCallBack(testActor) + + val shirt = Item("1", "Shirt", 59.99F) + val shoes = Item("2", "Shoes", 89.99F) + val coat = Item("3", "Coat", 119.99F) + + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shirt) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shoes) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(coat) + fsmRef ! GetCurrentCart + fsmRef ! Buy + fsmRef ! GetCurrentCart + fsmRef ! Leave + + expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(EmptyShoppingCart) + + expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(NonEmptyShoppingCart(List(shirt))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectMsg(Transition(fsmRef, Shopping, Paid)) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectTerminated(fsmRef) + } + + "function as a regular FSM on state timeout" in { + val persistenceId = name + val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + + watch(fsmRef) + fsmRef ! SubscribeTransitionCallBack(testActor) + + val shirt = Item("1", "Shirt", 59.99F) + + fsmRef ! AddItem(shirt) + + expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(Transition(fsmRef, LookingAround, Shopping)) + + within(0.9 seconds, 1.1 seconds) { + expectMsg(Transition(fsmRef, Shopping, Inactive)) + } + + within(1.9 seconds, 2.1 seconds) { + expectTerminated(fsmRef) + } + } + + "recover successfully with correct state data" in { + val persistenceId = name + + val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + watch(fsmRef) + fsmRef ! SubscribeTransitionCallBack(testActor) + + val shirt = Item("1", "Shirt", 59.99F) + val shoes = Item("2", "Shoes", 89.99F) + val coat = Item("3", "Coat", 119.99F) + + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shirt) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shoes) + fsmRef ! GetCurrentCart + + expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(EmptyShoppingCart) + + expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(NonEmptyShoppingCart(List(shirt))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) + + fsmRef ! PoisonPill + expectTerminated(fsmRef) + + val recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + watch(recoveredFsmRef) + recoveredFsmRef ! SubscribeTransitionCallBack(testActor) + + recoveredFsmRef ! GetCurrentCart + + recoveredFsmRef ! AddItem(coat) + recoveredFsmRef ! GetCurrentCart + + recoveredFsmRef ! Buy + recoveredFsmRef ! GetCurrentCart + recoveredFsmRef ! Leave + + expectMsg(CurrentState(recoveredFsmRef, Shopping)) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) + + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectMsg(Transition(recoveredFsmRef, Shopping, Paid)) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectTerminated(recoveredFsmRef) + } + + "execute the defined actions following successful persistence of state change" in { + val persistenceId = name + + val reportActorProbe = TestProbe() + val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref)) + watch(fsmRef) + fsmRef ! SubscribeTransitionCallBack(testActor) + + val shirt = Item("1", "Shirt", 59.99F) + val shoes = Item("2", "Shoes", 89.99F) + val coat = Item("3", "Coat", 119.99F) + + fsmRef ! AddItem(shirt) + fsmRef ! AddItem(shoes) + fsmRef ! AddItem(coat) + fsmRef ! Buy + fsmRef ! Leave + + expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(Transition(fsmRef, LookingAround, Shopping)) + expectMsg(Transition(fsmRef, Shopping, Paid)) + reportActorProbe.expectMsg(PurchaseWasMade(List(shirt, shoes, coat))) + expectTerminated(fsmRef) + } + + "execute the defined actions following successful persistence of FSM stop" in { + val persistenceId = name + + val reportActorProbe = TestProbe() + val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, reportActorProbe.ref)) + watch(fsmRef) + fsmRef ! SubscribeTransitionCallBack(testActor) + + val shirt = Item("1", "Shirt", 59.99F) + val shoes = Item("2", "Shoes", 89.99F) + val coat = Item("3", "Coat", 119.99F) + + fsmRef ! AddItem(shirt) + fsmRef ! AddItem(shoes) + fsmRef ! AddItem(coat) + fsmRef ! Leave + + expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(Transition(fsmRef, LookingAround, Shopping)) + reportActorProbe.expectMsg(ShoppingCardDiscarded) + expectTerminated(fsmRef) + } + + "recover successfully with correct state timeout" in { + val persistenceId = name + val fsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + + watch(fsmRef) + fsmRef ! SubscribeTransitionCallBack(testActor) + + val shirt = Item("1", "Shirt", 59.99F) + + fsmRef ! AddItem(shirt) + + expectMsg(CurrentState(fsmRef, LookingAround)) + expectMsg(Transition(fsmRef, LookingAround, Shopping)) + + expectNoMsg(0.6 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM + fsmRef ! PoisonPill + expectTerminated(fsmRef) + + var recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + watch(recoveredFsmRef) + recoveredFsmRef ! SubscribeTransitionCallBack(testActor) + + expectMsg(CurrentState(recoveredFsmRef, Shopping)) + + within(0.9 seconds, 1.1 seconds) { + expectMsg(Transition(recoveredFsmRef, Shopping, Inactive)) + } + + expectNoMsg(0.9 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM + recoveredFsmRef ! PoisonPill + expectTerminated(recoveredFsmRef) + + recoveredFsmRef = system.actorOf(WebStoreCustomerFSMActor.props(persistenceId, dummyReportActorRef)) + watch(recoveredFsmRef) + recoveredFsmRef ! SubscribeTransitionCallBack(testActor) + + expectMsg(CurrentState(recoveredFsmRef, Inactive)) + + within(1.9 seconds, 2.1 seconds) { + expectTerminated(recoveredFsmRef) + } + } + } + +} + +object PersistentFSMActorSpec { + //#customer-states + sealed trait UserState extends FSMState + case object LookingAround extends UserState { + override def identifier: String = "Looking Around" + } + case object Shopping extends UserState { + override def identifier: String = "Shopping" + } + case object Inactive extends UserState { + override def identifier: String = "Inactive" + } + case object Paid extends UserState { + override def identifier: String = "Paid" + } + //#customer-states + + //#customer-states-data + case class Item(id: String, name: String, price: Float) + + sealed trait ShoppingCart { + def addItem(item: Item): ShoppingCart + def empty(): ShoppingCart + } + case object EmptyShoppingCart extends ShoppingCart { + def addItem(item: Item) = NonEmptyShoppingCart(item :: Nil) + def empty() = this + } + case class NonEmptyShoppingCart(items: Seq[Item]) extends ShoppingCart { + def addItem(item: Item) = NonEmptyShoppingCart(items :+ item) + def empty() = EmptyShoppingCart + } + //#customer-states-data + + //#customer-commands + sealed trait Command + case class AddItem(item: Item) extends Command + case object Buy extends Command + case object Leave extends Command + case object GetCurrentCart extends Command + //#customer-commands + + //#customer-domain-events + sealed trait DomainEvent + case class ItemAdded(item: Item) extends DomainEvent + case object OrderExecuted extends DomainEvent + case object OrderDiscarded extends DomainEvent + //#customer-domain-events + + //Side effects - report events to be sent to some "Report Actor" + sealed trait ReportEvent + case class PurchaseWasMade(items: Seq[Item]) extends ReportEvent + case object ShoppingCardDiscarded extends ReportEvent + + class WebStoreCustomerFSMActor(_persistenceId: String, reportActor: ActorRef)(implicit val domainEventClassTag: ClassTag[DomainEvent]) extends PersistentFsmActor[UserState, ShoppingCart, DomainEvent] { + override def persistenceId = _persistenceId + + //#customer-fsm-body + startWith(LookingAround, EmptyShoppingCart) + + when(LookingAround) { + case Event(AddItem(item), _) ⇒ + goto(Shopping) applying ItemAdded(item) forMax (1 seconds) + case Event(GetCurrentCart, data) ⇒ + stay replying data + } + + when(Shopping) { + case Event(AddItem(item), _) ⇒ + stay applying ItemAdded(item) forMax (1 seconds) + case Event(Buy, _) ⇒ + goto(Paid) applying OrderExecuted andThen { + case NonEmptyShoppingCart(items) ⇒ reportActor ! PurchaseWasMade(items) + } + case Event(Leave, _) ⇒ + stop applying OrderDiscarded andThen { + case _ ⇒ reportActor ! ShoppingCardDiscarded + } + case Event(GetCurrentCart, data) ⇒ + stay replying data + case Event(StateTimeout, _) ⇒ + goto(Inactive) forMax (2 seconds) + } + + when(Inactive) { + case Event(AddItem(item), _) ⇒ + goto(Shopping) applying ItemAdded(item) forMax (1 seconds) + case Event(StateTimeout, _) ⇒ + stop applying OrderDiscarded andThen { + case _ ⇒ reportActor ! ShoppingCardDiscarded + } + } + + when(Paid) { + case Event(Leave, _) ⇒ stop() + case Event(GetCurrentCart, data) ⇒ + stay replying data + } + //#customer-fsm-body + + /** + * Override this handler to define the action on Domain Event + * + * @param event domain event to apply + * @param cartBeforeEvent state data of the previous state + */ + //#customer-apply-event + override def applyEvent(event: DomainEvent, cartBeforeEvent: ShoppingCart): ShoppingCart = { + event match { + case ItemAdded(item) ⇒ cartBeforeEvent.addItem(item) + case OrderExecuted ⇒ cartBeforeEvent + case OrderDiscarded ⇒ cartBeforeEvent.empty() + } + } + //#customer-apply-event + } + + object WebStoreCustomerFSMActor { + def props(persistenceId: String, reportActor: ActorRef) = + Props(new WebStoreCustomerFSMActor(persistenceId, reportActor)) + } +} + +class LeveldbPersistentFSMActorSpec extends PersistentFSMActorSpec(PersistenceSpec.config("leveldb", "PersistentFSMActorSpec")) +class InmemPersistentFSMActorSpec extends PersistentFSMActorSpec(PersistenceSpec.config("inmem", "PersistentFSMActorSpec"))