diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 5321328caf..ad359066f4 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -857,9 +857,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val localValue = getData(key.id) Try { localValue match { - case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key) - case Some(envelope @ DataEnvelope(existing, _)) ⇒ modify(Some(existing)) - case None ⇒ modify(None) + case Some(DataEnvelope(DeletedData, _)) ⇒ throw new DataDeleted(key) + case Some(envelope @ DataEnvelope(existing, _)) ⇒ + existing.merge(modify(Some(existing)).asInstanceOf[existing.T]) + case None ⇒ modify(None) } } match { case Success(newData) ⇒ diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala index 8872e53dab..00a250c0d1 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala @@ -59,10 +59,17 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val KeyG = ORSetKey[String]("G") val KeyH = ORMapKey[Flag]("H") val KeyI = GSetKey[String]("I") + val KeyJ = GSetKey[String]("J") val KeyX = GCounterKey("X") val KeyY = GCounterKey("Y") val KeyZ = GCounterKey("Z") + var afterCounter = 0 + def enterBarrierAfterTestStep(): Unit = { + afterCounter += 1 + enterBarrier("after-" + afterCounter) + } + def join(from: RoleName, to: RoleName): Unit = { runOn(from) { cluster join node(to).address @@ -145,10 +152,24 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec expectMsg(GetKeyIdsResult(Set("A"))) } - enterBarrier("after-1") + enterBarrierAfterTestStep() } } + "merge the update with existing value" in { + runOn(first) { + // in case user is not using the passed in existing value + replicator ! Update(KeyJ, GSet(), WriteLocal)(_ + "a" + "b") + expectMsg(UpdateSuccess(KeyJ, None)) + replicator ! Update(KeyJ, GSet(), WriteLocal)(_ ⇒ GSet.empty[String] + "c") // normal usage would be `_ + "c"` + expectMsg(UpdateSuccess(KeyJ, None)) + replicator ! Get(KeyJ, ReadLocal) + val s = expectMsgPF() { case g @ GetSuccess(KeyJ, _) ⇒ g.get(KeyJ) } + s should ===(GSet.empty[String] + "a" + "b" + "c") + } + enterBarrierAfterTestStep() + } + "reply with ModifyFailure if exception is thrown by modify function" in { val e = new RuntimeException("errr") replicator ! Update(KeyA, GCounter(), WriteLocal)(_ ⇒ throw e) @@ -184,7 +205,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec c.value should be(6) } - enterBarrier("after-2") + enterBarrierAfterTestStep() } "work in 2 node cluster" in { @@ -234,7 +255,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec } } - enterBarrier("after-3") + enterBarrierAfterTestStep() } "be replicated after succesful update" in { @@ -309,7 +330,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec } } - enterBarrier("after-4") + enterBarrierAfterTestStep() } "converge after partition" in { @@ -360,7 +381,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec } } - enterBarrier("after-5") + enterBarrierAfterTestStep() } "support majority quorum write and read with 3 nodes with 1 unreachable" in { @@ -463,7 +484,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec c155.value should be(155) } - enterBarrier("after-6") + enterBarrierAfterTestStep() } "converge after many concurrent updates" in within(10.seconds) { @@ -482,7 +503,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val c = expectMsgPF() { case g @ GetSuccess(KeyF, _) ⇒ g.get(KeyF) } c.value should be(3 * 100) } - enterBarrier("after-7") + enterBarrierAfterTestStep() } "read-repair happens before GetSuccess" in { @@ -497,7 +518,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec replicator ! Get(KeyG, ReadLocal) expectMsgPF() { case g @ GetSuccess(KeyG, _) ⇒ g.get(KeyG).elements } should be(Set("a", "b")) } - enterBarrier("after-8") + enterBarrierAfterTestStep() } "check that a remote update and a local update both cause a change event to emit with the merged data" in { @@ -523,7 +544,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec Map("a" -> Flag(enabled = true), "b" -> Flag(enabled = true))) } - enterBarrier("after-9") + enterBarrierAfterTestStep() } "avoid duplicate change events for same data" in { @@ -547,7 +568,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec changedProbe.expectNoMsg(1.second) - enterBarrier("after-10") + enterBarrierAfterTestStep() } }