Add support for durable storage of distributed data, #21645

* using lmdbjava libarary
This commit is contained in:
Patrik Nordwall 2016-10-10 20:00:24 +02:00
parent 446c0545ec
commit d6d50a08d0
18 changed files with 1892 additions and 124 deletions

View file

@ -14765,6 +14765,529 @@ public final class ReplicatorMessages {
// @@protoc_insertion_point(class_scope:akka.cluster.ddata.StringGSet)
}
public interface DurableDataEnvelopeOrBuilder
extends akka.protobuf.MessageOrBuilder {
// required .akka.cluster.ddata.OtherMessage data = 1;
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
boolean hasData();
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getData();
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder();
}
/**
* Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope}
*/
public static final class DurableDataEnvelope extends
akka.protobuf.GeneratedMessage
implements DurableDataEnvelopeOrBuilder {
// Use DurableDataEnvelope.newBuilder() to construct.
private DurableDataEnvelope(akka.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private DurableDataEnvelope(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final DurableDataEnvelope defaultInstance;
public static DurableDataEnvelope getDefaultInstance() {
return defaultInstance;
}
public DurableDataEnvelope getDefaultInstanceForType() {
return defaultInstance;
}
private final akka.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final akka.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private DurableDataEnvelope(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
akka.protobuf.UnknownFieldSet.Builder unknownFields =
akka.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder subBuilder = null;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
subBuilder = data_.toBuilder();
}
data_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(data_);
data_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000001;
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.Builder.class);
}
public static akka.protobuf.Parser<DurableDataEnvelope> PARSER =
new akka.protobuf.AbstractParser<DurableDataEnvelope>() {
public DurableDataEnvelope parsePartialFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return new DurableDataEnvelope(input, extensionRegistry);
}
};
@java.lang.Override
public akka.protobuf.Parser<DurableDataEnvelope> getParserForType() {
return PARSER;
}
private int bitField0_;
// required .akka.cluster.ddata.OtherMessage data = 1;
public static final int DATA_FIELD_NUMBER = 1;
private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage data_;
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public boolean hasData() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getData() {
return data_;
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder() {
return data_;
}
private void initFields() {
data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasData()) {
memoizedIsInitialized = 0;
return false;
}
if (!getData().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(akka.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, data_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(1, data_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
akka.protobuf.ByteString data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
akka.protobuf.ByteString data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(byte[] data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
byte[] data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseDelimitedFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
akka.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parseFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
akka.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code akka.cluster.ddata.DurableDataEnvelope}
*/
public static final class Builder extends
akka.protobuf.GeneratedMessage.Builder<Builder>
implements akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelopeOrBuilder {
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.class, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.Builder.class);
}
// Construct using akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
akka.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getDataFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
if (dataBuilder_ == null) {
data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
} else {
dataBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public akka.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
}
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope getDefaultInstanceForType() {
return akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.getDefaultInstance();
}
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope build() {
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope buildPartial() {
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope result = new akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
if (dataBuilder_ == null) {
result.data_ = data_;
} else {
result.data_ = dataBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(akka.protobuf.Message other) {
if (other instanceof akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope) {
return mergeFrom((akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope other) {
if (other == akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope.getDefaultInstance()) return this;
if (other.hasData()) {
mergeData(other.getData());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasData()) {
return false;
}
if (!getData().isInitialized()) {
return false;
}
return true;
}
public Builder mergeFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DurableDataEnvelope) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// required .akka.cluster.ddata.OtherMessage data = 1;
private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
private akka.protobuf.SingleFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder> dataBuilder_;
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public boolean hasData() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getData() {
if (dataBuilder_ == null) {
return data_;
} else {
return dataBuilder_.getMessage();
}
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public Builder setData(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value) {
if (dataBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
data_ = value;
onChanged();
} else {
dataBuilder_.setMessage(value);
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public Builder setData(
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder builderForValue) {
if (dataBuilder_ == null) {
data_ = builderForValue.build();
onChanged();
} else {
dataBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public Builder mergeData(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value) {
if (dataBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001) &&
data_ != akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance()) {
data_ =
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.newBuilder(data_).mergeFrom(value).buildPartial();
} else {
data_ = value;
}
onChanged();
} else {
dataBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public Builder clearData() {
if (dataBuilder_ == null) {
data_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance();
onChanged();
} else {
dataBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder getDataBuilder() {
bitField0_ |= 0x00000001;
onChanged();
return getDataFieldBuilder().getBuilder();
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getDataOrBuilder() {
if (dataBuilder_ != null) {
return dataBuilder_.getMessageOrBuilder();
} else {
return data_;
}
}
/**
* <code>required .akka.cluster.ddata.OtherMessage data = 1;</code>
*/
private akka.protobuf.SingleFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder>
getDataFieldBuilder() {
if (dataBuilder_ == null) {
dataBuilder_ = new akka.protobuf.SingleFieldBuilder<
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder>(
data_,
getParentForChildren(),
isClean());
data_ = null;
}
return dataBuilder_;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DurableDataEnvelope)
}
static {
defaultInstance = new DurableDataEnvelope(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:akka.cluster.ddata.DurableDataEnvelope)
}
private static akka.protobuf.Descriptors.Descriptor
internal_static_akka_cluster_ddata_Get_descriptor;
private static
@ -14870,6 +15393,11 @@ public final class ReplicatorMessages {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_akka_cluster_ddata_StringGSet_fieldAccessorTable;
private static akka.protobuf.Descriptors.Descriptor
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor;
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable;
public static akka.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -14925,8 +15453,10 @@ public final class ReplicatorMessages {
"me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" +
"\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" +
"\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" +
"GSet\022\020\n\010elements\030\001 \003(\tB#\n\037akka.cluster.d" +
"data.protobuf.msgH\001"
"GSet\022\020\n\010elements\030\001 \003(\t\"E\n\023DurableDataEnv" +
"elope\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata" +
".OtherMessageB#\n\037akka.cluster.ddata.prot" +
"obuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -15059,6 +15589,12 @@ public final class ReplicatorMessages {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_StringGSet_descriptor,
new java.lang.String[] { "Elements", });
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor =
getDescriptor().getMessageTypes().get(18);
internal_static_akka_cluster_ddata_DurableDataEnvelope_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DurableDataEnvelope_descriptor,
new java.lang.String[] { "Data", });
return null;
}
};

View file

@ -117,4 +117,6 @@ message StringGSet {
repeated string elements = 1;
}
message DurableDataEnvelope {
required OtherMessage data = 1;
}

View file

@ -46,6 +46,48 @@ akka.cluster.distributed-data {
# after this duration.
serializer-cache-time-to-live = 10s
durable {
# List of keys that are durable. Prefix matching is supported by using * at the
# end of a key.
keys = []
# Fully qualified class name of the durable store actor. It must be a subclass
# of akka.actor.Actor and handle the protocol defined in
# akka.cluster.ddata.DurableStore. The class must have a constructor with
# com.typesafe.config.Config parameter.
store-actor-class = akka.cluster.ddata.LmdbDurableStore
use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
pinned-store {
executor = thread-pool-executor
type = PinnedDispatcher
}
# Config for the LmdbDurableStore
lmdb {
# Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem
# and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory.
dir = "ddata"
# Size in bytes of the memory mapped file.
map-size = 100 MiB
# Accumulate changes before storing improves performance with the
# risk of losing the last writes if the JVM crashes.
# The interval is by default set to 'off' to write each update immediately.
# Enabling write behind by specifying a duration, e.g. 200ms, is especially
# efficient when performing many writes to the same key, because it is only
# the last value for each key that will be serialized and stored.
# write-behind-interval = 200 ms
write-behind-interval = off
}
}
}
#//#distributed-data

View file

@ -0,0 +1,264 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.ddata
import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.DeadLetterSuppression
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.Replicator.ReplicatorMessage
import akka.io.DirectByteBufferPool
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.util.ByteString
import akka.util.OptionVal
import com.typesafe.config.Config
import org.lmdbjava.DbiFlags
import org.lmdbjava.Env
import org.lmdbjava.EnvFlags
import org.lmdbjava.Txn
/**
* An actor implementing the durable store for the Distributed Data `Replicator`
* has to implement the protocol with the messages defined here.
*
* At startup the `Replicator` creates the durable store actor and sends the
* `Load` message to it. It must then reply with 0 or more `LoadData` messages
* followed by one `LoadAllCompleted` message to the `sender` (the `Replicator`).
*
* If the `LoadAll` fails it can throw `LoadFailed` and the `Replicator` supervisor
* will stop itself and the durable store.
*
* When the `Replicator` needs to store a value it sends a `Store` message
* to the durable store actor, which must then reply with the `successMsg` or
* `failureMsg` to the `replyTo`.
*/
object DurableStore {
/**
* Request to store an entry. It optionally contains a `StoreReply`, which
* should be used to signal success or failure of the operation to the contained
* `replyTo` actor.
*/
final case class Store(key: String, data: ReplicatedData, reply: Option[StoreReply])
final case class StoreReply(successMsg: Any, failureMsg: Any, replyTo: ActorRef)
/**
* Request to load all entries.
*
* It must reply with 0 or more `LoadData` messages
* followed by one `LoadAllCompleted` message to the `sender` (the `Replicator`).
*
* If the `LoadAll` fails it can throw `LoadFailed` and the `Replicator` supervisor
* will stop itself and the durable store.
*/
case object LoadAll
final case class LoadData(data: Map[String, ReplicatedData])
case object LoadAllCompleted
class LoadFailed(message: String, cause: Throwable) extends RuntimeException(message) {
def this(message: String) = this(message, null)
}
/**
* Wrapper class for serialization of a data value.
* The `ReplicatorMessageSerializer` will serialize/deserialize
* the wrapped `ReplicatedData` including its serializerId and
* manifest.
*/
final class DurableDataEnvelope(val data: ReplicatedData) extends ReplicatorMessage {
override def toString(): String = s"DurableDataEnvelope($data)"
override def hashCode(): Int = data.hashCode
override def equals(o: Any): Boolean = o match {
case other: DurableDataEnvelope data == other.data
case _ false
}
}
}
object LmdbDurableStore {
def props(config: Config): Props =
Props(new LmdbDurableStore(config))
private case object WriteBehind extends DeadLetterSuppression
}
final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
import DurableStore._
import LmdbDurableStore.WriteBehind
val serialization = SerializationExtension(context.system)
val serializer = serialization.serializerFor(classOf[DurableDataEnvelope]).asInstanceOf[SerializerWithStringManifest]
val manifest = serializer.manifest(new DurableDataEnvelope(Replicator.Internal.DeletedData))
val writeBehindInterval = config.getString("lmdb.write-behind-interval").toLowerCase match {
case "off" Duration.Zero
case _ config.getDuration("lmdb.write-behind-interval", MILLISECONDS).millis
}
val env: Env[ByteBuffer] = {
val mapSize = config.getBytes("lmdb.map-size")
val dir = config.getString("lmdb.dir") match {
case path if path.endsWith("ddata")
new File(s"$path-${context.system.name}-${self.path.parent.name}-${Cluster(context.system).selfAddress.port.get}")
case path
new File(path)
}
dir.mkdirs()
Env.create()
.setMapSize(mapSize)
.setMaxDbs(1)
.open(dir, EnvFlags.MDB_NOLOCK)
}
val db = env.openDbi("ddata", DbiFlags.MDB_CREATE)
val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize)
var valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow when needed
def ensureValueBufferSize(size: Int): Unit = {
if (valueBuffer.remaining < size) {
DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer)
valueBuffer = ByteBuffer.allocateDirect(size * 2)
}
}
// pending write behind
val pending = new java.util.HashMap[String, ReplicatedData]
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
// Load is only done on first start, not on restart
context.become(active)
}
override def postStop(): Unit = {
super.postStop()
writeBehind()
Try(db.close())
Try(env.close())
DirectByteBufferPool.tryCleanDirectByteBuffer(keyBuffer)
DirectByteBufferPool.tryCleanDirectByteBuffer(valueBuffer)
}
def receive = init
def init: Receive = {
case LoadAll
val t0 = System.nanoTime()
val tx = env.txnRead()
try {
val iter = db.iterate(tx)
try {
var n = 0
val loadData = LoadData(iter.asScala.map { entry
n += 1
val keyArray = Array.ofDim[Byte](entry.key.remaining)
entry.key.get(keyArray)
val key = new String(keyArray, ByteString.UTF_8)
val valArray = Array.ofDim[Byte](entry.`val`.remaining)
entry.`val`.get(valArray)
val envelope = serializer.fromBinary(valArray, manifest).asInstanceOf[DurableDataEnvelope]
key envelope.data
}.toMap)
if (loadData.data.nonEmpty)
sender() ! loadData
sender() ! LoadAllCompleted
if (log.isDebugEnabled)
log.debug("load all of [{}] entries took [{} ms]", n,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
context.become(active)
} finally {
Try(iter.close())
}
} catch {
case NonFatal(e)
throw new LoadFailed("failed to load durable distributed-data", e)
} finally {
Try(tx.close())
}
}
def active: Receive = {
case Store(key, data, reply)
try {
if (writeBehindInterval.length == 0) {
dbPut(OptionVal.None, key, data)
} else {
if (pending.isEmpty)
context.system.scheduler.scheduleOnce(writeBehindInterval, self, WriteBehind)(context.system.dispatcher)
pending.put(key, data)
}
reply match {
case Some(StoreReply(successMsg, _, replyTo))
replyTo ! successMsg
case None
}
} catch {
case NonFatal(e)
log.error(e, "failed to store [{}]", key)
reply match {
case Some(StoreReply(_, failureMsg, replyTo))
replyTo ! failureMsg
case None
}
}
case WriteBehind
writeBehind()
}
def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: String, data: ReplicatedData): Unit = {
try {
keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
val value = serializer.toBinary(new DurableDataEnvelope(data))
ensureValueBufferSize(value.length)
valueBuffer.put(value).flip()
tx match {
case OptionVal.None db.put(keyBuffer, valueBuffer)
case OptionVal.Some(t) db.put(t, keyBuffer, valueBuffer)
}
} finally {
keyBuffer.clear()
valueBuffer.clear()
}
}
def writeBehind(): Unit = {
if (!pending.isEmpty()) {
val t0 = System.nanoTime()
val tx = env.txnWrite()
try {
val iter = pending.entrySet.iterator
while (iter.hasNext) {
val entry = iter.next()
dbPut(OptionVal.Some(tx), entry.getKey, entry.getValue)
}
tx.commit()
if (log.isDebugEnabled)
log.debug("store and commit of [{}] entries took [{} ms]", pending.size,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
} catch {
case NonFatal(e)
import scala.collection.JavaConverters._
log.error(e, "failed to store [{}]", pending.keySet.asScala.mkString(","))
tx.abort()
} finally {
pending.clear()
}
}
}
}

View file

@ -24,6 +24,7 @@ private[akka] final case class PruningState(owner: UniqueAddress, phase: Pruning
def merge(that: PruningState): PruningState =
(this.phase, that.phase) match {
// FIXME this will add the PruningPerformed back again when one is None
case (PruningPerformed, _) this
case (_, PruningPerformed) that
case (PruningInitialized(thisSeen), PruningInitialized(thatSeen))

View file

@ -4,7 +4,9 @@
package akka.cluster.ddata
import java.security.MessageDigest
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
@ -37,6 +39,11 @@ import akka.dispatch.Dispatchers
import akka.actor.DeadLetterSuppression
import akka.cluster.ddata.Key.KeyR
import java.util.Optional
import akka.cluster.ddata.DurableStore._
import akka.actor.ExtendedActorSystem
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
object ReplicatorSettings {
@ -56,6 +63,8 @@ object ReplicatorSettings {
case "" Dispatchers.DefaultDispatcherId
case id id
}
import scala.collection.JavaConverters._
new ReplicatorSettings(
role = roleOption(config.getString("role")),
gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis,
@ -63,7 +72,9 @@ object ReplicatorSettings {
maxDeltaElements = config.getInt("max-delta-elements"),
dispatcher = dispatcher,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis)
maxPruningDissemination = config.getDuration("max-pruning-dissemination", MILLISECONDS).millis,
durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))),
durableKeys = config.getStringList("durable.keys").asScala.toSet)
}
/**
@ -91,6 +102,13 @@ object ReplicatorSettings {
* completing the pruning process of data associated with removed cluster nodes.
* The time measurement is stopped when any replica is unreachable, so it should
* be configured to worst case in a healthy cluster.
* @param durableStoreProps Props for the durable store actor,
* the `Left` alternative is a tuple of fully qualified actor class name and
* the config constructor parameter of that class,
* the `Right` alternative is the `Props` of the actor.
* @param durableKeys Keys that are durable. Prefix matching is supported by using
* `*` at the end of a key. All entries can be made durable by including "*"
* in the `Set`.
*/
final class ReplicatorSettings(
val role: Option[String],
@ -99,7 +117,15 @@ final class ReplicatorSettings(
val maxDeltaElements: Int,
val dispatcher: String,
val pruningInterval: FiniteDuration,
val maxPruningDissemination: FiniteDuration) {
val maxPruningDissemination: FiniteDuration,
val durableStoreProps: Either[(String, Config), Props],
val durableKeys: Set[String]) {
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, Right(Props.empty), Set.empty)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@ -125,16 +151,35 @@ final class ReplicatorSettings(
def withPruning(pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration): ReplicatorSettings =
copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination)
def withDurableStoreProps(durableStoreProps: Props): ReplicatorSettings =
copy(durableStoreProps = Right(durableStoreProps))
/**
* Scala API
*/
def withDurableKeys(durableKeys: Set[String]): ReplicatorSettings =
copy(durableKeys = durableKeys)
/**
* Java API
*/
def withDurableKeys(durableKeys: java.util.Set[String]): ReplicatorSettings = {
import scala.collection.JavaConverters._
withDurableKeys(durableKeys.asScala.toSet)
}
private def copy(
role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval,
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
maxDeltaElements: Int = maxDeltaElements,
dispatcher: String = dispatcher,
pruningInterval: FiniteDuration = pruningInterval,
maxPruningDissemination: FiniteDuration = maxPruningDissemination): ReplicatorSettings =
role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval,
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
maxDeltaElements: Int = maxDeltaElements,
dispatcher: String = dispatcher,
pruningInterval: FiniteDuration = pruningInterval,
maxPruningDissemination: FiniteDuration = maxPruningDissemination,
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
durableKeys: Set[String] = durableKeys): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
pruningInterval, maxPruningDissemination)
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys)
}
object Replicator {
@ -142,8 +187,12 @@ object Replicator {
/**
* Factory method for the [[akka.actor.Props]] of the [[Replicator]] actor.
*/
def props(settings: ReplicatorSettings): Props =
def props(settings: ReplicatorSettings): Props = {
require(
settings.durableKeys.isEmpty || (settings.durableStoreProps != Right(Props.empty)),
"durableStoreProps must be defined when durableKeys are defined")
Props(new Replicator(settings)).withDeploy(Deploy.local).withDispatcher(settings.dispatcher)
}
sealed trait ReadConsistency {
def timeout: FiniteDuration
@ -400,6 +449,17 @@ object Replicator {
extends UpdateFailure[A] {
override def toString: String = s"ModifyFailure [$key]: $errorMessage"
}
/**
* The local store or direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] due to durable store errors. This is
* only used for entries that have been configured to be durable.
*
* The `Update` was still performed in memory locally and possibly replicated to some nodes,
* but it might not have been written to durable storage.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A] with DeleteResponse[A]
/**
* Send this message to the local `Replicator` to delete a data value for the
@ -460,6 +520,7 @@ object Replicator {
case object ClockTick
final case class Write(key: String, envelope: DataEnvelope) extends ReplicatorMessage
case object WriteAck extends ReplicatorMessage with DeadLetterSuppression
case object WriteNack extends ReplicatorMessage with DeadLetterSuppression
final case class Read(key: String) extends ReplicatorMessage
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: String, envelope: DataEnvelope)
@ -507,7 +568,8 @@ object Replicator {
var mergedRemovedNodePruning = other.pruning
for ((key, thisValue) pruning) {
mergedRemovedNodePruning.get(key) match {
case None mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue)
case None
mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue)
case Some(thatValue)
mergedRemovedNodePruning = mergedRemovedNodePruning.updated(key, thisValue merge thatValue)
}
@ -751,6 +813,21 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val serializer = SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
val maxPruningDisseminationNanos = maxPruningDissemination.toNanos
val hasDurableKeys = settings.durableKeys.nonEmpty
val durable = settings.durableKeys.filterNot(_.endsWith("*"))
val durableWildcards = settings.durableKeys.collect { case k if k.endsWith("*") k.dropRight(1) }
val durableStore: ActorRef =
if (hasDurableKeys) {
val props = settings.durableStoreProps match {
case Right(p) p
case Left((s, c))
val clazz = context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess.getClassFor[Actor](s).get
Props(clazz, c).withDispatcher(c.getString("use-dispatcher"))
}
context.watch(context.actorOf(props.withDeploy(Deploy.local), "durableStore"))
} else
context.system.deadLetters // not used
// cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty
@ -784,6 +861,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
var subscriptionKeys = Map.empty[String, KeyR]
override def preStart(): Unit = {
if (hasDurableKeys)
durableStore ! LoadAll
val leaderChangedClass = if (role.isDefined) classOf[RoleLeaderChanged] else classOf[LeaderChanged]
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[ReachabilityEvent], leaderChangedClass)
@ -799,7 +878,47 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
def receive = normalReceive
override val supervisorStrategy = {
def fromDurableStore: Boolean = sender() == durableStore && sender() != context.system.deadLetters
OneForOneStrategy()(
({
case e @ (_: DurableStore.LoadFailed | _: ActorInitializationException) if fromDurableStore
log.error(e, "Stopping distributed-data Replicator due to load or startup failure in durable store")
context.stop(self)
SupervisorStrategy.Stop
}: SupervisorStrategy.Decider).orElse(SupervisorStrategy.defaultDecider))
}
def receive =
if (hasDurableKeys) load.orElse(normalReceive)
else normalReceive
val load: Receive = {
case LoadData(data)
data.foreach {
case (key, d)
val envelope = DataEnvelope(d)
write(key, envelope) match {
case Some(newEnvelope)
if (newEnvelope.data ne envelope.data)
durableStore ! Store(key, newEnvelope.data, None)
case None
}
}
case LoadAllCompleted
context.become(normalReceive)
self ! FlushChanges
case GetReplicaCount
// 0 until durable data has been loaded, used by test
sender() ! ReplicaCount(0)
case RemovedNodePruningTick | FlushChanges | GossipTick
// ignore scheduled ticks when loading durable data
case m @ (_: Read | _: Write | _: Status | _: Gossip)
// ignore gossip and replication when loading durable data
log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
}
val normalReceive: Receive = {
case Get(key, consistency, req) receiveGet(key, consistency, req)
@ -872,11 +991,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, newData)
val envelope = DataEnvelope(pruningCleanupTombstoned(newData))
setData(key.id, envelope)
if (isLocalUpdate(writeConsistency))
sender() ! UpdateSuccess(key, req)
else
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, sender())
.withDispatcher(context.props.dispatcher))
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
durableStore ! Store(key.id, envelope.data,
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), sender())))
else
sender() ! UpdateSuccess(key, req)
} else {
val writeAggregator =
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, sender(), durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, envelope.data,
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
case Failure(e: DataDeleted[_])
log.debug("Received Update for deleted key [{}]", key)
sender() ! e
@ -886,6 +1016,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def isDurable(key: String): Boolean =
durable(key) || (durableWildcards.nonEmpty && durableWildcards.exists(key.startsWith))
def isLocalUpdate(writeConsistency: WriteConsistency): Boolean =
writeConsistency match {
case WriteLocal true
@ -894,28 +1027,43 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveWrite(key: String, envelope: DataEnvelope): Unit = {
write(key, envelope)
sender() ! WriteAck
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, Some(StoreReply(WriteAck, WriteNack, sender())))
else
sender() ! WriteAck
case None
}
}
def write(key: String, writeEnvelope: DataEnvelope): Unit =
def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope] =
getData(key) match {
case Some(DataEnvelope(DeletedData, _)) // already deleted
case Some(DataEnvelope(DeletedData, _)) Some(writeEnvelope) // already deleted
case Some(envelope @ DataEnvelope(existing, _))
if (existing.getClass == writeEnvelope.data.getClass || writeEnvelope.data == DeletedData) {
val merged = envelope.merge(pruningCleanupTombstoned(writeEnvelope)).addSeen(selfAddress)
setData(key, merged)
Some(merged)
} else {
log.warning(
"Wrong type for writing [{}], existing type [{}], got [{}]",
key, existing.getClass.getName, writeEnvelope.data.getClass.getName)
None
}
case None
setData(key, pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress))
val cleaned = pruningCleanupTombstoned(writeEnvelope).addSeen(selfAddress)
setData(key, cleaned)
Some(cleaned)
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope)
write(key, writeEnvelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
case None
}
sender() ! ReadRepairAck
}
@ -933,11 +1081,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
sender() ! DataDeleted(key)
case _
setData(key.id, DeletedEnvelope)
if (isLocalUpdate(consistency))
sender() ! DeleteSuccess(key)
else
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, sender())
.withDispatcher(context.props.dispatcher))
val durable = isDurable(key.id)
if (isLocalUpdate(consistency)) {
if (durable)
durableStore ! Store(key.id, DeletedData,
Some(StoreReply(DeleteSuccess(key), StoreFailure(key, None), sender())))
else
sender() ! DeleteSuccess(key)
} else {
val writeAggregator =
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, None, nodes, sender(), durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, DeletedData,
Some(StoreReply(DeleteSuccess(key), StoreFailure(key, None), writeAggregator)))
}
}
}
}
@ -1075,7 +1234,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
updatedData.foreach {
case (key, envelope)
val hadData = dataEntries.contains(key)
write(key, envelope)
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
case None
}
if (sendBack) getData(key) match {
case Some(d)
if (hadData || d.pruning.nonEmpty)
@ -1108,14 +1272,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
(newSubscribers.exists { case (k, s) s.contains(subscriber) })
def receiveTerminated(ref: ActorRef): Unit = {
val keys1 = subscribers.collect { case (k, s) if s.contains(ref) k }
keys1.foreach { key subscribers.removeBinding(key, ref) }
val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) k }
keys2.foreach { key newSubscribers.removeBinding(key, ref) }
if (ref == durableStore) {
log.error("Stopping distributed-data Replicator because durable store terminated")
context.stop(self)
} else {
val keys1 = subscribers.collect { case (k, s) if s.contains(ref) k }
keys1.foreach { key subscribers.removeBinding(key, ref) }
val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) k }
keys2.foreach { key newSubscribers.removeBinding(key, ref) }
(keys1 ++ keys2).foreach { key
if (!subscribers.contains(key) && !newSubscribers.contains(key))
subscriptionKeys -= key
(keys1 ++ keys2).foreach { key
if (!subscribers.contains(key) && !newSubscribers.contains(key))
subscriptionKeys -= key
}
}
}
@ -1161,7 +1330,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
initRemovedNodePruning()
}
performRemovedNodePruning()
tombstoneRemovedNodePruning()
// FIXME tombstoneRemovedNodePruning doesn't work, since merge of PruningState will add the PruningPerformed back again
// tombstoneRemovedNodePruning()
}
def initRemovedNodePruning(): Unit = {
@ -1171,6 +1341,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}(collection.breakOut)
if (removedSet.nonEmpty) {
// FIXME handle pruning of durable data, this is difficult and requires more thought
for ((key, (envelope, _)) dataEntries; removed removedSet) {
def init(): Unit = {
@ -1206,6 +1377,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
pruningPerformed = pruningPerformed.updated(removed, allReachableClockTime)
log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress)
setData(key, newEnvelope)
if ((newEnvelope.data ne data) && isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
case _
}
case _ // deleted, or pruning not needed
@ -1225,6 +1398,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
// FIXME pruningPerformed is only updated on one node, but tombstoneNodes should be on all
pruningPerformed.foreach {
case (removed, timestamp) if ((allReachableClockTime - timestamp) > maxPruningDisseminationNanos) &&
allPruningPerformed(removed)
@ -1234,7 +1408,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
tombstoneNodes += removed
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _))
setData(key, pruningCleanupTombstoned(removed, envelope))
val newEnvelope = pruningCleanupTombstoned(removed, envelope)
setData(key, newEnvelope)
if ((newEnvelope.data ne data) && isDurable(key))
durableStore ! Store(key, newEnvelope.data, None)
case _ // deleted, or pruning not needed
}
case (removed, timestamp) // not ready
@ -1325,8 +1502,9 @@ private[akka] object WriteAggregator {
consistency: Replicator.WriteConsistency,
req: Option[Any],
nodes: Set[Address],
replyTo: ActorRef): Props =
Props(new WriteAggregator(key, envelope, consistency, req, nodes, replyTo))
replyTo: ActorRef,
durable: Boolean): Props =
Props(new WriteAggregator(key, envelope, consistency, req, nodes, replyTo, durable))
.withDeploy(Deploy.local)
}
@ -1339,7 +1517,8 @@ private[akka] class WriteAggregator(
consistency: Replicator.WriteConsistency,
req: Option[Any],
override val nodes: Set[Address],
replyTo: ActorRef) extends ReadWriteAggregator {
replyTo: ActorRef,
durable: Boolean) extends ReadWriteAggregator {
import Replicator._
import Replicator.Internal._
@ -1355,41 +1534,65 @@ private[akka] class WriteAggregator(
val w = N / 2 + 1 // write to at least (N/2+1) nodes
N - w
case WriteLocal
throw new IllegalArgumentException("ReadLocal not supported by WriteAggregator")
throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator")
}
val writeMsg = Write(key.id, envelope)
var gotLocalStoreReply = !durable
var gotWriteNackFrom = Set.empty[Address]
override def preStart(): Unit = {
primaryNodes.foreach { replica(_) ! writeMsg }
if (remaining.size == doneWhenRemainingSize)
reply(ok = true)
else if (doneWhenRemainingSize < 0 || remaining.size < doneWhenRemainingSize)
reply(ok = false)
if (isDone) reply(isTimeout = false)
}
def receive = {
def receive: Receive = {
case WriteAck
remaining -= senderAddress()
if (remaining.size == doneWhenRemainingSize)
reply(ok = true)
if (isDone) reply(isTimeout = false)
case WriteNack
gotWriteNackFrom += senderAddress()
if (isDone) reply(isTimeout = false)
case _: Replicator.UpdateSuccess[_]
gotLocalStoreReply = true
if (isDone) reply(isTimeout = false)
case f: Replicator.StoreFailure[_]
gotLocalStoreReply = true
gotWriteNackFrom += Cluster(context.system).selfAddress
if (isDone) reply(isTimeout = false)
case SendToSecondary
secondaryNodes.foreach { replica(_) ! writeMsg }
case ReceiveTimeout reply(ok = false)
case ReceiveTimeout
reply(isTimeout = true)
}
def senderAddress(): Address = sender().path.address
def reply(ok: Boolean): Unit = {
if (ok && envelope.data == DeletedData)
replyTo.tell(DeleteSuccess(key), context.parent)
else if (ok)
replyTo.tell(UpdateSuccess(key, req), context.parent)
else if (envelope.data == DeletedData)
replyTo.tell(ReplicationDeleteFailure(key), context.parent)
else
replyTo.tell(UpdateTimeout(key, req), context.parent)
def isDone: Boolean =
gotLocalStoreReply &&
(remaining.size <= doneWhenRemainingSize || (remaining diff gotWriteNackFrom).isEmpty ||
notEnoughNodes)
def notEnoughNodes: Boolean =
doneWhenRemainingSize < 0 || nodes.size < doneWhenRemainingSize
def reply(isTimeout: Boolean): Unit = {
val isDelete = envelope.data == DeletedData
val isSuccess = remaining.size <= doneWhenRemainingSize && !notEnoughNodes
val isTimeoutOrNotEnoughNodes = isTimeout || notEnoughNodes || gotWriteNackFrom.isEmpty
val replyMsg =
if (isSuccess && isDelete) DeleteSuccess(key)
else if (isSuccess) UpdateSuccess(key, req)
else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key)
else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req)
else StoreFailure(key, req)
replyTo.tell(replyMsg, context.parent)
context.stop(self)
}
}

View file

@ -25,6 +25,8 @@ import akka.cluster.ddata.Key.KeyR
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
/**
* INTERNAL API
@ -167,6 +169,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
val ReadResultManifest = "L"
val StatusManifest = "M"
val GossipManifest = "N"
val WriteNackManifest = "O"
val DurableDataEnvelopeManifest = "P"
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef](
GetManifest getFromBinary,
@ -182,42 +186,48 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
ReadManifest readFromBinary,
ReadResultManifest readResultFromBinary,
StatusManifest statusFromBinary,
GossipManifest gossipFromBinary)
GossipManifest gossipFromBinary,
WriteNackManifest (_ WriteNack),
DurableDataEnvelopeManifest durableDataEnvelopeFromBinary)
override def manifest(obj: AnyRef): String = obj match {
case _: DataEnvelope DataEnvelopeManifest
case _: Write WriteManifest
case WriteAck WriteAckManifest
case _: Read ReadManifest
case _: ReadResult ReadResultManifest
case _: Status StatusManifest
case _: Get[_] GetManifest
case _: GetSuccess[_] GetSuccessManifest
case _: Changed[_] ChangedManifest
case _: NotFound[_] NotFoundManifest
case _: GetFailure[_] GetFailureManifest
case _: Subscribe[_] SubscribeManifest
case _: Unsubscribe[_] UnsubscribeManifest
case _: Gossip GossipManifest
case _: DataEnvelope DataEnvelopeManifest
case _: Write WriteManifest
case WriteAck WriteAckManifest
case _: Read ReadManifest
case _: ReadResult ReadResultManifest
case _: Status StatusManifest
case _: Get[_] GetManifest
case _: GetSuccess[_] GetSuccessManifest
case _: DurableDataEnvelope DurableDataEnvelopeManifest
case _: Changed[_] ChangedManifest
case _: NotFound[_] NotFoundManifest
case _: GetFailure[_] GetFailureManifest
case _: Subscribe[_] SubscribeManifest
case _: Unsubscribe[_] UnsubscribeManifest
case _: Gossip GossipManifest
case WriteNack WriteNackManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: DataEnvelope dataEnvelopeToProto(m).toByteArray
case m: Write writeCache.getOrAdd(m)
case WriteAck writeAckBytes
case m: Read readCache.getOrAdd(m)
case m: ReadResult readResultToProto(m).toByteArray
case m: Status statusToProto(m).toByteArray
case m: Get[_] getToProto(m).toByteArray
case m: GetSuccess[_] getSuccessToProto(m).toByteArray
case m: Changed[_] changedToProto(m).toByteArray
case m: NotFound[_] notFoundToProto(m).toByteArray
case m: GetFailure[_] getFailureToProto(m).toByteArray
case m: Subscribe[_] subscribeToProto(m).toByteArray
case m: Unsubscribe[_] unsubscribeToProto(m).toByteArray
case m: Gossip compress(gossipToProto(m))
case m: DataEnvelope dataEnvelopeToProto(m).toByteArray
case m: Write writeCache.getOrAdd(m)
case WriteAck writeAckBytes
case m: Read readCache.getOrAdd(m)
case m: ReadResult readResultToProto(m).toByteArray
case m: Status statusToProto(m).toByteArray
case m: Get[_] getToProto(m).toByteArray
case m: GetSuccess[_] getSuccessToProto(m).toByteArray
case m: DurableDataEnvelope durableDataEnvelopeToProto(m).toByteArray
case m: Changed[_] changedToProto(m).toByteArray
case m: NotFound[_] notFoundToProto(m).toByteArray
case m: GetFailure[_] getFailureToProto(m).toByteArray
case m: Subscribe[_] subscribeToProto(m).toByteArray
case m: Unsubscribe[_] unsubscribeToProto(m).toByteArray
case m: Gossip compress(gossipToProto(m))
case WriteNack dm.Empty.getDefaultInstance.toByteArray
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
@ -450,4 +460,18 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
ReadResult(envelope)
}
private def durableDataEnvelopeToProto(durableDataEnvelope: DurableDataEnvelope): dm.DurableDataEnvelope = {
dm.DurableDataEnvelope.newBuilder()
.setData(otherMessageToProto(durableDataEnvelope.data))
.build()
}
private def durableDataEnvelopeFromBinary(bytes: Array[Byte]): DurableDataEnvelope =
durableDataEnvelopeFromProto(dm.DurableDataEnvelope.parseFrom(bytes))
private def durableDataEnvelopeFromProto(durableDataEnvelope: dm.DurableDataEnvelope): DurableDataEnvelope = {
val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData]
new DurableDataEnvelope(data)
}
}

View file

@ -0,0 +1,338 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.ddata
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
final case class DurableDataSpecConfig(writeBehind: Boolean) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = DEBUG
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
akka.cluster.distributed-data.durable.keys = ["durable*"]
akka.cluster.distributed-data.durable.lmdb {
dir = target/DurableDataSpec-${System.currentTimeMillis}-ddata
map-size = 10 MiB
write-behind-interval = ${if (writeBehind) "200ms" else "off"}
}
akka.test.single-expect-default = 5s
"""))
}
object DurableDataSpec {
def testDurableStoreProps(failLoad: Boolean = false, failStore: Boolean = false): Props =
Props(new TestDurableStore(failLoad, failStore))
class TestDurableStore(failLoad: Boolean, failStore: Boolean) extends Actor {
import DurableStore._
def receive = {
case LoadAll
if (failLoad)
throw new LoadFailed("failed to load durable distributed-data") with NoStackTrace
else
sender() ! LoadAllCompleted
case Store(key, data, reply)
if (failStore) reply match {
case Some(StoreReply(_, failureMsg, replyTo)) replyTo ! failureMsg
case None
}
else reply match {
case Some(StoreReply(successMsg, _, replyTo)) replyTo ! successMsg
case None
}
}
}
}
class DurableDataSpecMultiJvmNode1 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = false))
class DurableDataSpecMultiJvmNode2 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = false))
class DurableDataWriteBehindSpecMultiJvmNode1 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = true))
class DurableDataWriteBehindSpecMultiJvmNode2 extends DurableDataSpec(DurableDataSpecConfig(writeBehind = true))
abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
extends MultiNodeSpec(multiNodeConfig) with STMultiNodeSpec with ImplicitSender {
import DurableDataSpec._
import Replicator._
import multiNodeConfig._
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val timeout = 5.seconds.dilated
val writeTwo = WriteTo(2, timeout)
val readTwo = ReadFrom(2, timeout)
val KeyA = GCounterKey("durable-A")
val KeyB = GCounterKey("durable-B")
val KeyC = ORSetKey[String]("durable-C")
var testStepCounter = 0
def enterBarrierAfterTestStep(): Unit = {
testStepCounter += 1
enterBarrier("after-" + testStepCounter)
}
def newReplicator(sys: ActorSystem = system) = sys.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second)), "replicator-" + testStepCounter)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Durable CRDT" must {
"work in single node cluster" in {
join(first, first)
runOn(first) {
val r = newReplicator()
within(5.seconds) {
awaitAssert {
r ! GetReplicaCount
expectMsg(ReplicaCount(1))
}
}
r ! Get(KeyA, ReadLocal)
expectMsg(NotFound(KeyA, None))
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyA, None))
watch(r)
system.stop(r)
expectTerminated(r)
var r2: ActorRef = null
awaitAssert(r2 = newReplicator()) // try until name is free
// wait until all loaded
awaitAssert {
r2 ! GetKeyIds
expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
}
r2 ! Get(KeyA, ReadLocal)
expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(3)
watch(r2)
system.stop(r2)
expectTerminated(r2)
}
enterBarrierAfterTestStep()
}
}
"work in multi node cluster" in {
join(second, first)
val r = newReplicator()
within(5.seconds) {
awaitAssert {
r ! GetReplicaCount
expectMsg(ReplicaCount(2))
}
}
enterBarrier("both-initalized")
r ! Update(KeyA, GCounter(), writeTwo)(_ + 1)
expectMsg(UpdateSuccess(KeyA, None))
r ! Update(KeyC, ORSet.empty[String], writeTwo)(_ + myself.name)
expectMsg(UpdateSuccess(KeyC, None))
enterBarrier("update-done-" + testStepCounter)
r ! Get(KeyA, readTwo)
expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2)
r ! Get(KeyC, readTwo)
expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should be(Set(first.name, second.name))
enterBarrier("values-verified-" + testStepCounter)
watch(r)
system.stop(r)
expectTerminated(r)
var r2: ActorRef = null
awaitAssert(r2 = newReplicator()) // try until name is free
awaitAssert {
r2 ! GetKeyIds
expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
}
r2 ! Get(KeyA, ReadLocal)
expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2)
r2 ! Get(KeyC, ReadLocal)
expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should be(Set(first.name, second.name))
enterBarrierAfterTestStep()
}
"be durable after gossip update" in {
val r = newReplicator()
runOn(first) {
r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ + myself.name)
expectMsg(UpdateSuccess(KeyC, None))
}
runOn(second) {
r ! Subscribe(KeyC, testActor)
expectMsgType[Changed[ORSet[String]]].dataValue.elements should be(Set(first.name))
// must do one more roundtrip to be sure that it keyB is stored, since Changed might have
// been sent out before storage
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
expectMsg(UpdateSuccess(KeyA, None))
watch(r)
system.stop(r)
expectTerminated(r)
var r2: ActorRef = null
awaitAssert(r2 = newReplicator()) // try until name is free
awaitAssert {
r2 ! GetKeyIds
expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
}
r2 ! Get(KeyC, ReadLocal)
expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should be(Set(first.name))
}
enterBarrierAfterTestStep()
}
"handle Update before load" in {
runOn(first) {
val sys1 = ActorSystem("AdditionalSys", system.settings.config)
val addr = Cluster(sys1).selfAddress
try {
Cluster(sys1).join(addr)
new TestKit(sys1) with ImplicitSender {
val r = newReplicator(sys1)
within(5.seconds) {
awaitAssert {
r ! GetReplicaCount
expectMsg(ReplicaCount(1))
}
}
r ! Get(KeyA, ReadLocal)
expectMsg(NotFound(KeyA, None))
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyB, GCounter(), WriteLocal)(_ + 1)
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyB, None))
watch(r)
system.stop(r)
expectTerminated(r)
}
} finally {
Await.ready(sys1.terminate(), 10.seconds)
}
val sys2 = ActorSystem(
"AdditionalSys",
// use the same port
ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port = ${addr.port.get}
akka.remote.netty.tcp.port = ${addr.port.get}
""").withFallback(system.settings.config))
try {
Cluster(sys2).join(addr)
new TestKit(sys2) with ImplicitSender {
val r2: ActorRef = newReplicator(sys2)
// it should be possible to update while loading is in progress
r2 ! Update(KeyB, GCounter(), WriteLocal)(_ + 1)
expectMsg(UpdateSuccess(KeyB, None))
// wait until all loaded
awaitAssert {
r2 ! GetKeyIds
expectMsgType[GetKeyIdsResult].keyIds should ===(Set(KeyA.id, KeyB.id))
}
r2 ! Get(KeyA, ReadLocal)
expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(3)
r2 ! Get(KeyB, ReadLocal)
expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2)
}
} finally {
Await.ready(sys1.terminate(), 10.seconds)
}
}
enterBarrierAfterTestStep()
}
"stop Replicator if Load fails" in {
runOn(first) {
val r = system.actorOf(
Replicator.props(
ReplicatorSettings(system).withDurableStoreProps(testDurableStoreProps(failLoad = true))),
"replicator-" + testStepCounter)
watch(r)
expectTerminated(r)
}
enterBarrierAfterTestStep()
}
"reply with StoreFailure if store fails" in {
runOn(first) {
val r = system.actorOf(
Replicator.props(
ReplicatorSettings(system).withDurableStoreProps(testDurableStoreProps(failStore = true))),
"replicator-" + testStepCounter)
r ! Update(KeyA, GCounter(), WriteLocal, request = Some("a"))(_ + 1)
expectMsg(StoreFailure(KeyA, Some("a")))
}
enterBarrierAfterTestStep()
}
}

View file

@ -0,0 +1,167 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.ddata
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.InitialStateAsEvents
import akka.cluster.ClusterEvent.MemberUp
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.ActorRef
import scala.concurrent.Await
object DurablePruningSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off
akka.cluster.distributed-data.durable.keys = ["*"]
akka.cluster.distributed-data.durable.lmdb {
dir = target/DurablePruningSpec-${System.currentTimeMillis}-ddata
map-size = 10 MiB
}
""")))
}
class DurablePruningSpecMultiJvmNode1 extends DurablePruningSpec
class DurablePruningSpecMultiJvmNode2 extends DurablePruningSpec
class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiNodeSpec with ImplicitSender {
import DurablePruningSpec._
import Replicator._
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val maxPruningDissemination = 3.seconds
def startReplicator(sys: ActorSystem): ActorRef =
sys.actorOf(Replicator.props(
ReplicatorSettings(sys).withGossipInterval(1.second)
.withPruning(pruningInterval = 1.second, maxPruningDissemination)), "replicator")
val replicator = startReplicator(system)
val timeout = 5.seconds.dilated
val KeyA = GCounterKey("A")
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Pruning of durable CRDT" must {
"move data from removed node" in {
join(first, first)
join(second, first)
val sys2 = ActorSystem(system.name, system.settings.config)
val cluster2 = Cluster(sys2)
val replicator2 = startReplicator(sys2)
val probe2 = TestProbe()(sys2)
Cluster(sys2).join(node(first).address)
within(5.seconds) {
awaitAssert {
replicator ! GetReplicaCount
expectMsg(ReplicaCount(4))
replicator2.tell(GetReplicaCount, probe2.ref)
probe2.expectMsg(ReplicaCount(4))
}
}
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3)
expectMsg(UpdateSuccess(KeyA, None))
replicator2.tell(Update(KeyA, GCounter(), WriteLocal)(_.increment(cluster2, 2)), probe2.ref)
probe2.expectMsg(UpdateSuccess(KeyA, None))
enterBarrier("updates-done")
within(10.seconds) {
awaitAssert {
replicator ! Get(KeyA, ReadAll(1.second))
val counter1 = expectMsgType[GetSuccess[GCounter]].dataValue
counter1.value should be(10)
counter1.state.size should be(4)
}
}
within(10.seconds) {
awaitAssert {
replicator2.tell(Get(KeyA, ReadAll(1.second)), probe2.ref)
val counter2 = probe2.expectMsgType[GetSuccess[GCounter]].dataValue
counter2.value should be(10)
counter2.state.size should be(4)
}
}
enterBarrier("get1")
runOn(first) {
cluster.leave(cluster2.selfAddress)
}
within(15.seconds) {
awaitAssert {
replicator ! GetReplicaCount
expectMsg(ReplicaCount(3))
}
}
enterBarrier("removed")
runOn(first) {
Await.ready(sys2.terminate(), 5.seconds)
}
within(15.seconds) {
awaitAssert {
replicator ! Get(KeyA, ReadLocal)
val counter3 = expectMsgType[GetSuccess[GCounter]].dataValue
counter3.value should be(10)
counter3.state.size should be(3)
}
}
enterBarrier("pruned")
// let it become tombstone
Thread.sleep(5000)
runOn(first) {
val addr = cluster2.selfAddress
val sys3 = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port = ${addr.port.get}
akka.remote.netty.tcp.port = ${addr.port.get}
""").withFallback(system.settings.config))
val cluster3 = Cluster(sys3)
val replicator3 = startReplicator(sys3)
val probe3 = TestProbe()(sys3)
Cluster(sys3).join(node(first).address)
within(10.seconds) {
awaitAssert {
replicator3.tell(Get(KeyA, ReadLocal), probe3.ref)
val counter4 = probe3.expectMsgType[GetSuccess[GCounter]].dataValue
counter4.value should be(10)
counter4.state.size should be(3)
}
}
}
enterBarrier("after-1")
}
}
}

View file

@ -23,7 +23,7 @@ object PerformanceSpec extends MultiNodeConfig {
val n4 = role("n4")
val n5 = role("n5")
commonConfig(ConfigFactory.parseString("""
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.actor.provider = "cluster"
@ -34,6 +34,10 @@ object PerformanceSpec extends MultiNodeConfig {
akka.testconductor.barrier-timeout = 60 s
akka.cluster.distributed-data.gossip-interval = 1 s
akka.actor.serialize-messages = off
#akka.cluster.distributed-data.durable.keys = ["*"]
#akka.cluster.distributed-data.durable.lmdb.dir = target/PerformanceSpec-${System.currentTimeMillis}-ddata
#akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200ms
"""))
def countDownProps(latch: TestLatch): Props = Props(new CountDown(latch)).withDeploy(Deploy.local)

View file

@ -46,6 +46,7 @@ class LocalConcurrencySpec(_system: ActorSystem) extends TestKit(_system)
ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0
""")))
}

View file

@ -20,12 +20,12 @@ object WriteAggregatorSpec {
val key = GSetKey[String]("a")
def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef): Props =
Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo))
probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef, durable: Boolean): Props =
Props(new TestWriteAggregator(data, consistency, probes, nodes, replyTo, durable))
class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef)
extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo) {
probes: Map[Address, ActorRef], nodes: Set[Address], replyTo: ActorRef, durable: Boolean)
extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, replyTo, durable) {
override def replica(address: Address): ActorSelection =
context.actorSelection(probes(address).path)
@ -43,6 +43,8 @@ object WriteAggregatorSpec {
def receive = {
case WriteAck
replicator.foreach(_ ! WriteAck)
case WriteNack
replicator.foreach(_ ! WriteNack)
case msg
replicator = Some(sender())
replica ! msg
@ -50,9 +52,14 @@ object WriteAggregatorSpec {
}
}
class WriteAggregatorSpec extends AkkaSpec("""
class WriteAggregatorSpec extends AkkaSpec(s"""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.cluster.distributed-data.durable.lmdb {
dir = target/WriteAggregatorSpec-${System.currentTimeMillis}-ddata
map-size = 10 MiB
}
""")
with ImplicitSender {
@ -69,7 +76,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
val data = GSet.empty + "A" + "B"
val timeout = 3.seconds.dilated
val writeTwo = WriteTo(2, timeout)
val writeThree = WriteTo(3, timeout)
val writeMajority = WriteMajority(timeout)
def probes(probe: ActorRef): Map[Address, ActorRef] =
@ -79,7 +86,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
"send to at least N/2+1 replicas when WriteMajority" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor))
data, writeMajority, probes(probe.ref), nodes, testActor, durable = false))
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
@ -93,7 +100,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
"send to more when no immediate reply" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor))
data, writeMajority, probes(probe.ref), nodes, testActor, durable = false))
probe.expectMsgType[Write]
// no reply
@ -112,7 +119,7 @@ class WriteAggregatorSpec extends AkkaSpec("""
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor))
data, writeMajority, probes(probe.ref), nodes, testActor, durable = false))
probe.expectMsgType[Write]
// no reply
@ -126,6 +133,84 @@ class WriteAggregatorSpec extends AkkaSpec("""
watch(aggr)
expectTerminated(aggr)
}
}
"Durable WriteAggregator" must {
"not reply before local confirmation" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeThree, probes(probe.ref), nodes, testActor, durable = true))
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
expectNoMsg(200.millis)
// the local write
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None)
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
"tolerate WriteNack if enough WriteAck" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeThree, probes(probe.ref), nodes, testActor, durable = true))
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
"reply with StoreFailure when too many nacks" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor, durable = true))
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
expectMsg(StoreFailure(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeMajority, probes(probe.ref), nodes, testActor, durable = true))
probe.expectMsgType[Write]
// no reply
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None))
watch(aggr)
expectTerminated(aggr)
}
}
}

View file

@ -31,15 +31,16 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
ConfigFactory.parseString("""
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem])
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3)
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1L)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L)
override def afterAll {
shutdown()

View file

@ -23,21 +23,23 @@ import akka.util.ByteString
import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.DurableStore.DurableDataEnvelope
class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"ReplicatorMessageSerializerSpec",
ConfigFactory.parseString("""
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatorMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3)
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1L)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L)
val keyA = GSetKey[String]("A")
@ -72,6 +74,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
address3 PruningState(address2, PruningInitialized(Set(address1.address))))))
checkSerialization(Write("A", DataEnvelope(data1)))
checkSerialization(WriteAck)
checkSerialization(WriteNack)
checkSerialization(Read("A"))
checkSerialization(ReadResult(Some(DataEnvelope(data1))))
checkSerialization(ReadResult(None))
@ -81,6 +84,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Gossip(Map(
"A" DataEnvelope(data1),
"B" DataEnvelope(GSet() + "b" + "c")), sendBack = true))
checkSerialization(new DurableDataEnvelope(data1))
}
}

View file

@ -448,6 +448,56 @@ look like for the ``TwoPhaseSet``:
.. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java#serializer
Durable Storage
---------------
By default the data is only kept in memory. It is redundant since it is replicated to other nodes
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
elsewhere.
Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded
next time the replicator is started, i.e. when actor system is restarted. This means data will survive as
long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries
are configured with::
akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]
Prefix matching is supported by using ``*`` at the end of a key.
All entries can be made durable by specifying::
akka.cluster.distributed-data.durable.keys = ["*"]
`LMDB <https://github.com/lmdbjava/lmdbjava/>`_ is the default storage implementation. It is
possible to replace that with another implementation by implementing the actor protocol described in
``akka.cluster.ddata.DurableStore`` and defining the ``akka.cluster.distributed-data.durable.store-actor-class``
property for the new implementation.
The location of the files for the data is configured with::
# Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem
# and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory.
akka.cluster.distributed-data.lmdb.dir = "ddata"
Making the data durable has of course a performance cost. By default, each update is flushed
to disk before the ``UpdateSuccess`` reply is sent. For better performance, but with the risk of losing
the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
efficient when performing many writes to the same key, because it is only the last value for each key
that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the
data is typically replicated to other nodes immediately according to the given ``WriteConsistency``.
::
akka.cluster.distributed-data.lmdb.write-behind-interval = 200 ms
Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Update`` of a
durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval``
such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``.
CRDT Garbage
------------
@ -494,11 +544,6 @@ other nodes. This means that you cannot have too large data entries, because the
size will be too large. We might be able to make this more efficient by implementing
`Efficient State-based CRDTs by Delta-Mutation <http://gsd.di.uminho.pt/members/cbm/ps/delta-crdt-draft16may2014.pdf>`_.
The data is only kept in memory. It is redundant since it is replicated to other nodes
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
elsewhere. Making the data durable is a possible future feature, but even if we implement that
it is not intended to be a full featured database.
Learn More about CRDTs
======================

View file

@ -448,6 +448,56 @@ look like for the ``TwoPhaseSet``:
.. includecode:: code/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala#serializer
Durable Storage
---------------
By default the data is only kept in memory. It is redundant since it is replicated to other nodes
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
elsewhere.
Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded
next time the replicator is started, i.e. when actor system is restarted. This means data will survive as
long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries
are configured with::
akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]
Prefix matching is supported by using ``*`` at the end of a key.
All entries can be made durable by specifying::
akka.cluster.distributed-data.durable.keys = ["*"]
`LMDB <https://symas.com/products/lightning-memory-mapped-database/>`_ is the default storage implementation. It is
possible to replace that with another implementation by implementing the actor protocol described in
``akka.cluster.ddata.DurableStore`` and defining the ``akka.cluster.distributed-data.durable.store-actor-class``
property for the new implementation.
The location of the files for the data is configured with::
# Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem
# and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory.
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
Making the data durable has of course a performance cost. By default, each update is flushed
to disk before the ``UpdateSuccess`` reply is sent. For better performance, but with the risk of losing
the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
efficient when performing many writes to the same key, because it is only the last value for each key
that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the
data is typically replicated to other nodes immediately according to the given ``WriteConsistency``.
::
akka.cluster.distributed-data.lmdb.write-behind-interval = 200 ms
Note that you should be prepared to receive ``WriteFailure`` as reply to an ``Update`` of a
durable entry if the data could not be stored for some reason. When enabling ``write-behind-interval``
such errors will only be logged and ``UpdateSuccess`` will still be the reply to the ``Update``.
CRDT Garbage
------------
@ -494,11 +544,6 @@ other nodes. This means that you cannot have too large data entries, because the
size will be too large. We might be able to make this more efficient by implementing
`Efficient State-based CRDTs by Delta-Mutation <http://gsd.di.uminho.pt/members/cbm/ps/delta-crdt-draft16may2014.pdf>`_.
The data is only kept in memory. It is redundant since it is replicated to other nodes
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
elsewhere. Making the data durable is a possible future feature, but even if we implement that
it is not intended to be a full featured database.
Learn More about CRDTs
======================

View file

@ -61,7 +61,9 @@ object Dependencies {
// ssl-config
val sslConfigCore = "com.typesafe" %% "ssl-config-core" % sslConfigVersion // ApacheV2
val lmdb = "org.lmdbjava" % "lmdbjava" % "0.0.4" // ApacheV2, OpenLDAP Public License
// For akka-http-testkit-java
val junit = "junit" % "junit" % junitVersion // Common Public License 1.0
@ -142,7 +144,7 @@ object Dependencies {
val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)
val distributedData = l ++= Seq(Test.junit, Test.scalatest.value)
val distributedData = l ++= Seq(lmdb, Test.junit, Test.scalatest.value)
val slf4j = l ++= Seq(slf4jApi, Test.logback)

View file

@ -531,8 +531,12 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.SyncDirective"),
// # 21944
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ClusterEvent#ReachabilityEvent.member")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ClusterEvent#ReachabilityEvent.member"),
// #21645 durable distributed data
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write")
)
)
}