discard large deltas, #23025

* to avoid OversizedPayloadException
* some complex deltas grow for each update operation, e.g.
  when updating different keys in ORMap (PNCounterMap)
* such large deltas can safely be discarded and disseminated as full
  state instead
* added ReplicatedDeltaSize interface to be able to define the "size"
  and when that size exceeds configured threshold the delta is discarded
This commit is contained in:
Patrik Nordwall 2017-05-30 12:09:56 +02:00
parent 5641c12547
commit 2970287f95
9 changed files with 117 additions and 13 deletions

View file

@ -62,6 +62,11 @@ akka.cluster.distributed-data {
delta-crdt { delta-crdt {
# enable or disable delta-CRDT replication # enable or disable delta-CRDT replication
enabled = on enabled = on
# Some complex deltas grow in size for each update and above this
# threshold such deltas are discarded and sent as full state instead.
# This is number of elements or similar size hint, not size in bytes.
max-delta-size = 200
} }
durable { durable {

View file

@ -30,6 +30,8 @@ import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholde
def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation
def maxDeltaSize: Int
def currentVersion(key: KeyId): Long = deltaCounter.get(key) match { def currentVersion(key: KeyId): Long = deltaCounter.get(key) match {
case Some(v) v case Some(v) v
case None 0L case None 0L
@ -106,12 +108,18 @@ import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholde
case None case None
val group = deltaEntriesAfterJ.valuesIterator.reduceLeft { val group = deltaEntriesAfterJ.valuesIterator.reduceLeft {
(d1, d2) (d1, d2)
d2 match { val merged = d2 match {
case NoDeltaPlaceholder NoDeltaPlaceholder case NoDeltaPlaceholder NoDeltaPlaceholder
case _ case _
// this is fine also if d1 is a NoDeltaPlaceholder // this is fine also if d1 is a NoDeltaPlaceholder
d1.merge(d2.asInstanceOf[d1.T]) d1.merge(d2.asInstanceOf[d1.T])
} }
merged match {
case s: ReplicatedDeltaSize if s.deltaSize >= maxDeltaSize
// discard too large deltas
NoDeltaPlaceholder
case _ merged
}
} }
cache = cache.updated(cacheKey, group) cache = cache.updated(cacheKey, group)
group group

View file

@ -51,7 +51,7 @@ object ORMap {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] sealed abstract class AtomicDeltaOp[A, B <: ReplicatedData] extends DeltaOp { @InternalApi private[akka] sealed abstract class AtomicDeltaOp[A, B <: ReplicatedData] extends DeltaOp with ReplicatedDeltaSize {
def underlying: ORSet.DeltaOp def underlying: ORSet.DeltaOp
def zeroTag: ZeroTag def zeroTag: ZeroTag
override def zero: DeltaReplicatedData = zeroTag.zero override def zero: DeltaReplicatedData = zeroTag.zero
@ -59,6 +59,7 @@ object ORMap {
case other: AtomicDeltaOp[A, B] DeltaGroup(Vector(this, other)) case other: AtomicDeltaOp[A, B] DeltaGroup(Vector(this, other))
case DeltaGroup(ops) DeltaGroup(this +: ops) case DeltaGroup(ops) DeltaGroup(this +: ops)
} }
override def deltaSize: Int = 1
} }
// PutDeltaOp contains ORSet delta and full value // PutDeltaOp contains ORSet delta and full value
@ -117,7 +118,8 @@ object ORMap {
// DeltaGroup is effectively a causally ordered list of individual deltas // DeltaGroup is effectively a causally ordered list of individual deltas
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp { @InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp])
extends DeltaOp with ReplicatedDeltaSize {
override def merge(that: DeltaOp): DeltaOp = that match { override def merge(that: DeltaOp): DeltaOp = that match {
case that: AtomicDeltaOp[A, B] case that: AtomicDeltaOp[A, B]
ops.last match { ops.last match {
@ -139,6 +141,8 @@ object ORMap {
} }
override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero) override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero)
override def deltaSize: Int = ops.size
} }
} }

View file

@ -45,9 +45,10 @@ object ORSet {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] sealed abstract class AtomicDeltaOp[A] extends DeltaOp { @InternalApi private[akka] sealed abstract class AtomicDeltaOp[A] extends DeltaOp with ReplicatedDeltaSize {
def underlying: ORSet[A] def underlying: ORSet[A]
override def zero: ORSet[A] = ORSet.empty override def zero: ORSet[A] = ORSet.empty
override def deltaSize: Int = 1
} }
/** INTERNAL API */ /** INTERNAL API */
@ -94,7 +95,8 @@ object ORSet {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp { @InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp])
extends DeltaOp with ReplicatedDeltaSize {
override def merge(that: DeltaOp): DeltaOp = that match { override def merge(that: DeltaOp): DeltaOp = that match {
case thatAdd: AddDeltaOp[A] case thatAdd: AddDeltaOp[A]
// merge AddDeltaOp into last AddDeltaOp in the group, if possible // merge AddDeltaOp into last AddDeltaOp in the group, if possible
@ -107,6 +109,8 @@ object ORSet {
} }
override def zero: ORSet[A] = ORSet.empty override def zero: ORSet[A] = ORSet.empty
override def deltaSize: Int = ops.size
} }
/** /**

View file

@ -114,6 +114,18 @@ trait ReplicatedDelta extends ReplicatedData {
*/ */
trait RequiresCausalDeliveryOfDeltas extends ReplicatedDelta trait RequiresCausalDeliveryOfDeltas extends ReplicatedDelta
/**
* Some complex deltas grow in size for each update and above a configured
* threshold such deltas are discarded and sent as full state instead. This
* interface should be implemented by such deltas to define its size.
* This is number of elements or similar size hint, not size in bytes.
* The threshold is defined in `akka.cluster.distributed-data.delta-crdt.max-delta-size`
* or corresponding [[ReplicatorSettings]].
*/
trait ReplicatedDeltaSize {
def deltaSize: Int
}
/** /**
* Java API: Interface for implementing a [[ReplicatedData]] in Java. * Java API: Interface for implementing a [[ReplicatedData]] in Java.
* *

View file

@ -86,7 +86,8 @@ object ReplicatorSettings {
durableKeys = config.getStringList("durable.keys").asScala.toSet, durableKeys = config.getStringList("durable.keys").asScala.toSet,
pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis, pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis,
durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis, durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis,
deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled")) deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"),
maxDeltaSize = config.getInt("delta-crdt.max-delta-size"))
} }
/** /**
@ -134,20 +135,31 @@ final class ReplicatorSettings(
val durableKeys: Set[KeyId], val durableKeys: Set[KeyId],
val pruningMarkerTimeToLive: FiniteDuration, val pruningMarkerTimeToLive: FiniteDuration,
val durablePruningMarkerTimeToLive: FiniteDuration, val durablePruningMarkerTimeToLive: FiniteDuration,
val deltaCrdtEnabled: Boolean) { val deltaCrdtEnabled: Boolean,
val maxDeltaSize: Int) {
// For backwards compatibility // For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) = maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true) maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true, 200)
// For backwards compatibility // For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration, maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) = durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true) maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true, 200)
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String],
pruningMarkerTimeToLive: FiniteDuration, durablePruningMarkerTimeToLive: FiniteDuration,
deltaCrdtEnabled: Boolean) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive,
deltaCrdtEnabled, 200)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role)) def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@ -200,6 +212,9 @@ final class ReplicatorSettings(
def withDeltaCrdtEnabled(deltaCrdtEnabled: Boolean): ReplicatorSettings = def withDeltaCrdtEnabled(deltaCrdtEnabled: Boolean): ReplicatorSettings =
copy(deltaCrdtEnabled = deltaCrdtEnabled) copy(deltaCrdtEnabled = deltaCrdtEnabled)
def withMaxDeltaSize(maxDeltaSize: Int): ReplicatorSettings =
copy(maxDeltaSize = maxDeltaSize)
private def copy( private def copy(
role: Option[String] = role, role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval, gossipInterval: FiniteDuration = gossipInterval,
@ -212,10 +227,11 @@ final class ReplicatorSettings(
durableKeys: Set[KeyId] = durableKeys, durableKeys: Set[KeyId] = durableKeys,
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive, pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive, durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings = deltaCrdtEnabled: Boolean = deltaCrdtEnabled,
maxDeltaSize: Int = maxDeltaSize): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys, pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled) pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled, maxDeltaSize)
} }
object Replicator { object Replicator {
@ -1010,6 +1026,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted
} }
override def maxDeltaSize: Int = settings.maxDeltaSize
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = { override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = {
// Important to include the pruning state in the deltas. For example if the delta is based // Important to include the pruning state in the deltas. For example if the delta is based
// on an entry that has been pruned but that has not yet been performed on the target node. // on an entry that has been pruned but that has not yet been performed on the target node.

View file

@ -13,6 +13,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.event.Logging.Error
object ReplicatorMapDeltaSpec extends MultiNodeConfig { object ReplicatorMapDeltaSpec extends MultiNodeConfig {
val first = role("first") val first = role("first")
@ -21,13 +22,14 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig {
val fourth = role("fourth") val fourth = role("fourth")
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.loglevel = DEBUG akka.loglevel = INFO
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
akka.actor { akka.actor {
serialize-messages = off serialize-messages = off
allow-java-serialization = off allow-java-serialization = off
} }
#akka.remote.artery.enabled = on
""")) """))
testTransport(on = true) testTransport(on = true)
@ -187,6 +189,9 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
r ! Replicator.Internal.TestFullStateGossip(enabled = false) r ! Replicator.Internal.TestFullStateGossip(enabled = false)
r r
} }
// both deltas and full state
val ordinaryReplicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second)), "ordinaryReplicator")
var afterCounter = 0 var afterCounter = 0
def enterBarrierAfterTestStep(): Unit = { def enterBarrierAfterTestStep(): Unit = {
@ -288,6 +293,34 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
enterBarrierAfterTestStep() enterBarrierAfterTestStep()
} }
"replicate high throughput changes without OversizedPayloadException" in {
val N = 1000
val errorLogProbe = TestProbe()
system.eventStream.subscribe(errorLogProbe.ref, classOf[Error])
runOn(first) {
for (_ 1 to N; key List(KeyA, KeyB)) {
ordinaryReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2)
}
}
enterBarrier("updated-2")
within(5.seconds) {
awaitAssert {
val p = TestProbe()
List(KeyA, KeyB).foreach { key
ordinaryReplicator.tell(Get(key._1, ReadLocal), p.ref)
p.expectMsgType[GetSuccess[PNCounterMap[String]]].dataValue.get(key._2).get.intValue should be(N)
}
}
}
enterBarrier("replicated-2")
// no OversizedPayloadException logging
errorLogProbe.expectNoMsg(100.millis)
enterBarrierAfterTestStep()
}
"be eventually consistent" in { "be eventually consistent" in {
val operations = generateOperations(onNode = myself) val operations = generateOperations(onNode = myself)
log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}") log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}")
@ -344,7 +377,7 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
} }
} }
enterBarrier("updated-2") enterBarrier("updated-3")
List(KeyA, KeyB, KeyC).foreach { key List(KeyA, KeyB, KeyC).foreach { key
within(5.seconds) { within(5.seconds) {

View file

@ -9,6 +9,7 @@ import akka.cluster.ddata.Key.KeyId
import akka.cluster.ddata.Replicator.Internal.DataEnvelope import akka.cluster.ddata.Replicator.Internal.DataEnvelope
import akka.cluster.ddata.Replicator.Internal.Delta import akka.cluster.ddata.Replicator.Internal.Delta
import akka.cluster.ddata.Replicator.Internal.DeltaPropagation import akka.cluster.ddata.Replicator.Internal.DeltaPropagation
import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder
import org.scalactic.TypeCheckedTripleEquals import org.scalactic.TypeCheckedTripleEquals
import org.scalatest.Matchers import org.scalatest.Matchers
import org.scalatest.WordSpec import org.scalatest.WordSpec
@ -22,6 +23,7 @@ object DeltaPropagationSelectorSpec {
DeltaPropagation(selfUniqueAddress, false, deltas.mapValues { DeltaPropagation(selfUniqueAddress, false, deltas.mapValues {
case (d, fromSeqNr, toSeqNr) Delta(DataEnvelope(d), fromSeqNr, toSeqNr) case (d, fromSeqNr, toSeqNr) Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
}) })
override def maxDeltaSize: Int = 10
} }
val deltaA = GSet.empty[String] + "a" val deltaA = GSet.empty[String] + "a"
@ -158,6 +160,21 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
} }
"discard too large deltas" in {
val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) {
override def nodesSliceSize(allNodesSize: Int): Int = 1
}
var data = PNCounterMap.empty[String]
(1 to 1000).foreach { n
val d = data.resetDelta.increment(selfUniqueAddress, (n % 2).toString, 1)
selector.update("A", d.delta.get)
data = d
}
val expected = DeltaPropagation(selfUniqueAddress, false, Map(
"A" Delta(DataEnvelope(NoDeltaPlaceholder), 1L, 1000L)))
selector.collectPropagations() should ===(Map(nodes(0) expected))
}
"calcualte right slice size" in { "calcualte right slice size" in {
val selector = new TestSelector(selfUniqueAddress, nodes) val selector = new TestSelector(selfUniqueAddress, nodes)
selector.nodesSliceSize(0) should ===(0) selector.nodesSliceSize(0) should ===(0)

View file

@ -1224,6 +1224,9 @@ object MiMa extends AutoPlugin {
// #23144 recoverWithRetries cleanup // #23144 recoverWithRetries cleanup
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"),
// #23025 OversizedPayloadException DeltaPropagation
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.DeltaPropagationSelector.maxDeltaSize"),
// #23023 added a new overload with implementation to trait, so old transport implementations compiled against // #23023 added a new overload with implementation to trait, so old transport implementations compiled against
// older versions will be missing the method. We accept that incompatibility for now. // older versions will be missing the method. We accept that incompatibility for now.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate")