diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
index ced5725d11..227f148965 100644
--- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
+++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
@@ -14765,6 +14765,529 @@ public final class ReplicatorMessages {
// @@protoc_insertion_point(class_scope:akka.cluster.ddata.StringGSet)
}
+ public interface DurableDataEnvelopeOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // required .akka.cluster.ddata.OtherMessage data = 1;
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ boolean hasData();
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getData();
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder();
+ }
+ /**
+ * Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope}
+ */
+ public static final class DurableDataEnvelope extends
+ akka.protobuf.GeneratedMessage
+ implements DurableDataEnvelopeOrBuilder {
+ // Use DurableDataEnvelope.newBuilder() to construct.
+ private DurableDataEnvelope(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private DurableDataEnvelope(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final DurableDataEnvelope defaultInstance;
+ public static DurableDataEnvelope getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public DurableDataEnvelope getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private DurableDataEnvelope(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder subBuilder = null;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ subBuilder = data_.toBuilder();
+ }
+ data_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.PARSER, extensionRegistry);
+ if (subBuilder != null) {
+ subBuilder.mergeFrom(data_);
+ data_ = subBuilder.buildPartial();
+ }
+ bitField0_ |= 0x00000001;
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public DurableDataEnvelope parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new DurableDataEnvelope(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required .akka.cluster.ddata.OtherMessage data = 1;
+ public static final int DATA_FIELD_NUMBER = 1;
+ private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage data_;
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public boolean hasData() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getData() {
+ return data_;
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder() {
+ return data_;
+ }
+
+ private void initFields() {
+ data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasData()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getData().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeMessage(1, data_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(1, data_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelopeOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.Builder.class);
+ }
+
+ // Construct using akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getDataFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ if (dataBuilder_ == null) {
+ data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
+ } else {
+ dataBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope getDefaultInstanceForType() {
+ return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.getDefaultInstance();
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope build() {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope buildPartial() {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope result = new akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ if (dataBuilder_ == null) {
+ result.data_ = data_;
+ } else {
+ result.data_ = dataBuilder_.build();
+ }
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope) {
+ return mergeFrom((akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope other) {
+ if (other == akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.getDefaultInstance()) return this;
+ if (other.hasData()) {
+ mergeData(other.getData());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasData()) {
+
+ return false;
+ }
+ if (!getData().isInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required .akka.cluster.ddata.OtherMessage data = 1;
+ private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
+ private akka.protobuf.SingleFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder> dataBuilder_;
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public boolean hasData() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getData() {
+ if (dataBuilder_ == null) {
+ return data_;
+ } else {
+ return dataBuilder_.getMessage();
+ }
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public Builder setData(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value) {
+ if (dataBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ data_ = value;
+ onChanged();
+ } else {
+ dataBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public Builder setData(
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder builderForValue) {
+ if (dataBuilder_ == null) {
+ data_ = builderForValue.build();
+ onChanged();
+ } else {
+ dataBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public Builder mergeData(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value) {
+ if (dataBuilder_ == null) {
+ if (((bitField0_ & 0x00000001) == 0x00000001) &&
+ data_ != akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance()) {
+ data_ =
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.newBuilder(data_).mergeFrom(value).buildPartial();
+ } else {
+ data_ = value;
+ }
+ onChanged();
+ } else {
+ dataBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public Builder clearData() {
+ if (dataBuilder_ == null) {
+ data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
+ onChanged();
+ } else {
+ dataBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder getDataBuilder() {
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return getDataFieldBuilder().getBuilder();
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder() {
+ if (dataBuilder_ != null) {
+ return dataBuilder_.getMessageOrBuilder();
+ } else {
+ return data_;
+ }
+ }
+ /**
+ * required .akka.cluster.ddata.OtherMessage data = 1;
+ */
+ private akka.protobuf.SingleFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder>
+ getDataFieldBuilder() {
+ if (dataBuilder_ == null) {
+ dataBuilder_ = new akka.protobuf.SingleFieldBuilder<
+ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder>(
+ data_,
+ getParentForChildren(),
+ isClean());
+ data_ = null;
+ }
+ return dataBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DurableDataEnvelope)
+ }
+
+ static {
+ defaultInstance = new DurableDataEnvelope(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:akka.cluster.ddata.DurableDataEnvelope)
+ }
+
private static akka.protobuf.Descriptors.Descriptor
internal_static_akka_cluster_ddata_Get_descriptor;
private static
@@ -14870,6 +15393,11 @@ public final class ReplicatorMessages {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_akka_cluster_ddata_StringGSet_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable;
public static akka.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -14925,8 +15453,10 @@ public final class ReplicatorMessages {
"me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" +
"\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" +
"\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" +
- "GSet\022\020\n\010elements\030\001 \003(\tB#\n\037akka.cluster.d" +
- "data.protobuf.msgH\001"
+ "GSet\022\020\n\010elements\030\001 \003(\t\"E\n\023DurableDataEnv" +
+ "elope\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata" +
+ ".OtherMessageB#\n\037akka.cluster.ddata.prot" +
+ "obuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15059,6 +15589,12 @@ public final class ReplicatorMessages {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_StringGSet_descriptor,
new java.lang.String[] { "Elements", });
+ internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor =
+ getDescriptor().getMessageTypes().get(18);
+ internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor,
+ new java.lang.String[] { "Data", });
return null;
}
};
diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
index 9d3a93b68b..3cd82fac62 100644
--- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
+++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto
@@ -117,4 +117,6 @@ message StringGSet {
repeated string elements = 1;
}
-
+message DurableDataEnvelope {
+ required OtherMessage data = 1;
+}
diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf
index e0f59a79ab..5061039e87 100644
--- a/akka-distributed-data/src/main/resources/reference.conf
+++ b/akka-distributed-data/src/main/resources/reference.conf
@@ -46,6 +46,48 @@ akka.cluster.distributed-data {
# after this duration.
serializer-cache-time-to-live = 10s
+ durable {
+ # List of keys that are durable. Prefix matching is supported by using * at the
+ # end of a key.
+ keys = []
+
+ # Fully qualified class name of the durable store actor. It must be a subclass
+ # of akka.actor.Actor and handle the protocol defined in
+ # akka.cluster.ddata.DurableStore. The class must have a constructor with
+ # com.typesafe.config.Config parameter.
+ store-actor-class = akka.cluster.ddata.LmdbDurableStore
+
+ use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
+
+ pinned-store {
+ executor = thread-pool-executor
+ type = PinnedDispatcher
+ }
+
+ # Config for the LmdbDurableStore
+ lmdb {
+ # Directory of LMDB file. There are two options:
+ # 1. A relative or absolute path to a directory that ends with 'ddata'
+ # the full name of the directory will contain name of the ActorSystem
+ # and its remote port.
+ # 2. Otherwise the path is used as is, as a relative or absolute path to
+ # a directory.
+ dir = "ddata"
+
+ # Size in bytes of the memory mapped file.
+ map-size = 100 MiB
+
+ # Accumulate changes before storing improves performance with the
+ # risk of losing the last writes if the JVM crashes.
+ # The interval is by default set to 'off' to write each update immediately.
+ # Enabling write behind by specifying a duration, e.g. 200ms, is especially
+ # efficient when performing many writes to the same key, because it is only
+ # the last value for each key that will be serialized and stored.
+ # write-behind-interval = 200 ms
+ write-behind-interval = off
+ }
+ }
+
}
#//#distributed-data
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala
new file mode 100644
index 0000000000..6bdcec14cf
--- /dev/null
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DurableStore.scala
@@ -0,0 +1,264 @@
+/**
+ * Copyright (C) 2016 Lightbend Inc.
+ */
+package akka.cluster.ddata
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.util.Try
+import scala.util.control.NonFatal
+
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.DeadLetterSuppression
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.cluster.ddata.Replicator.ReplicatorMessage
+import akka.io.DirectByteBufferPool
+import akka.serialization.SerializationExtension
+import akka.serialization.SerializerWithStringManifest
+import akka.util.ByteString
+import akka.util.OptionVal
+import com.typesafe.config.Config
+import org.lmdbjava.DbiFlags
+import org.lmdbjava.Env
+import org.lmdbjava.EnvFlags
+import org.lmdbjava.Txn
+
+/**
+ * An actor implementing the durable store for the Distributed Data `Replicator`
+ * has to implement the protocol with the messages defined here.
+ *
+ * At startup the `Replicator` creates the durable store actor and sends the
+ * `Load` message to it. It must then reply with 0 or more `LoadData` messages
+ * followed by one `LoadAllCompleted` message to the `sender` (the `Replicator`).
+ *
+ * If the `LoadAll` fails it can throw `LoadFailed` and the `Replicator` supervisor
+ * will stop itself and the durable store.
+ *
+ * When the `Replicator` needs to store a value it sends a `Store` message
+ * to the durable store actor, which must then reply with the `successMsg` or
+ * `failureMsg` to the `replyTo`.
+ */
+object DurableStore {
+
+ /**
+ * Request to store an entry. It optionally contains a `StoreReply`, which
+ * should be used to signal success or failure of the operation to the contained
+ * `replyTo` actor.
+ */
+ final case class Store(key: String, data: ReplicatedData, reply: Option[StoreReply])
+ final case class StoreReply(successMsg: Any, failureMsg: Any, replyTo: ActorRef)
+
+ /**
+ * Request to load all entries.
+ *
+ * It must reply with 0 or more `LoadData` messages
+ * followed by one `LoadAllCompleted` message to the `sender` (the `Replicator`).
+ *
+ * If the `LoadAll` fails it can throw `LoadFailed` and the `Replicator` supervisor
+ * will stop itself and the durable store.
+ */
+ case object LoadAll
+ final case class LoadData(data: Map[String, ReplicatedData])
+ case object LoadAllCompleted
+ class LoadFailed(message: String, cause: Throwable) extends RuntimeException(message) {
+ def this(message: String) = this(message, null)
+ }
+
+ /**
+ * Wrapper class for serialization of a data value.
+ * The `ReplicatorMessageSerializer` will serialize/deserialize
+ * the wrapped `ReplicatedData` including its serializerId and
+ * manifest.
+ */
+ final class DurableDataEnvelope(val data: ReplicatedData) extends ReplicatorMessage {
+ override def toString(): String = s"DurableDataEnvelope($data)"
+ override def hashCode(): Int = data.hashCode
+ override def equals(o: Any): Boolean = o match {
+ case other: DurableDataEnvelope ⇒ data == other.data
+ case _ ⇒ false
+ }
+ }
+}
+
+object LmdbDurableStore {
+ def props(config: Config): Props =
+ Props(new LmdbDurableStore(config))
+
+ private case object WriteBehind extends DeadLetterSuppression
+}
+
+final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
+ import DurableStore._
+ import LmdbDurableStore.WriteBehind
+
+ val serialization = SerializationExtension(context.system)
+ val serializer = serialization.serializerFor(classOf[DurableDataEnvelope]).asInstanceOf[SerializerWithStringManifest]
+ val manifest = serializer.manifest(new DurableDataEnvelope(Replicator.Internal.DeletedData))
+
+ val writeBehindInterval = config.getString("lmdb.write-behind-interval").toLowerCase match {
+ case "off" ⇒ Duration.Zero
+ case _ ⇒ config.getDuration("lmdb.write-behind-interval", MILLISECONDS).millis
+ }
+
+ val env: Env[ByteBuffer] = {
+ val mapSize = config.getBytes("lmdb.map-size")
+ val dir = config.getString("lmdb.dir") match {
+ case path if path.endsWith("ddata") ⇒
+ new File(s"$path-${context.system.name}-${self.path.parent.name}-${Cluster(context.system).selfAddress.port.get}")
+ case path ⇒
+ new File(path)
+ }
+ dir.mkdirs()
+ Env.create()
+ .setMapSize(mapSize)
+ .setMaxDbs(1)
+ .open(dir, EnvFlags.MDB_NOLOCK)
+ }
+
+ val db = env.openDbi("ddata", DbiFlags.MDB_CREATE)
+
+ val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize)
+ var valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow when needed
+
+ def ensureValueBufferSize(size: Int): Unit = {
+ if (valueBuffer.remaining < size) {
+ DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer)
+ valueBuffer = ByteBuffer.allocateDirect(size * 2)
+ }
+ }
+
+ // pending write behind
+ val pending = new java.util.HashMap[String, ReplicatedData]
+
+ override def postRestart(reason: Throwable): Unit = {
+ super.postRestart(reason)
+ // Load is only done on first start, not on restart
+ context.become(active)
+ }
+
+ override def postStop(): Unit = {
+ super.postStop()
+ writeBehind()
+ Try(db.close())
+ Try(env.close())
+ DirectByteBufferPool.tryCleanDirectByteBuffer(keyBuffer)
+ DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer)
+ }
+
+ def receive = init
+
+ def init: Receive = {
+ case LoadAll ⇒
+ val t0 = System.nanoTime()
+ val tx = env.txnRead()
+ try {
+ val iter = db.iterate(tx)
+ try {
+ var n = 0
+ val loadData = LoadData(iter.asScala.map { entry ⇒
+ n += 1
+ val keyArray = Array.ofDim[Byte](entry.key.remaining)
+ entry.key.get(keyArray)
+ val key = new String(keyArray, ByteString.UTF_8)
+ val valArray = Array.ofDim[Byte](entry.`val`.remaining)
+ entry.`val`.get(valArray)
+ val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope]
+ key → envelope.data
+ }.toMap)
+ if (loadData.data.nonEmpty)
+ sender() ! loadData
+ sender() ! LoadAllCompleted
+ if (log.isDebugEnabled)
+ log.debug("load all of [{}] entries took [{} ms]", n,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+ context.become(active)
+ } finally {
+ Try(iter.close())
+ }
+ } catch {
+ case NonFatal(e) ⇒
+ throw new LoadFailed("failed to load durable distributed-data", e)
+ } finally {
+ Try(tx.close())
+ }
+ }
+
+ def active: Receive = {
+ case Store(key, data, reply) ⇒
+ try {
+ if (writeBehindInterval.length == 0) {
+ dbPut(OptionVal.None, key, data)
+ } else {
+ if (pending.isEmpty)
+ context.system.scheduler.scheduleOnce(writeBehindInterval, self, WriteBehind)(context.system.dispatcher)
+ pending.put(key, data)
+ }
+ reply match {
+ case Some(StoreReply(successMsg, _, replyTo)) ⇒
+ replyTo ! successMsg
+ case None ⇒
+ }
+ } catch {
+ case NonFatal(e) ⇒
+ log.error(e, "failed to store [{}]", key)
+ reply match {
+ case Some(StoreReply(_, failureMsg, replyTo)) ⇒
+ replyTo ! failureMsg
+ case None ⇒
+ }
+ }
+
+ case WriteBehind ⇒
+ writeBehind()
+ }
+
+ def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: ReplicatedData): Unit = {
+ try {
+ keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
+ val value = serializer.toBinary(new DurableDataEnvelope(data))
+ ensureValueBufferSize(value.length)
+ valueBuffer.put(value).flip()
+ tx match {
+ case OptionVal.None ⇒ db.put(keyBuffer, valueBuffer)
+ case OptionVal.Some(t) ⇒ db.put(t, keyBuffer, valueBuffer)
+ }
+ } finally {
+ keyBuffer.clear()
+ valueBuffer.clear()
+ }
+ }
+
+ def writeBehind(): Unit = {
+ if (!pending.isEmpty()) {
+ val t0 = System.nanoTime()
+ val tx = env.txnWrite()
+ try {
+ val iter = pending.entrySet.iterator
+ while (iter.hasNext) {
+ val entry = iter.next()
+ dbPut(OptionVal.Some(tx), entry.getKey, entry.getValue)
+ }
+ tx.commit()
+ if (log.isDebugEnabled)
+ log.debug("store and commit of [{}] entries took [{} ms]", pending.size,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+ } catch {
+ case NonFatal(e) ⇒
+ import scala.collection.JavaConverters._
+ log.error(e, "failed to store [{}]", pending.keySet.asScala.mkString(","))
+ tx.abort()
+ } finally {
+ pending.clear()
+ }
+ }
+ }
+
+}
+
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala
index b94cf3eedc..62ec07a46d 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala
@@ -24,6 +24,7 @@ private[akka] final case class PruningState(owner: UniqueAddress, phase: Pruning
def merge(that: PruningState): PruningState =
(this.phase, that.phase) match {
+ // FIXME this will add the PruningPerformed back again when one is None
case (PruningPerformed, _) ⇒ this
case (_, PruningPerformed) ⇒ that
case (PruningInitialized(thisSeen), PruningInitialized(thatSeen)) ⇒
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
index 74a4d43553..5490e17f96 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala
@@ -4,7 +4,9 @@
package akka.cluster.ddata
import java.security.MessageDigest
+import scala.annotation.tailrec
import scala.collection.immutable
+import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
@@ -37,6 +39,11 @@ import akka.dispatch.Dispatchers
import akka.actor.DeadLetterSuppression
import akka.cluster.ddata.Key.KeyR
import java.util.Optional
+import akka.cluster.ddata.DurableStore._
+import akka.actor.ExtendedActorSystem
+import akka.actor.SupervisorStrategy
+import akka.actor.OneForOneStrategy
+import akka.actor.ActorInitializationException
object ReplicatorSettings {
@@ -56,6 +63,8 @@ object ReplicatorSettings {
case "" ⇒ Dispatchers.DefaultDispatcherId
case id ⇒ id
}
+
+ import scala.collection.JavaConverters._
new ReplicatorSettings(
role = roleOption(config.getString("role")),
gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis,
@@ -63,7 +72,9 @@ object ReplicatorSettings {
maxDeltaElements = config.getInt("max-delta-elements"),
dispatcher = dispatcher,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
- maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis)
+ maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis,
+ durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))),
+ durableKeys = config.getStringList("durable.keys").asScala.toSet)
}
/**
@@ -91,6 +102,13 @@ object ReplicatorSettings {
* completing the pruning process of data associated with removed cluster nodes.
* The time measurement is stopped when any replica is unreachable, so it should
* be configured to worst case in a healthy cluster.
+ * @param durableStoreProps Props for the durable store actor,
+ * the `Left` alternative is a tuple of fully qualified actor class name and
+ * the config constructor parameter of that class,
+ * the `Right` alternative is the `Props` of the actor.
+ * @param durableKeys Keys that are durable. Prefix matching is supported by using
+ * `*` at the end of a key. All entries can be made durable by including "*"
+ * in the `Set`.
*/
final class ReplicatorSettings(
val role: Option[String],
@@ -99,7 +117,15 @@ final class ReplicatorSettings(
val maxDeltaElements: Int,
val dispatcher: String,
val pruningInterval: FiniteDuration,
- val maxPruningDissemination: FiniteDuration) {
+ val maxPruningDissemination: FiniteDuration,
+ val durableStoreProps: Either[(String, Config), Props],
+ val durableKeys: Set[String]) {
+
+ // For backwards compatibility
+ def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
+ maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
+ this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
+ maxPruningDissemination, Right(Props.empty), Set.empty)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@@ -125,16 +151,35 @@ final class ReplicatorSettings(
def withPruning(pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration): ReplicatorSettings =
copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination)
+ def withDurableStoreProps(durableStoreProps: Props): ReplicatorSettings =
+ copy(durableStoreProps = Right(durableStoreProps))
+
+ /**
+ * Scala API
+ */
+ def withDurableKeys(durableKeys: Set[String]): ReplicatorSettings =
+ copy(durableKeys = durableKeys)
+
+ /**
+ * Java API
+ */
+ def withDurableKeys(durableKeys: java.util.Set[String]): ReplicatorSettings = {
+ import scala.collection.JavaConverters._
+ withDurableKeys(durableKeys.asScala.toSet)
+ }
+
private def copy(
- role: Option[String] = role,
- gossipInterval: FiniteDuration = gossipInterval,
- notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
- maxDeltaElements: Int = maxDeltaElements,
- dispatcher: String = dispatcher,
- pruningInterval: FiniteDuration = pruningInterval,
- maxPruningDissemination: FiniteDuration = maxPruningDissemination): ReplicatorSettings =
+ role: Option[String] = role,
+ gossipInterval: FiniteDuration = gossipInterval,
+ notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
+ maxDeltaElements: Int = maxDeltaElements,
+ dispatcher: String = dispatcher,
+ pruningInterval: FiniteDuration = pruningInterval,
+ maxPruningDissemination: FiniteDuration = maxPruningDissemination,
+ durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
+ durableKeys: Set[String] = durableKeys): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
- pruningInterval, maxPruningDissemination)
+ pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys)
}
object Replicator {
@@ -142,8 +187,12 @@ object Replicator {
/**
* Factory method for the [[akka.actor.Props]] of the [[Replicator]] actor.
*/
- def props(settings: ReplicatorSettings): Props =
+ def props(settings: ReplicatorSettings): Props = {
+ require(
+ settings.durableKeys.isEmpty || (settings.durableStoreProps != Right(Props.empty)),
+ "durableStoreProps must be defined when durableKeys are defined")
Props(new Replicator(settings)).withDeploy(Deploy.local).withDispatcher(settings.dispatcher)
+ }
sealed trait ReadConsistency {
def timeout: FiniteDuration
@@ -400,6 +449,17 @@ object Replicator {
extends UpdateFailure[A] {
override def toString: String = s"ModifyFailure [$key]: $errorMessage"
}
+ /**
+ * The local store or direct replication of the [[Update]] could not be fulfill according to
+ * the given [[WriteConsistency consistency level]] due to durable store errors. This is
+ * only used for entries that have been configured to be durable.
+ *
+ * The `Update` was still performed in memory locally and possibly replicated to some nodes,
+ * but it might not have been written to durable storage.
+ * It will eventually be disseminated to other replicas, unless the local replica
+ * crashes before it has been able to communicate with other replicas.
+ */
+ final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A] with DeleteResponse[A]
/**
* Send this message to the local `Replicator` to delete a data value for the
@@ -460,6 +520,7 @@ object Replicator {
case object ClockTick
final case class Write(key: String, envelope: DataEnvelope) extends ReplicatorMessage
case object WriteAck extends ReplicatorMessage with DeadLetterSuppression
+ case object WriteNack extends ReplicatorMessage with DeadLetterSuppression
final case class Read(key: String) extends ReplicatorMessage
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: String, envelope: DataEnvelope)
@@ -507,7 +568,8 @@ object Replicator {
var mergedRemovedNodePruning = other.pruning
for ((key, thisValue) ← pruning) {
mergedRemovedNodePruning.get(key) match {
- case None ⇒ mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue)
+ case None ⇒
+ mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue)
case Some(thatValue) ⇒
mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue merge thatValue)
}
@@ -751,6 +813,21 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val serializer = SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
val maxPruningDisseminationNanos = maxPruningDissemination.toNanos
+ val hasDurableKeys = settings.durableKeys.nonEmpty
+ val durable = settings.durableKeys.filterNot(_.endsWith("*"))
+ val durableWildcards = settings.durableKeys.collect { case k if k.endsWith("*") ⇒ k.dropRight(1) }
+ val durableStore: ActorRef =
+ if (hasDurableKeys) {
+ val props = settings.durableStoreProps match {
+ case Right(p) ⇒ p
+ case Left((s, c)) ⇒
+ val clazz = context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.getClassFor[Actor](s).get
+ Props(clazz, c).withDispatcher(c.getString("use-dispatcher"))
+ }
+ context.watch(context.actorOf(props.withDeploy(Deploy.local), "durableStore"))
+ } else
+ context.system.deadLetters // not used
+
// cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty
@@ -784,6 +861,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
var subscriptionKeys = Map.empty[String, KeyR]
override def preStart(): Unit = {
+ if (hasDurableKeys)
+ durableStore ! LoadAll
val leaderChangedClass = if (role.isDefined) classOf[RoleLeaderChanged] else classOf[LeaderChanged]
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[ReachabilityEvent], leaderChangedClass)
@@ -799,7 +878,47 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
- def receive = normalReceive
+ override val supervisorStrategy = {
+ def fromDurableStore: Boolean = sender() == durableStore && sender() != context.system.deadLetters
+ OneForOneStrategy()(
+ ({
+ case e @ (_: DurableStore.LoadFailed | _: ActorInitializationException) if fromDurableStore ⇒
+ log.error(e, "Stopping distributed-data Replicator due to load or startup failure in durable store")
+ context.stop(self)
+ SupervisorStrategy.Stop
+ }: SupervisorStrategy.Decider).orElse(SupervisorStrategy.defaultDecider))
+ }
+
+ def receive =
+ if (hasDurableKeys) load.orElse(normalReceive)
+ else normalReceive
+
+ val load: Receive = {
+ case LoadData(data) ⇒
+ data.foreach {
+ case (key, d) ⇒
+ val envelope = DataEnvelope(d)
+ write(key, envelope) match {
+ case Some(newEnvelope) ⇒
+ if (newEnvelope.data ne envelope.data)
+ durableStore ! Store(key, newEnvelope.data, None)
+ case None ⇒
+ }
+ }
+ case LoadAllCompleted ⇒
+ context.become(normalReceive)
+ self ! FlushChanges
+
+ case GetReplicaCount ⇒
+ // 0 until durable data has been loaded, used by test
+ sender() ! ReplicaCount(0)
+
+ case RemovedNodePruningTick | FlushChanges | GossipTick ⇒
+ // ignore scheduled ticks when loading durable data
+ case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒
+ // ignore gossip and replication when loading durable data
+ log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
+ }
val normalReceive: Receive = {
case Get(key, consistency, req) ⇒ receiveGet(key, consistency, req)
@@ -872,11 +991,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, newData)
val envelope = DataEnvelope(pruningCleanupTombstoned(newData))
setData(key.id, envelope)
- if (isLocalUpdate(writeConsistency))
- sender() ! UpdateSuccess(key, req)
- else
- context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, sender())
- .withDispatcher(context.props.dispatcher))
+ val durable = isDurable(key.id)
+ if (isLocalUpdate(writeConsistency)) {
+ if (durable)
+ durableStore ! Store(key.id, envelope.data,
+ Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), sender())))
+ else
+ sender() ! UpdateSuccess(key, req)
+ } else {
+ val writeAggregator =
+ context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, sender(), durable)
+ .withDispatcher(context.props.dispatcher))
+ if (durable) {
+ durableStore ! Store(key.id, envelope.data,
+ Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
+ }
+ }
case Failure(e: DataDeleted[_]) ⇒
log.debug("Received Update for deleted key [{}]", key)
sender() ! e
@@ -886,6 +1016,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
+ def isDurable(key: String): Boolean =
+ durable(key) || (durableWildcards.nonEmpty && durableWildcards.exists(key.startsWith))
+
def isLocalUpdate(writeConsistency: WriteConsistency): Boolean =
writeConsistency match {
case WriteLocal ⇒ true
@@ -894,28 +1027,43 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveWrite(key: String, envelope: DataEnvelope): Unit = {
- write(key, envelope)
- sender() ! WriteAck
+ write(key, envelope) match {
+ case Some(newEnvelope) ⇒
+ if (isDurable(key))
+ durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, sender())))
+ else
+ sender() ! WriteAck
+ case None ⇒
+ }
}
- def write(key: String, writeEnvelope: DataEnvelope): Unit =
+ def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope] =
getData(key) match {
- case Some(DataEnvelope(DeletedData, _)) ⇒ // already deleted
+ case Some(DataEnvelope(DeletedData, _)) ⇒ Some(writeEnvelope) // already deleted
case Some(envelope @ DataEnvelope(existing, _)) ⇒
if (existing.getClass == writeEnvelope.data.getClass || writeEnvelope.data == DeletedData) {
val merged = envelope.merge(pruningCleanupTombstoned(writeEnvelope)).addSeen(selfAddress)
setData(key, merged)
+ Some(merged)
} else {
log.warning(
"Wrong type for writing [{}], existing type [{}], got [{}]",
key, existing.getClass.getName, writeEnvelope.data.getClass.getName)
+ None
}
case None ⇒
- setData(key, pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress))
+ val cleaned = pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress)
+ setData(key, cleaned)
+ Some(cleaned)
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
- write(key, writeEnvelope)
+ write(key, writeEnvelope) match {
+ case Some(newEnvelope) ⇒
+ if (isDurable(key))
+ durableStore ! Store(key, newEnvelope.data, None)
+ case None ⇒
+ }
sender() ! ReadRepairAck
}
@@ -933,11 +1081,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
sender() ! DataDeleted(key)
case _ ⇒
setData(key.id, DeletedEnvelope)
- if (isLocalUpdate(consistency))
- sender() ! DeleteSuccess(key)
- else
- context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, sender())
- .withDispatcher(context.props.dispatcher))
+ val durable = isDurable(key.id)
+ if (isLocalUpdate(consistency)) {
+ if (durable)
+ durableStore ! Store(key.id, DeletedData,
+ Some(StoreReply(DeleteSuccess(key), StoreFailure(key, None), sender())))
+ else
+ sender() ! DeleteSuccess(key)
+ } else {
+ val writeAggregator =
+ context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, sender(), durable)
+ .withDispatcher(context.props.dispatcher))
+ if (durable) {
+ durableStore ! Store(key.id, DeletedData,
+ Some(StoreReply(DeleteSuccess(key), StoreFailure(key, None), writeAggregator)))
+ }
+ }
}
}
@@ -1075,7 +1234,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
updatedData.foreach {
case (key, envelope) ⇒
val hadData = dataEntries.contains(key)
- write(key, envelope)
+ write(key, envelope) match {
+ case Some(newEnvelope) ⇒
+ if (isDurable(key))
+ durableStore ! Store(key, newEnvelope.data, None)
+ case None ⇒
+ }
if (sendBack) getData(key) match {
case Some(d) ⇒
if (hadData || d.pruning.nonEmpty)
@@ -1108,14 +1272,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
(newSubscribers.exists { case (k, s) ⇒ s.contains(subscriber) })
def receiveTerminated(ref: ActorRef): Unit = {
- val keys1 = subscribers.collect { case (k, s) if s.contains(ref) ⇒ k }
- keys1.foreach { key ⇒ subscribers.removeBinding(key, ref) }
- val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) ⇒ k }
- keys2.foreach { key ⇒ newSubscribers.removeBinding(key, ref) }
+ if (ref == durableStore) {
+ log.error("Stopping distributed-data Replicator because durable store terminated")
+ context.stop(self)
+ } else {
+ val keys1 = subscribers.collect { case (k, s) if s.contains(ref) ⇒ k }
+ keys1.foreach { key ⇒ subscribers.removeBinding(key, ref) }
+ val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) ⇒ k }
+ keys2.foreach { key ⇒ newSubscribers.removeBinding(key, ref) }
- (keys1 ++ keys2).foreach { key ⇒
- if (!subscribers.contains(key) && !newSubscribers.contains(key))
- subscriptionKeys -= key
+ (keys1 ++ keys2).foreach { key ⇒
+ if (!subscribers.contains(key) && !newSubscribers.contains(key))
+ subscriptionKeys -= key
+ }
}
}
@@ -1161,7 +1330,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
initRemovedNodePruning()
}
performRemovedNodePruning()
- tombstoneRemovedNodePruning()
+ // FIXME tombstoneRemovedNodePruning doesn't work, since merge of PruningState will add the PruningPerformed back again
+ // tombstoneRemovedNodePruning()
}
def initRemovedNodePruning(): Unit = {
@@ -1171,6 +1341,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}(collection.breakOut)
if (removedSet.nonEmpty) {
+ // FIXME handle pruning of durable data, this is difficult and requires more thought
for ((key, (envelope, _)) ← dataEntries; removed ← removedSet) {
def init(): Unit = {
@@ -1206,6 +1377,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
pruningPerformed = pruningPerformed.updated(removed, allReachableClockTime)
log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress)
setData(key, newEnvelope)
+ if ((newEnvelope.data ne data) && isDurable(key))
+ durableStore ! Store(key, newEnvelope.data, None)
case _ ⇒
}
case _ ⇒ // deleted, or pruning not needed
@@ -1225,6 +1398,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
+ // FIXME pruningPerformed is only updated on one node, but tombstoneNodes should be on all
pruningPerformed.foreach {
case (removed, timestamp) if ((allReachableClockTime - timestamp) > maxPruningDisseminationNanos) &&
allPruningPerformed(removed) ⇒
@@ -1234,7 +1408,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
tombstoneNodes += removed
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _)) ⇒
- setData(key, pruningCleanupTombstoned(removed, envelope))
+ val newEnvelope = pruningCleanupTombstoned(removed, envelope)
+ setData(key, newEnvelope)
+ if ((newEnvelope.data ne data) && isDurable(key))
+ durableStore ! Store(key, newEnvelope.data, None)
case _ ⇒ // deleted, or pruning not needed
}
case (removed, timestamp) ⇒ // not ready
@@ -1325,8 +1502,9 @@ private[akka] object WriteAggregator {
consistency: Replicator.WriteConsistency,
req: Option[Any],
nodes: Set[Address],
- replyTo: ActorRef): Props =
- Props(new WriteAggregator(key, envelope, consistency, req, nodes, replyTo))
+ replyTo: ActorRef,
+ durable: Boolean): Props =
+ Props(new WriteAggregator(key, envelope, consistency, req, nodes, replyTo, durable))
.withDeploy(Deploy.local)
}
@@ -1339,7 +1517,8 @@ private[akka] class WriteAggregator(
consistency: Replicator.WriteConsistency,
req: Option[Any],
override val nodes: Set[Address],
- replyTo: ActorRef) extends ReadWriteAggregator {
+ replyTo: ActorRef,
+ durable: Boolean) extends ReadWriteAggregator {
import Replicator._
import Replicator.Internal._
@@ -1355,41 +1534,65 @@ private[akka] class WriteAggregator(
val w = N / 2 + 1 // write to at least (N/2+1) nodes
N - w
case WriteLocal ⇒
- throw new IllegalArgumentException("ReadLocal not supported by WriteAggregator")
+ throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator")
}
val writeMsg = Write(key.id, envelope)
+ var gotLocalStoreReply = !durable
+ var gotWriteNackFrom = Set.empty[Address]
+
override def preStart(): Unit = {
primaryNodes.foreach { replica(_) ! writeMsg }
- if (remaining.size == doneWhenRemainingSize)
- reply(ok = true)
- else if (doneWhenRemainingSize < 0 || remaining.size < doneWhenRemainingSize)
- reply(ok = false)
+ if (isDone) reply(isTimeout = false)
}
- def receive = {
+ def receive: Receive = {
case WriteAck ⇒
remaining -= senderAddress()
- if (remaining.size == doneWhenRemainingSize)
- reply(ok = true)
+ if (isDone) reply(isTimeout = false)
+ case WriteNack ⇒
+ gotWriteNackFrom += senderAddress()
+ if (isDone) reply(isTimeout = false)
+
+ case _: Replicator.UpdateSuccess[_] ⇒
+ gotLocalStoreReply = true
+ if (isDone) reply(isTimeout = false)
+ case f: Replicator.StoreFailure[_] ⇒
+ gotLocalStoreReply = true
+ gotWriteNackFrom += Cluster(context.system).selfAddress
+ if (isDone) reply(isTimeout = false)
+
case SendToSecondary ⇒
secondaryNodes.foreach { replica(_) ! writeMsg }
- case ReceiveTimeout ⇒ reply(ok = false)
+ case ReceiveTimeout ⇒
+ reply(isTimeout = true)
}
def senderAddress(): Address = sender().path.address
- def reply(ok: Boolean): Unit = {
- if (ok && envelope.data == DeletedData)
- replyTo.tell(DeleteSuccess(key), context.parent)
- else if (ok)
- replyTo.tell(UpdateSuccess(key, req), context.parent)
- else if (envelope.data == DeletedData)
- replyTo.tell(ReplicationDeleteFailure(key), context.parent)
- else
- replyTo.tell(UpdateTimeout(key, req), context.parent)
+ def isDone: Boolean =
+ gotLocalStoreReply &&
+ (remaining.size <= doneWhenRemainingSize || (remaining diff gotWriteNackFrom).isEmpty ||
+ notEnoughNodes)
+
+ def notEnoughNodes: Boolean =
+ doneWhenRemainingSize < 0 || nodes.size < doneWhenRemainingSize
+
+ def reply(isTimeout: Boolean): Unit = {
+ val isDelete = envelope.data == DeletedData
+ val isSuccess = remaining.size <= doneWhenRemainingSize && !notEnoughNodes
+ val isTimeoutOrNotEnoughNodes = isTimeout || notEnoughNodes || gotWriteNackFrom.isEmpty
+
+ val replyMsg =
+ if (isSuccess && isDelete) DeleteSuccess(key)
+ else if (isSuccess) UpdateSuccess(key, req)
+ else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key)
+ else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req)
+ else StoreFailure(key, req)
+
+ replyTo.tell(replyMsg, context.parent)
context.stop(self)
}
}
diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
index 4df002af7b..356725d706 100644
--- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
+++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
@@ -25,6 +25,8 @@ import akka.cluster.ddata.Key.KeyR
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
+import akka.cluster.ddata.DurableStore.DurableDataEnvelope
+import akka.cluster.ddata.DurableStore.DurableDataEnvelope
/**
* INTERNAL API
@@ -167,6 +169,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
val ReadResultManifest = "L"
val StatusManifest = "M"
val GossipManifest = "N"
+ val WriteNackManifest = "O"
+ val DurableDataEnvelopeManifest = "P"
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
GetManifest → getFromBinary,
@@ -182,42 +186,48 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
ReadManifest → readFromBinary,
ReadResultManifest → readResultFromBinary,
StatusManifest → statusFromBinary,
- GossipManifest → gossipFromBinary)
+ GossipManifest → gossipFromBinary,
+ WriteNackManifest → (_ ⇒ WriteNack),
+ DurableDataEnvelopeManifest → durableDataEnvelopeFromBinary)
override def manifest(obj: AnyRef): String = obj match {
- case _: DataEnvelope ⇒ DataEnvelopeManifest
- case _: Write ⇒ WriteManifest
- case WriteAck ⇒ WriteAckManifest
- case _: Read ⇒ ReadManifest
- case _: ReadResult ⇒ ReadResultManifest
- case _: Status ⇒ StatusManifest
- case _: Get[_] ⇒ GetManifest
- case _: GetSuccess[_] ⇒ GetSuccessManifest
- case _: Changed[_] ⇒ ChangedManifest
- case _: NotFound[_] ⇒ NotFoundManifest
- case _: GetFailure[_] ⇒ GetFailureManifest
- case _: Subscribe[_] ⇒ SubscribeManifest
- case _: Unsubscribe[_] ⇒ UnsubscribeManifest
- case _: Gossip ⇒ GossipManifest
+ case _: DataEnvelope ⇒ DataEnvelopeManifest
+ case _: Write ⇒ WriteManifest
+ case WriteAck ⇒ WriteAckManifest
+ case _: Read ⇒ ReadManifest
+ case _: ReadResult ⇒ ReadResultManifest
+ case _: Status ⇒ StatusManifest
+ case _: Get[_] ⇒ GetManifest
+ case _: GetSuccess[_] ⇒ GetSuccessManifest
+ case _: DurableDataEnvelope ⇒ DurableDataEnvelopeManifest
+ case _: Changed[_] ⇒ ChangedManifest
+ case _: NotFound[_] ⇒ NotFoundManifest
+ case _: GetFailure[_] ⇒ GetFailureManifest
+ case _: Subscribe[_] ⇒ SubscribeManifest
+ case _: Unsubscribe[_] ⇒ UnsubscribeManifest
+ case _: Gossip ⇒ GossipManifest
+ case WriteNack ⇒ WriteNackManifest
case _ ⇒
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
def toBinary(obj: AnyRef): Array[Byte] = obj match {
- case m: DataEnvelope ⇒ dataEnvelopeToProto(m).toByteArray
- case m: Write ⇒ writeCache.getOrAdd(m)
- case WriteAck ⇒ writeAckBytes
- case m: Read ⇒ readCache.getOrAdd(m)
- case m: ReadResult ⇒ readResultToProto(m).toByteArray
- case m: Status ⇒ statusToProto(m).toByteArray
- case m: Get[_] ⇒ getToProto(m).toByteArray
- case m: GetSuccess[_] ⇒ getSuccessToProto(m).toByteArray
- case m: Changed[_] ⇒ changedToProto(m).toByteArray
- case m: NotFound[_] ⇒ notFoundToProto(m).toByteArray
- case m: GetFailure[_] ⇒ getFailureToProto(m).toByteArray
- case m: Subscribe[_] ⇒ subscribeToProto(m).toByteArray
- case m: Unsubscribe[_] ⇒ unsubscribeToProto(m).toByteArray
- case m: Gossip ⇒ compress(gossipToProto(m))
+ case m: DataEnvelope ⇒ dataEnvelopeToProto(m).toByteArray
+ case m: Write ⇒ writeCache.getOrAdd(m)
+ case WriteAck ⇒ writeAckBytes
+ case m: Read ⇒ readCache.getOrAdd(m)
+ case m: ReadResult ⇒ readResultToProto(m).toByteArray
+ case m: Status ⇒ statusToProto(m).toByteArray
+ case m: Get[_] ⇒ getToProto(m).toByteArray
+ case m: GetSuccess[_] ⇒ getSuccessToProto(m).toByteArray
+ case m: DurableDataEnvelope ⇒ durableDataEnvelopeToProto(m).toByteArray
+ case m: Changed[_] ⇒ changedToProto(m).toByteArray
+ case m: NotFound[_] ⇒ notFoundToProto(m).toByteArray
+ case m: GetFailure[_] ⇒ getFailureToProto(m).toByteArray
+ case m: Subscribe[_] ⇒ subscribeToProto(m).toByteArray
+ case m: Unsubscribe[_] ⇒ unsubscribeToProto(m).toByteArray
+ case m: Gossip ⇒ compress(gossipToProto(m))
+ case WriteNack ⇒ dm.Empty.getDefaultInstance.toByteArray
case _ ⇒
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
@@ -450,4 +460,18 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
ReadResult(envelope)
}
+ private def durableDataEnvelopeToProto(durableDataEnvelope: DurableDataEnvelope): dm.DurableDataEnvelope = {
+ dm.DurableDataEnvelope.newBuilder()
+ .setData(otherMessageToProto(durableDataEnvelope.data))
+ .build()
+ }
+
+ private def durableDataEnvelopeFromBinary(bytes: Array[Byte]): DurableDataEnvelope =
+ durableDataEnvelopeFromProto(dm.DurableDataEnvelope.parseFrom(bytes))
+
+ private def durableDataEnvelopeFromProto(durableDataEnvelope: dm.DurableDataEnvelope): DurableDataEnvelope = {
+ val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData]
+ new DurableDataEnvelope(data)
+ }
+
}
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala
new file mode 100644
index 0000000000..656cbbd65d
--- /dev/null
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala
@@ -0,0 +1,338 @@
+/**
+ * Copyright (C) 2016 Lightbend Inc.
+ */
+package akka.cluster.ddata
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+
+final case class DurableDataSpecConfig(writeBehind: Boolean) extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+
+ commonConfig(ConfigFactory.parseString(s"""
+ akka.loglevel = DEBUG
+ akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
+ akka.log-dead-letters-during-shutdown = off
+ akka.cluster.distributed-data.durable.keys = ["durable*"]
+ akka.cluster.distributed-data.durable.lmdb {
+ dir = target/DurableDataSpec-${System.currentTimeMillis}-ddata
+ map-size = 10 MiB
+ write-behind-interval = ${if (writeBehind) "200ms" else "off"}
+ }
+ akka.test.single-expect-default = 5s
+ """))
+}
+
+object DurableDataSpec {
+ def testDurableStoreProps(failLoad: Boolean = false, failStore: Boolean = false): Props =
+ Props(new TestDurableStore(failLoad, failStore))
+
+ class TestDurableStore(failLoad: Boolean, failStore: Boolean) extends Actor {
+ import DurableStore._
+ def receive = {
+ case LoadAll ⇒
+ if (failLoad)
+ throw new LoadFailed("failed to load durable distributed-data") with NoStackTrace
+ else
+ sender() ! LoadAllCompleted
+
+ case Store(key, data, reply) ⇒
+ if (failStore) reply match {
+ case Some(StoreReply(_, failureMsg, replyTo)) ⇒ replyTo ! failureMsg
+ case None ⇒
+ }
+ else reply match {
+ case Some(StoreReply(successMsg, _, replyTo)) ⇒ replyTo ! successMsg
+ case None ⇒
+ }
+ }
+
+ }
+
+}
+
+class DurableDataSpecMultiJvmNode1 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = false))
+class DurableDataSpecMultiJvmNode2 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = false))
+
+class DurableDataWriteBehindSpecMultiJvmNode1 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = true))
+class DurableDataWriteBehindSpecMultiJvmNode2 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = true))
+
+abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
+ extends MultiNodeSpec(multiNodeConfig) with STMultiNodeSpec with ImplicitSender {
+ import DurableDataSpec._
+ import Replicator._
+ import multiNodeConfig._
+
+ override def initialParticipants = roles.size
+
+ implicit val cluster = Cluster(system)
+
+ val timeout = 5.seconds.dilated
+ val writeTwo = WriteTo(2, timeout)
+ val readTwo = ReadFrom(2, timeout)
+
+ val KeyA = GCounterKey("durable-A")
+ val KeyB = GCounterKey("durable-B")
+ val KeyC = ORSetKey[String]("durable-C")
+
+ var testStepCounter = 0
+ def enterBarrierAfterTestStep(): Unit = {
+ testStepCounter += 1
+ enterBarrier("after-" + testStepCounter)
+ }
+
+ def newReplicator(sys: ActorSystem = system) = sys.actorOf(Replicator.props(
+ ReplicatorSettings(system).withGossipInterval(1.second)), "replicator-" + testStepCounter)
+
+ def join(from: RoleName, to: RoleName): Unit = {
+ runOn(from) {
+ cluster join node(to).address
+ }
+ enterBarrier(from.name + "-joined")
+ }
+
+ "Durable CRDT" must {
+
+ "work in single node cluster" in {
+ join(first, first)
+
+ runOn(first) {
+
+ val r = newReplicator()
+ within(5.seconds) {
+ awaitAssert {
+ r ! GetReplicaCount
+ expectMsg(ReplicaCount(1))
+ }
+ }
+
+ r ! Get(KeyA, ReadLocal)
+ expectMsg(NotFound(KeyA, None))
+
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+
+ expectMsg(UpdateSuccess(KeyA, None))
+ expectMsg(UpdateSuccess(KeyA, None))
+ expectMsg(UpdateSuccess(KeyA, None))
+
+ watch(r)
+ system.stop(r)
+ expectTerminated(r)
+
+ var r2: ActorRef = null
+ awaitAssert(r2 = newReplicator()) // try until name is free
+
+ // wait until all loaded
+ awaitAssert {
+ r2 ! GetKeyIds
+ expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
+ }
+ r2 ! Get(KeyA, ReadLocal)
+ expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(3)
+
+ watch(r2)
+ system.stop(r2)
+ expectTerminated(r2)
+ }
+
+ enterBarrierAfterTestStep()
+ }
+ }
+
+ "work in multi node cluster" in {
+ join(second, first)
+
+ val r = newReplicator()
+ within(5.seconds) {
+ awaitAssert {
+ r ! GetReplicaCount
+ expectMsg(ReplicaCount(2))
+ }
+ }
+ enterBarrier("both-initalized")
+
+ r ! Update(KeyA, GCounter(), writeTwo)(_ + 1)
+ expectMsg(UpdateSuccess(KeyA, None))
+
+ r ! Update(KeyC, ORSet.empty[String], writeTwo)(_ + myself.name)
+ expectMsg(UpdateSuccess(KeyC, None))
+
+ enterBarrier("update-done-" + testStepCounter)
+
+ r ! Get(KeyA, readTwo)
+ expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2)
+
+ r ! Get(KeyC, readTwo)
+ expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should be(Set(first.name, second.name))
+
+ enterBarrier("values-verified-" + testStepCounter)
+
+ watch(r)
+ system.stop(r)
+ expectTerminated(r)
+
+ var r2: ActorRef = null
+ awaitAssert(r2 = newReplicator()) // try until name is free
+ awaitAssert {
+ r2 ! GetKeyIds
+ expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
+ }
+
+ r2 ! Get(KeyA, ReadLocal)
+ expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2)
+
+ r2 ! Get(KeyC, ReadLocal)
+ expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should be(Set(first.name, second.name))
+
+ enterBarrierAfterTestStep()
+ }
+
+ "be durable after gossip update" in {
+ val r = newReplicator()
+
+ runOn(first) {
+ r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ + myself.name)
+ expectMsg(UpdateSuccess(KeyC, None))
+ }
+
+ runOn(second) {
+ r ! Subscribe(KeyC, testActor)
+ expectMsgType[Changed[ORSet[String]]].dataValue.elements should be(Set(first.name))
+
+ // must do one more roundtrip to be sure that it keyB is stored, since Changed might have
+ // been sent out before storage
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+ expectMsg(UpdateSuccess(KeyA, None))
+
+ watch(r)
+ system.stop(r)
+ expectTerminated(r)
+
+ var r2: ActorRef = null
+ awaitAssert(r2 = newReplicator()) // try until name is free
+ awaitAssert {
+ r2 ! GetKeyIds
+ expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
+ }
+
+ r2 ! Get(KeyC, ReadLocal)
+ expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should be(Set(first.name))
+ }
+
+ enterBarrierAfterTestStep()
+ }
+
+ "handle Update before load" in {
+ runOn(first) {
+
+ val sys1 = ActorSystem("AdditionalSys", system.settings.config)
+ val addr = Cluster(sys1).selfAddress
+ try {
+ Cluster(sys1).join(addr)
+ new TestKit(sys1) with ImplicitSender {
+
+ val r = newReplicator(sys1)
+ within(5.seconds) {
+ awaitAssert {
+ r ! GetReplicaCount
+ expectMsg(ReplicaCount(1))
+ }
+ }
+
+ r ! Get(KeyA, ReadLocal)
+ expectMsg(NotFound(KeyA, None))
+
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+ r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
+ r ! Update(KeyB, GCounter(), WriteLocal)(_ + 1)
+
+ expectMsg(UpdateSuccess(KeyA, None))
+ expectMsg(UpdateSuccess(KeyA, None))
+ expectMsg(UpdateSuccess(KeyA, None))
+ expectMsg(UpdateSuccess(KeyB, None))
+
+ watch(r)
+ system.stop(r)
+ expectTerminated(r)
+ }
+ } finally {
+ Await.ready(sys1.terminate(), 10.seconds)
+ }
+
+ val sys2 = ActorSystem(
+ "AdditionalSys",
+ // use the same port
+ ConfigFactory.parseString(s"""
+ akka.remote.artery.canonical.port = ${addr.port.get}
+ akka.remote.netty.tcp.port = ${addr.port.get}
+ """).withFallback(system.settings.config))
+ try {
+ Cluster(sys2).join(addr)
+ new TestKit(sys2) with ImplicitSender {
+
+ val r2: ActorRef = newReplicator(sys2)
+
+ // it should be possible to update while loading is in progress
+ r2 ! Update(KeyB, GCounter(), WriteLocal)(_ + 1)
+ expectMsg(UpdateSuccess(KeyB, None))
+
+ // wait until all loaded
+ awaitAssert {
+ r2 ! GetKeyIds
+ expectMsgType[GetKeyIdsResult].keyIds should ===(Set(KeyA.id, KeyB.id))
+ }
+ r2 ! Get(KeyA, ReadLocal)
+ expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(3)
+ r2 ! Get(KeyB, ReadLocal)
+ expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2)
+ }
+ } finally {
+ Await.ready(sys1.terminate(), 10.seconds)
+ }
+
+ }
+ enterBarrierAfterTestStep()
+ }
+
+ "stop Replicator if Load fails" in {
+ runOn(first) {
+ val r = system.actorOf(
+ Replicator.props(
+ ReplicatorSettings(system).withDurableStoreProps(testDurableStoreProps(failLoad = true))),
+ "replicator-" + testStepCounter)
+ watch(r)
+ expectTerminated(r)
+ }
+ enterBarrierAfterTestStep()
+ }
+
+ "reply with StoreFailure if store fails" in {
+ runOn(first) {
+ val r = system.actorOf(
+ Replicator.props(
+ ReplicatorSettings(system).withDurableStoreProps(testDurableStoreProps(failStore = true))),
+ "replicator-" + testStepCounter)
+ r ! Update(KeyA, GCounter(), WriteLocal, request = Some("a"))(_ + 1)
+ expectMsg(StoreFailure(KeyA, Some("a")))
+ }
+ enterBarrierAfterTestStep()
+ }
+
+}
+
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala
new file mode 100644
index 0000000000..5926ab931a
--- /dev/null
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala
@@ -0,0 +1,167 @@
+/**
+ * Copyright (C) 2009-2016 Lightbend Inc.
+ */
+package akka.cluster.ddata
+
+import scala.concurrent.duration._
+
+import akka.cluster.Cluster
+import akka.cluster.ClusterEvent.InitialStateAsEvents
+import akka.cluster.ClusterEvent.MemberUp
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+import akka.actor.ActorSystem
+import akka.actor.ActorRef
+import scala.concurrent.Await
+
+object DurablePruningSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+
+ commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(s"""
+ akka.loglevel = INFO
+ akka.actor.provider = "cluster"
+ akka.log-dead-letters-during-shutdown = off
+ akka.cluster.distributed-data.durable.keys = ["*"]
+ akka.cluster.distributed-data.durable.lmdb {
+ dir = target/DurablePruningSpec-${System.currentTimeMillis}-ddata
+ map-size = 10 MiB
+ }
+ """)))
+
+}
+
+class DurablePruningSpecMultiJvmNode1 extends DurablePruningSpec
+class DurablePruningSpecMultiJvmNode2 extends DurablePruningSpec
+
+class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiNodeSpec with ImplicitSender {
+ import DurablePruningSpec._
+ import Replicator._
+
+ override def initialParticipants = roles.size
+
+ implicit val cluster = Cluster(system)
+ val maxPruningDissemination = 3.seconds
+
+ def startReplicator(sys: ActorSystem): ActorRef =
+ sys.actorOf(Replicator.props(
+ ReplicatorSettings(sys).withGossipInterval(1.second)
+ .withPruning(pruningInterval = 1.second, maxPruningDissemination)), "replicator")
+ val replicator = startReplicator(system)
+ val timeout = 5.seconds.dilated
+
+ val KeyA = GCounterKey("A")
+
+ def join(from: RoleName, to: RoleName): Unit = {
+ runOn(from) {
+ cluster join node(to).address
+ }
+ enterBarrier(from.name + "-joined")
+ }
+
+ "Pruning of durable CRDT" must {
+
+ "move data from removed node" in {
+ join(first, first)
+ join(second, first)
+
+ val sys2 = ActorSystem(system.name, system.settings.config)
+ val cluster2 = Cluster(sys2)
+ val replicator2 = startReplicator(sys2)
+ val probe2 = TestProbe()(sys2)
+ Cluster(sys2).join(node(first).address)
+
+ within(5.seconds) {
+ awaitAssert {
+ replicator ! GetReplicaCount
+ expectMsg(ReplicaCount(4))
+ replicator2.tell(GetReplicaCount, probe2.ref)
+ probe2.expectMsg(ReplicaCount(4))
+ }
+ }
+
+ replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3)
+ expectMsg(UpdateSuccess(KeyA, None))
+
+ replicator2.tell(Update(KeyA, GCounter(), WriteLocal)(_.increment(cluster2, 2)), probe2.ref)
+ probe2.expectMsg(UpdateSuccess(KeyA, None))
+
+ enterBarrier("updates-done")
+
+ within(10.seconds) {
+ awaitAssert {
+ replicator ! Get(KeyA, ReadAll(1.second))
+ val counter1 = expectMsgType[GetSuccess[GCounter]].dataValue
+ counter1.value should be(10)
+ counter1.state.size should be(4)
+ }
+ }
+
+ within(10.seconds) {
+ awaitAssert {
+ replicator2.tell(Get(KeyA, ReadAll(1.second)), probe2.ref)
+ val counter2 = probe2.expectMsgType[GetSuccess[GCounter]].dataValue
+ counter2.value should be(10)
+ counter2.state.size should be(4)
+ }
+ }
+ enterBarrier("get1")
+
+ runOn(first) {
+ cluster.leave(cluster2.selfAddress)
+ }
+
+ within(15.seconds) {
+ awaitAssert {
+ replicator ! GetReplicaCount
+ expectMsg(ReplicaCount(3))
+ }
+ }
+ enterBarrier("removed")
+ runOn(first) {
+ Await.ready(sys2.terminate(), 5.seconds)
+ }
+
+ within(15.seconds) {
+ awaitAssert {
+ replicator ! Get(KeyA, ReadLocal)
+ val counter3 = expectMsgType[GetSuccess[GCounter]].dataValue
+ counter3.value should be(10)
+ counter3.state.size should be(3)
+ }
+ }
+ enterBarrier("pruned")
+
+ // let it become tombstone
+ Thread.sleep(5000)
+
+ runOn(first) {
+ val addr = cluster2.selfAddress
+ val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s"""
+ akka.remote.artery.canonical.port = ${addr.port.get}
+ akka.remote.netty.tcp.port = ${addr.port.get}
+ """).withFallback(system.settings.config))
+ val cluster3 = Cluster(sys3)
+ val replicator3 = startReplicator(sys3)
+ val probe3 = TestProbe()(sys3)
+ Cluster(sys3).join(node(first).address)
+
+ within(10.seconds) {
+ awaitAssert {
+ replicator3.tell(Get(KeyA, ReadLocal), probe3.ref)
+ val counter4 = probe3.expectMsgType[GetSuccess[GCounter]].dataValue
+ counter4.value should be(10)
+ counter4.state.size should be(3)
+ }
+ }
+ }
+
+ enterBarrier("after-1")
+ }
+ }
+
+}
+
diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala
index d2b1142621..2197500ad7 100644
--- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala
+++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala
@@ -23,7 +23,7 @@ object PerformanceSpec extends MultiNodeConfig {
val n4 = role("n4")
val n5 = role("n5")
- commonConfig(ConfigFactory.parseString("""
+ commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.actor.provider = "cluster"
@@ -34,6 +34,10 @@ object PerformanceSpec extends MultiNodeConfig {
akka.testconductor.barrier-timeout = 60 s
akka.cluster.distributed-data.gossip-interval = 1 s
akka.actor.serialize-messages = off
+
+ #akka.cluster.distributed-data.durable.keys = ["*"]
+ #akka.cluster.distributed-data.durable.lmdb.dir = target/PerformanceSpec-${System.currentTimeMillis}-ddata
+ #akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200ms
"""))
def countDownProps(latch: TestLatch): Props = Props(new CountDown(latch)).withDeploy(Deploy.local)
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala
index 854f0d359d..114752eea4 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala
@@ -46,6 +46,7 @@ class LocalConcurrencySpec(_system: ActorSystem) extends TestKit(_system)
ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
+ akka.remote.artery.canonical.port = 0
""")))
}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala
index 23c2aecabe..305b6b3698 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala
@@ -20,12 +20,12 @@ object WriteAggregatorSpec {
val key = GSetKey[String]("a")
def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency,
- probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef): Props =
- Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo))
+ probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef, durable: Boolean): Props =
+ Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo, durable))
class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency,
- probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef)
- extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo) {
+ probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef, durable: Boolean)
+ extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo, durable) {
override def replica(address: Address): ActorSelection =
context.actorSelection(probes(address).path)
@@ -43,6 +43,8 @@ object WriteAggregatorSpec {
def receive = {
case WriteAck ⇒
replicator.foreach(_ ! WriteAck)
+ case WriteNack ⇒
+ replicator.foreach(_ ! WriteNack)
case msg ⇒
replicator = Some(sender())
replica ! msg
@@ -50,9 +52,14 @@ object WriteAggregatorSpec {
}
}
-class WriteAggregatorSpec extends AkkaSpec("""
+class WriteAggregatorSpec extends AkkaSpec(s"""
akka.actor.provider = "cluster"
- akka.remote.netty.tcp.port=0
+ akka.remote.netty.tcp.port = 0
+ akka.remote.artery.canonical.port = 0
+ akka.cluster.distributed-data.durable.lmdb {
+ dir = target/WriteAggregatorSpec-${System.currentTimeMillis}-ddata
+ map-size = 10 MiB
+ }
""")
with ImplicitSender {
@@ -69,7 +76,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
val data = GSet.empty + "A" + "B"
val timeout = 3.seconds.dilated
- val writeTwo = WriteTo(2, timeout)
+ val writeThree = WriteTo(3, timeout)
val writeMajority = WriteMajority(timeout)
def probes(probe: ActorRef): Map[Address, ActorRef] =
@@ -79,7 +86,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
"send to at least N/2+1 replicas when WriteMajority" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
- data, writeMajority, probes(probe.ref), nodes, testActor))
+ data, writeMajority, probes(probe.ref), nodes, testActor, durable = false))
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
@@ -93,7 +100,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
"send to more when no immediate reply" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
- data, writeMajority, probes(probe.ref), nodes, testActor))
+ data, writeMajority, probes(probe.ref), nodes, testActor, durable = false))
probe.expectMsgType[Write]
// no reply
@@ -112,7 +119,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
- data, writeMajority, probes(probe.ref), nodes, testActor))
+ data, writeMajority, probes(probe.ref), nodes, testActor, durable = false))
probe.expectMsgType[Write]
// no reply
@@ -126,6 +133,84 @@ class WriteAggregatorSpec extends AkkaSpec("""
watch(aggr)
expectTerminated(aggr)
}
-
}
+
+ "Durable WriteAggregator" must {
+ "not reply before local confirmation" in {
+ val probe = TestProbe()
+ val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
+ data, writeThree, probes(probe.ref), nodes, testActor, durable = true))
+
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteAck
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteAck
+ expectNoMsg(200.millis)
+
+ // the local write
+ aggr ! UpdateSuccess(WriteAggregatorSpec.key, None)
+
+ expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
+ watch(aggr)
+ expectTerminated(aggr)
+ }
+
+ "tolerate WriteNack if enough WriteAck" in {
+ val probe = TestProbe()
+ val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
+ data, writeThree, probes(probe.ref), nodes, testActor, durable = true))
+
+ aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteAck
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteNack
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteAck
+
+ expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
+ watch(aggr)
+ expectTerminated(aggr)
+ }
+
+ "reply with StoreFailure when too many nacks" in {
+ val probe = TestProbe()
+ val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
+ data, writeMajority, probes(probe.ref), nodes, testActor, durable = true))
+
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteNack
+ aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteAck
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteNack
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteNack
+
+ expectMsg(StoreFailure(WriteAggregatorSpec.key, None))
+ watch(aggr)
+ expectTerminated(aggr)
+ }
+
+ "timeout when less than required acks" in {
+ val probe = TestProbe()
+ val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
+ data, writeMajority, probes(probe.ref), nodes, testActor, durable = true))
+
+ probe.expectMsgType[Write]
+ // no reply
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteAck
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteNack
+ probe.expectMsgType[Write]
+ probe.lastSender ! WriteNack
+
+ expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None))
+ watch(aggr)
+ expectTerminated(aggr)
+ }
+ }
+
}
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
index 0869b20b1f..209b4e3b03 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
@@ -31,15 +31,16 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
ConfigFactory.parseString("""
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
+ akka.remote.artery.canonical.port = 0
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem])
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
- val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1)
- val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2)
- val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3)
+ val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1L)
+ val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L)
+ val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L)
override def afterAll {
shutdown()
diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
index 70c314315f..c2a542fcfc 100644
--- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
+++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
@@ -23,21 +23,23 @@ import akka.util.ByteString
import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
+import akka.cluster.ddata.DurableStore.DurableDataEnvelope
class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"ReplicatorMessageSerializerSpec",
ConfigFactory.parseString("""
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
+ akka.remote.artery.canonical.port = 0
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatorMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
- val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1)
- val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2)
- val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3)
+ val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1L)
+ val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L)
+ val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L)
val keyA = GSetKey[String]("A")
@@ -72,6 +74,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
address3 → PruningState(address2, PruningInitialized(Set(address1.address))))))
checkSerialization(Write("A", DataEnvelope(data1)))
checkSerialization(WriteAck)
+ checkSerialization(WriteNack)
checkSerialization(Read("A"))
checkSerialization(ReadResult(Some(DataEnvelope(data1))))
checkSerialization(ReadResult(None))
@@ -81,6 +84,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Gossip(Map(
"A" → DataEnvelope(data1),
"B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true))
+ checkSerialization(new DurableDataEnvelope(data1))
}
}
diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst
index ac778d0b0a..5fe21ea95a 100644
--- a/akka-docs/rst/java/distributed-data.rst
+++ b/akka-docs/rst/java/distributed-data.rst
@@ -448,6 +448,56 @@ look like for the ``TwoPhaseSet``:
.. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java#serializer
+Durable Storage
+---------------
+
+By default the data is only kept in memory. It is redundant since it is replicated to other nodes
+in the cluster, but if you stop all nodes the data is lost, unless you have saved it
+elsewhere.
+
+Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded
+next time the replicator is started, i.e. when actor system is restarted. This means data will survive as
+long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries
+are configured with::
+
+ akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]
+
+Prefix matching is supported by using ``*`` at the end of a key.
+
+All entries can be made durable by specifying::
+
+ akka.cluster.distributed-data.durable.keys = ["*"]
+
+`LMDB `_ is the default storage implementation. It is
+possible to replace that with another implementation by implementing the actor protocol described in
+``akka.cluster.ddata.DurableStore`` and defining the ``akka.cluster.distributed-data.durable.store-actor-class``
+property for the new implementation.
+
+The location of the files for the data is configured with::
+
+ # Directory of LMDB file. There are two options:
+ # 1. A relative or absolute path to a directory that ends with 'ddata'
+ # the full name of the directory will contain name of the ActorSystem
+ # and its remote port.
+ # 2. Otherwise the path is used as is, as a relative or absolute path to
+ # a directory.
+ akka.cluster.distributed-data.lmdb.dir = "ddata"
+
+Making the data durable has of course a performance cost. By default, each update is flushed
+to disk before the ``UpdateSuccess`` reply is sent. For better performance, but with the risk of losing
+the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
+a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
+efficient when performing many writes to the same key, because it is only the last value for each key
+that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the
+data is typically replicated to other nodes immediately according to the given ``WriteConsistency``.
+
+::
+
+ akka.cluster.distributed-data.lmdb.write-behind-interval = 200 ms
+
+Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Update`` of a
+durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval``
+such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``.
CRDT Garbage
------------
@@ -494,11 +544,6 @@ other nodes. This means that you cannot have too large data entries, because the
size will be too large. We might be able to make this more efficient by implementing
`Efficient State-based CRDTs by Delta-Mutation `_.
-The data is only kept in memory. It is redundant since it is replicated to other nodes
-in the cluster, but if you stop all nodes the data is lost, unless you have saved it
-elsewhere. Making the data durable is a possible future feature, but even if we implement that
-it is not intended to be a full featured database.
-
Learn More about CRDTs
======================
diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst
index 04aa00b2f6..22dd241187 100644
--- a/akka-docs/rst/scala/distributed-data.rst
+++ b/akka-docs/rst/scala/distributed-data.rst
@@ -448,6 +448,56 @@ look like for the ``TwoPhaseSet``:
.. includecode:: code/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala#serializer
+Durable Storage
+---------------
+
+By default the data is only kept in memory. It is redundant since it is replicated to other nodes
+in the cluster, but if you stop all nodes the data is lost, unless you have saved it
+elsewhere.
+
+Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded
+next time the replicator is started, i.e. when actor system is restarted. This means data will survive as
+long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries
+are configured with::
+
+ akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]
+
+Prefix matching is supported by using ``*`` at the end of a key.
+
+All entries can be made durable by specifying::
+
+ akka.cluster.distributed-data.durable.keys = ["*"]
+
+`LMDB `_ is the default storage implementation. It is
+possible to replace that with another implementation by implementing the actor protocol described in
+``akka.cluster.ddata.DurableStore`` and defining the ``akka.cluster.distributed-data.durable.store-actor-class``
+property for the new implementation.
+
+The location of the files for the data is configured with::
+
+ # Directory of LMDB file. There are two options:
+ # 1. A relative or absolute path to a directory that ends with 'ddata'
+ # the full name of the directory will contain name of the ActorSystem
+ # and its remote port.
+ # 2. Otherwise the path is used as is, as a relative or absolute path to
+ # a directory.
+ akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
+
+Making the data durable has of course a performance cost. By default, each update is flushed
+to disk before the ``UpdateSuccess`` reply is sent. For better performance, but with the risk of losing
+the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
+a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
+efficient when performing many writes to the same key, because it is only the last value for each key
+that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the
+data is typically replicated to other nodes immediately according to the given ``WriteConsistency``.
+
+::
+
+ akka.cluster.distributed-data.lmdb.write-behind-interval = 200 ms
+
+Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Update`` of a
+durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval``
+such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``.
CRDT Garbage
------------
@@ -494,11 +544,6 @@ other nodes. This means that you cannot have too large data entries, because the
size will be too large. We might be able to make this more efficient by implementing
`Efficient State-based CRDTs by Delta-Mutation `_.
-The data is only kept in memory. It is redundant since it is replicated to other nodes
-in the cluster, but if you stop all nodes the data is lost, unless you have saved it
-elsewhere. Making the data durable is a possible future feature, but even if we implement that
-it is not intended to be a full featured database.
-
Learn More about CRDTs
======================
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index afcaf513af..8bc0918076 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -61,7 +61,9 @@ object Dependencies {
// ssl-config
val sslConfigCore = "com.typesafe" %% "ssl-config-core" % sslConfigVersion // ApacheV2
-
+
+ val lmdb = "org.lmdbjava" % "lmdbjava" % "0.0.4" // ApacheV2, OpenLDAP Public License
+
// For akka-http-testkit-java
val junit = "junit" % "junit" % junitVersion // Common Public License 1.0
@@ -142,7 +144,7 @@ object Dependencies {
val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)
- val distributedData = l ++= Seq(Test.junit, Test.scalatest.value)
+ val distributedData = l ++= Seq(lmdb, Test.junit, Test.scalatest.value)
val slf4j = l ++= Seq(slf4jApi, Test.logback)
diff --git a/project/MiMa.scala b/project/MiMa.scala
index 94955a013f..ea7fc7bd23 100644
--- a/project/MiMa.scala
+++ b/project/MiMa.scala
@@ -531,8 +531,12 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.SyncDirective"),
// # 21944
- ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ClusterEvent#ReachabilityEvent.member")
-
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ClusterEvent#ReachabilityEvent.member"),
+
+ // #21645 durable distributed data
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write")
)
)
}