=cdd #18106 Merge update value with existing
This commit is contained in:
parent
d84492e581
commit
153b683a0a
2 changed files with 35 additions and 13 deletions
|
|
@ -858,7 +858,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
Try {
|
Try {
|
||||||
localValue match {
|
localValue match {
|
||||||
case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key)
|
case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key)
|
||||||
case Some(envelope @ DataEnvelope(existing, _)) ⇒ modify(Some(existing))
|
case Some(envelope @ DataEnvelope(existing, _)) ⇒
|
||||||
|
existing.merge(modify(Some(existing)).asInstanceOf[existing.T])
|
||||||
case None ⇒ modify(None)
|
case None ⇒ modify(None)
|
||||||
}
|
}
|
||||||
} match {
|
} match {
|
||||||
|
|
|
||||||
|
|
@ -59,10 +59,17 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
val KeyG = ORSetKey[String]("G")
|
val KeyG = ORSetKey[String]("G")
|
||||||
val KeyH = ORMapKey[Flag]("H")
|
val KeyH = ORMapKey[Flag]("H")
|
||||||
val KeyI = GSetKey[String]("I")
|
val KeyI = GSetKey[String]("I")
|
||||||
|
val KeyJ = GSetKey[String]("J")
|
||||||
val KeyX = GCounterKey("X")
|
val KeyX = GCounterKey("X")
|
||||||
val KeyY = GCounterKey("Y")
|
val KeyY = GCounterKey("Y")
|
||||||
val KeyZ = GCounterKey("Z")
|
val KeyZ = GCounterKey("Z")
|
||||||
|
|
||||||
|
var afterCounter = 0
|
||||||
|
def enterBarrierAfterTestStep(): Unit = {
|
||||||
|
afterCounter += 1
|
||||||
|
enterBarrier("after-" + afterCounter)
|
||||||
|
}
|
||||||
|
|
||||||
def join(from: RoleName, to: RoleName): Unit = {
|
def join(from: RoleName, to: RoleName): Unit = {
|
||||||
runOn(from) {
|
runOn(from) {
|
||||||
cluster join node(to).address
|
cluster join node(to).address
|
||||||
|
|
@ -145,10 +152,24 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
expectMsg(GetKeyIdsResult(Set("A")))
|
expectMsg(GetKeyIdsResult(Set("A")))
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-1")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"merge the update with existing value" in {
|
||||||
|
runOn(first) {
|
||||||
|
// in case user is not using the passed in existing value
|
||||||
|
replicator ! Update(KeyJ, GSet(), WriteLocal)(_ + "a" + "b")
|
||||||
|
expectMsg(UpdateSuccess(KeyJ, None))
|
||||||
|
replicator ! Update(KeyJ, GSet(), WriteLocal)(_ ⇒ GSet.empty[String] + "c") // normal usage would be `_ + "c"`
|
||||||
|
expectMsg(UpdateSuccess(KeyJ, None))
|
||||||
|
replicator ! Get(KeyJ, ReadLocal)
|
||||||
|
val s = expectMsgPF() { case g @ GetSuccess(KeyJ, _) ⇒ g.get(KeyJ) }
|
||||||
|
s should ===(GSet.empty[String] + "a" + "b" + "c")
|
||||||
|
}
|
||||||
|
enterBarrierAfterTestStep()
|
||||||
|
}
|
||||||
|
|
||||||
"reply with ModifyFailure if exception is thrown by modify function" in {
|
"reply with ModifyFailure if exception is thrown by modify function" in {
|
||||||
val e = new RuntimeException("errr")
|
val e = new RuntimeException("errr")
|
||||||
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ ⇒ throw e)
|
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ ⇒ throw e)
|
||||||
|
|
@ -184,7 +205,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
c.value should be(6)
|
c.value should be(6)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-2")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"work in 2 node cluster" in {
|
"work in 2 node cluster" in {
|
||||||
|
|
@ -234,7 +255,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-3")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"be replicated after succesful update" in {
|
"be replicated after succesful update" in {
|
||||||
|
|
@ -309,7 +330,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-4")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"converge after partition" in {
|
"converge after partition" in {
|
||||||
|
|
@ -360,7 +381,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-5")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"support majority quorum write and read with 3 nodes with 1 unreachable" in {
|
"support majority quorum write and read with 3 nodes with 1 unreachable" in {
|
||||||
|
|
@ -463,7 +484,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
c155.value should be(155)
|
c155.value should be(155)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-6")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"converge after many concurrent updates" in within(10.seconds) {
|
"converge after many concurrent updates" in within(10.seconds) {
|
||||||
|
|
@ -482,7 +503,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
val c = expectMsgPF() { case g @ GetSuccess(KeyF, _) ⇒ g.get(KeyF) }
|
val c = expectMsgPF() { case g @ GetSuccess(KeyF, _) ⇒ g.get(KeyF) }
|
||||||
c.value should be(3 * 100)
|
c.value should be(3 * 100)
|
||||||
}
|
}
|
||||||
enterBarrier("after-7")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"read-repair happens before GetSuccess" in {
|
"read-repair happens before GetSuccess" in {
|
||||||
|
|
@ -497,7 +518,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
replicator ! Get(KeyG, ReadLocal)
|
replicator ! Get(KeyG, ReadLocal)
|
||||||
expectMsgPF() { case g @ GetSuccess(KeyG, _) ⇒ g.get(KeyG).elements } should be(Set("a", "b"))
|
expectMsgPF() { case g @ GetSuccess(KeyG, _) ⇒ g.get(KeyG).elements } should be(Set("a", "b"))
|
||||||
}
|
}
|
||||||
enterBarrier("after-8")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"check that a remote update and a local update both cause a change event to emit with the merged data" in {
|
"check that a remote update and a local update both cause a change event to emit with the merged data" in {
|
||||||
|
|
@ -523,7 +544,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
Map("a" -> Flag(enabled = true), "b" -> Flag(enabled = true)))
|
Map("a" -> Flag(enabled = true), "b" -> Flag(enabled = true)))
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-9")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
"avoid duplicate change events for same data" in {
|
"avoid duplicate change events for same data" in {
|
||||||
|
|
@ -547,7 +568,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
|
||||||
|
|
||||||
changedProbe.expectNoMsg(1.second)
|
changedProbe.expectNoMsg(1.second)
|
||||||
|
|
||||||
enterBarrier("after-10")
|
enterBarrierAfterTestStep()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue