Include entries DData Gossip message dynamically based on size, #28421 (#29751)

This commit is contained in:
Patrik Nordwall 2020-11-16 12:00:16 +01:00 committed by GitHub
parent ac58f7d4ca
commit 4fa733c146
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 247 additions and 21 deletions

View file

@ -0,0 +1,3 @@
# #28421 Gossip entries based on size
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.PruningState.estimatedSize")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.VersionVector.estimatedSize")

View file

@ -28,8 +28,10 @@ akka.cluster.distributed-data {
# It can be disabled by setting the property to off.
log-data-size-exceeding = 10 KiB
# Maximum number of entries to transfer in one gossip message when synchronizing
# the replicas. Next chunk will be transferred in next round of gossip.
# Maximum number of entries to transfer in one round of gossip exchange when
# synchronizing the replicas. Next chunk will be transferred in next round of gossip.
# The actual number of data entries in each Gossip message is dynamically
# adjusted to not exceed the maximum remote message size (maximum-frame-size).
max-delta-elements = 500
# The id of the dispatcher to use for Replicator actors.

View file

@ -0,0 +1,17 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.ddata
import akka.annotation.InternalApi
/**
* INTERNAL API: Rough estimate in bytes of some serialized data elements. Used
* when creating gossip messages.
*/
@InternalApi private[akka] object EstimatedSize {
val LongValue = 8
val Address = 50
val UniqueAddress = Address + LongValue
}

View file

@ -16,11 +16,12 @@ import akka.event.LoggingAdapter
@InternalApi private[akka] class PayloadSizeAggregator(
log: LoggingAdapter,
logSizeExceeding: Int,
warnSizeExceeding: Int) {
val maxFrameSize: Int) {
private val warnSizeExceeding = maxFrameSize * 3 / 4
private var maxPayloadBytes: Map[String, Int] = Map.empty
def updatePayloadSize(key: KeyId, size: Int): Unit = {
if (size >= logSizeExceeding) {
if (size > 0) { // deleted has size 0
// 10% threshold until next log
def newMax = (size * 1.1).toInt
@ -38,15 +39,21 @@ import akka.event.LoggingAdapter
case Some(max) =>
if (size > max) {
maxPayloadBytes = maxPayloadBytes.updated(key, newMax)
logSize()
if (size >= logSizeExceeding)
logSize()
}
case None =>
maxPayloadBytes = maxPayloadBytes.updated(key, newMax)
logSize()
if (size >= logSizeExceeding)
logSize()
}
}
}
def getMaxSize(key: KeyId): Int = {
maxPayloadBytes.getOrElse(key, 0)
}
def remove(key: KeyId): Unit =
maxPayloadBytes -= key

View file

@ -19,10 +19,12 @@ import akka.util.unused
if (seen(node) || owner.address == node) this
else copy(seen = seen + node)
}
def estimatedSize: Int = EstimatedSize.UniqueAddress + EstimatedSize.Address * seen.size
}
final case class PruningPerformed(obsoleteTime: Long) extends PruningState {
def isObsolete(currentTime: Long): Boolean = obsoleteTime <= currentTime
def addSeen(@unused node: Address): PruningState = this
def estimatedSize: Int = EstimatedSize.LongValue
}
}
@ -47,4 +49,6 @@ import akka.util.unused
}
def addSeen(node: Address): PruningState
def estimatedSize: Int
}

View file

@ -1087,6 +1087,10 @@ object Replicator {
if (changed) copy(pruning = newRemovedNodePruning)
else this
}
def estimatedSizeWithoutData: Int = {
deltaVersions.estimatedSize + pruning.valuesIterator.map(_.estimatedSize + EstimatedSize.UniqueAddress).sum
}
}
val DeletedEnvelope = DataEnvelope(DeletedData)
@ -1364,13 +1368,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
roles.subsetOf(cluster.selfRoles),
s"This cluster member [${selfAddress}] doesn't have all the roles [${roles.mkString(", ")}]")
private val payloadSizeAggregator = settings.logDataSizeExceeding.map { sizeExceeding =>
private val payloadSizeAggregator = {
val sizeExceeding = settings.logDataSizeExceeding.getOrElse(Int.MaxValue)
val remoteProvider = RARP(context.system).provider
val remoteSettings = remoteProvider.remoteSettings
val maxFrameSize =
if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Advanced.MaximumFrameSize
else context.system.settings.config.getBytes("akka.remote.classic.netty.tcp.maximum-frame-size").toInt
new PayloadSizeAggregator(log, sizeExceeding, maxFrameSize * 3 / 4)
new PayloadSizeAggregator(log, sizeExceeding, maxFrameSize)
}
//Start periodic gossip to random nodes in cluster
@ -1885,7 +1890,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
replyTo ! DataDeleted(key, req)
case _ =>
setData(key.id, DeletedEnvelope)
payloadSizeAggregator.foreach(_.remove(key.id))
payloadSizeAggregator.remove(key.id)
val durable = isDurable(key.id)
if (isLocalUpdate(consistency)) {
if (durable)
@ -1938,7 +1943,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (subscribers.contains(key) && !changed.contains(key)) {
val oldDigest = getDigest(key)
val (dig, payloadSize) = digest(newEnvelope)
payloadSizeAggregator.foreach(_.updatePayloadSize(key, payloadSize))
payloadSizeAggregator.updatePayloadSize(key, payloadSize)
if (dig != oldDigest)
changed += key // notify subscribers, later
dig
@ -1955,7 +1960,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
dataEntries.get(key) match {
case Some((envelope, LazyDigest)) =>
val (d, size) = digest(envelope)
payloadSizeAggregator.foreach(_.updatePayloadSize(key, size))
payloadSizeAggregator.updatePayloadSize(key, size)
dataEntries = dataEntries.updated(key, (envelope, d))
d
case Some((_, digest)) => digest
@ -2160,12 +2165,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (keys.nonEmpty) {
if (log.isDebugEnabled)
log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", "))
val g = Gossip(
keys.iterator.map(k => k -> getData(k).get).toMap,
sendBack = otherDifferentKeys.nonEmpty,
fromSystemUid,
selfFromSystemUid)
replyTo ! g
val sendBack = otherDifferentKeys.nonEmpty
createGossipMessages(keys, sendBack, fromSystemUid).foreach { g =>
replyTo ! g
}
}
val myMissingKeys = otherKeys.diff(myKeys)
if (myMissingKeys.nonEmpty) {
@ -2184,10 +2187,60 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
private def createGossipMessages(
keys: immutable.Iterable[KeyId],
sendBack: Boolean,
fromSystemUid: Option[Long]): Vector[Gossip] = {
// The sizes doesn't have to be exact, rather error on too small messages. The serializer is also
// compressing the Gossip message.
val maxMessageSize = payloadSizeAggregator.maxFrameSize - 128
var messages = Vector.empty[Gossip]
val collectedEntries = Vector.newBuilder[(KeyId, DataEnvelope)]
var sum = 0
def addGossip(): Unit = {
val entries = collectedEntries.result().toMap
if (entries.nonEmpty)
messages :+= Gossip(entries, sendBack, fromSystemUid, selfFromSystemUid)
}
keys.foreach { key =>
val keySize = key.length + 4
val dataSize = payloadSizeAggregator.getMaxSize(key) match {
case 0 =>
// trigger payloadSizeAggregator update (LazyDigest)
getDigest(key)
payloadSizeAggregator.getMaxSize(key)
case size => size
}
val dataEnvelope = getData(key).get
val envelopeSize = 100 + dataEnvelope.estimatedSizeWithoutData
val entrySize = keySize + dataSize + envelopeSize
if (sum + entrySize <= maxMessageSize) {
collectedEntries += (key -> dataEnvelope)
sum += entrySize
} else {
addGossip()
collectedEntries.clear()
collectedEntries += (key -> dataEnvelope)
sum = entrySize
}
}
// add remaining, if any
addGossip()
log.debug("Created [{}] Gossip messages from [{}] data entries.", messages.size, keys.size)
messages
}
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}].", replyTo.path.address, updatedData.keys.mkString(", "))
var replyData = Map.empty[KeyId, DataEnvelope]
var replyKeys = Set.empty[KeyId]
updatedData.foreach {
case (key, envelope) =>
val hadData = dataEntries.contains(key)
@ -2195,12 +2248,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (sendBack) getData(key) match {
case Some(d) =>
if (hadData || d.pruning.nonEmpty)
replyData = replyData.updated(key, d)
replyKeys += key
case None =>
}
}
if (sendBack && replyData.nonEmpty)
replyTo ! Gossip(replyData, sendBack = false, fromSystemUid, selfFromSystemUid)
if (sendBack && replyKeys.nonEmpty) {
createGossipMessages(replyKeys, sendBack = false, fromSystemUid).foreach { g =>
replyTo ! g
}
}
}
def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = {

View file

@ -263,6 +263,9 @@ sealed abstract class VersionVector extends ReplicatedData with ReplicatedDataSe
override def pruningCleanup(removedNode: UniqueAddress): VersionVector
/** INTERNAL API */
@InternalApi private[akka] def estimatedSize: Int
}
final case class OneVersionVector private[akka] (node: UniqueAddress, version: Long) extends VersionVector {
@ -322,6 +325,10 @@ final case class OneVersionVector private[akka] (node: UniqueAddress, version: L
override def toString: String =
s"VersionVector($node -> $version)"
/** INTERNAL API */
@InternalApi override private[akka] def estimatedSize: Int =
EstimatedSize.UniqueAddress + EstimatedSize.LongValue
}
final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) extends VersionVector {
@ -389,4 +396,8 @@ final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) exten
override def toString: String =
versions.map { case ((n, v)) => n.toString + " -> " + v }.mkString("VersionVector(", ", ", ")")
/** INTERNAL API */
@InternalApi override private[akka] def estimatedSize: Int =
versions.size * (EstimatedSize.UniqueAddress + EstimatedSize.LongValue)
}

View file

@ -0,0 +1,126 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.ddata
import scala.concurrent.duration._
import scala.util.Random
import com.typesafe.config.ConfigFactory
import akka.actor.Dropped
import akka.cluster.Cluster
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object ReplicatorGossipSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.distributed-data {
# only gossip in this test
delta-crdt.enabled = off
gossip-interval = 1s
max-delta-elements = 400
log-data-size-exceeding = 2000
}
akka.remote.artery {
log-frame-size-exceeding = 2000
advanced.maximum-frame-size = 50000
}
"""))
}
class ReplicatorGossipSpecMultiJvmNode1 extends ReplicatorGossipSpec
class ReplicatorGossipSpecMultiJvmNode2 extends ReplicatorGossipSpec
class ReplicatorGossipSpec extends MultiNodeSpec(ReplicatorGossipSpec) with STMultiNodeSpec with ImplicitSender {
import Replicator._
import ReplicatorGossipSpec._
override def initialParticipants: Int = roles.size
private val cluster = Cluster(system)
private implicit val selfUniqueAddress: SelfUniqueAddress = DistributedData(system).selfUniqueAddress
private val replicator = system.actorOf(Replicator.props(ReplicatorSettings(system)), "replicator")
private def threeDigits(n: Int): String = s"00$n".takeRight(3)
private val smallORSetKeys = (1 to 999).map(n => ORSetKey[String](s"small-${threeDigits(n)}")).toVector
private val largeORSetKeys = (1 to 99).map(n => ORSetKey[String](s"large-${threeDigits(n)}")).toVector
private val rnd = new Random
private val smallPayloadSize = 100
// note that those are full utf-8 strings so can be more than 1 byte per char
def smallPayload(): String = rnd.nextString(smallPayloadSize)
private val largePayloadSize = 4000
def largePayload(): String = rnd.nextString(largePayloadSize)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.join(node(to).address)
}
enterBarrier(from.name + "-joined")
}
"Replicator gossip" must {
"catch up when adding node" in {
join(first, first)
val droppedProbe = TestProbe()
system.eventStream.subscribe(droppedProbe.ref, classOf[Dropped])
val numberOfSmall = 500
val numberOfLarge = 15
runOn(first) {
(0 until numberOfSmall).foreach { i =>
replicator ! Update(smallORSetKeys(i), ORSet.empty[String], WriteLocal)(_ :+ smallPayload())
expectMsgType[UpdateSuccess[_]]
}
(0 until numberOfLarge).foreach { i =>
replicator ! Update(largeORSetKeys(i), ORSet.empty[String], WriteLocal)(_ :+ largePayload())
expectMsgType[UpdateSuccess[_]]
}
}
enterBarrier("updated-first")
join(second, first)
within(10.seconds) {
awaitAssert {
replicator ! GetReplicaCount
expectMsg(ReplicaCount(2))
}
}
enterBarrier("second-added")
runOn(second) {
within(10.seconds) {
awaitAssert {
(0 until numberOfSmall).foreach { i =>
replicator ! Get(smallORSetKeys(i), ReadLocal)
expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements.head.length should ===(smallPayloadSize)
}
(0 until numberOfLarge).foreach { i =>
replicator ! Get(largeORSetKeys(i), ReadLocal)
expectMsgType[GetSuccess[ORSet[String]]].dataValue.elements.head.length should ===(largePayloadSize)
}
}
}
}
enterBarrier("second-caught-up")
droppedProbe.expectNoMessage()
enterBarrier("done-1")
}
}
}