From 233e784154aae867fbb9bf64096349e149b2bc32 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 23 Feb 2017 12:12:29 +0100 Subject: [PATCH] Use delta in direct write also, #22188 * Follow up on the causal delivery of deltas. * The first implementation used full state for the direct Write messages, i.e. updates with WriteConsistency != LocalWrite * This is an optimization so that delatas are tried first and if they can't be applied it falls back to full state. * For simultanious updates the messages may be reordered because we create separate WriteAggregator actor and such, but normally they will be sent in order so the deltas will typically be received in order, otherwise we fall back to retrying with full state in the second round in the WriteAggregator. --- .../protobuf/msg/ReplicatorMessages.java | 154 +++++++++++++++--- .../main/protobuf/ReplicatorMessages.proto | 1 + .../scala/akka/cluster/ddata/Replicator.scala | 92 +++++++---- .../ReplicatorMessageSerializer.scala | 8 + .../cluster/ddata/ReplicatorDeltaSpec.scala | 48 +++++- .../ddata/ReplicatorORSetDeltaSpec.scala | 2 +- .../ddata/DeltaPropagationSelectorSpec.scala | 26 +-- .../cluster/ddata/WriteAggregatorSpec.scala | 116 +++++++++++-- .../ReplicatorMessageSerializerSpec.scala | 3 +- 9 files changed, 365 insertions(+), 85 deletions(-) diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java index 63c51ac058..d4e0dcde92 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java @@ -12721,6 +12721,24 @@ public final class ReplicatorMessages { */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.EntryOrBuilder getEntriesOrBuilder( int index); + + // optional bool reply = 3; + /** + * optional bool reply = 3; + * + *
+     * no reply if not set 
+     * 
+ */ + boolean hasReply(); + /** + * optional bool reply = 3; + * + *
+     * no reply if not set 
+     * 
+ */ + boolean getReply(); } /** * Protobuf type {@code akka.cluster.ddata.DeltaPropagation} @@ -12794,6 +12812,11 @@ public final class ReplicatorMessages { entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DeltaPropagation.Entry.PARSER, extensionRegistry)); break; } + case 24: { + bitField0_ |= 0x00000002; + reply_ = input.readBool(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -13784,9 +13807,34 @@ public final class ReplicatorMessages { return entries_.get(index); } + // optional bool reply = 3; + public static final int REPLY_FIELD_NUMBER = 3; + private boolean reply_; + /** + * optional bool reply = 3; + * + *
+     * no reply if not set 
+     * 
+ */ + public boolean hasReply() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bool reply = 3; + * + *
+     * no reply if not set 
+     * 
+ */ + public boolean getReply() { + return reply_; + } + private void initFields() { fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); entries_ = java.util.Collections.emptyList(); + reply_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13820,6 +13868,9 @@ public final class ReplicatorMessages { for (int i = 0; i < entries_.size(); i++) { output.writeMessage(2, entries_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBool(3, reply_); + } getUnknownFields().writeTo(output); } @@ -13837,6 +13888,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(2, entries_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeBoolSize(3, reply_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -13967,6 +14022,8 @@ public final class ReplicatorMessages { } else { entriesBuilder_.clear(); } + reply_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -14012,6 +14069,10 @@ public final class ReplicatorMessages { } else { result.entries_ = entriesBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.reply_ = reply_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -14057,6 +14118,9 @@ public final class ReplicatorMessages { } } } + if (other.hasReply()) { + setReply(other.getReply()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -14455,6 +14519,55 @@ public final class ReplicatorMessages { return entriesBuilder_; } + // optional bool reply = 3; + private boolean reply_ ; + /** + * optional bool reply = 3; + * + *
+       * no reply if not set 
+       * 
+ */ + public boolean hasReply() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool reply = 3; + * + *
+       * no reply if not set 
+       * 
+ */ + public boolean getReply() { + return reply_; + } + /** + * optional bool reply = 3; + * + *
+       * no reply if not set 
+       * 
+ */ + public Builder setReply(boolean value) { + bitField0_ |= 0x00000004; + reply_ = value; + onChanged(); + return this; + } + /** + * optional bool reply = 3; + * + *
+       * no reply if not set 
+       * 
+ */ + public Builder clearReply() { + bitField0_ = (bitField0_ & ~0x00000004); + reply_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.DeltaPropagation) } @@ -19212,27 +19325,28 @@ public final class ReplicatorMessages { " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda", "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" + "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat" + - "aEnvelope\"\362\001\n\020DeltaPropagation\0223\n\010fromNo" + + "aEnvelope\"\201\002\n\020DeltaPropagation\0223\n\010fromNo" + "de\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddr" + "ess\022;\n\007entries\030\002 \003(\0132*.akka.cluster.ddat" + - "a.DeltaPropagation.Entry\032l\n\005Entry\022\013\n\003key" + - "\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.cluster." + - "ddata.DataEnvelope\022\021\n\tfromSeqNr\030\003 \002(\003\022\017\n" + - "\007toSeqNr\030\004 \001(\003\"X\n\rUniqueAddress\022,\n\007addre" + - "ss\030\001 \002(\0132\033.akka.cluster.ddata.Address\022\013\n", - "\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010h" + - "ostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"\224\001\n\rVersionV" + - "ector\0228\n\007entries\030\001 \003(\0132\'.akka.cluster.dd" + - "ata.VersionVector.Entry\032I\n\005Entry\022/\n\004node" + - "\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddres" + - "s\022\017\n\007version\030\002 \002(\003\"V\n\014OtherMessage\022\027\n\017en" + - "closedMessage\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(" + - "\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nStringGSet" + - "\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023DurableDataEnvelo" + - "pe\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata.Ot", - "herMessage\022>\n\007pruning\030\002 \003(\0132-.akka.clust" + - "er.ddata.DataEnvelope.PruningEntryB#\n\037ak" + - "ka.cluster.ddata.protobuf.msgH\001" + "a.DeltaPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032" + + "l\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132" + + " .akka.cluster.ddata.DataEnvelope\022\021\n\tfro" + + "mSeqNr\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueA" + + "ddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster.d", + "data.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\"" + + ")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002" + + "(\r\"\224\001\n\rVersionVector\0228\n\007entries\030\001 \003(\0132\'." + + "akka.cluster.ddata.VersionVector.Entry\032I" + + "\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka.cluster.dda" + + "ta.UniqueAddress\022\017\n\007version\030\002 \002(\003\"V\n\014Oth" + + "erMessage\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014se" + + "rializerId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(" + + "\014\"\036\n\nStringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023Du" + + "rableDataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.c", + "luster.ddata.OtherMessage\022>\n\007pruning\030\002 \003" + + "(\0132-.akka.cluster.ddata.DataEnvelope.Pru" + + "ningEntryB#\n\037akka.cluster.ddata.protobuf" + + ".msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -19346,7 +19460,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_DeltaPropagation_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_DeltaPropagation_descriptor, - new java.lang.String[] { "FromNode", "Entries", }); + new java.lang.String[] { "FromNode", "Entries", "Reply", }); internal_static_akka_cluster_ddata_DeltaPropagation_Entry_descriptor = internal_static_akka_cluster_ddata_DeltaPropagation_descriptor.getNestedTypes().get(0); internal_static_akka_cluster_ddata_DeltaPropagation_Entry_fieldAccessorTable = new diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto index 0582fac91b..0c83967b07 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto @@ -107,6 +107,7 @@ message DeltaPropagation { required UniqueAddress fromNode = 1; repeated Entry entries = 2; + optional bool reply = 3; // no reply if not set } message UniqueAddress { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 0a96101ad0..5e85fa294e 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -743,7 +743,8 @@ object Replicator { final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) - final case class DeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]) extends ReplicatorMessage + final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage + case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression } } @@ -990,7 +991,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = { // Important to include the pruning state in the deltas. For example if the delta is based // on an entry that has been pruned but that has not yet been performed on the target node. - DeltaPropagation(selfUniqueAddress, deltas.map { + DeltaPropagation(selfUniqueAddress, reply = false, deltas.map { case (key, (d, fromSeqNr, toSeqNr)) ⇒ getData(key) match { case Some(envelope) ⇒ key → Delta(envelope.copy(data = d), fromSeqNr, toSeqNr) case None ⇒ key → Delta(DataEnvelope(d), fromSeqNr, toSeqNr) @@ -1149,7 +1150,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case Read(key) ⇒ receiveRead(key) case Write(key, envelope) ⇒ receiveWrite(key, envelope) case ReadRepair(key, envelope) ⇒ receiveReadRepair(key, envelope) - case DeltaPropagation(from, deltas) ⇒ receiveDeltaPropagation(from, deltas) + case DeltaPropagation(from, reply, deltas) ⇒ receiveDeltaPropagation(from, reply, deltas) case FlushChanges ⇒ receiveFlushChanges() case DeltaPropagationTick ⇒ receiveDeltaPropagationTick() case GossipTick ⇒ receiveGossipTick() @@ -1242,13 +1243,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog else replyTo ! UpdateSuccess(key, req) } else { - val writeEnvelope = delta match { - case Some(d: RequiresCausalDeliveryOfDeltas) ⇒ newEnvelope - case Some(d) ⇒ DataEnvelope(d) - case None ⇒ newEnvelope + val (writeEnvelope, writeDelta) = delta match { + case Some(d: RequiresCausalDeliveryOfDeltas) ⇒ + val v = deltaPropagationSelector.currentVersion(key.id) + (newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v))) + case Some(d) ⇒ (newEnvelope.copy(data = d), None) + case None ⇒ (newEnvelope, None) } val writeAggregator = - context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable) + context.actorOf(WriteAggregator.props(key, writeEnvelope, writeDelta, writeConsistency, + req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope), @@ -1274,14 +1278,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case _ ⇒ false } - def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = { - write(key, envelope) match { + def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = + writeAndStore(key, envelope, reply = true) + + def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope, reply: Boolean): Unit = { + write(key, writeEnvelope) match { case Some(newEnvelope) ⇒ - if (isDurable(key)) - durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), Some(StoreReply(WriteAck, WriteNack, replyTo))) - else + if (isDurable(key)) { + val storeReply = if (reply) Some(StoreReply(WriteAck, WriteNack, replyTo)) else None + durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), storeReply) + } else if (reply) replyTo ! WriteAck case None ⇒ + if (reply) + replyTo ! WriteNack } } @@ -1316,17 +1326,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } - def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope): Unit = { - write(key, writeEnvelope) match { - case Some(newEnvelope) ⇒ - if (isDurable(key)) - durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None) - case None ⇒ - } - } - def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit = { - writeAndStore(key, writeEnvelope) + writeAndStore(key, writeEnvelope, reply = false) replyTo ! ReadRepairAck } @@ -1353,7 +1354,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog replyTo ! DeleteSuccess(key, req) } else { val writeAggregator = - context.actorOf(WriteAggregator.props(key, DeletedEnvelope, consistency, req, nodes, unreachable, replyTo, durable) + context.actorOf(WriteAggregator.props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store(key.id, new DurableDataEnvelope(DeletedEnvelope), @@ -1464,7 +1465,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog deltaPropagationSelector.cleanupDeltaEntries() } - def receiveDeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]): Unit = + def receiveDeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]): Unit = if (deltaCrdtEnabled) { try { val isDebugEnabled = log.isDebugEnabled @@ -1485,20 +1486,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (isDebugEnabled) log.debug( "Skipping DeltaPropagation from [{}] for [{}] because toSeqNr [{}] already handled [{}]", fromNode.address, key, toSeqNr, currentSeqNr) + if (reply) replyTo ! WriteAck } else if (fromSeqNr > (currentSeqNr + 1)) { if (isDebugEnabled) log.debug( "Skipping DeltaPropagation from [{}] for [{}] because missing deltas between [{}-{}]", fromNode.address, key, currentSeqNr + 1, fromSeqNr - 1) + if (reply) replyTo ! DeltaNack } else { if (isDebugEnabled) log.debug( "Applying DeltaPropagation from [{}] for [{}] with sequence numbers [{}], current was [{}]", fromNode.address, key, s"$fromSeqNr-$toSeqNr", currentSeqNr) val newEnvelope = envelope.copy(deltaVersions = VersionVector(fromNode, toSeqNr)) - writeAndStore(key, newEnvelope) + writeAndStore(key, newEnvelope, reply) } case (key, Delta(envelope, _, _)) ⇒ // causal delivery of deltas not needed, just apply it - writeAndStore(key, envelope) + writeAndStore(key, envelope, reply) } } } catch { @@ -1507,6 +1510,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // mixing nodes with incompatible delta-CRDT types log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e) } + } else { + // !deltaCrdtEnabled + if (reply) replyTo ! DeltaNack } def receiveGossipTick(): Unit = { @@ -1583,7 +1589,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog updatedData.foreach { case (key, envelope) ⇒ val hadData = dataEntries.contains(key) - writeAndStore(key, envelope) + writeAndStore(key, envelope, reply = false) if (sendBack) getData(key) match { case Some(d) ⇒ if (hadData || d.pruning.nonEmpty) @@ -1842,13 +1848,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def props( key: KeyR, envelope: Replicator.Internal.DataEnvelope, + delta: Option[Replicator.Internal.Delta], consistency: Replicator.WriteConsistency, req: Option[Any], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = - Props(new WriteAggregator(key, envelope, consistency, req, nodes, unreachable, replyTo, durable)) + Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable)) .withDeploy(Deploy.local) } @@ -1858,6 +1865,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog @InternalApi private[akka] class WriteAggregator( key: KeyR, envelope: Replicator.Internal.DataEnvelope, + delta: Option[Replicator.Internal.Delta], consistency: Replicator.WriteConsistency, req: Option[Any], override val nodes: Set[Address], @@ -1869,6 +1877,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog import Replicator.Internal._ import ReadWriteAggregator._ + val selfUniqueAddress = Cluster(context.system).selfUniqueAddress + override def timeout: FiniteDuration = consistency.timeout override val doneWhenRemainingSize = consistency match { @@ -1883,12 +1893,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } val writeMsg = Write(key.id, envelope) + val deltaMsg = delta match { + case None ⇒ None + case Some(d) ⇒ Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id → d))) + } var gotLocalStoreReply = !durable var gotWriteNackFrom = Set.empty[Address] override def preStart(): Unit = { - primaryNodes.foreach { replica(_) ! writeMsg } + val msg = deltaMsg match { + case Some(d) ⇒ d + case None ⇒ writeMsg + } + primaryNodes.foreach { replica(_) ! msg } if (isDone) reply(isTimeout = false) } @@ -1900,16 +1918,26 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case WriteNack ⇒ gotWriteNackFrom += senderAddress() if (isDone) reply(isTimeout = false) + case DeltaNack ⇒ + // ok, will be retried with full state case _: Replicator.UpdateSuccess[_] ⇒ gotLocalStoreReply = true if (isDone) reply(isTimeout = false) case f: Replicator.StoreFailure[_] ⇒ gotLocalStoreReply = true - gotWriteNackFrom += Cluster(context.system).selfAddress + gotWriteNackFrom += selfUniqueAddress.address if (isDone) reply(isTimeout = false) case SendToSecondary ⇒ + deltaMsg match { + case None ⇒ + case Some(d) ⇒ + // Deltas must be applied in order and we can't keep track of ordering of + // simultaneous updates so there is a chance that the delta could not be applied. + // Try again with the full state to the primary nodes that have not acked. + primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg } + } secondaryNodes.foreach { replica(_) ! writeMsg } case ReceiveTimeout ⇒ reply(isTimeout = true) @@ -1934,7 +1962,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (isSuccess && isDelete) DeleteSuccess(key, req) else if (isSuccess) UpdateSuccess(key, req) else if (isTimeoutOrNotEnoughNodes && isDelete) ReplicationDeleteFailure(key, req) - else if (isTimeoutOrNotEnoughNodes) UpdateTimeout(key, req) + else if (isTimeoutOrNotEnoughNodes || !durable) UpdateTimeout(key, req) else StoreFailure(key, req) replyTo.tell(replyMsg, context.parent) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index c48b8f4750..d24682884f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -176,6 +176,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val WriteNackManifest = "O" val DurableDataEnvelopeManifest = "P" val DeltaPropagationManifest = "Q" + val DeltaNackManifest = "R" private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef]( GetManifest → getFromBinary, @@ -194,6 +195,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) GossipManifest → gossipFromBinary, DeltaPropagationManifest → deltaPropagationFromBinary, WriteNackManifest → (_ ⇒ WriteNack), + DeltaNackManifest → (_ ⇒ DeltaNack), DurableDataEnvelopeManifest → durableDataEnvelopeFromBinary) override def manifest(obj: AnyRef): String = obj match { @@ -214,6 +216,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case _: Unsubscribe[_] ⇒ UnsubscribeManifest case _: Gossip ⇒ GossipManifest case WriteNack ⇒ WriteNackManifest + case DeltaNack ⇒ DeltaNackManifest case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } @@ -236,6 +239,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case m: Unsubscribe[_] ⇒ unsubscribeToProto(m).toByteArray case m: Gossip ⇒ compress(gossipToProto(m)) case WriteNack ⇒ dm.Empty.getDefaultInstance.toByteArray + case DeltaNack ⇒ dm.Empty.getDefaultInstance.toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } @@ -289,6 +293,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = { val b = dm.DeltaPropagation.newBuilder() .setFromNode(uniqueAddressToProto(deltaPropagation.fromNode)) + if (deltaPropagation.reply) + b.setReply(deltaPropagation.reply) val entries = deltaPropagation.deltas.foreach { case (key, Delta(data, fromSeqNr, toSeqNr)) ⇒ val b2 = dm.DeltaPropagation.Entry.newBuilder() @@ -304,8 +310,10 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def deltaPropagationFromBinary(bytes: Array[Byte]): DeltaPropagation = { val deltaPropagation = dm.DeltaPropagation.parseFrom(bytes) + val reply = deltaPropagation.hasReply && deltaPropagation.getReply DeltaPropagation( uniqueAddressFromProto(deltaPropagation.getFromNode), + reply, deltaPropagation.getEntriesList.asScala.map { e ⇒ val fromSeqNr = e.getFromSeqNr val toSeqNr = if (e.hasToSeqNr) e.getToSeqNr else fromSeqNr diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala index 44910f3afd..c1df7bffdb 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala @@ -22,7 +22,7 @@ object ReplicatorDeltaSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(ConfigFactory.parseString(""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off """)) @@ -129,6 +129,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult r } + val writeAll = WriteAll(5.seconds) + var afterCounter = 0 def enterBarrierAfterTestStep(): Unit = { afterCounter += 1 @@ -197,7 +199,7 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult awaitAssert { val p = TestProbe() List(KeyD, KeyE, KeyF).foreach { key ⇒ - fullStateReplicator.tell(Get(key, ReadLocal), p.ref) + deltaReplicator.tell(Get(key, ReadLocal), p.ref) p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a")) } } @@ -206,6 +208,48 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult enterBarrierAfterTestStep() } + "work with write consistency" in { + runOn(first) { + val p1 = TestProbe() + fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-1") + + val p = TestProbe() + deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A")) + + // and also when doing several at the same time (deltas may be reordered) and then we + // retry with full state to sort it out + runOn(first) { + val p1 = TestProbe() + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "B"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "C"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "D"), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-2") + deltaReplicator.tell(Get(KeyD, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D")) + + // add same to the fullStateReplicator so they are in sync + runOn(first) { + val p1 = TestProbe() + fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A" + "B" + "C" + "D"), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-3") + fullStateReplicator.tell(Get(KeyD, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements should ===(Set("a", "A", "B", "C", "D")) + + enterBarrierAfterTestStep() + } + "be eventually consistent" in { val operations = generateOperations(onNode = myself) log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}") diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala index 04232f3f82..3d1a9581a8 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala @@ -19,7 +19,7 @@ object ReplicatorORSetDeltaSpec extends MultiNodeConfig { val third = role("third") commonConfig(ConfigFactory.parseString(""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off """)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala index e511e024fa..7dd1fea309 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala @@ -19,7 +19,7 @@ object DeltaPropagationSelectorSpec { override val allNodes: Vector[Address]) extends DeltaPropagationSelector { override val gossipIntervalDivisor = 5 override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = - DeltaPropagation(selfUniqueAddress, deltas.mapValues { + DeltaPropagation(selfUniqueAddress, false, deltas.mapValues { case (d, fromSeqNr, toSeqNr) ⇒ Delta(DataEnvelope(d), fromSeqNr, toSeqNr) }) } @@ -50,7 +50,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(true) selector.hasDeltaEntries("B") should ===(true) - val expected = DeltaPropagation(selfUniqueAddress, Map( + val expected = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected)) @@ -64,7 +64,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) selector.update("A", deltaA) selector.update("B", deltaB) - val expected = DeltaPropagation(selfUniqueAddress, Map( + val expected = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected, nodes(1) → expected)) @@ -82,17 +82,17 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) selector.update("A", deltaA) selector.update("B", deltaB) - val expected1 = DeltaPropagation(selfUniqueAddress, Map( + val expected1 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected1, nodes(1) → expected1)) // new update before previous was propagated to all nodes selector.update("C", deltaC) - val expected2 = DeltaPropagation(selfUniqueAddress, Map( + val expected2 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L), "C" → Delta(DataEnvelope(deltaC), 1L, 1L))) - val expected3 = DeltaPropagation(selfUniqueAddress, Map( + val expected3 = DeltaPropagation(selfUniqueAddress, false, Map( "C" → Delta(DataEnvelope(deltaC), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(2) → expected2, nodes(0) → expected3)) selector.cleanupDeltaEntries() @@ -114,12 +114,12 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck selector.currentVersion("A") should ===(1L) selector.update("A", delta2) selector.currentVersion("A") should ===(2L) - val expected1 = DeltaPropagation(selfUniqueAddress, Map( + val expected1 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L))) selector.collectPropagations() should ===(Map(nodes(0) → expected1)) selector.update("A", delta3) selector.currentVersion("A") should ===(3L) - val expected2 = DeltaPropagation(selfUniqueAddress, Map( + val expected2 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta3), 3L, 3L))) selector.collectPropagations() should ===(Map(nodes(0) → expected2)) selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) @@ -133,25 +133,25 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck override def nodesSliceSize(allNodesSize: Int): Int = 1 } selector.update("A", delta1) - val expected1 = DeltaPropagation(selfUniqueAddress, Map( + val expected1 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) → expected1)) selector.update("A", delta2) - val expected2 = DeltaPropagation(selfUniqueAddress, Map( + val expected2 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1.merge(delta2)), 1L, 2L))) selector.collectPropagations() should ===(Map(nodes(1) → expected2)) selector.update("A", delta3) - val expected3 = DeltaPropagation(selfUniqueAddress, Map( + val expected3 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta1.merge(delta2).merge(delta3)), 1L, 3L))) selector.collectPropagations() should ===(Map(nodes(2) → expected3)) - val expected4 = DeltaPropagation(selfUniqueAddress, Map( + val expected4 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L))) selector.collectPropagations() should ===(Map(nodes(0) → expected4)) - val expected5 = DeltaPropagation(selfUniqueAddress, Map( + val expected5 = DeltaPropagation(selfUniqueAddress, false, Map( "A" → Delta(DataEnvelope(delta3), 3L, 3L))) selector.collectPropagations() should ===(Map(nodes(1) → expected5)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 2547a7cebe..a07209a18d 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -17,18 +17,24 @@ import akka.cluster.ddata.Replicator._ import akka.remote.RARP import scala.concurrent.Future +import akka.cluster.Cluster object WriteAggregatorSpec { - val key = GSetKey[String]("a") + val KeyA = GSetKey[String]("A") + val KeyB = ORSetKey[String]("B") def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency, probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = - Props(new TestWriteAggregator(data, consistency, probes, nodes, unreachable, replyTo, durable)) + Props(new TestWriteAggregator(KeyA, data, None, consistency, probes, nodes, unreachable, replyTo, durable)) - class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency, + def writeAggregatorPropsWithDelta(data: ORSet[String], delta: Delta, consistency: Replicator.WriteConsistency, + probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = + Props(new TestWriteAggregator(KeyB, data, Some(delta), consistency, probes, nodes, unreachable, replyTo, durable)) + + class TestWriteAggregator(key: Key.KeyR, data: ReplicatedData, delta: Option[Delta], consistency: Replicator.WriteConsistency, probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean) - extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, unreachable, replyTo, durable) { + extends WriteAggregator(key, DataEnvelope(data), delta, consistency, None, nodes, unreachable, replyTo, durable) { override def replica(address: Address): ActorSelection = context.actorSelection(probes(address).path) @@ -48,6 +54,8 @@ object WriteAggregatorSpec { replicator.foreach(_ ! WriteAck) case WriteNack ⇒ replicator.foreach(_ ! WriteNack) + case DeltaNack ⇒ + replicator.foreach(_ ! DeltaNack) case msg ⇒ replicator = Some(sender()) replica ! msg @@ -89,6 +97,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val timeout = 3.seconds.dilated val writeThree = WriteTo(3, timeout) val writeMajority = WriteMajority(timeout) + val writeAll = WriteAll(timeout) def probes(probe: ActorRef): Map[Address, ActorRef] = nodes.toSeq.map(_ → system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap @@ -111,7 +120,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteAck - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -132,15 +141,14 @@ class WriteAggregatorSpec extends AkkaSpec(s""" Future.sequence { Seq( Future { testProbes(nodeC).expectNoMsg(t) }, - Future { testProbes(nodeD).expectNoMsg(t) } - ) + Future { testProbes(nodeD).expectNoMsg(t) }) }.futureValue testProbes(nodeC).expectMsgType[Write] testProbes(nodeC).lastSender ! WriteAck testProbes(nodeD).expectMsgType[Write] testProbes(nodeD).lastSender ! WriteAck - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -158,7 +166,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" // no reply probe.expectMsgType[Write] // no reply - expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) + expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -181,6 +189,82 @@ class WriteAggregatorSpec extends AkkaSpec(s""" } } + "WriteAggregator with delta" must { + implicit val cluster = Cluster(system) + val fullState1 = ORSet.empty[String] + "a" + "b" + val fullState2 = fullState1.resetDelta + "c" + val delta = Delta(DataEnvelope(fullState2.delta.get), 2L, 2L) + + "send deltas first" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta( + fullState2, delta, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) + + probe.expectMsgType[DeltaPropagation] + probe.lastSender ! WriteAck + probe.expectMsgType[DeltaPropagation] + probe.lastSender ! WriteAck + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None)) + watch(aggr) + expectTerminated(aggr) + } + + "retry with full state when no immediate reply or nack" in { + val testProbes = probes() + val testProbeRefs = testProbes.map { case (a, tm) ⇒ a → tm.writeAckAdapter } + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta( + fullState2, delta, writeAll, testProbeRefs, nodes, Set.empty, testActor, durable = false)) + + testProbes(nodeA).expectMsgType[DeltaPropagation] + // no reply + testProbes(nodeB).expectMsgType[DeltaPropagation] + testProbes(nodeB).lastSender ! WriteAck + testProbes(nodeC).expectMsgType[DeltaPropagation] + testProbes(nodeC).lastSender ! WriteAck + testProbes(nodeD).expectMsgType[DeltaPropagation] + testProbes(nodeD).lastSender ! DeltaNack + + // here is the second round + testProbes(nodeA).expectMsgType[Write] + testProbes(nodeA).lastSender ! WriteAck + testProbes(nodeD).expectMsgType[Write] + testProbes(nodeD).lastSender ! WriteAck + testProbes(nodeB).expectNoMsg(100.millis) + testProbes(nodeC).expectNoMsg(100.millis) + + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None)) + watch(aggr) + expectTerminated(aggr) + } + + "timeout when less than required acks" in { + val probe = TestProbe() + val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorPropsWithDelta( + fullState2, delta, writeAll, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) + + probe.expectMsgType[DeltaPropagation] + // no reply + probe.expectMsgType[DeltaPropagation] + probe.lastSender ! WriteAck + probe.expectMsgType[DeltaPropagation] + // nack + probe.lastSender ! DeltaNack + probe.expectMsgType[DeltaPropagation] + // no reply + + // only 1 ack so we expect 3 full state Write + probe.expectMsgType[Write] + probe.lastSender ! WriteAck + probe.expectMsgType[Write] + probe.expectMsgType[Write] + + // still not enough acks + expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyB, None)) + watch(aggr) + expectTerminated(aggr) + } + } + "Durable WriteAggregator" must { "not reply before local confirmation" in { val probe = TestProbe() @@ -194,9 +278,9 @@ class WriteAggregatorSpec extends AkkaSpec(s""" expectNoMsg(200.millis) // the local write - aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) + aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -206,7 +290,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) - aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write + aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] @@ -214,7 +298,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteAck - expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) + expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -226,7 +310,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteNack - aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write + aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] @@ -234,7 +318,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteNack - expectMsg(StoreFailure(WriteAggregatorSpec.key, None)) + expectMsg(StoreFailure(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } @@ -253,7 +337,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.expectMsgType[Write] probe.lastSender ! WriteNack - expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) + expectMsg(UpdateTimeout(WriteAggregatorSpec.KeyA, None)) watch(aggr) expectTerminated(aggr) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index d920da41af..ee0ee0fb59 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -78,6 +78,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( checkSerialization(Write("A", DataEnvelope(data1))) checkSerialization(WriteAck) checkSerialization(WriteNack) + checkSerialization(DeltaNack) checkSerialization(Read("A")) checkSerialization(ReadResult(Some(DataEnvelope(data1)))) checkSerialization(ReadResult(None)) @@ -87,7 +88,7 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( checkSerialization(Gossip(Map( "A" → DataEnvelope(data1), "B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true)) - checkSerialization(DeltaPropagation(address1, Map( + checkSerialization(DeltaPropagation(address1, reply = true, Map( "A" → Delta(DataEnvelope(delta1), 1L, 1L), "B" → Delta(DataEnvelope(delta2), 3L, 5L)))) checkSerialization(new DurableDataEnvelope(data1))