From 83a1f8eab342b4a4b91603e7ddcdc5e892c1d031 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 29 Jun 2015 14:50:36 +0200 Subject: [PATCH] =cdd #17796 Avoid duplicate change events --- .../scala/akka/cluster/ddata/Replicator.scala | 24 ++++++++++++------ .../akka/cluster/ddata/ReplicatorSpec.scala | 25 +++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 93d72bdafd..a843fba162 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -928,10 +928,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def setData(key: String, envelope: DataEnvelope): Unit = { - // notify subscribers, later - changed += key + val dig = + if (subscribers.contains(key) && !changed.contains(key)) { + val oldDigest = getDigest(key) + val dig = digest(envelope) + if (dig != oldDigest) + changed += key // notify subscribers, later + dig + } else if (envelope.data == DeletedData) DeletedDigest + else LazyDigest - val dig = if (envelope.data == DeletedData) DeletedDigest else LazyDigest dataEntries = dataEntries.updated(key, (envelope, dig)) } @@ -946,10 +952,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } - def digest(envelope: DataEnvelope): Digest = { - val bytes = serializer.toBinary(envelope) - ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes)) - } + def digest(envelope: DataEnvelope): Digest = + if (envelope.data == DeletedData) DeletedDigest + else { + val bytes = serializer.toBinary(envelope) + ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes)) + } def getData(key: String): Option[DataEnvelope] = dataEntries.get(key).map { case (envelope, _) ⇒ envelope } @@ -1016,7 +1024,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveStatus(otherDigests: Map[String, Digest], chunk: Int, totChunks: Int): Unit = { if (log.isDebugEnabled) log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", sender().path.address, - chunk, totChunks, otherDigests.keys.mkString(", ")) + (chunk + 1), totChunks, otherDigests.keys.mkString(", ")) def isOtherDifferent(key: String, otherDigest: Digest): Boolean = { val d = getDigest(key) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala index 433c36adbf..8872e53dab 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala @@ -58,6 +58,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val KeyF = GCounterKey("F") val KeyG = ORSetKey[String]("G") val KeyH = ORMapKey[Flag]("H") + val KeyI = GSetKey[String]("I") val KeyX = GCounterKey("X") val KeyY = GCounterKey("Y") val KeyZ = GCounterKey("Z") @@ -525,5 +526,29 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec enterBarrier("after-9") } + "avoid duplicate change events for same data" in { + val changedProbe = TestProbe() + replicator ! Subscribe(KeyI, changedProbe.ref) + enterBarrier("subscribed-I") + + runOn(second) { + replicator ! Update(KeyI, GSet.empty[String], writeTwo)(a ⇒ a.add("a")) + } + + within(5.seconds) { // gossip to third + changedProbe.expectMsgPF() { case c @ Changed(KeyI) ⇒ c.get(KeyI).elements } should be(Set("a")) + } + + enterBarrier("update-I") + + runOn(first) { + replicator ! Update(KeyI, GSet.empty[String], writeTwo)(_ + "a") + } + + changedProbe.expectNoMsg(1.second) + + enterBarrier("after-10") + } + }