diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index 2c40a3ead3..2f1637dfe8 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -432,17 +432,20 @@ final class ORSet[A] private[akka] ( private def mergeRemoveDelta(thatDelta: ORSet.RemoveDeltaOp[A]): ORSet[A] = { val that = thatDelta.underlying val (elem, thatDot) = that.elementsMap.head + def deleteDots = that.vvector.versionsIterator + def deleteDotsNodes = deleteDots.map { case (dotNode, _) ⇒ dotNode } val newElementsMap = - if (that.vvector > vvector || that.vvector == vvector) - elementsMap - elem - else { + if (deleteDots.forall { case (dotNode, dotV) ⇒ this.vvector.versionAt(dotNode) <= dotV }) { elementsMap.get(elem) match { case Some(thisDot) ⇒ - if (thatDot == thisDot || thatDot > thisDot) elementsMap - elem + if (thisDot.versionsIterator.forall { case (thisDotNode, _) ⇒ deleteDotsNodes.contains(thisDotNode) }) + elementsMap - elem else elementsMap case None ⇒ elementsMap } + } else { + elementsMap } clearAncestor() val newVvector = vvector.merge(that.vvector) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala index 73febe05b1..741e6ade92 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala @@ -16,6 +16,7 @@ class ORSetSpec extends WordSpec with Matchers { val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) + val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L) val nodeA = UniqueAddress(Address("akka.tcp", "Sys", "a", 2552), 1L) val nodeB = UniqueAddress(nodeA.address.copy(host = Some("b")), 2L) @@ -374,6 +375,50 @@ class ORSetSpec extends WordSpec with Matchers { s9.elements should ===(Set("a", "z")) } + "handle a mixed add/remove scenario 2" in { + val s1 = ORSet.empty[String] + val s2 = s1.resetDelta.add(node1, "a") + val s3 = s2.resetDelta.add(node1, "b") + val s4 = s3.resetDelta.add(node2, "a") + val s5 = s4.resetDelta.remove(node1, "a") + + s5.elements should ===(Set("b")) + + val delta1 = s2.delta.get merge s3.delta.get + val delta2 = s4.delta.get + + val t1 = ORSet.empty[String] + val t2 = t1.mergeDelta(delta1).mergeDelta(delta2) + t2.elements should ===(Set("a", "b")) + val t3 = t2.resetDelta.add(node3, "z") + + val t4 = t3.mergeDelta(s5.delta.get) + + t4.elements should ===(Set("b", "z")) + } + + "handle a mixed add/remove scenario 3" in { + val s1 = ORSet.empty[String] + val s2 = s1.resetDelta.add(node1, "a") + val s3 = s2.resetDelta.add(node1, "b") + val s4 = s3.resetDelta.add(node2, "a") + val s5 = s4.resetDelta.remove(node1, "a") + + s5.elements should ===(Set("b")) + + val delta1 = s2.delta.get merge s3.delta.get + + val t1 = ORSet.empty[String] + + val t2 = t1.mergeDelta(delta1) + t2.elements should ===(Set("a", "b")) + val t3 = t2.resetDelta.add(node3, "a") + + val t4 = t3.mergeDelta(s5.delta.get) + + t4.elements should ===(Set("b", "a")) + } + "require causal delivery of deltas" in { // This test illustrates why we need causal delivery of deltas. // Otherwise the following could happen.