Merge pull request #22378 from akka/wip-delta-in-direct-write-patriknw
Use delta in direct write also, #22188
This commit is contained in:
commit
6b5b819c73
9 changed files with 365 additions and 85 deletions
|
|
@ -12721,6 +12721,24 @@ public final class ReplicatorMessages {
|
|||
*/
|
||||
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder(
|
||||
int index);
|
||||
|
||||
// optional bool reply = 3;
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
boolean hasReply();
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
boolean getReply();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code akka.cluster.ddata.DeltaPropagation}
|
||||
|
|
@ -12794,6 +12812,11 @@ public final class ReplicatorMessages {
|
|||
entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.PARSER, extensionRegistry));
|
||||
break;
|
||||
}
|
||||
case 24: {
|
||||
bitField0_ |= 0x00000002;
|
||||
reply_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (akka.protobuf.InvalidProtocolBufferException e) {
|
||||
|
|
@ -13784,9 +13807,34 @@ public final class ReplicatorMessages {
|
|||
return entries_.get(index);
|
||||
}
|
||||
|
||||
// optional bool reply = 3;
|
||||
public static final int REPLY_FIELD_NUMBER = 3;
|
||||
private boolean reply_;
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
public boolean hasReply() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
public boolean getReply() {
|
||||
return reply_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance();
|
||||
entries_ = java.util.Collections.emptyList();
|
||||
reply_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
|
@ -13820,6 +13868,9 @@ public final class ReplicatorMessages {
|
|||
for (int i = 0; i < entries_.size(); i++) {
|
||||
output.writeMessage(2, entries_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeBool(3, reply_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
|
@ -13837,6 +13888,10 @@ public final class ReplicatorMessages {
|
|||
size += akka.protobuf.CodedOutputStream
|
||||
.computeMessageSize(2, entries_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
size += akka.protobuf.CodedOutputStream
|
||||
.computeBoolSize(3, reply_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
|
@ -13967,6 +14022,8 @@ public final class ReplicatorMessages {
|
|||
} else {
|
||||
entriesBuilder_.clear();
|
||||
}
|
||||
reply_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -14012,6 +14069,10 @@ public final class ReplicatorMessages {
|
|||
} else {
|
||||
result.entries_ = entriesBuilder_.build();
|
||||
}
|
||||
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.reply_ = reply_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
|
@ -14057,6 +14118,9 @@ public final class ReplicatorMessages {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (other.hasReply()) {
|
||||
setReply(other.getReply());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
|
@ -14455,6 +14519,55 @@ public final class ReplicatorMessages {
|
|||
return entriesBuilder_;
|
||||
}
|
||||
|
||||
// optional bool reply = 3;
|
||||
private boolean reply_ ;
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
public boolean hasReply() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
public boolean getReply() {
|
||||
return reply_;
|
||||
}
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
public Builder setReply(boolean value) {
|
||||
bitField0_ |= 0x00000004;
|
||||
reply_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional bool reply = 3;</code>
|
||||
*
|
||||
* <pre>
|
||||
* no reply if not set
|
||||
* </pre>
|
||||
*/
|
||||
public Builder clearReply() {
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
reply_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation)
|
||||
}
|
||||
|
||||
|
|
@ -19212,27 +19325,28 @@ public final class ReplicatorMessages {
|
|||
" \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda",
|
||||
"ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" +
|
||||
"\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat" +
|
||||
"aEnvelope\"\362\001\n\020DeltaPropagation\0223\n\010fromNo" +
|
||||
"aEnvelope\"\201\002\n\020DeltaPropagation\0223\n\010fromNo" +
|
||||
"de\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddr" +
|
||||
"ess\022;\n\007entries\030\002 \003(\0132*.akka.cluster.ddat" +
|
||||
"a.DeltaPropagation.Entry\032l\n\005Entry\022\013\n\003key" +
|
||||
"\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.cluster." +
|
||||
"ddata.DataEnvelope\022\021\n\tfromSeqNr\030\003 \002(\003\022\017\n" +
|
||||
"\007toSeqNr\030\004 \001(\003\"X\n\rUniqueAddress\022,\n\007addre" +
|
||||
"ss\030\001 \002(\0132\033.akka.cluster.ddata.Address\022\013\n",
|
||||
"\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010h" +
|
||||
"ostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"\224\001\n\rVersionV" +
|
||||
"ector\0228\n\007entries\030\001 \003(\0132\'.akka.cluster.dd" +
|
||||
"ata.VersionVector.Entry\032I\n\005Entry\022/\n\004node" +
|
||||
"\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddres" +
|
||||
"s\022\017\n\007version\030\002 \002(\003\"V\n\014OtherMessage\022\027\n\017en" +
|
||||
"closedMessage\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(" +
|
||||
"\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nStringGSet" +
|
||||
"\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023DurableDataEnvelo" +
|
||||
"pe\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata.Ot",
|
||||
"herMessage\022>\n\007pruning\030\002 \003(\0132-.akka.clust" +
|
||||
"er.ddata.DataEnvelope.PruningEntryB#\n\037ak" +
|
||||
"ka.cluster.ddata.protobuf.msgH\001"
|
||||
"a.DeltaPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032" +
|
||||
"l\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132" +
|
||||
" .akka.cluster.ddata.DataEnvelope\022\021\n\tfro" +
|
||||
"mSeqNr\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueA" +
|
||||
"ddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster.d",
|
||||
"data.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\"" +
|
||||
")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002" +
|
||||
"(\r\"\224\001\n\rVersionVector\0228\n\007entries\030\001 \003(\0132\'." +
|
||||
"akka.cluster.ddata.VersionVector.Entry\032I" +
|
||||
"\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka.cluster.dda" +
|
||||
"ta.UniqueAddress\022\017\n\007version\030\002 \002(\003\"V\n\014Oth" +
|
||||
"erMessage\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014se" +
|
||||
"rializerId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(" +
|
||||
"\014\"\036\n\nStringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023Du" +
|
||||
"rableDataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.c",
|
||||
"luster.ddata.OtherMessage\022>\n\007pruning\030\002 \003" +
|
||||
"(\0132-.akka.cluster.ddata.DataEnvelope.Pru" +
|
||||
"ningEntryB#\n\037akka.cluster.ddata.protobuf" +
|
||||
".msgH\001"
|
||||
};
|
||||
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
@ -19346,7 +19460,7 @@ public final class ReplicatorMessages {
|
|||
internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable = new
|
||||
akka.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_akka_cluster_ddata_DeltaPropagation_descriptor,
|
||||
new java.lang.String[] { "FromNode", "Entries", });
|
||||
new java.lang.String[] { "FromNode", "Entries", "Reply", });
|
||||
internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor =
|
||||
internal_static_akka_cluster_ddata_DeltaPropagation_descriptor.getNestedTypes().get(0);
|
||||
internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable = new
|
||||
|
|
|
|||
|
|
@ -107,6 +107,7 @@ message DeltaPropagation {
|
|||
|
||||
required UniqueAddress fromNode = 1;
|
||||
repeated Entry entries = 2;
|
||||
optional bool reply = 3; // no reply if not set
|
||||
}
|
||||
|
||||
message UniqueAddress {
|
||||
|
|
|
|||
|
|
@ -743,7 +743,8 @@ object Replicator {
|
|||
final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
|
||||
|
||||
final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long)
|
||||
final case class DeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]) extends ReplicatorMessage
|
||||
final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage
|
||||
case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -990,7 +991,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = {
|
||||
// Important to include the pruning state in the deltas. For example if the delta is based
|
||||
// on an entry that has been pruned but that has not yet been performed on the target node.
|
||||
DeltaPropagation(selfUniqueAddress, deltas.map {
|
||||
DeltaPropagation(selfUniqueAddress, reply = false, deltas.map {
|
||||
case (key, (d, fromSeqNr, toSeqNr)) ⇒ getData(key) match {
|
||||
case Some(envelope) ⇒ key → Delta(envelope.copy(data = d), fromSeqNr, toSeqNr)
|
||||
case None ⇒ key → Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
|
||||
|
|
@ -1149,7 +1150,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
case Read(key) ⇒ receiveRead(key)
|
||||
case Write(key, envelope) ⇒ receiveWrite(key, envelope)
|
||||
case ReadRepair(key, envelope) ⇒ receiveReadRepair(key, envelope)
|
||||
case DeltaPropagation(from, deltas) ⇒ receiveDeltaPropagation(from, deltas)
|
||||
case DeltaPropagation(from, reply, deltas) ⇒ receiveDeltaPropagation(from, reply, deltas)
|
||||
case FlushChanges ⇒ receiveFlushChanges()
|
||||
case DeltaPropagationTick ⇒ receiveDeltaPropagationTick()
|
||||
case GossipTick ⇒ receiveGossipTick()
|
||||
|
|
@ -1242,13 +1243,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
else
|
||||
replyTo ! UpdateSuccess(key, req)
|
||||
} else {
|
||||
val writeEnvelope = delta match {
|
||||
case Some(d: RequiresCausalDeliveryOfDeltas) ⇒ newEnvelope
|
||||
case Some(d) ⇒ DataEnvelope(d)
|
||||
case None ⇒ newEnvelope
|
||||
val (writeEnvelope, writeDelta) = delta match {
|
||||
case Some(d: RequiresCausalDeliveryOfDeltas) ⇒
|
||||
val v = deltaPropagationSelector.currentVersion(key.id)
|
||||
(newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v)))
|
||||
case Some(d) ⇒ (newEnvelope.copy(data = d), None)
|
||||
case None ⇒ (newEnvelope, None)
|
||||
}
|
||||
val writeAggregator =
|
||||
context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
|
||||
context.actorOf(WriteAggregator.props(key, writeEnvelope, writeDelta, writeConsistency,
|
||||
req, nodes, unreachable, replyTo, durable)
|
||||
.withDispatcher(context.props.dispatcher))
|
||||
if (durable) {
|
||||
durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope),
|
||||
|
|
@ -1274,14 +1278,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
case _ ⇒ false
|
||||
}
|
||||
|
||||
def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = {
|
||||
write(key, envelope) match {
|
||||
def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit =
|
||||
writeAndStore(key, envelope, reply = true)
|
||||
|
||||
def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope, reply: Boolean): Unit = {
|
||||
write(key, writeEnvelope) match {
|
||||
case Some(newEnvelope) ⇒
|
||||
if (isDurable(key))
|
||||
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo)))
|
||||
else
|
||||
if (isDurable(key)) {
|
||||
val storeReply = if (reply) Some(StoreReply(WriteAck, WriteNack, replyTo)) else None
|
||||
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), storeReply)
|
||||
} else if (reply)
|
||||
replyTo ! WriteAck
|
||||
case None ⇒
|
||||
if (reply)
|
||||
replyTo ! WriteNack
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1316,17 +1326,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
}
|
||||
}
|
||||
|
||||
def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope): Unit = {
|
||||
write(key, writeEnvelope) match {
|
||||
case Some(newEnvelope) ⇒
|
||||
if (isDurable(key))
|
||||
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
|
||||
case None ⇒
|
||||
}
|
||||
}
|
||||
|
||||
def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit = {
|
||||
writeAndStore(key, writeEnvelope)
|
||||
writeAndStore(key, writeEnvelope, reply = false)
|
||||
replyTo ! ReadRepairAck
|
||||
}
|
||||
|
||||
|
|
@ -1353,7 +1354,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
replyTo ! DeleteSuccess(key, req)
|
||||
} else {
|
||||
val writeAggregator =
|
||||
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable)
|
||||
context.actorOf(WriteAggregator.props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable)
|
||||
.withDispatcher(context.props.dispatcher))
|
||||
if (durable) {
|
||||
durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope),
|
||||
|
|
@ -1464,7 +1465,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
deltaPropagationSelector.cleanupDeltaEntries()
|
||||
}
|
||||
|
||||
def receiveDeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]): Unit =
|
||||
def receiveDeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]): Unit =
|
||||
if (deltaCrdtEnabled) {
|
||||
try {
|
||||
val isDebugEnabled = log.isDebugEnabled
|
||||
|
|
@ -1485,20 +1486,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
if (isDebugEnabled) log.debug(
|
||||
"Skipping DeltaPropagation from [{}] for [{}] because toSeqNr [{}] already handled [{}]",
|
||||
fromNode.address, key, toSeqNr, currentSeqNr)
|
||||
if (reply) replyTo ! WriteAck
|
||||
} else if (fromSeqNr > (currentSeqNr + 1)) {
|
||||
if (isDebugEnabled) log.debug(
|
||||
"Skipping DeltaPropagation from [{}] for [{}] because missing deltas between [{}-{}]",
|
||||
fromNode.address, key, currentSeqNr + 1, fromSeqNr - 1)
|
||||
if (reply) replyTo ! DeltaNack
|
||||
} else {
|
||||
if (isDebugEnabled) log.debug(
|
||||
"Applying DeltaPropagation from [{}] for [{}] with sequence numbers [{}], current was [{}]",
|
||||
fromNode.address, key, s"$fromSeqNr-$toSeqNr", currentSeqNr)
|
||||
val newEnvelope = envelope.copy(deltaVersions = VersionVector(fromNode, toSeqNr))
|
||||
writeAndStore(key, newEnvelope)
|
||||
writeAndStore(key, newEnvelope, reply)
|
||||
}
|
||||
case (key, Delta(envelope, _, _)) ⇒
|
||||
// causal delivery of deltas not needed, just apply it
|
||||
writeAndStore(key, envelope)
|
||||
writeAndStore(key, envelope, reply)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
|
|
@ -1507,6 +1510,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
// mixing nodes with incompatible delta-CRDT types
|
||||
log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e)
|
||||
}
|
||||
} else {
|
||||
// !deltaCrdtEnabled
|
||||
if (reply) replyTo ! DeltaNack
|
||||
}
|
||||
|
||||
def receiveGossipTick(): Unit = {
|
||||
|
|
@ -1583,7 +1589,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
updatedData.foreach {
|
||||
case (key, envelope) ⇒
|
||||
val hadData = dataEntries.contains(key)
|
||||
writeAndStore(key, envelope)
|
||||
writeAndStore(key, envelope, reply = false)
|
||||
if (sendBack) getData(key) match {
|
||||
case Some(d) ⇒
|
||||
if (hadData || d.pruning.nonEmpty)
|
||||
|
|
@ -1842,13 +1848,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
def props(
|
||||
key: KeyR,
|
||||
envelope: Replicator.Internal.DataEnvelope,
|
||||
delta: Option[Replicator.Internal.Delta],
|
||||
consistency: Replicator.WriteConsistency,
|
||||
req: Option[Any],
|
||||
nodes: Set[Address],
|
||||
unreachable: Set[Address],
|
||||
replyTo: ActorRef,
|
||||
durable: Boolean): Props =
|
||||
Props(new WriteAggregator(key, envelope, consistency, req, nodes, unreachable, replyTo, durable))
|
||||
Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable))
|
||||
.withDeploy(Deploy.local)
|
||||
}
|
||||
|
||||
|
|
@ -1858,6 +1865,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
@InternalApi private[akka] class WriteAggregator(
|
||||
key: KeyR,
|
||||
envelope: Replicator.Internal.DataEnvelope,
|
||||
delta: Option[Replicator.Internal.Delta],
|
||||
consistency: Replicator.WriteConsistency,
|
||||
req: Option[Any],
|
||||
override val nodes: Set[Address],
|
||||
|
|
@ -1869,6 +1877,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
import Replicator.Internal._
|
||||
import ReadWriteAggregator._
|
||||
|
||||
val selfUniqueAddress = Cluster(context.system).selfUniqueAddress
|
||||
|
||||
override def timeout: FiniteDuration = consistency.timeout
|
||||
|
||||
override val doneWhenRemainingSize = consistency match {
|
||||
|
|
@ -1883,12 +1893,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
}
|
||||
|
||||
val writeMsg = Write(key.id, envelope)
|
||||
val deltaMsg = delta match {
|
||||
case None ⇒ None
|
||||
case Some(d) ⇒ Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id → d)))
|
||||
}
|
||||
|
||||
var gotLocalStoreReply = !durable
|
||||
var gotWriteNackFrom = Set.empty[Address]
|
||||
|
||||
override def preStart(): Unit = {
|
||||
primaryNodes.foreach { replica(_) ! writeMsg }
|
||||
val msg = deltaMsg match {
|
||||
case Some(d) ⇒ d
|
||||
case None ⇒ writeMsg
|
||||
}
|
||||
primaryNodes.foreach { replica(_) ! msg }
|
||||
|
||||
if (isDone) reply(isTimeout = false)
|
||||
}
|
||||
|
|
@ -1900,16 +1918,26 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
case WriteNack ⇒
|
||||
gotWriteNackFrom += senderAddress()
|
||||
if (isDone) reply(isTimeout = false)
|
||||
case DeltaNack ⇒
|
||||
// ok, will be retried with full state
|
||||
|
||||
case _: Replicator.UpdateSuccess[_] ⇒
|
||||
gotLocalStoreReply = true
|
||||
if (isDone) reply(isTimeout = false)
|
||||
case f: Replicator.StoreFailure[_] ⇒
|
||||
gotLocalStoreReply = true
|
||||
gotWriteNackFrom += Cluster(context.system).selfAddress
|
||||
gotWriteNackFrom += selfUniqueAddress.address
|
||||
if (isDone) reply(isTimeout = false)
|
||||
|
||||
case SendToSecondary ⇒
|
||||
deltaMsg match {
|
||||
case None ⇒
|
||||
case Some(d) ⇒
|
||||
// Deltas must be applied in order and we can't keep track of ordering of
|
||||
// simultaneous updates so there is a chance that the delta could not be applied.
|
||||
// Try again with the full state to the primary nodes that have not acked.
|
||||
primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg }
|
||||
}
|
||||
secondaryNodes.foreach { replica(_) ! writeMsg }
|
||||
case ReceiveTimeout ⇒
|
||||
reply(isTimeout = true)
|
||||
|
|
@ -1934,7 +1962,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
if (isSuccess && isDelete) DeleteSuccess(key, req)
|
||||
else if (isSuccess) UpdateSuccess(key, req)
|
||||
else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key, req)
|
||||
else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req)
|
||||
else if (isTimeoutOrNotEnoughNodes || !durable) UpdateTimeout(key, req)
|
||||
else StoreFailure(key, req)
|
||||
|
||||
replyTo.tell(replyMsg, context.parent)
|
||||
|
|
|
|||
|
|
@ -177,6 +177,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
|
|||
val WriteNackManifest = "O"
|
||||
val DurableDataEnvelopeManifest = "P"
|
||||
val DeltaPropagationManifest = "Q"
|
||||
val DeltaNackManifest = "R"
|
||||
|
||||
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
|
||||
GetManifest → getFromBinary,
|
||||
|
|
@ -195,6 +196,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
|
|||
GossipManifest → gossipFromBinary,
|
||||
DeltaPropagationManifest → deltaPropagationFromBinary,
|
||||
WriteNackManifest → (_ ⇒ WriteNack),
|
||||
DeltaNackManifest → (_ ⇒ DeltaNack),
|
||||
DurableDataEnvelopeManifest → durableDataEnvelopeFromBinary)
|
||||
|
||||
override def manifest(obj: AnyRef): String = obj match {
|
||||
|
|
@ -215,6 +217,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
|
|||
case _: Unsubscribe[_] ⇒ UnsubscribeManifest
|
||||
case _: Gossip ⇒ GossipManifest
|
||||
case WriteNack ⇒ WriteNackManifest
|
||||
case DeltaNack ⇒ DeltaNackManifest
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
|
@ -237,6 +240,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
|
|||
case m: Unsubscribe[_] ⇒ unsubscribeToProto(m).toByteArray
|
||||
case m: Gossip ⇒ compress(gossipToProto(m))
|
||||
case WriteNack ⇒ dm.Empty.getDefaultInstance.toByteArray
|
||||
case DeltaNack ⇒ dm.Empty.getDefaultInstance.toByteArray
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
|
@ -290,6 +294,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
|
|||
private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = {
|
||||
val b = dm.DeltaPropagation.newBuilder()
|
||||
.setFromNode(uniqueAddressToProto(deltaPropagation.fromNode))
|
||||
if (deltaPropagation.reply)
|
||||
b.setReply(deltaPropagation.reply)
|
||||
val entries = deltaPropagation.deltas.foreach {
|
||||
case (key, Delta(data, fromSeqNr, toSeqNr)) ⇒
|
||||
val b2 = dm.DeltaPropagation.Entry.newBuilder()
|
||||
|
|
@ -305,8 +311,10 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
|
|||
|
||||
private def deltaPropagationFromBinary(bytes: Array[Byte]): DeltaPropagation = {
|
||||
val deltaPropagation = dm.DeltaPropagation.parseFrom(bytes)
|
||||
val reply = deltaPropagation.hasReply && deltaPropagation.getReply
|
||||
DeltaPropagation(
|
||||
uniqueAddressFromProto(deltaPropagation.getFromNode),
|
||||
reply,
|
||||
deltaPropagation.getEntriesList.asScala.map { e ⇒
|
||||
val fromSeqNr = e.getFromSeqNr
|
||||
val toSeqNr = if (e.hasToSeqNr) e.getToSeqNr else fromSeqNr
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ object ReplicatorDeltaSpec extends MultiNodeConfig {
|
|||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "cluster"
|
||||
akka.log-dead-letters-during-shutdown = off
|
||||
"""))
|
||||
|
|
@ -129,6 +129,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
|
|||
r
|
||||
}
|
||||
|
||||
val writeAll = WriteAll(5.seconds)
|
||||
|
||||
var afterCounter = 0
|
||||
def enterBarrierAfterTestStep(): Unit = {
|
||||
afterCounter += 1
|
||||
|
|
@ -197,7 +199,7 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
|
|||
awaitAssert {
|
||||
val p = TestProbe()
|
||||
List(KeyD, KeyE, KeyF).foreach { key ⇒
|
||||
fullStateReplicator.tell(Get(key, ReadLocal), p.ref)
|
||||
deltaReplicator.tell(Get(key, ReadLocal), p.ref)
|
||||
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a"))
|
||||
}
|
||||
}
|
||||
|
|
@ -206,6 +208,48 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
|
|||
enterBarrierAfterTestStep()
|
||||
}
|
||||
|
||||
"work with write consistency" in {
|
||||
runOn(first) {
|
||||
val p1 = TestProbe()
|
||||
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref)
|
||||
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref)
|
||||
p1.expectMsgType[UpdateSuccess[_]]
|
||||
p1.expectMsgType[UpdateSuccess[_]]
|
||||
}
|
||||
enterBarrier("write-1")
|
||||
|
||||
val p = TestProbe()
|
||||
deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref)
|
||||
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A"))
|
||||
|
||||
// and also when doing several at the same time (deltas may be reordered) and then we
|
||||
// retry with full state to sort it out
|
||||
runOn(first) {
|
||||
val p1 = TestProbe()
|
||||
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "B"), p1.ref)
|
||||
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "C"), p1.ref)
|
||||
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "D"), p1.ref)
|
||||
p1.expectMsgType[UpdateSuccess[_]]
|
||||
p1.expectMsgType[UpdateSuccess[_]]
|
||||
p1.expectMsgType[UpdateSuccess[_]]
|
||||
}
|
||||
enterBarrier("write-2")
|
||||
deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref)
|
||||
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D"))
|
||||
|
||||
// add same to the fullStateReplicator so they are in sync
|
||||
runOn(first) {
|
||||
val p1 = TestProbe()
|
||||
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A" + "B" + "C" + "D"), p1.ref)
|
||||
p1.expectMsgType[UpdateSuccess[_]]
|
||||
}
|
||||
enterBarrier("write-3")
|
||||
fullStateReplicator.tell(Get(KeyD, ReadLocal), p.ref)
|
||||
p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D"))
|
||||
|
||||
enterBarrierAfterTestStep()
|
||||
}
|
||||
|
||||
"be eventually consistent" in {
|
||||
val operations = generateOperations(onNode = myself)
|
||||
log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ object ReplicatorORSetDeltaSpec extends MultiNodeConfig {
|
|||
val third = role("third")
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "cluster"
|
||||
akka.log-dead-letters-during-shutdown = off
|
||||
"""))
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ object DeltaPropagationSelectorSpec {
|
|||
override val allNodes: Vector[Address]) extends DeltaPropagationSelector {
|
||||
override val gossipIntervalDivisor = 5
|
||||
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation =
|
||||
DeltaPropagation(selfUniqueAddress, deltas.mapValues {
|
||||
DeltaPropagation(selfUniqueAddress, false, deltas.mapValues {
|
||||
case (d, fromSeqNr, toSeqNr) ⇒ Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
|
||||
})
|
||||
}
|
||||
|
|
@ -50,7 +50,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
|
|||
selector.cleanupDeltaEntries()
|
||||
selector.hasDeltaEntries("A") should ===(true)
|
||||
selector.hasDeltaEntries("B") should ===(true)
|
||||
val expected = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(deltaA), 1L, 1L),
|
||||
"B" → Delta(DataEnvelope(deltaB), 1L, 1L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected))
|
||||
|
|
@ -64,7 +64,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
|
|||
val selector = new TestSelector(selfUniqueAddress, nodes.take(3))
|
||||
selector.update("A", deltaA)
|
||||
selector.update("B", deltaB)
|
||||
val expected = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(deltaA), 1L, 1L),
|
||||
"B" → Delta(DataEnvelope(deltaB), 1L, 1L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected, nodes(1) → expected))
|
||||
|
|
@ -82,17 +82,17 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
|
|||
val selector = new TestSelector(selfUniqueAddress, nodes.take(3))
|
||||
selector.update("A", deltaA)
|
||||
selector.update("B", deltaB)
|
||||
val expected1 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected1 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(deltaA), 1L, 1L),
|
||||
"B" → Delta(DataEnvelope(deltaB), 1L, 1L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected1, nodes(1) → expected1))
|
||||
// new update before previous was propagated to all nodes
|
||||
selector.update("C", deltaC)
|
||||
val expected2 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected2 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(deltaA), 1L, 1L),
|
||||
"B" → Delta(DataEnvelope(deltaB), 1L, 1L),
|
||||
"C" → Delta(DataEnvelope(deltaC), 1L, 1L)))
|
||||
val expected3 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected3 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"C" → Delta(DataEnvelope(deltaC), 1L, 1L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(2) → expected2, nodes(0) → expected3))
|
||||
selector.cleanupDeltaEntries()
|
||||
|
|
@ -114,12 +114,12 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
|
|||
selector.currentVersion("A") should ===(1L)
|
||||
selector.update("A", delta2)
|
||||
selector.currentVersion("A") should ===(2L)
|
||||
val expected1 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected1 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected1))
|
||||
selector.update("A", delta3)
|
||||
selector.currentVersion("A") should ===(3L)
|
||||
val expected2 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected2 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta3), 3L, 3L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected2))
|
||||
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
|
||||
|
|
@ -133,25 +133,25 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
|
|||
override def nodesSliceSize(allNodesSize: Int): Int = 1
|
||||
}
|
||||
selector.update("A", delta1)
|
||||
val expected1 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected1 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta1), 1L, 1L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected1))
|
||||
|
||||
selector.update("A", delta2)
|
||||
val expected2 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected2 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(1) → expected2))
|
||||
|
||||
selector.update("A", delta3)
|
||||
val expected3 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected3 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta1.merge(delta2).merge(delta3)), 1L, 3L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(2) → expected3))
|
||||
|
||||
val expected4 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected4 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(0) → expected4))
|
||||
|
||||
val expected5 = DeltaPropagation(selfUniqueAddress, Map(
|
||||
val expected5 = DeltaPropagation(selfUniqueAddress, false, Map(
|
||||
"A" → Delta(DataEnvelope(delta3), 3L, 3L)))
|
||||
selector.collectPropagations() should ===(Map(nodes(1) → expected5))
|
||||
|
||||
|
|
|
|||
|
|
@ -17,18 +17,24 @@ import akka.cluster.ddata.Replicator._
|
|||
import akka.remote.RARP
|
||||
|
||||
import scala.concurrent.Future
|
||||
import akka.cluster.Cluster
|
||||
|
||||
object WriteAggregatorSpec {
|
||||
|
||||
val key = GSetKey[String]("a")
|
||||
val KeyA = GSetKey[String]("A")
|
||||
val KeyB = ORSetKey[String]("B")
|
||||
|
||||
def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency,
|
||||
probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props =
|
||||
Props(new TestWriteAggregator(data, consistency, probes, nodes, unreachable, replyTo, durable))
|
||||
Props(new TestWriteAggregator(KeyA, data, None, consistency, probes, nodes, unreachable, replyTo, durable))
|
||||
|
||||
class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency,
|
||||
def writeAggregatorPropsWithDelta(data: ORSet[String], delta: Delta, consistency: Replicator.WriteConsistency,
|
||||
probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props =
|
||||
Props(new TestWriteAggregator(KeyB, data, Some(delta), consistency, probes, nodes, unreachable, replyTo, durable))
|
||||
|
||||
class TestWriteAggregator(key: Key.KeyR, data: ReplicatedData, delta: Option[Delta], consistency: Replicator.WriteConsistency,
|
||||
probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean)
|
||||
extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, unreachable, replyTo, durable) {
|
||||
extends WriteAggregator(key, DataEnvelope(data), delta, consistency, None, nodes, unreachable, replyTo, durable) {
|
||||
|
||||
override def replica(address: Address): ActorSelection =
|
||||
context.actorSelection(probes(address).path)
|
||||
|
|
@ -48,6 +54,8 @@ object WriteAggregatorSpec {
|
|||
replicator.foreach(_ ! WriteAck)
|
||||
case WriteNack ⇒
|
||||
replicator.foreach(_ ! WriteNack)
|
||||
case DeltaNack ⇒
|
||||
replicator.foreach(_ ! DeltaNack)
|
||||
case msg ⇒
|
||||
replicator = Some(sender())
|
||||
replica ! msg
|
||||
|
|
@ -89,6 +97,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
val timeout = 3.seconds.dilated
|
||||
val writeThree = WriteTo(3, timeout)
|
||||
val writeMajority = WriteMajority(timeout)
|
||||
val writeAll = WriteAll(timeout)
|
||||
|
||||
def probes(probe: ActorRef): Map[Address, ActorRef] =
|
||||
nodes.toSeq.map(_ → system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap
|
||||
|
|
@ -111,7 +120,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
probe.lastSender ! WriteAck
|
||||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteAck
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
@ -132,15 +141,14 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
Future.sequence {
|
||||
Seq(
|
||||
Future { testProbes(nodeC).expectNoMsg(t) },
|
||||
Future { testProbes(nodeD).expectNoMsg(t) }
|
||||
)
|
||||
Future { testProbes(nodeD).expectNoMsg(t) })
|
||||
}.futureValue
|
||||
testProbes(nodeC).expectMsgType[Write]
|
||||
testProbes(nodeC).lastSender ! WriteAck
|
||||
testProbes(nodeD).expectMsgType[Write]
|
||||
testProbes(nodeD).lastSender ! WriteAck
|
||||
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
@ -158,7 +166,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
// no reply
|
||||
probe.expectMsgType[Write]
|
||||
// no reply
|
||||
expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None))
|
||||
expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
@ -181,6 +189,82 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
}
|
||||
}
|
||||
|
||||
"WriteAggregator with delta" must {
|
||||
implicit val cluster = Cluster(system)
|
||||
val fullState1 = ORSet.empty[String] + "a" + "b"
|
||||
val fullState2 = fullState1.resetDelta + "c"
|
||||
val delta = Delta(DataEnvelope(fullState2.delta.get), 2L, 2L)
|
||||
|
||||
"send deltas first" in {
|
||||
val probe = TestProbe()
|
||||
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta(
|
||||
fullState2, delta, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false))
|
||||
|
||||
probe.expectMsgType[DeltaPropagation]
|
||||
probe.lastSender ! WriteAck
|
||||
probe.expectMsgType[DeltaPropagation]
|
||||
probe.lastSender ! WriteAck
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
||||
"retry with full state when no immediate reply or nack" in {
|
||||
val testProbes = probes()
|
||||
val testProbeRefs = testProbes.map { case (a, tm) ⇒ a → tm.writeAckAdapter }
|
||||
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta(
|
||||
fullState2, delta, writeAll, testProbeRefs, nodes, Set.empty, testActor, durable = false))
|
||||
|
||||
testProbes(nodeA).expectMsgType[DeltaPropagation]
|
||||
// no reply
|
||||
testProbes(nodeB).expectMsgType[DeltaPropagation]
|
||||
testProbes(nodeB).lastSender ! WriteAck
|
||||
testProbes(nodeC).expectMsgType[DeltaPropagation]
|
||||
testProbes(nodeC).lastSender ! WriteAck
|
||||
testProbes(nodeD).expectMsgType[DeltaPropagation]
|
||||
testProbes(nodeD).lastSender ! DeltaNack
|
||||
|
||||
// here is the second round
|
||||
testProbes(nodeA).expectMsgType[Write]
|
||||
testProbes(nodeA).lastSender ! WriteAck
|
||||
testProbes(nodeD).expectMsgType[Write]
|
||||
testProbes(nodeD).lastSender ! WriteAck
|
||||
testProbes(nodeB).expectNoMsg(100.millis)
|
||||
testProbes(nodeC).expectNoMsg(100.millis)
|
||||
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
||||
"timeout when less than required acks" in {
|
||||
val probe = TestProbe()
|
||||
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta(
|
||||
fullState2, delta, writeAll, probes(probe.ref), nodes, Set.empty, testActor, durable = false))
|
||||
|
||||
probe.expectMsgType[DeltaPropagation]
|
||||
// no reply
|
||||
probe.expectMsgType[DeltaPropagation]
|
||||
probe.lastSender ! WriteAck
|
||||
probe.expectMsgType[DeltaPropagation]
|
||||
// nack
|
||||
probe.lastSender ! DeltaNack
|
||||
probe.expectMsgType[DeltaPropagation]
|
||||
// no reply
|
||||
|
||||
// only 1 ack so we expect 3 full state Write
|
||||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteAck
|
||||
probe.expectMsgType[Write]
|
||||
probe.expectMsgType[Write]
|
||||
|
||||
// still not enough acks
|
||||
expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyB, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
}
|
||||
|
||||
"Durable WriteAggregator" must {
|
||||
"not reply before local confirmation" in {
|
||||
val probe = TestProbe()
|
||||
|
|
@ -194,9 +278,9 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
expectNoMsg(200.millis)
|
||||
|
||||
// the local write
|
||||
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None)
|
||||
aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None)
|
||||
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
@ -206,7 +290,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps(
|
||||
data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true))
|
||||
|
||||
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write
|
||||
aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write
|
||||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteAck
|
||||
probe.expectMsgType[Write]
|
||||
|
|
@ -214,7 +298,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteAck
|
||||
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None))
|
||||
expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
@ -226,7 +310,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
|
||||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteNack
|
||||
aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write
|
||||
aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write
|
||||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteAck
|
||||
probe.expectMsgType[Write]
|
||||
|
|
@ -234,7 +318,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteNack
|
||||
|
||||
expectMsg(StoreFailure(WriteAggregatorSpec.key, None))
|
||||
expectMsg(StoreFailure(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
@ -253,7 +337,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
|
|||
probe.expectMsgType[Write]
|
||||
probe.lastSender ! WriteNack
|
||||
|
||||
expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None))
|
||||
expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None))
|
||||
watch(aggr)
|
||||
expectTerminated(aggr)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,6 +80,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
|
|||
checkSerialization(Write("A", DataEnvelope(data1)))
|
||||
checkSerialization(WriteAck)
|
||||
checkSerialization(WriteNack)
|
||||
checkSerialization(DeltaNack)
|
||||
checkSerialization(Read("A"))
|
||||
checkSerialization(ReadResult(Some(DataEnvelope(data1))))
|
||||
checkSerialization(ReadResult(None))
|
||||
|
|
@ -89,7 +90,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
|
|||
checkSerialization(Gossip(Map(
|
||||
"A" → DataEnvelope(data1),
|
||||
"B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true))
|
||||
checkSerialization(DeltaPropagation(address1, Map(
|
||||
checkSerialization(DeltaPropagation(address1, reply = true, Map(
|
||||
"A" → Delta(DataEnvelope(delta1), 1L, 1L),
|
||||
"B" → Delta(DataEnvelope(delta2), 3L, 5L))))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue