diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala index 488b90cfa4..6ac292f26f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala @@ -9,6 +9,7 @@ import akka.actor.Address import akka.annotation.InternalApi import akka.cluster.ddata.Key.KeyId import akka.cluster.ddata.Replicator.Internal.DeltaPropagation +import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder /** * INTERNAL API: Used by the Replicator actor. @@ -104,7 +105,13 @@ import akka.cluster.ddata.Replicator.Internal.DeltaPropagation val deltaGroup = cache.get(cacheKey) match { case None ⇒ val group = deltaEntriesAfterJ.valuesIterator.reduceLeft { - (d1, d2) ⇒ d1.merge(d2.asInstanceOf[d1.T]) + (d1, d2) ⇒ + d2 match { + case NoDeltaPlaceholder ⇒ NoDeltaPlaceholder + case _ ⇒ + // this is fine also if d1 is a NoDeltaPlaceholder + d1.merge(d2.asInstanceOf[d1.T]) + } } cache = cache.updated(cacheKey, group) group diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala index 55e77eb053..4626bfbe70 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala @@ -97,19 +97,15 @@ final class PNCounter private[akka] ( decrements = that.decrements.merge(this.decrements)) override def delta: Option[PNCounter] = { - if (increments.delta.isEmpty && decrements.delta.isEmpty) - None - else { - val incrementsDelta = increments.delta match { - case Some(d) ⇒ d - case None ⇒ GCounter.empty - } - val decrementsDelta = decrements.delta match { - case Some(d) ⇒ d - case None ⇒ GCounter.empty - } - Some(new PNCounter(incrementsDelta, decrementsDelta)) + val incrementsDelta = increments.delta match { + case Some(d) ⇒ d + case None ⇒ GCounter.empty } + val decrementsDelta = decrements.delta match { + case Some(d) ⇒ d + case None ⇒ GCounter.empty + } + Some(new PNCounter(incrementsDelta, decrementsDelta)) } override def mergeDelta(thatDelta: PNCounter): PNCounter = merge(thatDelta) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index a839ad2e36..89ae23594b 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -744,6 +744,27 @@ object Replicator { final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage + object DeltaPropagation { + /** + * When a DeltaReplicatedData returns `None` from `delta` it must still be + * treated as a delta that increase the version counter in `DeltaPropagationSelector`. + * Otherwise a later delta might be applied before the full state gossip is received + * and thereby violating `RequiresCausalDeliveryOfDeltas`. + * + * This is used as a placeholder for such `None` delta. It's filtered out + * in `createDeltaPropagation`, i.e. never sent to the other replicas. + */ + val NoDeltaPlaceholder: ReplicatedDelta = + new DeltaReplicatedData with RequiresCausalDeliveryOfDeltas with ReplicatedDelta { + type T = ReplicatedData + type D = ReplicatedDelta + override def merge(other: ReplicatedData): ReplicatedData = this + override def mergeDelta(other: ReplicatedDelta): ReplicatedDelta = this + override def zero: DeltaReplicatedData = this + override def delta: Option[ReplicatedDelta] = None + override def resetDelta: ReplicatedData = this + } + } case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression } @@ -941,6 +962,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog import Replicator._ import Replicator.Internal._ + import Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder import PruningState._ import settings._ @@ -991,11 +1013,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = { // Important to include the pruning state in the deltas. For example if the delta is based // on an entry that has been pruned but that has not yet been performed on the target node. - DeltaPropagation(selfUniqueAddress, reply = false, deltas.map { - case (key, (d, fromSeqNr, toSeqNr)) ⇒ getData(key) match { - case Some(envelope) ⇒ key → Delta(envelope.copy(data = d), fromSeqNr, toSeqNr) - case None ⇒ key → Delta(DataEnvelope(d), fromSeqNr, toSeqNr) - } + DeltaPropagation(selfUniqueAddress, reply = false, deltas.collect { + case (key, (d, fromSeqNr, toSeqNr)) if d != NoDeltaPlaceholder ⇒ + getData(key) match { + case Some(envelope) ⇒ key → Delta(envelope.copy(data = d), fromSeqNr, toSeqNr) + case None ⇒ key → Delta(DataEnvelope(d), fromSeqNr, toSeqNr) + } }(collection.breakOut)) } } @@ -1206,18 +1229,27 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ⇒ ReplicatedData, writeConsistency: WriteConsistency, req: Option[Any]): Unit = { val localValue = getData(key.id) + + def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = { + d.delta match { + case s @ Some(_) ⇒ s + case None ⇒ Some(NoDeltaPlaceholder) + } + } + Try { localValue match { case Some(DataEnvelope(DeletedData, _, _)) ⇒ throw new DataDeleted(key, req) case Some(envelope @ DataEnvelope(existing, _, _)) ⇒ modify(Some(existing)) match { case d: DeltaReplicatedData if deltaCrdtEnabled ⇒ - (envelope.merge(d.resetDelta.asInstanceOf[existing.T]), d.delta) + (envelope.merge(d.resetDelta.asInstanceOf[existing.T]), deltaOrPlaceholder(d)) case d ⇒ (envelope.merge(d.asInstanceOf[existing.T]), None) } case None ⇒ modify(None) match { - case d: DeltaReplicatedData if deltaCrdtEnabled ⇒ (DataEnvelope(d.resetDelta), d.delta) + case d: DeltaReplicatedData if deltaCrdtEnabled ⇒ + (DataEnvelope(d.resetDelta), deltaOrPlaceholder(d)) case d ⇒ (DataEnvelope(d), None) } } @@ -1244,6 +1276,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog replyTo ! UpdateSuccess(key, req) } else { val (writeEnvelope, writeDelta) = delta match { + case Some(NoDeltaPlaceholder) ⇒ (newEnvelope, None) case Some(d: RequiresCausalDeliveryOfDeltas) ⇒ val v = deltaPropagationSelector.currentVersion(key.id) (newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v))) @@ -1458,7 +1491,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog deltaPropagationSelector.collectPropagations().foreach { case (node, deltaPropagation) ⇒ // TODO split it to several DeltaPropagation if too many entries - replica(node) ! deltaPropagation + if (deltaPropagation.deltas.nonEmpty) + replica(node) ! deltaPropagation } if (deltaPropagationSelector.propagationCount % deltaPropagationSelector.gossipIntervalDivisor == 0) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala index c1df7bffdb..7b1f70a26f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala @@ -29,6 +29,25 @@ object ReplicatorDeltaSpec extends MultiNodeConfig { testTransport(on = true) + case class Highest(n: Int, delta: Option[Highest] = None) + extends DeltaReplicatedData with RequiresCausalDeliveryOfDeltas with ReplicatedDelta { + type T = Highest + type D = Highest + + override def merge(other: Highest): Highest = + if (n >= other.n) this else other + + override def mergeDelta(other: Highest): Highest = merge(other) + + override def zero: Highest = Highest(0) + + override def resetDelta: Highest = Highest(n) + + def incr(i: Int): Highest = Highest(n + i, Some(Highest(n + i))) + + def incrNoDelta(i: Int): Highest = Highest(n + i, None) + } + sealed trait Op final case class Delay(n: Int) extends Op final case class Incr(key: PNCounterKey, n: Int, consistency: WriteConsistency) extends Op @@ -40,6 +59,7 @@ object ReplicatorDeltaSpec extends MultiNodeConfig { val writeTwo = WriteTo(2, timeout) val writeMajority = WriteMajority(timeout) + val KeyHigh = new Key[Highest]("High") {} val KeyA = PNCounterKey("A") val KeyB = PNCounterKey("B") val KeyC = PNCounterKey("C") @@ -250,6 +270,81 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult enterBarrierAfterTestStep() } + "preserve causal consistency for None delta" in { + runOn(first) { + val p1 = TestProbe() + deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(1)), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-1") + + runOn(first) { + val p = TestProbe() + deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(1) + } + runOn(second, third, fourth) { + within(5.seconds) { + awaitAssert { + val p = TestProbe() + deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(1) + } + } + } + enterBarrier("read-1") + + runOn(first) { + val p1 = TestProbe() + deltaReplicator.tell(Update(KeyHigh, Highest(0), writeAll)(_.incr(2)), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incrNoDelta(5)), p1.ref) + deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(10)), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-2") + + runOn(first) { + val p = TestProbe() + deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(18) + } + runOn(second, third, fourth) { + within(5.seconds) { + awaitAssert { + val p = TestProbe() + deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref) + // the incrNoDelta(5) is not propagated as delta, and then incr(10) is also skipped + p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(3) + } + } + } + enterBarrier("read-2") + + runOn(first) { + val p1 = TestProbe() + // WriteAll will send full state when delta can't be applied and thereby syncing the + // delta versions again. Same would happen via full state gossip. + // Thereafter delta can be propagated and applied again. + deltaReplicator.tell(Update(KeyHigh, Highest(0), writeAll)(_.incr(100)), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(4)), p1.ref) + p1.expectMsgType[UpdateSuccess[_]] + } + enterBarrier("write-3") + + within(5.seconds) { + awaitAssert { + val p = TestProbe() + deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(122) + } + } + + enterBarrierAfterTestStep() + } + "be eventually consistent" in { val operations = generateOperations(onNode = myself) log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")