diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
index 63c51ac058..d4e0dcde92 100644
--- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
+++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java
@@ -12721,6 +12721,24 @@ public final class ReplicatorMessages {
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder(
int index);
+
+ // optional bool reply = 3;
+ /**
+ * optional bool reply = 3;
+ *
+ *
+ * no reply if not set + *+ */ + boolean hasReply(); + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + boolean getReply(); } /** * Protobuf type {@code akka.cluster.ddata.DeltaPropagation} @@ -12794,6 +12812,11 @@ public final class ReplicatorMessages { entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.PARSER, extensionRegistry)); break; } + case 24: { + bitField0_ |= 0x00000002; + reply_ = input.readBool(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -13784,9 +13807,34 @@ public final class ReplicatorMessages { return entries_.get(index); } + // optional bool reply = 3; + public static final int REPLY_FIELD_NUMBER = 3; + private boolean reply_; + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + public boolean hasReply() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + public boolean getReply() { + return reply_; + } + private void initFields() { fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); entries_ = java.util.Collections.emptyList(); + reply_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13820,6 +13868,9 @@ public final class ReplicatorMessages { for (int i = 0; i < entries_.size(); i++) { output.writeMessage(2, entries_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBool(3, reply_); + } getUnknownFields().writeTo(output); } @@ -13837,6 +13888,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(2, entries_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeBoolSize(3, reply_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -13967,6 +14022,8 @@ public final class ReplicatorMessages { } else { entriesBuilder_.clear(); } + reply_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -14012,6 +14069,10 @@ public final class ReplicatorMessages { } else { result.entries_ = entriesBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.reply_ = reply_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -14057,6 +14118,9 @@ public final class ReplicatorMessages { } } } + if (other.hasReply()) { + setReply(other.getReply()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -14455,6 +14519,55 @@ public final class ReplicatorMessages { return entriesBuilder_; } + // optional bool reply = 3; + private boolean reply_ ; + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + public boolean hasReply() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + public boolean getReply() { + return reply_; + } + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + public Builder setReply(boolean value) { + bitField0_ |= 0x00000004; + reply_ = value; + onChanged(); + return this; + } + /** + *
optional bool reply = 3;
+ *
+ * + * no reply if not set + *+ */ + public Builder clearReply() { + bitField0_ = (bitField0_ & ~0x00000004); + reply_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation) } @@ -19212,27 +19325,28 @@ public final class ReplicatorMessages { " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda", "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" + "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat" + - "aEnvelope\"\362\001\n\020DeltaPropagation\0223\n\010fromNo" + + "aEnvelope\"\201\002\n\020DeltaPropagation\0223\n\010fromNo" + "de\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddr" + "ess\022;\n\007entries\030\002 \003(\0132*.akka.cluster.ddat" + - "a.DeltaPropagation.Entry\032l\n\005Entry\022\013\n\003key" + - "\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.cluster." + - "ddata.DataEnvelope\022\021\n\tfromSeqNr\030\003 \002(\003\022\017\n" + - "\007toSeqNr\030\004 \001(\003\"X\n\rUniqueAddress\022,\n\007addre" + - "ss\030\001 \002(\0132\033.akka.cluster.ddata.Address\022\013\n", - "\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010h" + - "ostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"\224\001\n\rVersionV" + - "ector\0228\n\007entries\030\001 \003(\0132\'.akka.cluster.dd" + - "ata.VersionVector.Entry\032I\n\005Entry\022/\n\004node" + - "\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddres" + - "s\022\017\n\007version\030\002 \002(\003\"V\n\014OtherMessage\022\027\n\017en" + - "closedMessage\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(" + - "\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nStringGSet" + - "\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023DurableDataEnvelo" + - "pe\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata.Ot", - "herMessage\022>\n\007pruning\030\002 \003(\0132-.akka.clust" + - "er.ddata.DataEnvelope.PruningEntryB#\n\037ak" + - "ka.cluster.ddata.protobuf.msgH\001" + "a.DeltaPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032" + + "l\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132" + + " .akka.cluster.ddata.DataEnvelope\022\021\n\tfro" + + "mSeqNr\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueA" + + "ddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster.d", + "data.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\"" + + ")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002" + + "(\r\"\224\001\n\rVersionVector\0228\n\007entries\030\001 \003(\0132\'." + + "akka.cluster.ddata.VersionVector.Entry\032I" + + "\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka.cluster.dda" + + "ta.UniqueAddress\022\017\n\007version\030\002 \002(\003\"V\n\014Oth" + + "erMessage\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014se" + + "rializerId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(" + + "\014\"\036\n\nStringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023Du" + + "rableDataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.c", + "luster.ddata.OtherMessage\022>\n\007pruning\030\002 \003" + + "(\0132-.akka.cluster.ddata.DataEnvelope.Pru" + + "ningEntryB#\n\037akka.cluster.ddata.protobuf" + + ".msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -19346,7 +19460,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_DeltaPropagation_descriptor, - new java.lang.String[] { "FromNode", "Entries", }); + new java.lang.String[] { "FromNode", "Entries", "Reply", }); internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor = internal_static_akka_cluster_ddata_DeltaPropagation_descriptor.getNestedTypes().get(0); internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable = new diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto index 0582fac91b..0c83967b07 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto @@ -107,6 +107,7 @@ message DeltaPropagation { required UniqueAddress fromNode = 1; repeated Entry entries = 2; + optional bool reply = 3; // no reply if not set } message UniqueAddress { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 0a96101ad0..5e85fa294e 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -743,7 +743,8 @@ object Replicator { final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) - final case class DeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]) extends ReplicatorMessage + final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage + case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression } } @@ -990,7 +991,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = { // Important to include the pruning state in the deltas. For example if the delta is based // on an entry that has been pruned but that has not yet been performed on the target node. - DeltaPropagation(selfUniqueAddress, deltas.map { + DeltaPropagation(selfUniqueAddress, reply = false, deltas.map { case (key, (d, fromSeqNr, toSeqNr)) ⇒ getData(key) match { case Some(envelope) ⇒ key → Delta(envelope.copy(data = d), fromSeqNr, toSeqNr) case None ⇒ key → Delta(DataEnvelope(d), fromSeqNr, toSeqNr) @@ -1149,7 +1150,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case Read(key) ⇒ receiveRead(key) case Write(key, envelope) ⇒ receiveWrite(key, envelope) case ReadRepair(key, envelope) ⇒ receiveReadRepair(key, envelope) - case DeltaPropagation(from, deltas) ⇒ receiveDeltaPropagation(from, deltas) + case DeltaPropagation(from, reply, deltas) ⇒ receiveDeltaPropagation(from, reply, deltas) case FlushChanges ⇒ receiveFlushChanges() case DeltaPropagationTick ⇒ receiveDeltaPropagationTick() case GossipTick ⇒ receiveGossipTick() @@ -1242,13 +1243,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog else replyTo ! UpdateSuccess(key, req) } else { - val writeEnvelope = delta match { - case Some(d: RequiresCausalDeliveryOfDeltas) ⇒ newEnvelope - case Some(d) ⇒ DataEnvelope(d) - case None ⇒ newEnvelope + val (writeEnvelope, writeDelta) = delta match { + case Some(d: RequiresCausalDeliveryOfDeltas) ⇒ + val v = deltaPropagationSelector.currentVersion(key.id) + (newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v))) + case Some(d) ⇒ (newEnvelope.copy(data = d), None) + case None ⇒ (newEnvelope, None) } val writeAggregator = - context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable) + context.actorOf(WriteAggregator.props(key, writeEnvelope, writeDelta, writeConsistency, + req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope), @@ -1274,14 +1278,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case _ ⇒ false } - def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = { - write(key, envelope) match { + def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = + writeAndStore(key, envelope, reply = true) + + def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope, reply: Boolean): Unit = { + write(key, writeEnvelope) match { case Some(newEnvelope) ⇒ - if (isDurable(key)) - durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo))) - else + if (isDurable(key)) { + val storeReply = if (reply) Some(StoreReply(WriteAck, WriteNack, replyTo)) else None + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), storeReply) + } else if (reply) replyTo ! WriteAck case None ⇒ + if (reply) + replyTo ! WriteNack } } @@ -1316,17 +1326,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } - def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope): Unit = { - write(key, writeEnvelope) match { - case Some(newEnvelope) ⇒ - if (isDurable(key)) - durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None) - case None ⇒ - } - } - def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit = { - writeAndStore(key, writeEnvelope) + writeAndStore(key, writeEnvelope, reply = false) replyTo ! ReadRepairAck } @@ -1353,7 +1354,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog replyTo ! DeleteSuccess(key, req) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable) + context.actorOf(WriteAggregator.props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope), @@ -1464,7 +1465,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog deltaPropagationSelector.cleanupDeltaEntries() } - def receiveDeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]): Unit = + def receiveDeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]): Unit = if (deltaCrdtEnabled) { try { val isDebugEnabled = log.isDebugEnabled @@ -1485,20 +1486,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (isDebugEnabled) log.debug( "Skipping DeltaPropagation from [{}] for [{}] because toSeqNr [{}] already handled [{}]", fromNode.address, key, toSeqNr, currentSeqNr) + if (reply) replyTo ! WriteAck } else if (fromSeqNr > (currentSeqNr + 1)) { if (isDebugEnabled) log.debug( "Skipping DeltaPropagation from [{}] for [{}] because missing deltas between [{}-{}]", fromNode.address, key, currentSeqNr + 1, fromSeqNr - 1) + if (reply) replyTo ! DeltaNack } else { if (isDebugEnabled) log.debug( "Applying DeltaPropagation from [{}] for [{}] with sequence numbers [{}], current was [{}]", fromNode.address, key, s"$fromSeqNr-$toSeqNr", currentSeqNr) val newEnvelope = envelope.copy(deltaVersions = VersionVector(fromNode, toSeqNr)) - writeAndStore(key, newEnvelope) + writeAndStore(key, newEnvelope, reply) } case (key, Delta(envelope, _, _)) ⇒ // causal delivery of deltas not needed, just apply it - writeAndStore(key, envelope) + writeAndStore(key, envelope, reply) } } } catch { @@ -1507,6 +1510,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // mixing nodes with incompatible delta-CRDT types log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e) } + } else { + // !deltaCrdtEnabled + if (reply) replyTo ! DeltaNack } def receiveGossipTick(): Unit = { @@ -1583,7 +1589,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog updatedData.foreach { case (key, envelope) ⇒ val hadData = dataEntries.contains(key) - writeAndStore(key, envelope) + writeAndStore(key, envelope, reply = false) if (sendBack) getData(key) match { case Some(d) ⇒ if (hadData || d.pruning.nonEmpty) @@ -1842,13 +1848,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def props( key: KeyR, envelope: Replicator.Internal.DataEnvelope, + delta: Option[Replicator.Internal.Delta], consistency: Replicator.WriteConsistency, req: Option[Any], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = - Props(new WriteAggregator(key, envelope, consistency, req, nodes, unreachable, replyTo, durable)) + Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable)) .withDeploy(Deploy.local) } @@ -1858,6 +1865,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog @InternalApi private[akka] class WriteAggregator( key: KeyR, envelope: Replicator.Internal.DataEnvelope, + delta: Option[Replicator.Internal.Delta], consistency: Replicator.WriteConsistency, req: Option[Any], override val nodes: Set[Address], @@ -1869,6 +1877,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog import Replicator.Internal._ import ReadWriteAggregator._ + val selfUniqueAddress = Cluster(context.system).selfUniqueAddress + override def timeout: FiniteDuration = consistency.timeout override val doneWhenRemainingSize = consistency match { @@ -1883,12 +1893,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } val writeMsg = Write(key.id, envelope) + val deltaMsg = delta match { + case None ⇒ None + case Some(d) ⇒ Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id → d))) + } var gotLocalStoreReply = !durable var gotWriteNackFrom = Set.empty[Address] override def preStart(): Unit = { - primaryNodes.foreach { replica(_) ! writeMsg } + val msg = deltaMsg match { + case Some(d) ⇒ d + case None ⇒ writeMsg + } + primaryNodes.foreach { replica(_) ! msg } if (isDone) reply(isTimeout = false) } @@ -1900,16 +1918,26 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case WriteNack ⇒ gotWriteNackFrom += senderAddress() if (isDone) reply(isTimeout = false) + case DeltaNack ⇒ + // ok, will be retried with full state case _: Replicator.UpdateSuccess[_] ⇒ gotLocalStoreReply = true if (isDone) reply(isTimeout = false) case f: Replicator.StoreFailure[_] ⇒ gotLocalStoreReply = true - gotWriteNackFrom += Cluster(context.system).selfAddress + gotWriteNackFrom += selfUniqueAddress.address if (isDone) reply(isTimeout = false) case SendToSecondary ⇒ + deltaMsg match { + case None ⇒ + case Some(d) ⇒ + // Deltas must be applied in order and we can't keep track of ordering of + // simultaneous updates so there is a chance that the delta could not be applied. + // Try again with the full state to the primary nodes that have not acked. + primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg } + } secondaryNodes.foreach { replica(_) ! writeMsg } case ReceiveTimeout ⇒ reply(isTimeout = true) @@ -1934,7 +1962,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (isSuccess && isDelete) DeleteSuccess(key, req) else if (isSuccess) UpdateSuccess(key, req) else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key, req) - else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req) + else if (isTimeoutOrNotEnoughNodes || !durable) UpdateTimeout(key, req) else StoreFailure(key, req) replyTo.tell(replyMsg, context.parent) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index c48b8f4750..d24682884f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -176,6 +176,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val WriteNackManifest = "O" val DurableDataEnvelopeManifest = "P" val DeltaPropagationManifest = "Q" + val DeltaNackManifest = "R" private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef]( GetManifest → getFromBinary, @@ -194,6 +195,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) GossipManifest → gossipFromBinary, DeltaPropagationManifest → deltaPropagationFromBinary, WriteNackManifest → (_ ⇒ WriteNack), + DeltaNackManifest → (_ ⇒ DeltaNack), DurableDataEnvelopeManifest → durableDataEnvelopeFromBinary) override def manifest(obj: AnyRef): String = obj match { @@ -214,6 +216,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case _: Unsubscribe[_] ⇒ UnsubscribeManifest case _: Gossip ⇒ GossipManifest case WriteNack ⇒ WriteNackManifest + case DeltaNack ⇒ DeltaNackManifest case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } @@ -236,6 +239,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case m: Unsubscribe[_] ⇒ unsubscribeToProto(m).toByteArray case m: Gossip ⇒ compress(gossipToProto(m)) case WriteNack ⇒ dm.Empty.getDefaultInstance.toByteArray + case DeltaNack ⇒ dm.Empty.getDefaultInstance.toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } @@ -289,6 +293,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = { val b = dm.DeltaPropagation.newBuilder() .setFromNode(uniqueAddressToProto(deltaPropagation.fromNode)) + if (deltaPropagation.reply) + b.setReply(deltaPropagation.reply) val entries = deltaPropagation.deltas.foreach { case (key, Delta(data, fromSeqNr, toSeqNr)) ⇒ val b2 = dm.DeltaPropagation.Entry.newBuilder() @@ -304,8 +310,10 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def deltaPropagationFromBinary(bytes: Array[Byte]): DeltaPropagation = { val deltaPropagation = dm.DeltaPropagation.parseFrom(bytes) + val reply = deltaPropagation.hasReply && deltaPropagation.getReply DeltaPropagation( uniqueAddressFromProto(deltaPropagation.getFromNode), + reply, deltaPropagation.getEntriesList.asScala.map { e ⇒ val fromSeqNr = e.getFromSeqNr val toSeqNr = if (e.hasToSeqNr) e.getToSeqNr else fromSeqNr diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala index 44910f3afd..c1df7bffdb 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala @@ -22,7 +22,7 @@ object ReplicatorDeltaSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(ConfigFactory.parseString(""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off """)) @@ -129,6 +129,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult r } + val writeAll = WriteAll(5.seconds) + var afterCounter = 0 def enterBarrierAfterTestStep(): Unit = { afterCounter += 1 @@ -197,7 +199,7 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult awaitAssert { val p = TestProbe() List(KeyD, KeyE, KeyF).foreach { key ⇒ - fullStateReplicator.tell(Get(key, ReadLocal), p.ref) + deltaReplicator.tell(Get(key, ReadLocal), p.ref) p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a")) } } @@ -206,6 +208,48 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult enterBarrierAfterTestStep() } + "work with write consistency" in { + runOn(first) { + val p1 = TestProbe() + fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-1") + + val p = TestProbe() + deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A")) + + // and also when doing several at the same time (deltas may be reordered) and then we + // retry with full state to sort it out + runOn(first) { + val p1 = TestProbe() + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "B"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "C"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "D"), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-2") + deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D")) + + // add same to the fullStateReplicator so they are in sync + runOn(first) { + val p1 = TestProbe() + fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A" + "B" + "C" + "D"), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-3") + fullStateReplicator.tell(Get(KeyD, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D")) + + enterBarrierAfterTestStep() + } + "be eventually consistent" in { val operations = generateOperations(onNode = myself) log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}") diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala index 04232f3f82..3d1a9581a8 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala @@ -19,7 +19,7 @@ object ReplicatorORSetDeltaSpec extends MultiNodeConfig { val third = role("third") commonConfig(ConfigFactory.parseString(""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off """)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala index e511e024fa..7dd1fea309 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala @@ -19,7 +19,7 @@ object DeltaPropagationSelectorSpec { override val allNodes: Vector[Address]) extends DeltaPropagationSelector { override val gossipIntervalDivisor = 5 override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = - DeltaPropagation(selfUniqueAddress, deltas.mapValues { + DeltaPropagation(selfUniqueAddress, false, deltas.mapValues { case (d, fromSeqNr, toSeqNr) ⇒ Delta(DataEnvelope(d), fromSeqNr, toSeqNr) }) } @@ -50,7 +50,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(true) selector.hasDeltaEntries("B") should ===(true) - val expected = DeltaPropagation(selfUniqueAddress, Map( + val expected = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected)) @@ -64,7 +64,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) selector.update("A", deltaA) selector.update("B", deltaB) - val expected = DeltaPropagation(selfUniqueAddress, Map( + val expected = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected, nodes(1) → expected)) @@ -82,17 +82,17 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) selector.update("A", deltaA) selector.update("B", deltaB) - val expected1 = DeltaPropagation(selfUniqueAddress, Map( + val expected1 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected1, nodes(1) → expected1)) // new update before previous was propagated to all nodes selector.update("C", deltaC) - val expected2 = DeltaPropagation(selfUniqueAddress, Map( + val expected2 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L), "C" → Delta(DataEnvelope(deltaC), 1L, 1L))) - val expected3 = DeltaPropagation(selfUniqueAddress, Map( + val expected3 = DeltaPropagation(selfUniqueAddress, false, Map( "C" → Delta(DataEnvelope(deltaC), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(2) → expected2, nodes(0) → expected3)) selector.cleanupDeltaEntries() @@ -114,12 +114,12 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck selector.currentVersion("A") should ===(1L) selector.update("A", delta2) selector.currentVersion("A") should ===(2L) - val expected1 = DeltaPropagation(selfUniqueAddress, Map( + val expected1 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L))) selector.collectPropagations() should ===(Map(nodes(0) → expected1)) selector.update("A", delta3) selector.currentVersion("A") should ===(3L) - val expected2 = DeltaPropagation(selfUniqueAddress, Map( + val expected2 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta3), 3L, 3L))) selector.collectPropagations() should ===(Map(nodes(0) → expected2)) selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) @@ -133,25 +133,25 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck override def nodesSliceSize(allNodesSize: Int): Int = 1 } selector.update("A", delta1) - val expected1 = DeltaPropagation(selfUniqueAddress, Map( + val expected1 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected1)) selector.update("A", delta2) - val expected2 = DeltaPropagation(selfUniqueAddress, Map( + val expected2 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L))) selector.collectPropagations() should ===(Map(nodes(1) → expected2)) selector.update("A", delta3) - val expected3 = DeltaPropagation(selfUniqueAddress, Map( + val expected3 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1.merge(delta2).merge(delta3)), 1L, 3L))) selector.collectPropagations() should ===(Map(nodes(2) → expected3)) - val expected4 = DeltaPropagation(selfUniqueAddress, Map( + val expected4 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L))) selector.collectPropagations() should ===(Map(nodes(0) → expected4)) - val expected5 = DeltaPropagation(selfUniqueAddress, Map( + val expected5 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta3), 3L, 3L))) selector.collectPropagations() should ===(Map(nodes(1) → expected5)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 2547a7cebe..a07209a18d 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -17,18 +17,24 @@ import akka.cluster.ddata.Replicator._ import akka.remote.RARP import scala.concurrent.Future +import akka.cluster.Cluster object WriteAggregatorSpec { - val key = GSetKey[String]("a") + val KeyA = GSetKey[String]("A") + val KeyB = ORSetKey[String]("B") def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency, probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = - Props(new TestWriteAggregator(data, consistency, probes, nodes, unreachable, replyTo, durable)) + Props(new TestWriteAggregator(KeyA, data, None, consistency, probes, nodes, unreachable, replyTo, durable)) - class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency, + def writeAggregatorPropsWithDelta(data: ORSet[String], delta: Delta, consistency: Replicator.WriteConsistency, + probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = + Props(new TestWriteAggregator(KeyB, data, Some(delta), consistency, probes, nodes, unreachable, replyTo, durable)) + + class TestWriteAggregator(key: Key.KeyR, data: ReplicatedData, delta: Option[Delta], consistency: Replicator.WriteConsistency, probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean) - extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, unreachable, replyTo, durable) { + extends WriteAggregator(key, DataEnvelope(data), delta, consistency, None, nodes, unreachable, replyTo, durable) { override def replica(address: Address): ActorSelection = context.actorSelection(probes(address).path) @@ -48,6 +54,8 @@ object WriteAggregatorSpec { replicator.foreach(_ ! WriteAck) case WriteNack ⇒ replicator.foreach(_ ! WriteNack) + case DeltaNack ⇒ + replicator.foreach(_ ! DeltaNack) case msg ⇒ replicator = Some(sender()) replica ! msg @@ -89,6 +97,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val timeout = 3.seconds.dilated val writeThree = WriteTo(3, timeout) val writeMajority = WriteMajority(timeout) + val writeAll = WriteAll(timeout) def probes(probe: ActorRef): Map[Address, ActorRef] = nodes.toSeq.map(_ → system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap @@ -111,7 +120,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteAck - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -132,15 +141,14 @@ class WriteAggregatorSpec extends AkkaSpec(s""" Future.sequence { Seq( Future { testProbes(nodeC).expectNoMsg(t) }, - Future { testProbes(nodeD).expectNoMsg(t) } - ) + Future { testProbes(nodeD).expectNoMsg(t) }) }.futureValue testProbes(nodeC).expectMsgType[Write] testProbes(nodeC).lastSender ! WriteAck testProbes(nodeD).expectMsgType[Write] testProbes(nodeD).lastSender ! WriteAck - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -158,7 +166,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" // no reply probe.expectMsgType[Write] // no reply - expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) + expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -181,6 +189,82 @@ class WriteAggregatorSpec extends AkkaSpec(s""" } } + "WriteAggregator with delta" must { + implicit val cluster = Cluster(system) + val fullState1 = ORSet.empty[String] + "a" + "b" + val fullState2 = fullState1.resetDelta + "c" + val delta = Delta(DataEnvelope(fullState2.delta.get), 2L, 2L) + + "send deltas first" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta( + fullState2, delta, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) + + probe.expectMsgType[DeltaPropagation] + probe.lastSender ! WriteAck + probe.expectMsgType[DeltaPropagation] + probe.lastSender ! WriteAck + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None)) + watch(aggr) + expectTerminated(aggr) + } + + "retry with full state when no immediate reply or nack" in { + val testProbes = probes() + val testProbeRefs = testProbes.map { case (a, tm) ⇒ a → tm.writeAckAdapter } + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta( + fullState2, delta, writeAll, testProbeRefs, nodes, Set.empty, testActor, durable = false)) + + testProbes(nodeA).expectMsgType[DeltaPropagation] + // no reply + testProbes(nodeB).expectMsgType[DeltaPropagation] + testProbes(nodeB).lastSender ! WriteAck + testProbes(nodeC).expectMsgType[DeltaPropagation] + testProbes(nodeC).lastSender ! WriteAck + testProbes(nodeD).expectMsgType[DeltaPropagation] + testProbes(nodeD).lastSender ! DeltaNack + + // here is the second round + testProbes(nodeA).expectMsgType[Write] + testProbes(nodeA).lastSender ! WriteAck + testProbes(nodeD).expectMsgType[Write] + testProbes(nodeD).lastSender ! WriteAck + testProbes(nodeB).expectNoMsg(100.millis) + testProbes(nodeC).expectNoMsg(100.millis) + + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None)) + watch(aggr) + expectTerminated(aggr) + } + + "timeout when less than required acks" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta( + fullState2, delta, writeAll, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) + + probe.expectMsgType[DeltaPropagation] + // no reply + probe.expectMsgType[DeltaPropagation] + probe.lastSender ! WriteAck + probe.expectMsgType[DeltaPropagation] + // nack + probe.lastSender ! DeltaNack + probe.expectMsgType[DeltaPropagation] + // no reply + + // only 1 ack so we expect 3 full state Write + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + probe.expectMsgType[Write] + probe.expectMsgType[Write] + + // still not enough acks + expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyB, None)) + watch(aggr) + expectTerminated(aggr) + } + } + "Durable WriteAggregator" must { "not reply before local confirmation" in { val probe = TestProbe() @@ -194,9 +278,9 @@ class WriteAggregatorSpec extends AkkaSpec(s""" expectNoMsg(200.millis) // the local write - aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) + aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -206,7 +290,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) - aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write + aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] @@ -214,7 +298,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteAck - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -226,7 +310,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteNack - aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write + aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] @@ -234,7 +318,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteNack - expectMsg(StoreFailure(WriteAggregatorSpec.key, None)) + expectMsg(StoreFailure(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -253,7 +337,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteNack - expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) + expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index d920da41af..ee0ee0fb59 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -78,6 +78,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( checkSerialization(Write("A", DataEnvelope(data1))) checkSerialization(WriteAck) checkSerialization(WriteNack) + checkSerialization(DeltaNack) checkSerialization(Read("A")) checkSerialization(ReadResult(Some(DataEnvelope(data1)))) checkSerialization(ReadResult(None)) @@ -87,7 +88,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( checkSerialization(Gossip(Map( "A" → DataEnvelope(data1), "B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true)) - checkSerialization(DeltaPropagation(address1, Map( + checkSerialization(DeltaPropagation(address1, reply = true, Map( "A" → Delta(DataEnvelope(delta1), 1L, 1L), "B" → Delta(DataEnvelope(delta2), 3L, 5L)))) checkSerialization(new DurableDataEnvelope(data1))