Use delta in direct write also, #22188

* Follow up on the causal delivery of deltas.
* The first implementation used full state for the direct
  Write messages, i.e. updates with WriteConsistency != LocalWrite
* This is an optimization so that delatas are tried first and if
  they can't be applied it falls back to full state.
* For simultanious updates the messages may be reordered because we
  create separate WriteAggregator actor and such, but normally they
  will be sent in order so the deltas will typically be received in
  order, otherwise we fall back to retrying with full state in the
  second round in the WriteAggregator.
This commit is contained in:
Patrik Nordwall 2017-02-23 12:12:29 +01:00
parent b2759ab56a
commit 233e784154
9 changed files with 365 additions and 85 deletions

View file

@ -12721,6 +12721,24 @@ public final class ReplicatorMessages {
*/ */
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder( akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder(
int index); int index);
// optional bool reply = 3;
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
boolean hasReply();
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
boolean getReply();
} }
/** /**
* Protobuf type {@code akka.cluster.ddata.DeltaPropagation} * Protobuf type {@code akka.cluster.ddata.DeltaPropagation}
@ -12794,6 +12812,11 @@ public final class ReplicatorMessages {
entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.PARSER, extensionRegistry)); entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.PARSER, extensionRegistry));
break; break;
} }
case 24: {
bitField0_ |= 0x00000002;
reply_ = input.readBool();
break;
}
} }
} }
} catch (akka.protobuf.InvalidProtocolBufferException e) { } catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -13784,9 +13807,34 @@ public final class ReplicatorMessages {
return entries_.get(index); return entries_.get(index);
} }
// optional bool reply = 3;
public static final int REPLY_FIELD_NUMBER = 3;
private boolean reply_;
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
public boolean hasReply() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
public boolean getReply() {
return reply_;
}
private void initFields() { private void initFields() {
fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance();
entries_ = java.util.Collections.emptyList(); entries_ = java.util.Collections.emptyList();
reply_ = false;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -13820,6 +13868,9 @@ public final class ReplicatorMessages {
for (int i = 0; i < entries_.size(); i++) { for (int i = 0; i < entries_.size(); i++) {
output.writeMessage(2, entries_.get(i)); output.writeMessage(2, entries_.get(i));
} }
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBool(3, reply_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -13837,6 +13888,10 @@ public final class ReplicatorMessages {
size += akka.protobuf.CodedOutputStream size += akka.protobuf.CodedOutputStream
.computeMessageSize(2, entries_.get(i)); .computeMessageSize(2, entries_.get(i));
} }
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += akka.protobuf.CodedOutputStream
.computeBoolSize(3, reply_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -13967,6 +14022,8 @@ public final class ReplicatorMessages {
} else { } else {
entriesBuilder_.clear(); entriesBuilder_.clear();
} }
reply_ = false;
bitField0_ = (bitField0_ & ~0x00000004);
return this; return this;
} }
@ -14012,6 +14069,10 @@ public final class ReplicatorMessages {
} else { } else {
result.entries_ = entriesBuilder_.build(); result.entries_ = entriesBuilder_.build();
} }
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000002;
}
result.reply_ = reply_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -14057,6 +14118,9 @@ public final class ReplicatorMessages {
} }
} }
} }
if (other.hasReply()) {
setReply(other.getReply());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -14455,6 +14519,55 @@ public final class ReplicatorMessages {
return entriesBuilder_; return entriesBuilder_;
} }
// optional bool reply = 3;
private boolean reply_ ;
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
public boolean hasReply() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
public boolean getReply() {
return reply_;
}
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
public Builder setReply(boolean value) {
bitField0_ |= 0x00000004;
reply_ = value;
onChanged();
return this;
}
/**
* <code>optional bool reply = 3;</code>
*
* <pre>
* no reply if not set
* </pre>
*/
public Builder clearReply() {
bitField0_ = (bitField0_ & ~0x00000004);
reply_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation) // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation)
} }
@ -19212,27 +19325,28 @@ public final class ReplicatorMessages {
" \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda", " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda",
"ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" + "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" +
"\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat" + "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat" +
"aEnvelope\"\362\001\n\020DeltaPropagation\0223\n\010fromNo" + "aEnvelope\"\201\002\n\020DeltaPropagation\0223\n\010fromNo" +
"de\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddr" + "de\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddr" +
"ess\022;\n\007entries\030\002 \003(\0132*.akka.cluster.ddat" + "ess\022;\n\007entries\030\002 \003(\0132*.akka.cluster.ddat" +
"a.DeltaPropagation.Entry\032l\n\005Entry\022\013\n\003key" + "a.DeltaPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032" +
"\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.cluster." + "l\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132" +
"ddata.DataEnvelope\022\021\n\tfromSeqNr\030\003 \002(\003\022\017\n" + " .akka.cluster.ddata.DataEnvelope\022\021\n\tfro" +
"\007toSeqNr\030\004 \001(\003\"X\n\rUniqueAddress\022,\n\007addre" + "mSeqNr\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueA" +
"ss\030\001 \002(\0132\033.akka.cluster.ddata.Address\022\013\n", "ddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster.d",
"\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010h" + "data.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\"" +
"ostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"\224\001\n\rVersionV" + ")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002" +
"ector\0228\n\007entries\030\001 \003(\0132\'.akka.cluster.dd" + "(\r\"\224\001\n\rVersionVector\0228\n\007entries\030\001 \003(\0132\'." +
"ata.VersionVector.Entry\032I\n\005Entry\022/\n\004node" + "akka.cluster.ddata.VersionVector.Entry\032I" +
"\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddres" + "\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka.cluster.dda" +
"s\022\017\n\007version\030\002 \002(\003\"V\n\014OtherMessage\022\027\n\017en" + "ta.UniqueAddress\022\017\n\007version\030\002 \002(\003\"V\n\014Oth" +
"closedMessage\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(" + "erMessage\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014se" +
"\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nStringGSet" + "rializerId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(" +
"\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023DurableDataEnvelo" + "\014\"\036\n\nStringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023Du" +
"pe\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata.Ot", "rableDataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.c",
"herMessage\022>\n\007pruning\030\002 \003(\0132-.akka.clust" + "luster.ddata.OtherMessage\022>\n\007pruning\030\002 \003" +
"er.ddata.DataEnvelope.PruningEntryB#\n\037ak" + "(\0132-.akka.cluster.ddata.DataEnvelope.Pru" +
"ka.cluster.ddata.protobuf.msgH\001" "ningEntryB#\n\037akka.cluster.ddata.protobuf" +
".msgH\001"
}; };
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -19346,7 +19460,7 @@ public final class ReplicatorMessages {
internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable = new internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable( akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_DeltaPropagation_descriptor, internal_static_akka_cluster_ddata_DeltaPropagation_descriptor,
new java.lang.String[] { "FromNode", "Entries", }); new java.lang.String[] { "FromNode", "Entries", "Reply", });
internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor = internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor =
internal_static_akka_cluster_ddata_DeltaPropagation_descriptor.getNestedTypes().get(0); internal_static_akka_cluster_ddata_DeltaPropagation_descriptor.getNestedTypes().get(0);
internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable = new internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable = new

View file

@ -107,6 +107,7 @@ message DeltaPropagation {
required UniqueAddress fromNode = 1; required UniqueAddress fromNode = 1;
repeated Entry entries = 2; repeated Entry entries = 2;
optional bool reply = 3; // no reply if not set
} }
message UniqueAddress { message UniqueAddress {

View file

@ -743,7 +743,8 @@ object Replicator {
final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long)
final case class DeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]) extends ReplicatorMessage final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage
case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression
} }
} }
@ -990,7 +991,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = { override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = {
// Important to include the pruning state in the deltas. For example if the delta is based // Important to include the pruning state in the deltas. For example if the delta is based
// on an entry that has been pruned but that has not yet been performed on the target node. // on an entry that has been pruned but that has not yet been performed on the target node.
DeltaPropagation(selfUniqueAddress, deltas.map { DeltaPropagation(selfUniqueAddress, reply = false, deltas.map {
case (key, (d, fromSeqNr, toSeqNr)) getData(key) match { case (key, (d, fromSeqNr, toSeqNr)) getData(key) match {
case Some(envelope) key Delta(envelope.copy(data = d), fromSeqNr, toSeqNr) case Some(envelope) key Delta(envelope.copy(data = d), fromSeqNr, toSeqNr)
case None key Delta(DataEnvelope(d), fromSeqNr, toSeqNr) case None key Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
@ -1149,7 +1150,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Read(key) receiveRead(key) case Read(key) receiveRead(key)
case Write(key, envelope) receiveWrite(key, envelope) case Write(key, envelope) receiveWrite(key, envelope)
case ReadRepair(key, envelope) receiveReadRepair(key, envelope) case ReadRepair(key, envelope) receiveReadRepair(key, envelope)
case DeltaPropagation(from, deltas) receiveDeltaPropagation(from, deltas) case DeltaPropagation(from, reply, deltas) receiveDeltaPropagation(from, reply, deltas)
case FlushChanges receiveFlushChanges() case FlushChanges receiveFlushChanges()
case DeltaPropagationTick receiveDeltaPropagationTick() case DeltaPropagationTick receiveDeltaPropagationTick()
case GossipTick receiveGossipTick() case GossipTick receiveGossipTick()
@ -1242,13 +1243,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else else
replyTo ! UpdateSuccess(key, req) replyTo ! UpdateSuccess(key, req)
} else { } else {
val writeEnvelope = delta match { val (writeEnvelope, writeDelta) = delta match {
case Some(d: RequiresCausalDeliveryOfDeltas) newEnvelope case Some(d: RequiresCausalDeliveryOfDeltas)
case Some(d) DataEnvelope(d) val v = deltaPropagationSelector.currentVersion(key.id)
case None newEnvelope (newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v)))
case Some(d) (newEnvelope.copy(data = d), None)
case None (newEnvelope, None)
} }
val writeAggregator = val writeAggregator =
context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable) context.actorOf(WriteAggregator.props(key, writeEnvelope, writeDelta, writeConsistency,
req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher)) .withDispatcher(context.props.dispatcher))
if (durable) { if (durable) {
durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope), durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope),
@ -1274,14 +1278,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case _ false case _ false
} }
def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = { def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit =
write(key, envelope) match { writeAndStore(key, envelope, reply = true)
def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope, reply: Boolean): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope) case Some(newEnvelope)
if (isDurable(key)) if (isDurable(key)) {
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo))) val storeReply = if (reply) Some(StoreReply(WriteAck, WriteNack, replyTo)) else None
else durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), storeReply)
} else if (reply)
replyTo ! WriteAck replyTo ! WriteAck
case None case None
if (reply)
replyTo ! WriteNack
} }
} }
@ -1316,17 +1326,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} }
} }
def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
}
def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit = { def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit = {
writeAndStore(key, writeEnvelope) writeAndStore(key, writeEnvelope, reply = false)
replyTo ! ReadRepairAck replyTo ! ReadRepairAck
} }
@ -1353,7 +1354,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
replyTo ! DeleteSuccess(key, req) replyTo ! DeleteSuccess(key, req)
} else { } else {
val writeAggregator = val writeAggregator =
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable) context.actorOf(WriteAggregator.props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher)) .withDispatcher(context.props.dispatcher))
if (durable) { if (durable) {
durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope), durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope),
@ -1464,7 +1465,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
deltaPropagationSelector.cleanupDeltaEntries() deltaPropagationSelector.cleanupDeltaEntries()
} }
def receiveDeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]): Unit = def receiveDeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]): Unit =
if (deltaCrdtEnabled) { if (deltaCrdtEnabled) {
try { try {
val isDebugEnabled = log.isDebugEnabled val isDebugEnabled = log.isDebugEnabled
@ -1485,20 +1486,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (isDebugEnabled) log.debug( if (isDebugEnabled) log.debug(
"Skipping DeltaPropagation from [{}] for [{}] because toSeqNr [{}] already handled [{}]", "Skipping DeltaPropagation from [{}] for [{}] because toSeqNr [{}] already handled [{}]",
fromNode.address, key, toSeqNr, currentSeqNr) fromNode.address, key, toSeqNr, currentSeqNr)
if (reply) replyTo ! WriteAck
} else if (fromSeqNr > (currentSeqNr + 1)) { } else if (fromSeqNr > (currentSeqNr + 1)) {
if (isDebugEnabled) log.debug( if (isDebugEnabled) log.debug(
"Skipping DeltaPropagation from [{}] for [{}] because missing deltas between [{}-{}]", "Skipping DeltaPropagation from [{}] for [{}] because missing deltas between [{}-{}]",
fromNode.address, key, currentSeqNr + 1, fromSeqNr - 1) fromNode.address, key, currentSeqNr + 1, fromSeqNr - 1)
if (reply) replyTo ! DeltaNack
} else { } else {
if (isDebugEnabled) log.debug( if (isDebugEnabled) log.debug(
"Applying DeltaPropagation from [{}] for [{}] with sequence numbers [{}], current was [{}]", "Applying DeltaPropagation from [{}] for [{}] with sequence numbers [{}], current was [{}]",
fromNode.address, key, s"$fromSeqNr-$toSeqNr", currentSeqNr) fromNode.address, key, s"$fromSeqNr-$toSeqNr", currentSeqNr)
val newEnvelope = envelope.copy(deltaVersions = VersionVector(fromNode, toSeqNr)) val newEnvelope = envelope.copy(deltaVersions = VersionVector(fromNode, toSeqNr))
writeAndStore(key, newEnvelope) writeAndStore(key, newEnvelope, reply)
} }
case (key, Delta(envelope, _, _)) case (key, Delta(envelope, _, _))
// causal delivery of deltas not needed, just apply it // causal delivery of deltas not needed, just apply it
writeAndStore(key, envelope) writeAndStore(key, envelope, reply)
} }
} }
} catch { } catch {
@ -1507,6 +1510,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// mixing nodes with incompatible delta-CRDT types // mixing nodes with incompatible delta-CRDT types
log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e) log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e)
} }
} else {
// !deltaCrdtEnabled
if (reply) replyTo ! DeltaNack
} }
def receiveGossipTick(): Unit = { def receiveGossipTick(): Unit = {
@ -1583,7 +1589,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
updatedData.foreach { updatedData.foreach {
case (key, envelope) case (key, envelope)
val hadData = dataEntries.contains(key) val hadData = dataEntries.contains(key)
writeAndStore(key, envelope) writeAndStore(key, envelope, reply = false)
if (sendBack) getData(key) match { if (sendBack) getData(key) match {
case Some(d) case Some(d)
if (hadData || d.pruning.nonEmpty) if (hadData || d.pruning.nonEmpty)
@ -1842,13 +1848,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def props( def props(
key: KeyR, key: KeyR,
envelope: Replicator.Internal.DataEnvelope, envelope: Replicator.Internal.DataEnvelope,
delta: Option[Replicator.Internal.Delta],
consistency: Replicator.WriteConsistency, consistency: Replicator.WriteConsistency,
req: Option[Any], req: Option[Any],
nodes: Set[Address], nodes: Set[Address],
unreachable: Set[Address], unreachable: Set[Address],
replyTo: ActorRef, replyTo: ActorRef,
durable: Boolean): Props = durable: Boolean): Props =
Props(new WriteAggregator(key, envelope, consistency, req, nodes, unreachable, replyTo, durable)) Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable))
.withDeploy(Deploy.local) .withDeploy(Deploy.local)
} }
@ -1858,6 +1865,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
@InternalApi private[akka] class WriteAggregator( @InternalApi private[akka] class WriteAggregator(
key: KeyR, key: KeyR,
envelope: Replicator.Internal.DataEnvelope, envelope: Replicator.Internal.DataEnvelope,
delta: Option[Replicator.Internal.Delta],
consistency: Replicator.WriteConsistency, consistency: Replicator.WriteConsistency,
req: Option[Any], req: Option[Any],
override val nodes: Set[Address], override val nodes: Set[Address],
@ -1869,6 +1877,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
import Replicator.Internal._ import Replicator.Internal._
import ReadWriteAggregator._ import ReadWriteAggregator._
val selfUniqueAddress = Cluster(context.system).selfUniqueAddress
override def timeout: FiniteDuration = consistency.timeout override def timeout: FiniteDuration = consistency.timeout
override val doneWhenRemainingSize = consistency match { override val doneWhenRemainingSize = consistency match {
@ -1883,12 +1893,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} }
val writeMsg = Write(key.id, envelope) val writeMsg = Write(key.id, envelope)
val deltaMsg = delta match {
case None None
case Some(d) Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id d)))
}
var gotLocalStoreReply = !durable var gotLocalStoreReply = !durable
var gotWriteNackFrom = Set.empty[Address] var gotWriteNackFrom = Set.empty[Address]
override def preStart(): Unit = { override def preStart(): Unit = {
primaryNodes.foreach { replica(_) ! writeMsg } val msg = deltaMsg match {
case Some(d) d
case None writeMsg
}
primaryNodes.foreach { replica(_) ! msg }
if (isDone) reply(isTimeout = false) if (isDone) reply(isTimeout = false)
} }
@ -1900,16 +1918,26 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case WriteNack case WriteNack
gotWriteNackFrom += senderAddress() gotWriteNackFrom += senderAddress()
if (isDone) reply(isTimeout = false) if (isDone) reply(isTimeout = false)
case DeltaNack
// ok, will be retried with full state
case _: Replicator.UpdateSuccess[_] case _: Replicator.UpdateSuccess[_]
gotLocalStoreReply = true gotLocalStoreReply = true
if (isDone) reply(isTimeout = false) if (isDone) reply(isTimeout = false)
case f: Replicator.StoreFailure[_] case f: Replicator.StoreFailure[_]
gotLocalStoreReply = true gotLocalStoreReply = true
gotWriteNackFrom += Cluster(context.system).selfAddress gotWriteNackFrom += selfUniqueAddress.address
if (isDone) reply(isTimeout = false) if (isDone) reply(isTimeout = false)
case SendToSecondary case SendToSecondary
deltaMsg match {
case None
case Some(d)
// Deltas must be applied in order and we can't keep track of ordering of
// simultaneous updates so there is a chance that the delta could not be applied.
// Try again with the full state to the primary nodes that have not acked.
primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg }
}
secondaryNodes.foreach { replica(_) ! writeMsg } secondaryNodes.foreach { replica(_) ! writeMsg }
case ReceiveTimeout case ReceiveTimeout
reply(isTimeout = true) reply(isTimeout = true)
@ -1934,7 +1962,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (isSuccess && isDelete) DeleteSuccess(key, req) if (isSuccess && isDelete) DeleteSuccess(key, req)
else if (isSuccess) UpdateSuccess(key, req) else if (isSuccess) UpdateSuccess(key, req)
else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key, req) else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key, req)
else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req) else if (isTimeoutOrNotEnoughNodes || !durable) UpdateTimeout(key, req)
else StoreFailure(key, req) else StoreFailure(key, req)
replyTo.tell(replyMsg, context.parent) replyTo.tell(replyMsg, context.parent)

View file

@ -176,6 +176,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
val WriteNackManifest = "O" val WriteNackManifest = "O"
val DurableDataEnvelopeManifest = "P" val DurableDataEnvelopeManifest = "P"
val DeltaPropagationManifest = "Q" val DeltaPropagationManifest = "Q"
val DeltaNackManifest = "R"
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef]( private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef](
GetManifest getFromBinary, GetManifest getFromBinary,
@ -194,6 +195,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
GossipManifest gossipFromBinary, GossipManifest gossipFromBinary,
DeltaPropagationManifest deltaPropagationFromBinary, DeltaPropagationManifest deltaPropagationFromBinary,
WriteNackManifest (_ WriteNack), WriteNackManifest (_ WriteNack),
DeltaNackManifest (_ DeltaNack),
DurableDataEnvelopeManifest durableDataEnvelopeFromBinary) DurableDataEnvelopeManifest durableDataEnvelopeFromBinary)
override def manifest(obj: AnyRef): String = obj match { override def manifest(obj: AnyRef): String = obj match {
@ -214,6 +216,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
case _: Unsubscribe[_] UnsubscribeManifest case _: Unsubscribe[_] UnsubscribeManifest
case _: Gossip GossipManifest case _: Gossip GossipManifest
case WriteNack WriteNackManifest case WriteNack WriteNackManifest
case DeltaNack DeltaNackManifest
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
} }
@ -236,6 +239,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
case m: Unsubscribe[_] unsubscribeToProto(m).toByteArray case m: Unsubscribe[_] unsubscribeToProto(m).toByteArray
case m: Gossip compress(gossipToProto(m)) case m: Gossip compress(gossipToProto(m))
case WriteNack dm.Empty.getDefaultInstance.toByteArray case WriteNack dm.Empty.getDefaultInstance.toByteArray
case DeltaNack dm.Empty.getDefaultInstance.toByteArray
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
} }
@ -289,6 +293,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = { private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = {
val b = dm.DeltaPropagation.newBuilder() val b = dm.DeltaPropagation.newBuilder()
.setFromNode(uniqueAddressToProto(deltaPropagation.fromNode)) .setFromNode(uniqueAddressToProto(deltaPropagation.fromNode))
if (deltaPropagation.reply)
b.setReply(deltaPropagation.reply)
val entries = deltaPropagation.deltas.foreach { val entries = deltaPropagation.deltas.foreach {
case (key, Delta(data, fromSeqNr, toSeqNr)) case (key, Delta(data, fromSeqNr, toSeqNr))
val b2 = dm.DeltaPropagation.Entry.newBuilder() val b2 = dm.DeltaPropagation.Entry.newBuilder()
@ -304,8 +310,10 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
private def deltaPropagationFromBinary(bytes: Array[Byte]): DeltaPropagation = { private def deltaPropagationFromBinary(bytes: Array[Byte]): DeltaPropagation = {
val deltaPropagation = dm.DeltaPropagation.parseFrom(bytes) val deltaPropagation = dm.DeltaPropagation.parseFrom(bytes)
val reply = deltaPropagation.hasReply && deltaPropagation.getReply
DeltaPropagation( DeltaPropagation(
uniqueAddressFromProto(deltaPropagation.getFromNode), uniqueAddressFromProto(deltaPropagation.getFromNode),
reply,
deltaPropagation.getEntriesList.asScala.map { e deltaPropagation.getEntriesList.asScala.map { e
val fromSeqNr = e.getFromSeqNr val fromSeqNr = e.getFromSeqNr
val toSeqNr = if (e.hasToSeqNr) e.getToSeqNr else fromSeqNr val toSeqNr = if (e.hasToSeqNr) e.getToSeqNr else fromSeqNr

View file

@ -22,7 +22,7 @@ object ReplicatorDeltaSpec extends MultiNodeConfig {
val fourth = role("fourth") val fourth = role("fourth")
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.loglevel = DEBUG akka.loglevel = INFO
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
""")) """))
@ -129,6 +129,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
r r
} }
val writeAll = WriteAll(5.seconds)
var afterCounter = 0 var afterCounter = 0
def enterBarrierAfterTestStep(): Unit = { def enterBarrierAfterTestStep(): Unit = {
afterCounter += 1 afterCounter += 1
@ -197,7 +199,7 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
awaitAssert { awaitAssert {
val p = TestProbe() val p = TestProbe()
List(KeyD, KeyE, KeyF).foreach { key List(KeyD, KeyE, KeyF).foreach { key
fullStateReplicator.tell(Get(key, ReadLocal), p.ref) deltaReplicator.tell(Get(key, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a")) p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a"))
} }
} }
@ -206,6 +208,48 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
enterBarrierAfterTestStep() enterBarrierAfterTestStep()
} }
"work with write consistency" in {
runOn(first) {
val p1 = TestProbe()
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-1")
val p = TestProbe()
deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A"))
// and also when doing several at the same time (deltas may be reordered) and then we
// retry with full state to sort it out
runOn(first) {
val p1 = TestProbe()
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "B"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "C"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "D"), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-2")
deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D"))
// add same to the fullStateReplicator so they are in sync
runOn(first) {
val p1 = TestProbe()
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A" + "B" + "C" + "D"), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-3")
fullStateReplicator.tell(Get(KeyD, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D"))
enterBarrierAfterTestStep()
}
"be eventually consistent" in { "be eventually consistent" in {
val operations = generateOperations(onNode = myself) val operations = generateOperations(onNode = myself)
log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}") log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")

View file

@ -19,7 +19,7 @@ object ReplicatorORSetDeltaSpec extends MultiNodeConfig {
val third = role("third") val third = role("third")
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.loglevel = DEBUG akka.loglevel = INFO
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
""")) """))

View file

@ -19,7 +19,7 @@ object DeltaPropagationSelectorSpec {
override val allNodes: Vector[Address]) extends DeltaPropagationSelector { override val allNodes: Vector[Address]) extends DeltaPropagationSelector {
override val gossipIntervalDivisor = 5 override val gossipIntervalDivisor = 5
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation =
DeltaPropagation(selfUniqueAddress, deltas.mapValues { DeltaPropagation(selfUniqueAddress, false, deltas.mapValues {
case (d, fromSeqNr, toSeqNr) Delta(DataEnvelope(d), fromSeqNr, toSeqNr) case (d, fromSeqNr, toSeqNr) Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
}) })
} }
@ -50,7 +50,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
selector.cleanupDeltaEntries() selector.cleanupDeltaEntries()
selector.hasDeltaEntries("A") should ===(true) selector.hasDeltaEntries("A") should ===(true)
selector.hasDeltaEntries("B") should ===(true) selector.hasDeltaEntries("B") should ===(true)
val expected = DeltaPropagation(selfUniqueAddress, Map( val expected = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(deltaA), 1L, 1L), "A" Delta(DataEnvelope(deltaA), 1L, 1L),
"B" Delta(DataEnvelope(deltaB), 1L, 1L))) "B" Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) expected)) selector.collectPropagations() should ===(Map(nodes(0) expected))
@ -64,7 +64,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) val selector = new TestSelector(selfUniqueAddress, nodes.take(3))
selector.update("A", deltaA) selector.update("A", deltaA)
selector.update("B", deltaB) selector.update("B", deltaB)
val expected = DeltaPropagation(selfUniqueAddress, Map( val expected = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(deltaA), 1L, 1L), "A" Delta(DataEnvelope(deltaA), 1L, 1L),
"B" Delta(DataEnvelope(deltaB), 1L, 1L))) "B" Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) expected, nodes(1) expected)) selector.collectPropagations() should ===(Map(nodes(0) expected, nodes(1) expected))
@ -82,17 +82,17 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) val selector = new TestSelector(selfUniqueAddress, nodes.take(3))
selector.update("A", deltaA) selector.update("A", deltaA)
selector.update("B", deltaB) selector.update("B", deltaB)
val expected1 = DeltaPropagation(selfUniqueAddress, Map( val expected1 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(deltaA), 1L, 1L), "A" Delta(DataEnvelope(deltaA), 1L, 1L),
"B" Delta(DataEnvelope(deltaB), 1L, 1L))) "B" Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) expected1, nodes(1) expected1)) selector.collectPropagations() should ===(Map(nodes(0) expected1, nodes(1) expected1))
// new update before previous was propagated to all nodes // new update before previous was propagated to all nodes
selector.update("C", deltaC) selector.update("C", deltaC)
val expected2 = DeltaPropagation(selfUniqueAddress, Map( val expected2 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(deltaA), 1L, 1L), "A" Delta(DataEnvelope(deltaA), 1L, 1L),
"B" Delta(DataEnvelope(deltaB), 1L, 1L), "B" Delta(DataEnvelope(deltaB), 1L, 1L),
"C" Delta(DataEnvelope(deltaC), 1L, 1L))) "C" Delta(DataEnvelope(deltaC), 1L, 1L)))
val expected3 = DeltaPropagation(selfUniqueAddress, Map( val expected3 = DeltaPropagation(selfUniqueAddress, false, Map(
"C" Delta(DataEnvelope(deltaC), 1L, 1L))) "C" Delta(DataEnvelope(deltaC), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(2) expected2, nodes(0) expected3)) selector.collectPropagations() should ===(Map(nodes(2) expected2, nodes(0) expected3))
selector.cleanupDeltaEntries() selector.cleanupDeltaEntries()
@ -114,12 +114,12 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
selector.currentVersion("A") should ===(1L) selector.currentVersion("A") should ===(1L)
selector.update("A", delta2) selector.update("A", delta2)
selector.currentVersion("A") should ===(2L) selector.currentVersion("A") should ===(2L)
val expected1 = DeltaPropagation(selfUniqueAddress, Map( val expected1 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L))) "A" Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L)))
selector.collectPropagations() should ===(Map(nodes(0) expected1)) selector.collectPropagations() should ===(Map(nodes(0) expected1))
selector.update("A", delta3) selector.update("A", delta3)
selector.currentVersion("A") should ===(3L) selector.currentVersion("A") should ===(3L)
val expected2 = DeltaPropagation(selfUniqueAddress, Map( val expected2 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta3), 3L, 3L))) "A" Delta(DataEnvelope(delta3), 3L, 3L)))
selector.collectPropagations() should ===(Map(nodes(0) expected2)) selector.collectPropagations() should ===(Map(nodes(0) expected2))
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
@ -133,25 +133,25 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
override def nodesSliceSize(allNodesSize: Int): Int = 1 override def nodesSliceSize(allNodesSize: Int): Int = 1
} }
selector.update("A", delta1) selector.update("A", delta1)
val expected1 = DeltaPropagation(selfUniqueAddress, Map( val expected1 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta1), 1L, 1L))) "A" Delta(DataEnvelope(delta1), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) expected1)) selector.collectPropagations() should ===(Map(nodes(0) expected1))
selector.update("A", delta2) selector.update("A", delta2)
val expected2 = DeltaPropagation(selfUniqueAddress, Map( val expected2 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L))) "A" Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L)))
selector.collectPropagations() should ===(Map(nodes(1) expected2)) selector.collectPropagations() should ===(Map(nodes(1) expected2))
selector.update("A", delta3) selector.update("A", delta3)
val expected3 = DeltaPropagation(selfUniqueAddress, Map( val expected3 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta1.merge(delta2).merge(delta3)), 1L, 3L))) "A" Delta(DataEnvelope(delta1.merge(delta2).merge(delta3)), 1L, 3L)))
selector.collectPropagations() should ===(Map(nodes(2) expected3)) selector.collectPropagations() should ===(Map(nodes(2) expected3))
val expected4 = DeltaPropagation(selfUniqueAddress, Map( val expected4 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L))) "A" Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L)))
selector.collectPropagations() should ===(Map(nodes(0) expected4)) selector.collectPropagations() should ===(Map(nodes(0) expected4))
val expected5 = DeltaPropagation(selfUniqueAddress, Map( val expected5 = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(delta3), 3L, 3L))) "A" Delta(DataEnvelope(delta3), 3L, 3L)))
selector.collectPropagations() should ===(Map(nodes(1) expected5)) selector.collectPropagations() should ===(Map(nodes(1) expected5))

View file

@ -17,18 +17,24 @@ import akka.cluster.ddata.Replicator._
import akka.remote.RARP import akka.remote.RARP
import scala.concurrent.Future import scala.concurrent.Future
import akka.cluster.Cluster
object WriteAggregatorSpec { object WriteAggregatorSpec {
val key = GSetKey[String]("a") val KeyA = GSetKey[String]("A")
val KeyB = ORSetKey[String]("B")
def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency, def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props =
Props(new TestWriteAggregator(data, consistency, probes, nodes, unreachable, replyTo, durable)) Props(new TestWriteAggregator(KeyA, data, None, consistency, probes, nodes, unreachable, replyTo, durable))
class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency, def writeAggregatorPropsWithDelta(data: ORSet[String], delta: Delta, consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props =
Props(new TestWriteAggregator(KeyB, data, Some(delta), consistency, probes, nodes, unreachable, replyTo, durable))
class TestWriteAggregator(key: Key.KeyR, data: ReplicatedData, delta: Option[Delta], consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean) probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean)
extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, unreachable, replyTo, durable) { extends WriteAggregator(key, DataEnvelope(data), delta, consistency, None, nodes, unreachable, replyTo, durable) {
override def replica(address: Address): ActorSelection = override def replica(address: Address): ActorSelection =
context.actorSelection(probes(address).path) context.actorSelection(probes(address).path)
@ -48,6 +54,8 @@ object WriteAggregatorSpec {
replicator.foreach(_ ! WriteAck) replicator.foreach(_ ! WriteAck)
case WriteNack case WriteNack
replicator.foreach(_ ! WriteNack) replicator.foreach(_ ! WriteNack)
case DeltaNack
replicator.foreach(_ ! DeltaNack)
case msg case msg
replicator = Some(sender()) replicator = Some(sender())
replica ! msg replica ! msg
@ -89,6 +97,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
val timeout = 3.seconds.dilated val timeout = 3.seconds.dilated
val writeThree = WriteTo(3, timeout) val writeThree = WriteTo(3, timeout)
val writeMajority = WriteMajority(timeout) val writeMajority = WriteMajority(timeout)
val writeAll = WriteAll(timeout)
def probes(probe: ActorRef): Map[Address, ActorRef] = def probes(probe: ActorRef): Map[Address, ActorRef] =
nodes.toSeq.map(_ system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap nodes.toSeq.map(_ system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap
@ -111,7 +120,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
probe.lastSender ! WriteAck probe.lastSender ! WriteAck
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteAck probe.lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
@ -132,15 +141,14 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
Future.sequence { Future.sequence {
Seq( Seq(
Future { testProbes(nodeC).expectNoMsg(t) }, Future { testProbes(nodeC).expectNoMsg(t) },
Future { testProbes(nodeD).expectNoMsg(t) } Future { testProbes(nodeD).expectNoMsg(t) })
)
}.futureValue }.futureValue
testProbes(nodeC).expectMsgType[Write] testProbes(nodeC).expectMsgType[Write]
testProbes(nodeC).lastSender ! WriteAck testProbes(nodeC).lastSender ! WriteAck
testProbes(nodeD).expectMsgType[Write] testProbes(nodeD).expectMsgType[Write]
testProbes(nodeD).lastSender ! WriteAck testProbes(nodeD).lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
@ -158,7 +166,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
// no reply // no reply
probe.expectMsgType[Write] probe.expectMsgType[Write]
// no reply // no reply
expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
@ -181,6 +189,82 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
} }
} }
"WriteAggregator with delta" must {
implicit val cluster = Cluster(system)
val fullState1 = ORSet.empty[String] + "a" + "b"
val fullState2 = fullState1.resetDelta + "c"
val delta = Delta(DataEnvelope(fullState2.delta.get), 2L, 2L)
"send deltas first" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta(
fullState2, delta, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false))
probe.expectMsgType[DeltaPropagation]
probe.lastSender ! WriteAck
probe.expectMsgType[DeltaPropagation]
probe.lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None))
watch(aggr)
expectTerminated(aggr)
}
"retry with full state when no immediate reply or nack" in {
val testProbes = probes()
val testProbeRefs = testProbes.map { case (a, tm) a tm.writeAckAdapter }
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta(
fullState2, delta, writeAll, testProbeRefs, nodes, Set.empty, testActor, durable = false))
testProbes(nodeA).expectMsgType[DeltaPropagation]
// no reply
testProbes(nodeB).expectMsgType[DeltaPropagation]
testProbes(nodeB).lastSender ! WriteAck
testProbes(nodeC).expectMsgType[DeltaPropagation]
testProbes(nodeC).lastSender ! WriteAck
testProbes(nodeD).expectMsgType[DeltaPropagation]
testProbes(nodeD).lastSender ! DeltaNack
// here is the second round
testProbes(nodeA).expectMsgType[Write]
testProbes(nodeA).lastSender ! WriteAck
testProbes(nodeD).expectMsgType[Write]
testProbes(nodeD).lastSender ! WriteAck
testProbes(nodeB).expectNoMsg(100.millis)
testProbes(nodeC).expectNoMsg(100.millis)
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None))
watch(aggr)
expectTerminated(aggr)
}
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta(
fullState2, delta, writeAll, probes(probe.ref), nodes, Set.empty, testActor, durable = false))
probe.expectMsgType[DeltaPropagation]
// no reply
probe.expectMsgType[DeltaPropagation]
probe.lastSender ! WriteAck
probe.expectMsgType[DeltaPropagation]
// nack
probe.lastSender ! DeltaNack
probe.expectMsgType[DeltaPropagation]
// no reply
// only 1 ack so we expect 3 full state Write
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
probe.expectMsgType[Write]
probe.expectMsgType[Write]
// still not enough acks
expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyB, None))
watch(aggr)
expectTerminated(aggr)
}
}
"Durable WriteAggregator" must { "Durable WriteAggregator" must {
"not reply before local confirmation" in { "not reply before local confirmation" in {
val probe = TestProbe() val probe = TestProbe()
@ -194,9 +278,9 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
expectNoMsg(200.millis) expectNoMsg(200.millis)
// the local write // the local write
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None)
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
@ -206,7 +290,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true))
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteAck probe.lastSender ! WriteAck
probe.expectMsgType[Write] probe.expectMsgType[Write]
@ -214,7 +298,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteAck probe.lastSender ! WriteAck
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
@ -226,7 +310,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteNack probe.lastSender ! WriteNack
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteAck probe.lastSender ! WriteAck
probe.expectMsgType[Write] probe.expectMsgType[Write]
@ -234,7 +318,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteNack probe.lastSender ! WriteNack
expectMsg(StoreFailure(WriteAggregatorSpec.key, None)) expectMsg(StoreFailure(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
@ -253,7 +337,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
probe.expectMsgType[Write] probe.expectMsgType[Write]
probe.lastSender ! WriteNack probe.lastSender ! WriteNack
expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None))
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }

View file

@ -78,6 +78,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Write("A", DataEnvelope(data1))) checkSerialization(Write("A", DataEnvelope(data1)))
checkSerialization(WriteAck) checkSerialization(WriteAck)
checkSerialization(WriteNack) checkSerialization(WriteNack)
checkSerialization(DeltaNack)
checkSerialization(Read("A")) checkSerialization(Read("A"))
checkSerialization(ReadResult(Some(DataEnvelope(data1)))) checkSerialization(ReadResult(Some(DataEnvelope(data1))))
checkSerialization(ReadResult(None)) checkSerialization(ReadResult(None))
@ -87,7 +88,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(Gossip(Map( checkSerialization(Gossip(Map(
"A" DataEnvelope(data1), "A" DataEnvelope(data1),
"B" DataEnvelope(GSet() + "b" + "c")), sendBack = true)) "B" DataEnvelope(GSet() + "b" + "c")), sendBack = true))
checkSerialization(DeltaPropagation(address1, Map( checkSerialization(DeltaPropagation(address1, reply = true, Map(
"A" Delta(DataEnvelope(delta1), 1L, 1L), "A" Delta(DataEnvelope(delta1), 1L, 1L),
"B" Delta(DataEnvelope(delta2), 3L, 5L)))) "B" Delta(DataEnvelope(delta2), 3L, 5L))))
checkSerialization(new DurableDataEnvelope(data1)) checkSerialization(new DurableDataEnvelope(data1))