=cdd #17796 Avoid duplicate change events
This commit is contained in:
parent
e13a2d9560
commit
83a1f8eab3
2 changed files with 41 additions and 8 deletions
|
|
@ -928,10 +928,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
}
|
||||
|
||||
def setData(key: String, envelope: DataEnvelope): Unit = {
|
||||
// notify subscribers, later
|
||||
changed += key
|
||||
val dig =
|
||||
if (subscribers.contains(key) && !changed.contains(key)) {
|
||||
val oldDigest = getDigest(key)
|
||||
val dig = digest(envelope)
|
||||
if (dig != oldDigest)
|
||||
changed += key // notify subscribers, later
|
||||
dig
|
||||
} else if (envelope.data == DeletedData) DeletedDigest
|
||||
else LazyDigest
|
||||
|
||||
val dig = if (envelope.data == DeletedData) DeletedDigest else LazyDigest
|
||||
dataEntries = dataEntries.updated(key, (envelope, dig))
|
||||
}
|
||||
|
||||
|
|
@ -946,10 +952,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
}
|
||||
}
|
||||
|
||||
def digest(envelope: DataEnvelope): Digest = {
|
||||
val bytes = serializer.toBinary(envelope)
|
||||
ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes))
|
||||
}
|
||||
def digest(envelope: DataEnvelope): Digest =
|
||||
if (envelope.data == DeletedData) DeletedDigest
|
||||
else {
|
||||
val bytes = serializer.toBinary(envelope)
|
||||
ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes))
|
||||
}
|
||||
|
||||
def getData(key: String): Option[DataEnvelope] = dataEntries.get(key).map { case (envelope, _) ⇒ envelope }
|
||||
|
||||
|
|
@ -1016,7 +1024,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
|||
def receiveStatus(otherDigests: Map[String, Digest], chunk: Int, totChunks: Int): Unit = {
|
||||
if (log.isDebugEnabled)
|
||||
log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", sender().path.address,
|
||||
chunk, totChunks, otherDigests.keys.mkString(", "))
|
||||
(chunk + 1), totChunks, otherDigests.keys.mkString(", "))
|
||||
|
||||
def isOtherDifferent(key: String, otherDigest: Digest): Boolean = {
|
||||
val d = getDigest(key)
|
||||
|
|
|
|||
|
|
@ -58,6 +58,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
|||
val KeyF = GCounterKey("F")
|
||||
val KeyG = ORSetKey[String]("G")
|
||||
val KeyH = ORMapKey[Flag]("H")
|
||||
val KeyI = GSetKey[String]("I")
|
||||
val KeyX = GCounterKey("X")
|
||||
val KeyY = GCounterKey("Y")
|
||||
val KeyZ = GCounterKey("Z")
|
||||
|
|
@ -525,5 +526,29 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
|||
enterBarrier("after-9")
|
||||
}
|
||||
|
||||
"avoid duplicate change events for same data" in {
|
||||
val changedProbe = TestProbe()
|
||||
replicator ! Subscribe(KeyI, changedProbe.ref)
|
||||
enterBarrier("subscribed-I")
|
||||
|
||||
runOn(second) {
|
||||
replicator ! Update(KeyI, GSet.empty[String], writeTwo)(a ⇒ a.add("a"))
|
||||
}
|
||||
|
||||
within(5.seconds) { // gossip to third
|
||||
changedProbe.expectMsgPF() { case c @ Changed(KeyI) ⇒ c.get(KeyI).elements } should be(Set("a"))
|
||||
}
|
||||
|
||||
enterBarrier("update-I")
|
||||
|
||||
runOn(first) {
|
||||
replicator ! Update(KeyI, GSet.empty[String], writeTwo)(_ + "a")
|
||||
}
|
||||
|
||||
changedProbe.expectNoMsg(1.second)
|
||||
|
||||
enterBarrier("after-10")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue