Handle delta None correctly, #22655

When a DeltaReplicatedData returns None from delta it must still be
treated as a delta that increase the version counter in DeltaPropagationSelector.
Otherwise a later delta might be applied before the full state gossip is received
and thereby violating RequiresCausalDeliveryOfDeltas.
This commit is contained in:
Patrik Nordwall 2017-03-30 08:12:03 +02:00
parent 24ddc8f381
commit 9855e2896f
4 changed files with 153 additions and 21 deletions

View file

@ -9,6 +9,7 @@ import akka.actor.Address
import akka.annotation.InternalApi
import akka.cluster.ddata.Key.KeyId
import akka.cluster.ddata.Replicator.Internal.DeltaPropagation
import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder
/**
* INTERNAL API: Used by the Replicator actor.
@ -104,7 +105,13 @@ import akka.cluster.ddata.Replicator.Internal.DeltaPropagation
val deltaGroup = cache.get(cacheKey) match {
case None
val group = deltaEntriesAfterJ.valuesIterator.reduceLeft {
(d1, d2) d1.merge(d2.asInstanceOf[d1.T])
(d1, d2)
d2 match {
case NoDeltaPlaceholder NoDeltaPlaceholder
case _
// this is fine also if d1 is a NoDeltaPlaceholder
d1.merge(d2.asInstanceOf[d1.T])
}
}
cache = cache.updated(cacheKey, group)
group

View file

@ -97,19 +97,15 @@ final class PNCounter private[akka] (
decrements = that.decrements.merge(this.decrements))
override def delta: Option[PNCounter] = {
if (increments.delta.isEmpty && decrements.delta.isEmpty)
None
else {
val incrementsDelta = increments.delta match {
case Some(d) d
case None GCounter.empty
}
val decrementsDelta = decrements.delta match {
case Some(d) d
case None GCounter.empty
}
Some(new PNCounter(incrementsDelta, decrementsDelta))
val incrementsDelta = increments.delta match {
case Some(d) d
case None GCounter.empty
}
val decrementsDelta = decrements.delta match {
case Some(d) d
case None GCounter.empty
}
Some(new PNCounter(incrementsDelta, decrementsDelta))
}
override def mergeDelta(thatDelta: PNCounter): PNCounter = merge(thatDelta)

View file

@ -744,6 +744,27 @@ object Replicator {
final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long)
final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage
object DeltaPropagation {
/**
* When a DeltaReplicatedData returns `None` from `delta` it must still be
* treated as a delta that increase the version counter in `DeltaPropagationSelector`.
* Otherwise a later delta might be applied before the full state gossip is received
* and thereby violating `RequiresCausalDeliveryOfDeltas`.
*
* This is used as a placeholder for such `None` delta. It's filtered out
* in `createDeltaPropagation`, i.e. never sent to the other replicas.
*/
val NoDeltaPlaceholder: ReplicatedDelta =
new DeltaReplicatedData with RequiresCausalDeliveryOfDeltas with ReplicatedDelta {
type T = ReplicatedData
type D = ReplicatedDelta
override def merge(other: ReplicatedData): ReplicatedData = this
override def mergeDelta(other: ReplicatedDelta): ReplicatedDelta = this
override def zero: DeltaReplicatedData = this
override def delta: Option[ReplicatedDelta] = None
override def resetDelta: ReplicatedData = this
}
}
case object DeltaNack extends ReplicatorMessage with DeadLetterSuppression
}
@ -941,6 +962,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
import Replicator._
import Replicator.Internal._
import Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder
import PruningState._
import settings._
@ -991,11 +1013,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = {
// Important to include the pruning state in the deltas. For example if the delta is based
// on an entry that has been pruned but that has not yet been performed on the target node.
DeltaPropagation(selfUniqueAddress, reply = false, deltas.map {
case (key, (d, fromSeqNr, toSeqNr)) getData(key) match {
case Some(envelope) key Delta(envelope.copy(data = d), fromSeqNr, toSeqNr)
case None key Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
}
DeltaPropagation(selfUniqueAddress, reply = false, deltas.collect {
case (key, (d, fromSeqNr, toSeqNr)) if d != NoDeltaPlaceholder
getData(key) match {
case Some(envelope) key Delta(envelope.copy(data = d), fromSeqNr, toSeqNr)
case None key Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
}
}(collection.breakOut))
}
}
@ -1206,18 +1229,27 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ReplicatedData,
writeConsistency: WriteConsistency, req: Option[Any]): Unit = {
val localValue = getData(key.id)
def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = {
d.delta match {
case s @ Some(_) s
case None Some(NoDeltaPlaceholder)
}
}
Try {
localValue match {
case Some(DataEnvelope(DeletedData, _, _)) throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _, _))
modify(Some(existing)) match {
case d: DeltaReplicatedData if deltaCrdtEnabled
(envelope.merge(d.resetDelta.asInstanceOf[existing.T]), d.delta)
(envelope.merge(d.resetDelta.asInstanceOf[existing.T]), deltaOrPlaceholder(d))
case d
(envelope.merge(d.asInstanceOf[existing.T]), None)
}
case None modify(None) match {
case d: DeltaReplicatedData if deltaCrdtEnabled (DataEnvelope(d.resetDelta), d.delta)
case d: DeltaReplicatedData if deltaCrdtEnabled
(DataEnvelope(d.resetDelta), deltaOrPlaceholder(d))
case d (DataEnvelope(d), None)
}
}
@ -1244,6 +1276,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
replyTo ! UpdateSuccess(key, req)
} else {
val (writeEnvelope, writeDelta) = delta match {
case Some(NoDeltaPlaceholder) (newEnvelope, None)
case Some(d: RequiresCausalDeliveryOfDeltas)
val v = deltaPropagationSelector.currentVersion(key.id)
(newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v)))
@ -1458,7 +1491,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
deltaPropagationSelector.collectPropagations().foreach {
case (node, deltaPropagation)
// TODO split it to several DeltaPropagation if too many entries
replica(node) ! deltaPropagation
if (deltaPropagation.deltas.nonEmpty)
replica(node) ! deltaPropagation
}
if (deltaPropagationSelector.propagationCount % deltaPropagationSelector.gossipIntervalDivisor == 0)

View file

@ -29,6 +29,25 @@ object ReplicatorDeltaSpec extends MultiNodeConfig {
testTransport(on = true)
case class Highest(n: Int, delta: Option[Highest] = None)
extends DeltaReplicatedData with RequiresCausalDeliveryOfDeltas with ReplicatedDelta {
type T = Highest
type D = Highest
override def merge(other: Highest): Highest =
if (n >= other.n) this else other
override def mergeDelta(other: Highest): Highest = merge(other)
override def zero: Highest = Highest(0)
override def resetDelta: Highest = Highest(n)
def incr(i: Int): Highest = Highest(n + i, Some(Highest(n + i)))
def incrNoDelta(i: Int): Highest = Highest(n + i, None)
}
sealed trait Op
final case class Delay(n: Int) extends Op
final case class Incr(key: PNCounterKey, n: Int, consistency: WriteConsistency) extends Op
@ -40,6 +59,7 @@ object ReplicatorDeltaSpec extends MultiNodeConfig {
val writeTwo = WriteTo(2, timeout)
val writeMajority = WriteMajority(timeout)
val KeyHigh = new Key[Highest]("High") {}
val KeyA = PNCounterKey("A")
val KeyB = PNCounterKey("B")
val KeyC = PNCounterKey("C")
@ -250,6 +270,81 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
enterBarrierAfterTestStep()
}
"preserve causal consistency for None delta" in {
runOn(first) {
val p1 = TestProbe()
deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(1)), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-1")
runOn(first) {
val p = TestProbe()
deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(1)
}
runOn(second, third, fourth) {
within(5.seconds) {
awaitAssert {
val p = TestProbe()
deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(1)
}
}
}
enterBarrier("read-1")
runOn(first) {
val p1 = TestProbe()
deltaReplicator.tell(Update(KeyHigh, Highest(0), writeAll)(_.incr(2)), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incrNoDelta(5)), p1.ref)
deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(10)), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-2")
runOn(first) {
val p = TestProbe()
deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(18)
}
runOn(second, third, fourth) {
within(5.seconds) {
awaitAssert {
val p = TestProbe()
deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref)
// the incrNoDelta(5) is not propagated as delta, and then incr(10) is also skipped
p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(3)
}
}
}
enterBarrier("read-2")
runOn(first) {
val p1 = TestProbe()
// WriteAll will send full state when delta can't be applied and thereby syncing the
// delta versions again. Same would happen via full state gossip.
// Thereafter delta can be propagated and applied again.
deltaReplicator.tell(Update(KeyHigh, Highest(0), writeAll)(_.incr(100)), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
deltaReplicator.tell(Update(KeyHigh, Highest(0), WriteLocal)(_.incr(4)), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-3")
within(5.seconds) {
awaitAssert {
val p = TestProbe()
deltaReplicator.tell(Get(KeyHigh, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[Highest]].dataValue.n should ===(122)
}
}
enterBarrierAfterTestStep()
}
"be eventually consistent" in {
val operations = generateOperations(onNode = myself)
log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")