From 4fa733c146782192ebfee115466bb1614ddd6f76 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 16 Nov 2020 12:00:16 +0100 Subject: [PATCH] Include entries DData Gossip message dynamically based on size, #28421 (#29751) --- .../issue-28421-dynamic-gossip.excludes | 3 + .../src/main/resources/reference.conf | 6 +- .../akka/cluster/ddata/EstimatedSize.scala | 17 +++ .../cluster/ddata/PayloadSizeAggregator.scala | 15 ++- .../akka/cluster/ddata/PruningState.scala | 4 + .../scala/akka/cluster/ddata/Replicator.scala | 86 +++++++++--- .../akka/cluster/ddata/VersionVector.scala | 11 ++ .../cluster/ddata/ReplicatorGossipSpec.scala | 126 ++++++++++++++++++ 8 files changed, 247 insertions(+), 21 deletions(-) create mode 100644 akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-28421-dynamic-gossip.excludes create mode 100644 akka-distributed-data/src/main/scala/akka/cluster/ddata/EstimatedSize.scala create mode 100644 akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorGossipSpec.scala diff --git a/akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-28421-dynamic-gossip.excludes b/akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-28421-dynamic-gossip.excludes new file mode 100644 index 0000000000..1bffbecb14 --- /dev/null +++ b/akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-28421-dynamic-gossip.excludes @@ -0,0 +1,3 @@ +# #28421 Gossip entries based on size +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.PruningState.estimatedSize") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.VersionVector.estimatedSize") diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index 3fdecf4bab..f716157bd5 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -28,8 +28,10 @@ akka.cluster.distributed-data { # It can be disabled by setting the property to off. log-data-size-exceeding = 10 KiB - # Maximum number of entries to transfer in one gossip message when synchronizing - # the replicas. Next chunk will be transferred in next round of gossip. + # Maximum number of entries to transfer in one round of gossip exchange when + # synchronizing the replicas. Next chunk will be transferred in next round of gossip. + # The actual number of data entries in each Gossip message is dynamically + # adjusted to not exceed the maximum remote message size (maximum-frame-size). max-delta-elements = 500 # The id of the dispatcher to use for Replicator actors. diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/EstimatedSize.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/EstimatedSize.scala new file mode 100644 index 0000000000..1244c96b04 --- /dev/null +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/EstimatedSize.scala @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.ddata + +import akka.annotation.InternalApi + +/** + * INTERNAL API: Rough estimate in bytes of some serialized data elements. Used + * when creating gossip messages. + */ +@InternalApi private[akka] object EstimatedSize { + val LongValue = 8 + val Address = 50 + val UniqueAddress = Address + LongValue +} diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala index 1517a8f66e..485f2a2ad5 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala @@ -16,11 +16,12 @@ import akka.event.LoggingAdapter @InternalApi private[akka] class PayloadSizeAggregator( log: LoggingAdapter, logSizeExceeding: Int, - warnSizeExceeding: Int) { + val maxFrameSize: Int) { + private val warnSizeExceeding = maxFrameSize * 3 / 4 private var maxPayloadBytes: Map[String, Int] = Map.empty def updatePayloadSize(key: KeyId, size: Int): Unit = { - if (size >= logSizeExceeding) { + if (size > 0) { // deleted has size 0 // 10% threshold until next log def newMax = (size * 1.1).toInt @@ -38,15 +39,21 @@ import akka.event.LoggingAdapter case Some(max) => if (size > max) { maxPayloadBytes = maxPayloadBytes.updated(key, newMax) - logSize() + if (size >= logSizeExceeding) + logSize() } case None => maxPayloadBytes = maxPayloadBytes.updated(key, newMax) - logSize() + if (size >= logSizeExceeding) + logSize() } } } + def getMaxSize(key: KeyId): Int = { + maxPayloadBytes.getOrElse(key, 0) + } + def remove(key: KeyId): Unit = maxPayloadBytes -= key diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala index 60e8904517..8054153241 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala @@ -19,10 +19,12 @@ import akka.util.unused if (seen(node) || owner.address == node) this else copy(seen = seen + node) } + def estimatedSize: Int = EstimatedSize.UniqueAddress + EstimatedSize.Address * seen.size } final case class PruningPerformed(obsoleteTime: Long) extends PruningState { def isObsolete(currentTime: Long): Boolean = obsoleteTime <= currentTime def addSeen(@unused node: Address): PruningState = this + def estimatedSize: Int = EstimatedSize.LongValue } } @@ -47,4 +49,6 @@ import akka.util.unused } def addSeen(node: Address): PruningState + + def estimatedSize: Int } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 6a2db0acb7..38983ad06a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1087,6 +1087,10 @@ object Replicator { if (changed) copy(pruning = newRemovedNodePruning) else this } + + def estimatedSizeWithoutData: Int = { + deltaVersions.estimatedSize + pruning.valuesIterator.map(_.estimatedSize + EstimatedSize.UniqueAddress).sum + } } val DeletedEnvelope = DataEnvelope(DeletedData) @@ -1364,13 +1368,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog roles.subsetOf(cluster.selfRoles), s"This cluster member [${selfAddress}] doesn't have all the roles [${roles.mkString(", ")}]") - private val payloadSizeAggregator = settings.logDataSizeExceeding.map { sizeExceeding => + private val payloadSizeAggregator = { + val sizeExceeding = settings.logDataSizeExceeding.getOrElse(Int.MaxValue) val remoteProvider = RARP(context.system).provider val remoteSettings = remoteProvider.remoteSettings val maxFrameSize = if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Advanced.MaximumFrameSize else context.system.settings.config.getBytes("akka.remote.classic.netty.tcp.maximum-frame-size").toInt - new PayloadSizeAggregator(log, sizeExceeding, maxFrameSize * 3 / 4) + new PayloadSizeAggregator(log, sizeExceeding, maxFrameSize) } //Start periodic gossip to random nodes in cluster @@ -1885,7 +1890,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog replyTo ! DataDeleted(key, req) case _ => setData(key.id, DeletedEnvelope) - payloadSizeAggregator.foreach(_.remove(key.id)) + payloadSizeAggregator.remove(key.id) val durable = isDurable(key.id) if (isLocalUpdate(consistency)) { if (durable) @@ -1938,7 +1943,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (subscribers.contains(key) && !changed.contains(key)) { val oldDigest = getDigest(key) val (dig, payloadSize) = digest(newEnvelope) - payloadSizeAggregator.foreach(_.updatePayloadSize(key, payloadSize)) + payloadSizeAggregator.updatePayloadSize(key, payloadSize) if (dig != oldDigest) changed += key // notify subscribers, later dig @@ -1955,7 +1960,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog dataEntries.get(key) match { case Some((envelope, LazyDigest)) => val (d, size) = digest(envelope) - payloadSizeAggregator.foreach(_.updatePayloadSize(key, size)) + payloadSizeAggregator.updatePayloadSize(key, size) dataEntries = dataEntries.updated(key, (envelope, d)) d case Some((_, digest)) => digest @@ -2160,12 +2165,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (keys.nonEmpty) { if (log.isDebugEnabled) log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", ")) - val g = Gossip( - keys.iterator.map(k => k -> getData(k).get).toMap, - sendBack = otherDifferentKeys.nonEmpty, - fromSystemUid, - selfFromSystemUid) - replyTo ! g + val sendBack = otherDifferentKeys.nonEmpty + createGossipMessages(keys, sendBack, fromSystemUid).foreach { g => + replyTo ! g + } } val myMissingKeys = otherKeys.diff(myKeys) if (myMissingKeys.nonEmpty) { @@ -2184,10 +2187,60 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } + private def createGossipMessages( + keys: immutable.Iterable[KeyId], + sendBack: Boolean, + fromSystemUid: Option[Long]): Vector[Gossip] = { + // The sizes doesn't have to be exact, rather error on too small messages. The serializer is also + // compressing the Gossip message. + val maxMessageSize = payloadSizeAggregator.maxFrameSize - 128 + + var messages = Vector.empty[Gossip] + val collectedEntries = Vector.newBuilder[(KeyId, DataEnvelope)] + var sum = 0 + + def addGossip(): Unit = { + val entries = collectedEntries.result().toMap + if (entries.nonEmpty) + messages :+= Gossip(entries, sendBack, fromSystemUid, selfFromSystemUid) + } + + keys.foreach { key => + val keySize = key.length + 4 + val dataSize = payloadSizeAggregator.getMaxSize(key) match { + case 0 => + // trigger payloadSizeAggregator update (LazyDigest) + getDigest(key) + payloadSizeAggregator.getMaxSize(key) + case size => size + } + val dataEnvelope = getData(key).get + val envelopeSize = 100 + dataEnvelope.estimatedSizeWithoutData + + val entrySize = keySize + dataSize + envelopeSize + if (sum + entrySize <= maxMessageSize) { + collectedEntries += (key -> dataEnvelope) + sum += entrySize + } else { + addGossip() + collectedEntries.clear() + collectedEntries += (key -> dataEnvelope) + sum = entrySize + } + } + + // add remaining, if any + addGossip() + + log.debug("Created [{}] Gossip messages from [{}] data entries.", messages.size, keys.size) + + messages + } + def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = { if (log.isDebugEnabled) log.debug("Received gossip from [{}], containing [{}].", replyTo.path.address, updatedData.keys.mkString(", ")) - var replyData = Map.empty[KeyId, DataEnvelope] + var replyKeys = Set.empty[KeyId] updatedData.foreach { case (key, envelope) => val hadData = dataEntries.contains(key) @@ -2195,12 +2248,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (sendBack) getData(key) match { case Some(d) => if (hadData || d.pruning.nonEmpty) - replyData = replyData.updated(key, d) + replyKeys += key case None => } } - if (sendBack && replyData.nonEmpty) - replyTo ! Gossip(replyData, sendBack = false, fromSystemUid, selfFromSystemUid) + if (sendBack && replyKeys.nonEmpty) { + createGossipMessages(replyKeys, sendBack = false, fromSystemUid).foreach { g => + replyTo ! g + } + } } def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala index 75466e1a8f..6113e8ef8f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala @@ -263,6 +263,9 @@ sealed abstract class VersionVector extends ReplicatedData with ReplicatedDataSe override def pruningCleanup(removedNode: UniqueAddress): VersionVector + /** INTERNAL API */ + @InternalApi private[akka] def estimatedSize: Int + } final case class OneVersionVector private[akka] (node: UniqueAddress, version: Long) extends VersionVector { @@ -322,6 +325,10 @@ final case class OneVersionVector private[akka] (node: UniqueAddress, version: L override def toString: String = s"VersionVector($node -> $version)" + /** INTERNAL API */ + @InternalApi override private[akka] def estimatedSize: Int = + EstimatedSize.UniqueAddress + EstimatedSize.LongValue + } final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) extends VersionVector { @@ -389,4 +396,8 @@ final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) exten override def toString: String = versions.map { case ((n, v)) => n.toString + " -> " + v }.mkString("VersionVector(", ", ", ")") + + /** INTERNAL API */ + @InternalApi override private[akka] def estimatedSize: Int = + versions.size * (EstimatedSize.UniqueAddress + EstimatedSize.LongValue) } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorGossipSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorGossipSpec.scala new file mode 100644 index 0000000000..ac28cc4319 --- /dev/null +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorGossipSpec.scala @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.ddata + +import scala.concurrent.duration._ +import scala.util.Random + +import com.typesafe.config.ConfigFactory + +import akka.actor.Dropped +import akka.cluster.Cluster +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object ReplicatorGossipSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.distributed-data { + # only gossip in this test + delta-crdt.enabled = off + gossip-interval = 1s + max-delta-elements = 400 + log-data-size-exceeding = 2000 + } + akka.remote.artery { + log-frame-size-exceeding = 2000 + advanced.maximum-frame-size = 50000 + } + """)) + +} + +class ReplicatorGossipSpecMultiJvmNode1 extends ReplicatorGossipSpec +class ReplicatorGossipSpecMultiJvmNode2 extends ReplicatorGossipSpec + +class ReplicatorGossipSpec extends MultiNodeSpec(ReplicatorGossipSpec) with STMultiNodeSpec with ImplicitSender { + import Replicator._ + import ReplicatorGossipSpec._ + + override def initialParticipants: Int = roles.size + + private val cluster = Cluster(system) + private implicit val selfUniqueAddress: SelfUniqueAddress = DistributedData(system).selfUniqueAddress + private val replicator = system.actorOf(Replicator.props(ReplicatorSettings(system)), "replicator") + + private def threeDigits(n: Int): String = s"00$n".takeRight(3) + private val smallORSetKeys = (1 to 999).map(n => ORSetKey[String](s"small-${threeDigits(n)}")).toVector + private val largeORSetKeys = (1 to 99).map(n => ORSetKey[String](s"large-${threeDigits(n)}")).toVector + + private val rnd = new Random + private val smallPayloadSize = 100 + // note that those are full utf-8 strings so can be more than 1 byte per char + def smallPayload(): String = rnd.nextString(smallPayloadSize) + private val largePayloadSize = 4000 + def largePayload(): String = rnd.nextString(largePayloadSize) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster.join(node(to).address) + } + enterBarrier(from.name + "-joined") + } + + "Replicator gossip" must { + + "catch up when adding node" in { + join(first, first) + + val droppedProbe = TestProbe() + system.eventStream.subscribe(droppedProbe.ref, classOf[Dropped]) + + val numberOfSmall = 500 + val numberOfLarge = 15 + + runOn(first) { + (0 until numberOfSmall).foreach { i => + replicator ! Update(smallORSetKeys(i), ORSet.empty[String], WriteLocal)(_ :+ smallPayload()) + expectMsgType[UpdateSuccess[_]] + } + (0 until numberOfLarge).foreach { i => + replicator ! Update(largeORSetKeys(i), ORSet.empty[String], WriteLocal)(_ :+ largePayload()) + expectMsgType[UpdateSuccess[_]] + } + } + enterBarrier("updated-first") + + join(second, first) + within(10.seconds) { + awaitAssert { + replicator ! GetReplicaCount + expectMsg(ReplicaCount(2)) + } + } + enterBarrier("second-added") + + runOn(second) { + within(10.seconds) { + awaitAssert { + (0 until numberOfSmall).foreach { i => + replicator ! Get(smallORSetKeys(i), ReadLocal) + expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements.head.length should ===(smallPayloadSize) + } + (0 until numberOfLarge).foreach { i => + replicator ! Get(largeORSetKeys(i), ReadLocal) + expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements.head.length should ===(largePayloadSize) + } + } + } + } + enterBarrier("second-caught-up") + + droppedProbe.expectNoMessage() + + enterBarrier("done-1") + } + } + +}