From 2970287f95c228339892a63346a13c8c6bc8456c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 30 May 2017 12:09:56 +0200 Subject: [PATCH] discard large deltas, #23025 * to avoid OversizedPayloadException * some complex deltas grow for each update operation, e.g. when updating different keys in ORMap (PNCounterMap) * such large deltas can safely be discarded and disseminated as full state instead * added ReplicatedDeltaSize interface to be able to define the "size" and when that size exceeds configured threshold the delta is discarded --- .../src/main/resources/reference.conf | 5 +++ .../ddata/DeltaPropagationSelector.scala | 10 ++++- .../main/scala/akka/cluster/ddata/ORMap.scala | 8 +++- .../main/scala/akka/cluster/ddata/ORSet.scala | 8 +++- .../akka/cluster/ddata/ReplicatedData.scala | 12 ++++++ .../scala/akka/cluster/ddata/Replicator.scala | 30 ++++++++++++--- .../ddata/ReplicatorMapDeltaSpec.scala | 37 ++++++++++++++++++- .../ddata/DeltaPropagationSelectorSpec.scala | 17 +++++++++ project/MiMa.scala | 3 ++ 9 files changed, 117 insertions(+), 13 deletions(-) diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index 6d39269ead..c6b0352793 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -62,6 +62,11 @@ akka.cluster.distributed-data { delta-crdt { # enable or disable delta-CRDT replication enabled = on + + # Some complex deltas grow in size for each update and above this + # threshold such deltas are discarded and sent as full state instead. + # This is number of elements or similar size hint, not size in bytes. + max-delta-size = 200 } durable { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala index 6ac292f26f..6fc613a1ce 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala @@ -30,6 +30,8 @@ import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholde def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation + def maxDeltaSize: Int + def currentVersion(key: KeyId): Long = deltaCounter.get(key) match { case Some(v) ⇒ v case None ⇒ 0L @@ -106,12 +108,18 @@ import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholde case None ⇒ val group = deltaEntriesAfterJ.valuesIterator.reduceLeft { (d1, d2) ⇒ - d2 match { + val merged = d2 match { case NoDeltaPlaceholder ⇒ NoDeltaPlaceholder case _ ⇒ // this is fine also if d1 is a NoDeltaPlaceholder d1.merge(d2.asInstanceOf[d1.T]) } + merged match { + case s: ReplicatedDeltaSize if s.deltaSize >= maxDeltaSize ⇒ + // discard too large deltas + NoDeltaPlaceholder + case _ ⇒ merged + } } cache = cache.updated(cacheKey, group) group diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index 40d01e1d89..fed712a5cb 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -51,7 +51,7 @@ object ORMap { /** * INTERNAL API */ - @InternalApi private[akka] sealed abstract class AtomicDeltaOp[A, B <: ReplicatedData] extends DeltaOp { + @InternalApi private[akka] sealed abstract class AtomicDeltaOp[A, B <: ReplicatedData] extends DeltaOp with ReplicatedDeltaSize { def underlying: ORSet.DeltaOp def zeroTag: ZeroTag override def zero: DeltaReplicatedData = zeroTag.zero @@ -59,6 +59,7 @@ object ORMap { case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other)) case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops) } + override def deltaSize: Int = 1 } // PutDeltaOp contains ORSet delta and full value @@ -117,7 +118,8 @@ object ORMap { // DeltaGroup is effectively a causally ordered list of individual deltas /** INTERNAL API */ - @InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp { + @InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) + extends DeltaOp with ReplicatedDeltaSize { override def merge(that: DeltaOp): DeltaOp = that match { case that: AtomicDeltaOp[A, B] ⇒ ops.last match { @@ -139,6 +141,8 @@ object ORMap { } override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero) + + override def deltaSize: Int = ops.size } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index 93a73bebd2..818274b304 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -45,9 +45,10 @@ object ORSet { /** * INTERNAL API */ - @InternalApi private[akka] sealed abstract class AtomicDeltaOp[A] extends DeltaOp { + @InternalApi private[akka] sealed abstract class AtomicDeltaOp[A] extends DeltaOp with ReplicatedDeltaSize { def underlying: ORSet[A] override def zero: ORSet[A] = ORSet.empty + override def deltaSize: Int = 1 } /** INTERNAL API */ @@ -94,7 +95,8 @@ object ORSet { /** * INTERNAL API */ - @InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp { + @InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) + extends DeltaOp with ReplicatedDeltaSize { override def merge(that: DeltaOp): DeltaOp = that match { case thatAdd: AddDeltaOp[A] ⇒ // merge AddDeltaOp into last AddDeltaOp in the group, if possible @@ -107,6 +109,8 @@ object ORSet { } override def zero: ORSet[A] = ORSet.empty + + override def deltaSize: Int = ops.size } /** diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala index 98bd1134fd..969969ec66 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala @@ -114,6 +114,18 @@ trait ReplicatedDelta extends ReplicatedData { */ trait RequiresCausalDeliveryOfDeltas extends ReplicatedDelta +/** + * Some complex deltas grow in size for each update and above a configured + * threshold such deltas are discarded and sent as full state instead. This + * interface should be implemented by such deltas to define its size. + * This is number of elements or similar size hint, not size in bytes. + * The threshold is defined in `akka.cluster.distributed-data.delta-crdt.max-delta-size` + * or corresponding [[ReplicatorSettings]]. + */ +trait ReplicatedDeltaSize { + def deltaSize: Int +} + /** * Java API: Interface for implementing a [[ReplicatedData]] in Java. * diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index fa688c7686..795a1fce2b 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -86,7 +86,8 @@ object ReplicatorSettings { durableKeys = config.getStringList("durable.keys").asScala.toSet, pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis, durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis, - deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled")) + deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"), + maxDeltaSize = config.getInt("delta-crdt.max-delta-size")) } /** @@ -134,20 +135,31 @@ final class ReplicatorSettings( val durableKeys: Set[KeyId], val pruningMarkerTimeToLive: FiniteDuration, val durablePruningMarkerTimeToLive: FiniteDuration, - val deltaCrdtEnabled: Boolean) { + val deltaCrdtEnabled: Boolean, + val maxDeltaSize: Int) { // For backwards compatibility def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) = this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, - maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true) + maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true, 200) // For backwards compatibility def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration, durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) = this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, - maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true) + maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true, 200) + + // For backwards compatibility + def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration, + maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration, + durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String], + pruningMarkerTimeToLive: FiniteDuration, durablePruningMarkerTimeToLive: FiniteDuration, + deltaCrdtEnabled: Boolean) = + this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, + maxPruningDissemination, durableStoreProps, durableKeys, pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, + deltaCrdtEnabled, 200) def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role)) @@ -200,6 +212,9 @@ final class ReplicatorSettings( def withDeltaCrdtEnabled(deltaCrdtEnabled: Boolean): ReplicatorSettings = copy(deltaCrdtEnabled = deltaCrdtEnabled) + def withMaxDeltaSize(maxDeltaSize: Int): ReplicatorSettings = + copy(maxDeltaSize = maxDeltaSize) + private def copy( role: Option[String] = role, gossipInterval: FiniteDuration = gossipInterval, @@ -212,10 +227,11 @@ final class ReplicatorSettings( durableKeys: Set[KeyId] = durableKeys, pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive, durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive, - deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings = + deltaCrdtEnabled: Boolean = deltaCrdtEnabled, + maxDeltaSize: Int = maxDeltaSize): ReplicatorSettings = new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys, - pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled) + pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled, maxDeltaSize) } object Replicator { @@ -1010,6 +1026,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted } + override def maxDeltaSize: Int = settings.maxDeltaSize + override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = { // Important to include the pruning state in the deltas. For example if the delta is based // on an entry that has been pruned but that has not yet been performed on the target node. diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala index 0600c616ea..6a263c4ee7 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala @@ -13,6 +13,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import com.typesafe.config.ConfigFactory +import akka.event.Logging.Error object ReplicatorMapDeltaSpec extends MultiNodeConfig { val first = role("first") @@ -21,13 +22,14 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(ConfigFactory.parseString(""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off akka.actor { serialize-messages = off allow-java-serialization = off } + #akka.remote.artery.enabled = on """)) testTransport(on = true) @@ -187,6 +189,9 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with r ! Replicator.Internal.TestFullStateGossip(enabled = false) r } + // both deltas and full state + val ordinaryReplicator = system.actorOf(Replicator.props( + ReplicatorSettings(system).withGossipInterval(1.second)), "ordinaryReplicator") var afterCounter = 0 def enterBarrierAfterTestStep(): Unit = { @@ -288,6 +293,34 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with enterBarrierAfterTestStep() } + "replicate high throughput changes without OversizedPayloadException" in { + val N = 1000 + val errorLogProbe = TestProbe() + system.eventStream.subscribe(errorLogProbe.ref, classOf[Error]) + runOn(first) { + for (_ ← 1 to N; key ← List(KeyA, KeyB)) { + ordinaryReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2) + } + } + enterBarrier("updated-2") + + within(5.seconds) { + awaitAssert { + val p = TestProbe() + List(KeyA, KeyB).foreach { key ⇒ + ordinaryReplicator.tell(Get(key._1, ReadLocal), p.ref) + p.expectMsgType[GetSuccess[PNCounterMap[String]]].dataValue.get(key._2).get.intValue should be(N) + } + } + } + + enterBarrier("replicated-2") + // no OversizedPayloadException logging + errorLogProbe.expectNoMsg(100.millis) + + enterBarrierAfterTestStep() + } + "be eventually consistent" in { val operations = generateOperations(onNode = myself) log.debug(s"random operations on [${myself.name}]: ${operations.mkString(", ")}") @@ -344,7 +377,7 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with } } - enterBarrier("updated-2") + enterBarrier("updated-3") List(KeyA, KeyB, KeyC).foreach { key ⇒ within(5.seconds) { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala index 7dd1fea309..544d61e640 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala @@ -9,6 +9,7 @@ import akka.cluster.ddata.Key.KeyId import akka.cluster.ddata.Replicator.Internal.DataEnvelope import akka.cluster.ddata.Replicator.Internal.Delta import akka.cluster.ddata.Replicator.Internal.DeltaPropagation +import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder import org.scalactic.TypeCheckedTripleEquals import org.scalatest.Matchers import org.scalatest.WordSpec @@ -22,6 +23,7 @@ object DeltaPropagationSelectorSpec { DeltaPropagation(selfUniqueAddress, false, deltas.mapValues { case (d, fromSeqNr, toSeqNr) ⇒ Delta(DataEnvelope(d), fromSeqNr, toSeqNr) }) + override def maxDeltaSize: Int = 10 } val deltaA = GSet.empty[String] + "a" @@ -158,6 +160,21 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) } + "discard too large deltas" in { + val selector = new TestSelector(selfUniqueAddress, nodes.take(3)) { + override def nodesSliceSize(allNodesSize: Int): Int = 1 + } + var data = PNCounterMap.empty[String] + (1 to 1000).foreach { n ⇒ + val d = data.resetDelta.increment(selfUniqueAddress, (n % 2).toString, 1) + selector.update("A", d.delta.get) + data = d + } + val expected = DeltaPropagation(selfUniqueAddress, false, Map( + "A" → Delta(DataEnvelope(NoDeltaPlaceholder), 1L, 1000L))) + selector.collectPropagations() should ===(Map(nodes(0) → expected)) + } + "calcualte right slice size" in { val selector = new TestSelector(selfUniqueAddress, nodes) selector.nodesSliceSize(0) should ===(0) diff --git a/project/MiMa.scala b/project/MiMa.scala index 7d27c2a82d..6c15c4a27c 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1224,6 +1224,9 @@ object MiMa extends AutoPlugin { // #23144 recoverWithRetries cleanup ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries"), + // #23025 OversizedPayloadException DeltaPropagation + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.DeltaPropagationSelector.maxDeltaSize"), + // #23023 added a new overload with implementation to trait, so old transport implementations compiled against // older versions will be missing the method. We accept that incompatibility for now. ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate")