From ebbdb6135e0187b03dbf17a85f5e4e31836d700d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 Jul 2018 09:23:25 +0200 Subject: [PATCH] Harden ReplicatorDeltaSpec, #25342 * It was a timing race condition in the test that was exposed by the change in PR #25315. Full state is now sent immediately when receiving the DeltaNack and that makes the Update complete much faster for that case than before. * That resulted in that the delta propagations from previous updates were still in the buffer to be sent out when the incr(4) was performed. Those deltas contained the NoDeltaPlaceholder, which caused the inrc(4) delta to also be folded into NoDeltaPlaceholder and thereby not propagated. * Before the DeltaNack the buffer had time to be flushed before the incr(4) and therefore no NoDeltaPlaceholder. --- .../src/main/scala/akka/cluster/ddata/Replicator.scala | 1 + .../scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index aa5ffc3e0b..21b147e0f1 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -847,6 +847,7 @@ object Replicator { override def zero: DeltaReplicatedData = this override def delta: Option[ReplicatedDelta] = None override def resetDelta: ReplicatedData = this + override def toString: String = "NoDeltaPlaceholder" } } case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala index d7e98361ef..f2caf9f8e7 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala @@ -331,6 +331,13 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult // Thereafter delta can be propagated and applied again. deltaReplicator.tell(Update(KeyHigh, Highest(0), writeAll)(_.incr(100)), p1.ref) p1.expectMsgType[UpdateSuccess[_]] + // Flush the deltaPropagation buffer, otherwise it will contain + // NoDeltaPlaceholder from previous updates and the incr(4) delta will also + // be folded into NoDeltaPlaceholder and not propagated as delta. A few DeltaPropagationTick + // are needed to send to all and flush buffer. + roles.foreach { _ ⇒ + deltaReplicator ! Replicator.Internal.DeltaPropagationTick + } deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(4)), p1.ref) p1.expectMsgType[UpdateSuccess[_]] }