diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index f43d3f6e0c..b922dc9945 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -789,6 +789,18 @@ Here is how everything is wired together: .. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-apply-event +``andThen`` can be used to define actions which will be executed following event's persistence - convenient for "side effects" like sending a message or logging. +Notice that actions defined in ``andThen`` block are not executed on recovery: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-andthen-example + +A snapshot of state data can be persisted by calling the ``saveStateSnapshot()`` method: + +.. includecode:: ../../../akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java#customer-snapshot-example + +On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the +``applyEvent`` method. + Storage plugins =============== diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 7e9ee1695a..a8649454ac 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -796,6 +796,18 @@ Here is how everything is wired together: .. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-apply-event +``andThen`` can be used to define actions which will be executed following event's persistence - convenient for "side effects" like sending a message or logging. +Notice that actions defined in ``andThen`` block are not executed on recovery: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-andthen-example + +A snapshot of state data can be persisted by calling the ``saveStateSnapshot()`` method: + +.. includecode:: ../../../akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala#customer-snapshot-example + +On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the +``applyEvent`` method. + .. _storage-plugins: Storage plugins diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 53e63f67e7..ba11b1848c 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -4293,6 +4293,16 @@ public final class MessageFormats { */ akka.protobuf.ByteString getTimeoutBytes(); + + // optional int64 timeoutNanos = 3; + /** + * optional int64 timeoutNanos = 3; + */ + boolean hasTimeoutNanos(); + /** + * optional int64 timeoutNanos = 3; + */ + long getTimeoutNanos(); } /** * Protobuf type {@code PersistentStateChangeEvent} @@ -4355,6 +4365,11 @@ public final class MessageFormats { timeout_ = input.readBytes(); break; } + case 24: { + bitField0_ |= 0x00000004; + timeoutNanos_ = input.readInt64(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -4481,9 +4496,26 @@ public final class MessageFormats { } } + // optional int64 timeoutNanos = 3; + public static final int TIMEOUTNANOS_FIELD_NUMBER = 3; + private long timeoutNanos_; + /** + * optional int64 timeoutNanos = 3; + */ + public boolean hasTimeoutNanos() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 timeoutNanos = 3; + */ + public long getTimeoutNanos() { + return timeoutNanos_; + } + private void initFields() { stateIdentifier_ = ""; timeout_ = ""; + timeoutNanos_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4507,6 +4539,9 @@ public final class MessageFormats { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getTimeoutBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, timeoutNanos_); + } getUnknownFields().writeTo(output); } @@ -4524,6 +4559,10 @@ public final class MessageFormats { size += akka.protobuf.CodedOutputStream .computeBytesSize(2, getTimeoutBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeInt64Size(3, timeoutNanos_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4644,6 +4683,8 @@ public final class MessageFormats { bitField0_ = (bitField0_ & ~0x00000001); timeout_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + timeoutNanos_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -4680,6 +4721,10 @@ public final class MessageFormats { to_bitField0_ |= 0x00000002; } result.timeout_ = timeout_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.timeoutNanos_ = timeoutNanos_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4706,6 +4751,9 @@ public final class MessageFormats { timeout_ = other.timeout_; onChanged(); } + if (other.hasTimeoutNanos()) { + setTimeoutNanos(other.getTimeoutNanos()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4885,6 +4933,39 @@ public final class MessageFormats { return this; } + // optional int64 timeoutNanos = 3; + private long timeoutNanos_ ; + /** + * optional int64 timeoutNanos = 3; + */ + public boolean hasTimeoutNanos() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 timeoutNanos = 3; + */ + public long getTimeoutNanos() { + return timeoutNanos_; + } + /** + * optional int64 timeoutNanos = 3; + */ + public Builder setTimeoutNanos(long value) { + bitField0_ |= 0x00000004; + timeoutNanos_ = value; + onChanged(); + return this; + } + /** + * optional int64 timeoutNanos = 3; + */ + public Builder clearTimeoutNanos() { + bitField0_ = (bitField0_ & ~0x00000004); + timeoutNanos_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:PersistentStateChangeEvent) } @@ -4896,6 +4977,774 @@ public final class MessageFormats { // @@protoc_insertion_point(class_scope:PersistentStateChangeEvent) } + public interface PersistentFSMSnapshotOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string stateIdentifier = 1; + /** + * required string stateIdentifier = 1; + */ + boolean hasStateIdentifier(); + /** + * required string stateIdentifier = 1; + */ + java.lang.String getStateIdentifier(); + /** + * required string stateIdentifier = 1; + */ + akka.protobuf.ByteString + getStateIdentifierBytes(); + + // required .PersistentPayload data = 2; + /** + * required .PersistentPayload data = 2; + */ + boolean hasData(); + /** + * required .PersistentPayload data = 2; + */ + akka.persistence.serialization.MessageFormats.PersistentPayload getData(); + /** + * required .PersistentPayload data = 2; + */ + akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getDataOrBuilder(); + + // optional int64 timeoutNanos = 3; + /** + * optional int64 timeoutNanos = 3; + */ + boolean hasTimeoutNanos(); + /** + * optional int64 timeoutNanos = 3; + */ + long getTimeoutNanos(); + } + /** + * Protobuf type {@code PersistentFSMSnapshot} + */ + public static final class PersistentFSMSnapshot extends + akka.protobuf.GeneratedMessage + implements PersistentFSMSnapshotOrBuilder { + // Use PersistentFSMSnapshot.newBuilder() to construct. + private PersistentFSMSnapshot(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private PersistentFSMSnapshot(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final PersistentFSMSnapshot defaultInstance; + public static PersistentFSMSnapshot getDefaultInstance() { + return defaultInstance; + } + + public PersistentFSMSnapshot getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PersistentFSMSnapshot( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + stateIdentifier_ = input.readBytes(); + break; + } + case 18: { + akka.persistence.serialization.MessageFormats.PersistentPayload.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = data_.toBuilder(); + } + data_ = input.readMessage(akka.persistence.serialization.MessageFormats.PersistentPayload.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(data_); + data_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000002; + break; + } + case 24: { + bitField0_ |= 0x00000004; + timeoutNanos_ = input.readInt64(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentFSMSnapshot_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentFSMSnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.class, akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public PersistentFSMSnapshot parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new PersistentFSMSnapshot(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string stateIdentifier = 1; + public static final int STATEIDENTIFIER_FIELD_NUMBER = 1; + private java.lang.Object stateIdentifier_; + /** + * required string stateIdentifier = 1; + */ + public boolean hasStateIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string stateIdentifier = 1; + */ + public java.lang.String getStateIdentifier() { + java.lang.Object ref = stateIdentifier_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + stateIdentifier_ = s; + } + return s; + } + } + /** + * required string stateIdentifier = 1; + */ + public akka.protobuf.ByteString + getStateIdentifierBytes() { + java.lang.Object ref = stateIdentifier_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + stateIdentifier_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // required .PersistentPayload data = 2; + public static final int DATA_FIELD_NUMBER = 2; + private akka.persistence.serialization.MessageFormats.PersistentPayload data_; + /** + * required .PersistentPayload data = 2; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .PersistentPayload data = 2; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload getData() { + return data_; + } + /** + * required .PersistentPayload data = 2; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getDataOrBuilder() { + return data_; + } + + // optional int64 timeoutNanos = 3; + public static final int TIMEOUTNANOS_FIELD_NUMBER = 3; + private long timeoutNanos_; + /** + * optional int64 timeoutNanos = 3; + */ + public boolean hasTimeoutNanos() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 timeoutNanos = 3; + */ + public long getTimeoutNanos() { + return timeoutNanos_; + } + + private void initFields() { + stateIdentifier_ = ""; + data_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + timeoutNanos_ = 0L; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasStateIdentifier()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasData()) { + memoizedIsInitialized = 0; + return false; + } + if (!getData().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getStateIdentifierBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, data_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, timeoutNanos_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getStateIdentifierBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(2, data_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeInt64Size(3, timeoutNanos_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code PersistentFSMSnapshot} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.persistence.serialization.MessageFormats.PersistentFSMSnapshotOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentFSMSnapshot_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentFSMSnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.class, akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.Builder.class); + } + + // Construct using akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getDataFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + stateIdentifier_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + if (dataBuilder_ == null) { + data_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + } else { + dataBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + timeoutNanos_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentFSMSnapshot_descriptor; + } + + public akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot getDefaultInstanceForType() { + return akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.getDefaultInstance(); + } + + public akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot build() { + akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot buildPartial() { + akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot result = new akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.stateIdentifier_ = stateIdentifier_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (dataBuilder_ == null) { + result.data_ = data_; + } else { + result.data_ = dataBuilder_.build(); + } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.timeoutNanos_ = timeoutNanos_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot) { + return mergeFrom((akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot other) { + if (other == akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot.getDefaultInstance()) return this; + if (other.hasStateIdentifier()) { + bitField0_ |= 0x00000001; + stateIdentifier_ = other.stateIdentifier_; + onChanged(); + } + if (other.hasData()) { + mergeData(other.getData()); + } + if (other.hasTimeoutNanos()) { + setTimeoutNanos(other.getTimeoutNanos()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasStateIdentifier()) { + + return false; + } + if (!hasData()) { + + return false; + } + if (!getData().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.persistence.serialization.MessageFormats.PersistentFSMSnapshot) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string stateIdentifier = 1; + private java.lang.Object stateIdentifier_ = ""; + /** + * required string stateIdentifier = 1; + */ + public boolean hasStateIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string stateIdentifier = 1; + */ + public java.lang.String getStateIdentifier() { + java.lang.Object ref = stateIdentifier_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + stateIdentifier_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string stateIdentifier = 1; + */ + public akka.protobuf.ByteString + getStateIdentifierBytes() { + java.lang.Object ref = stateIdentifier_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + stateIdentifier_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string stateIdentifier = 1; + */ + public Builder setStateIdentifier( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + stateIdentifier_ = value; + onChanged(); + return this; + } + /** + * required string stateIdentifier = 1; + */ + public Builder clearStateIdentifier() { + bitField0_ = (bitField0_ & ~0x00000001); + stateIdentifier_ = getDefaultInstance().getStateIdentifier(); + onChanged(); + return this; + } + /** + * required string stateIdentifier = 1; + */ + public Builder setStateIdentifierBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + stateIdentifier_ = value; + onChanged(); + return this; + } + + // required .PersistentPayload data = 2; + private akka.persistence.serialization.MessageFormats.PersistentPayload data_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> dataBuilder_; + /** + * required .PersistentPayload data = 2; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .PersistentPayload data = 2; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload getData() { + if (dataBuilder_ == null) { + return data_; + } else { + return dataBuilder_.getMessage(); + } + } + /** + * required .PersistentPayload data = 2; + */ + public Builder setData(akka.persistence.serialization.MessageFormats.PersistentPayload value) { + if (dataBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + data_ = value; + onChanged(); + } else { + dataBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .PersistentPayload data = 2; + */ + public Builder setData( + akka.persistence.serialization.MessageFormats.PersistentPayload.Builder builderForValue) { + if (dataBuilder_ == null) { + data_ = builderForValue.build(); + onChanged(); + } else { + dataBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .PersistentPayload data = 2; + */ + public Builder mergeData(akka.persistence.serialization.MessageFormats.PersistentPayload value) { + if (dataBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + data_ != akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance()) { + data_ = + akka.persistence.serialization.MessageFormats.PersistentPayload.newBuilder(data_).mergeFrom(value).buildPartial(); + } else { + data_ = value; + } + onChanged(); + } else { + dataBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .PersistentPayload data = 2; + */ + public Builder clearData() { + if (dataBuilder_ == null) { + data_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + onChanged(); + } else { + dataBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + /** + * required .PersistentPayload data = 2; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload.Builder getDataBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getDataFieldBuilder().getBuilder(); + } + /** + * required .PersistentPayload data = 2; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getDataOrBuilder() { + if (dataBuilder_ != null) { + return dataBuilder_.getMessageOrBuilder(); + } else { + return data_; + } + } + /** + * required .PersistentPayload data = 2; + */ + private akka.protobuf.SingleFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> + getDataFieldBuilder() { + if (dataBuilder_ == null) { + dataBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder>( + data_, + getParentForChildren(), + isClean()); + data_ = null; + } + return dataBuilder_; + } + + // optional int64 timeoutNanos = 3; + private long timeoutNanos_ ; + /** + * optional int64 timeoutNanos = 3; + */ + public boolean hasTimeoutNanos() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 timeoutNanos = 3; + */ + public long getTimeoutNanos() { + return timeoutNanos_; + } + /** + * optional int64 timeoutNanos = 3; + */ + public Builder setTimeoutNanos(long value) { + bitField0_ |= 0x00000004; + timeoutNanos_ = value; + onChanged(); + return this; + } + /** + * optional int64 timeoutNanos = 3; + */ + public Builder clearTimeoutNanos() { + bitField0_ = (bitField0_ & ~0x00000004); + timeoutNanos_ = 0L; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:PersistentFSMSnapshot) + } + + static { + defaultInstance = new PersistentFSMSnapshot(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:PersistentFSMSnapshot) + } + private static akka.protobuf.Descriptors.Descriptor internal_static_PersistentMessage_descriptor; private static @@ -4926,6 +5775,11 @@ public final class MessageFormats { private static akka.protobuf.GeneratedMessage.FieldAccessorTable internal_static_PersistentStateChangeEvent_fieldAccessorTable; + private static akka.protobuf.Descriptors.Descriptor + internal_static_PersistentFSMSnapshot_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_PersistentFSMSnapshot_fieldAccessorTable; public static akka.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -4949,10 +5803,13 @@ public final class MessageFormats { "verySnapshot.UnconfirmedDelivery\032c\n\023Unco" + "nfirmedDelivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013d" + "estination\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.Pers" + - "istentPayload\"F\n\032PersistentStateChangeEv" + + "istentPayload\"\\\n\032PersistentStateChangeEv" + "ent\022\027\n\017stateIdentifier\030\001 \002(\t\022\017\n\007timeout\030" + - "\002 \001(\tB\"\n\036akka.persistence.serializationH" + - "\001" + "\002 \001(\t\022\024\n\014timeoutNanos\030\003 \001(\003\"h\n\025Persisten" + + "tFSMSnapshot\022\027\n\017stateIdentifier\030\001 \002(\t\022 \n" + + "\004data\030\002 \002(\0132\022.PersistentPayload\022\024\n\014timeo" + + "utNanos\030\003 \001(\003B\"\n\036akka.persistence.serial", + "izationH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4994,7 +5851,13 @@ public final class MessageFormats { internal_static_PersistentStateChangeEvent_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentStateChangeEvent_descriptor, - new java.lang.String[] { "StateIdentifier", "Timeout", }); + new java.lang.String[] { "StateIdentifier", "Timeout", "TimeoutNanos", }); + internal_static_PersistentFSMSnapshot_descriptor = + getDescriptor().getMessageTypes().get(5); + internal_static_PersistentFSMSnapshot_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_PersistentFSMSnapshot_descriptor, + new java.lang.String[] { "StateIdentifier", "Data", "TimeoutNanos", }); return null; } }; diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index 4c4d5d5716..ea90d1abae 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -43,5 +43,12 @@ message AtLeastOnceDeliverySnapshot { message PersistentStateChangeEvent { required string stateIdentifier = 1; - optional string timeout = 2; + optional string timeout = 2; //not used in new records from 2.4.5 + optional int64 timeoutNanos = 3; +} + +message PersistentFSMSnapshot { + required string stateIdentifier = 1; + required PersistentPayload data = 2; + optional int64 timeoutNanos = 3; } diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index d9df80b752..c2cef062e8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -5,9 +5,9 @@ package akka.persistence.fsm import akka.actor._ -import akka.persistence.fsm.PersistentFSM.{ FSMState } +import akka.persistence.fsm.PersistentFSM.FSMState import akka.persistence.serialization.Message -import akka.persistence.{ PersistentActor, RecoveryCompleted } +import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer } import scala.annotation.varargs import scala.collection.immutable @@ -48,6 +48,11 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent */ lazy val statesMap: Map[String, S] = stateNames.map(name ⇒ (name.identifier, name)).toMap + /** + * Timeout set for the current state. Used when saving a snapshot + */ + private var currentStateTimeout: Option[FiniteDuration] = None + /** * Override this handler to define the action on Domain Event * @@ -62,6 +67,13 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent */ def onRecoveryCompleted(): Unit = {} + /** + * Save the current state as a snapshot + */ + final def saveStateSnapshot(): Unit = { + saveSnapshot(PersistentFSMSnapshot(stateName.identifier, stateData, currentStateTimeout)) + } + /** * After recovery events are handled as in usual FSM actor */ @@ -75,6 +87,7 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent override def receiveRecover: Receive = { case domainEventTag(event) ⇒ startWith(stateName, applyEvent(event, stateData)) case StateChangeEvent(stateIdentifier, timeout) ⇒ startWith(statesMap(stateIdentifier), stateData, timeout) + case SnapshotOffer(_, PersistentFSMSnapshot(stateIdentifier, data: D, timeout)) => startWith(statesMap(stateIdentifier), data, timeout) case RecoveryCompleted ⇒ initialize() onRecoveryCompleted() @@ -103,6 +116,7 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent handlersExecutedCounter += 1 if (handlersExecutedCounter == eventsToPersist.size) { super.applyState(nextState using nextData) + currentStateTimeout = nextState.timeout nextState.afterTransitionDo(stateData) } } @@ -132,6 +146,16 @@ object PersistentFSM { */ private[persistence] case class StateChangeEvent(stateIdentifier: String, timeout: Option[FiniteDuration]) extends PersistentFsmEvent + /** + * FSM state and data snapshot + * + * @param stateIdentifier FSM state identifier + * @param data FSM state data + * @param timeout FSM state timeout + * @tparam D state data type + */ + private[persistence] case class PersistentFSMSnapshot[D](stateIdentifier: String, data: D, timeout: Option[FiniteDuration]) extends Message + /** * FSMState base trait, makes possible for simple default serialization by conversion to String */ diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 8f64af969f..ba438f7d74 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -7,7 +7,7 @@ package akka.persistence.serialization import akka.actor.{ ActorPath, ExtendedActorSystem } import akka.persistence.AtLeastOnceDelivery._ import akka.persistence._ -import akka.persistence.fsm.PersistentFSM.StateChangeEvent +import akka.persistence.fsm.PersistentFSM.{ PersistentFSMSnapshot, StateChangeEvent } import akka.persistence.serialization.{ MessageFormats ⇒ mf } import akka.serialization._ import akka.protobuf._ @@ -33,6 +33,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val PersistentImplClass = classOf[PersistentImpl] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnapshot] val PersistentStateChangeEventClass = classOf[StateChangeEvent] + val PersistentFSMSnapshotClass = classOf[PersistentFSMSnapshot[Any]] private lazy val serialization = SerializationExtension(system) @@ -53,6 +54,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer case a: AtomicWrite ⇒ atomicWriteBuilder(a).build().toByteArray case a: AtLeastOnceDeliverySnapshot ⇒ atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray case s: StateChangeEvent ⇒ stateChangeBuilder(s).build.toByteArray + case p: PersistentFSMSnapshot[Any] ⇒ persistentFSMSnapshotBuilder(p).build.toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } @@ -68,6 +70,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer case AtomicWriteClass ⇒ atomicWrite(mf.AtomicWrite.parseFrom(bytes)) case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(mf.AtLeastOnceDeliverySnapshot.parseFrom(bytes)) case PersistentStateChangeEventClass ⇒ stateChange(mf.PersistentStateChangeEvent.parseFrom(bytes)) + case PersistentFSMSnapshotClass ⇒ persistentFSMSnapshot(mf.PersistentFSMSnapshot.parseFrom(bytes)) case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") } } @@ -94,7 +97,17 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val builder = mf.PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier) stateChange.timeout match { case None ⇒ builder - case Some(timeout) ⇒ builder.setTimeout(timeout.toString()) + case Some(timeout) ⇒ builder.setTimeoutNanos(timeout.toNanos) + } + } + + private[persistence] def persistentFSMSnapshotBuilder(persistentFSMSnapshot: PersistentFSMSnapshot[Any]): mf.PersistentFSMSnapshot.Builder = { + val builder = mf.PersistentFSMSnapshot.newBuilder + .setStateIdentifier(persistentFSMSnapshot.stateIdentifier) + .setData(persistentPayloadBuilder(persistentFSMSnapshot.data.asInstanceOf[AnyRef])) + persistentFSMSnapshot.timeout match { + case None ⇒ builder + case Some(timeout) ⇒ builder.setTimeoutNanos(timeout.toNanos) } } @@ -114,7 +127,17 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer def stateChange(persistentStateChange: mf.PersistentStateChangeEvent): StateChangeEvent = { StateChangeEvent( persistentStateChange.getStateIdentifier, - if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None) + // timeout field is deprecated, left for backward compatibility. timeoutNanos is used instead. + if (persistentStateChange.hasTimeoutNanos) Some(Duration.fromNanos(persistentStateChange.getTimeoutNanos)) + else if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) + else None) + } + + def persistentFSMSnapshot(persistentFSMSnapshot: mf.PersistentFSMSnapshot): PersistentFSMSnapshot[Any] = { + PersistentFSMSnapshot( + persistentFSMSnapshot.getStateIdentifier, + payload(persistentFSMSnapshot.getData), + if (persistentFSMSnapshot.hasTimeoutNanos) Some(Duration.fromNanos(persistentFSMSnapshot.getTimeoutNanos)) else None) } private def atomicWriteBuilder(a: AtomicWrite) = { diff --git a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java index bac149393d..adeab19ddc 100644 --- a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java +++ b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java @@ -532,17 +532,25 @@ public class AbstractPersistentFSMTest extends JUnitSuite { stay().applying(new ItemAdded(event.getItem())) .forMax(Duration.create(1, TimeUnit.SECONDS))) .event(Buy.class, + //#customer-andthen-example (event, data) -> goTo(UserState.PAID).applying(OrderExecuted.INSTANCE) - .andThen(exec(cart -> - reportActor.tell(new PurchaseWasMade(cart.getItems()), self())) - )) + .andThen(exec(cart -> { + reportActor.tell(new PurchaseWasMade(cart.getItems()), self()); + //#customer-andthen-example + saveStateSnapshot(); + //#customer-andthen-example + }))) + //#customer-andthen-example .event(Leave.class, + //#customer-snapshot-example (event, data) -> stop().applying(OrderDiscarded.INSTANCE) - .andThen(exec(cart -> - reportActor.tell(ShoppingCardDiscarded.INSTANCE, self()) - ))) + .andThen(exec(cart -> { + reportActor.tell(ShoppingCardDiscarded.INSTANCE, self()); + saveStateSnapshot(); + }))) + //#customer-snapshot-example .event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(StateTimeout$.class, (event, data) -> diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index a717ccadc1..3234d0744a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -5,7 +5,7 @@ package akka.persistence.fsm import akka.actor._ -import akka.persistence.{ PersistentActor, RecoveryCompleted, PersistenceSpec } +import akka.persistence._ import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ import com.typesafe.config.Config @@ -248,9 +248,6 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) fsmRef ! GetCurrentCart fsmRef ! AddItem(coat) fsmRef ! GetCurrentCart - fsmRef ! Buy - fsmRef ! GetCurrentCart - fsmRef ! Leave expectMsg(EmptyShoppingCart) @@ -258,8 +255,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) - expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) - + fsmRef ! PoisonPill expectTerminated(fsmRef) val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) @@ -273,15 +269,67 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) expectMsg(ItemAdded(Item("3", "Coat", 119.99F))) expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted - expectMsg(OrderExecuted) - expectMsgType[StateChangeEvent] + watch(persistentEventsStreamer) + persistentEventsStreamer ! PoisonPill + expectTerminated(persistentEventsStreamer) + } + + "persist snapshot" in { + val persistenceId = name + + val fsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) + watch(fsmRef) + + val shirt = Item("1", "Shirt", 59.99F) + val shoes = Item("2", "Shoes", 89.99F) + val coat = Item("3", "Coat", 119.99F) + + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shirt) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(shoes) + fsmRef ! GetCurrentCart + fsmRef ! AddItem(coat) + fsmRef ! GetCurrentCart + fsmRef ! Buy + fsmRef ! GetCurrentCart + + expectMsg(EmptyShoppingCart) + + expectMsg(NonEmptyShoppingCart(List(shirt))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes))) + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + expectNoMsg(1 second) + + fsmRef ! PoisonPill + expectTerminated(fsmRef) + + //Check that PersistentFSM recovers in the correct state + val recoveredFsmRef = system.actorOf(WebStoreCustomerFSM.props(persistenceId, dummyReportActorRef)) + recoveredFsmRef ! GetCurrentCart + expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) + + watch(recoveredFsmRef) + recoveredFsmRef ! PoisonPill + expectTerminated(recoveredFsmRef) + + //Check that PersistentFSM uses snapshot during recovery + val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) + + expectMsgPF() { + case SnapshotOffer(SnapshotMetadata(name, _, timestamp), PersistentFSMSnapshot(stateIdentifier, cart, None)) ⇒ + stateIdentifier should ===(Paid.identifier) + cart should ===(NonEmptyShoppingCart(List(shirt, shoes, coat))) + timestamp should be > 0L + } watch(persistentEventsStreamer) persistentEventsStreamer ! PoisonPill expectTerminated(persistentEventsStreamer) } } - } object PersistentFSMSpec { @@ -379,14 +427,24 @@ object PersistentFSMSpec { case Event(AddItem(item), _) ⇒ stay applying ItemAdded(item) forMax (1 seconds) case Event(Buy, _) ⇒ + //#customer-andthen-example goto(Paid) applying OrderExecuted andThen { - case NonEmptyShoppingCart(items) ⇒ reportActor ! PurchaseWasMade(items) - case EmptyShoppingCart ⇒ // do nothing... + case NonEmptyShoppingCart(items) ⇒ + reportActor ! PurchaseWasMade(items) + //#customer-andthen-example + saveStateSnapshot() + case EmptyShoppingCart ⇒ saveStateSnapshot() + //#customer-andthen-example } + //#customer-andthen-example case Event(Leave, _) ⇒ + //#customer-snapshot-example stop applying OrderDiscarded andThen { - case _ ⇒ reportActor ! ShoppingCardDiscarded + case _ ⇒ + reportActor ! ShoppingCardDiscarded + saveStateSnapshot() } + //#customer-snapshot-example case Event(GetCurrentCart, data) ⇒ stay replying data case Event(StateTimeout, _) ⇒ diff --git a/project/MiMa.scala b/project/MiMa.scala index 083620d8d1..077be3133b 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -780,7 +780,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.engine.parsing.HttpMessageParser.stage"), // #20131 - flow combinator - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.backpressureTimeout") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.backpressureTimeout"), + + // #20257 Snapshots with PersistentFSM (experimental feature) + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.serialization.MessageFormats#PersistentStateChangeEventOrBuilder.getTimeoutNanos"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.serialization.MessageFormats#PersistentStateChangeEventOrBuilder.hasTimeoutNanos"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.saveStateSnapshot"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout_=") ) ) }