diff --git a/akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-29736-log-data-size.excludes b/akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-29736-log-data-size.excludes new file mode 100644 index 0000000000..e3fb8bc419 --- /dev/null +++ b/akka-distributed-data/src/main/mima-filters/2.6.10.backwards.excludes/issue-29736-log-data-size.excludes @@ -0,0 +1,2 @@ +# #29736 Change of Replicator actor internals +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.digest") diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index 58a5e7efbc..3fdecf4bab 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -22,6 +22,12 @@ akka.cluster.distributed-data { # How often the subscribers will be notified of changes, if any notify-subscribers-interval = 500 ms + # Logging of data with payload size in bytes larger than + # this value. Maximum detected size per key is logged once, + # with an increase threshold of 10%. + # It can be disabled by setting the property to off. + log-data-size-exceeding = 10 KiB + # Maximum number of entries to transfer in one gossip message when synchronizing # the replicas. Next chunk will be transferred in next round of gossip. max-delta-elements = 500 diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala new file mode 100644 index 0000000000..1517a8f66e --- /dev/null +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PayloadSizeAggregator.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.ddata + +import akka.annotation.InternalApi +import akka.cluster.ddata.Key.KeyId +import akka.event.LoggingAdapter + +/** + * INTERNAL API + * + * This class is not thread-safe. It is supposed to be used from an actor. + */ +@InternalApi private[akka] class PayloadSizeAggregator( + log: LoggingAdapter, + logSizeExceeding: Int, + warnSizeExceeding: Int) { + private var maxPayloadBytes: Map[String, Int] = Map.empty + + def updatePayloadSize(key: KeyId, size: Int): Unit = { + if (size >= logSizeExceeding) { + // 10% threshold until next log + def newMax = (size * 1.1).toInt + + def logSize(): Unit = { + if (size >= warnSizeExceeding) + log.warning( + "Distributed data size for [{}] is [{}] bytes. Close to max remote message payload size.", + key, + size) + else + log.info("Distributed data size for [{}] is [{}] bytes.", key, size) + } + + maxPayloadBytes.get(key) match { + case Some(max) => + if (size > max) { + maxPayloadBytes = maxPayloadBytes.updated(key, newMax) + logSize() + } + case None => + maxPayloadBytes = maxPayloadBytes.updated(key, newMax) + logSize() + } + } + } + + def remove(key: KeyId): Unit = + maxPayloadBytes -= key + +} diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 8cbfa04501..6a2db0acb7 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -54,6 +54,7 @@ import akka.cluster.ddata.Key.KeyId import akka.cluster.ddata.Key.KeyR import akka.dispatch.Dispatchers import akka.event.Logging +import akka.remote.RARP import akka.serialization.SerializationExtension import akka.util.ByteString import akka.util.Helpers.toRootLowerCase @@ -82,6 +83,11 @@ object ReplicatorSettings { case _ => config.getDuration("pruning-interval", MILLISECONDS).millis } + val logDataSizeExceeding: Option[Int] = { + if (toRootLowerCase(config.getString("log-data-size-exceeding")) == "off") None + else Some(config.getBytes("log-data-size-exceeding").toInt) + } + import akka.util.ccompat.JavaConverters._ new ReplicatorSettings( roles = roleOption(config.getString("role")).toSet, @@ -97,7 +103,8 @@ object ReplicatorSettings { durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis, deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"), maxDeltaSize = config.getInt("delta-crdt.max-delta-size"), - preferOldest = config.getBoolean("prefer-oldest")) + preferOldest = config.getBoolean("prefer-oldest"), + logDataSizeExceeding = logDataSizeExceeding) } /** @@ -142,6 +149,7 @@ object ReplicatorSettings { * `*` at the end of a key. All entries can be made durable by including "*" * in the `Set`. * @param preferOldest Update and Get operations are sent to oldest nodes first. + * @param logDataSizeExceeding Log data size. */ final class ReplicatorSettings( val roles: Set[String], @@ -157,9 +165,45 @@ final class ReplicatorSettings( val durablePruningMarkerTimeToLive: FiniteDuration, val deltaCrdtEnabled: Boolean, val maxDeltaSize: Int, - val preferOldest: Boolean) { + val preferOldest: Boolean, + val logDataSizeExceeding: Option[Int]) { // for backwards compatibility + @deprecated("use full constructor", "2.6.11") + def this( + roles: Set[String], + gossipInterval: FiniteDuration, + notifySubscribersInterval: FiniteDuration, + maxDeltaElements: Int, + dispatcher: String, + pruningInterval: FiniteDuration, + maxPruningDissemination: FiniteDuration, + durableStoreProps: Either[(String, Config), Props], + durableKeys: Set[KeyId], + pruningMarkerTimeToLive: FiniteDuration, + durablePruningMarkerTimeToLive: FiniteDuration, + deltaCrdtEnabled: Boolean, + maxDeltaSize: Int, + preferOldest: Boolean) = + this( + roles, + gossipInterval, + notifySubscribersInterval, + maxDeltaElements, + dispatcher, + pruningInterval, + maxPruningDissemination, + durableStoreProps, + durableKeys, + pruningMarkerTimeToLive, + durablePruningMarkerTimeToLive, + deltaCrdtEnabled, + maxDeltaSize, + preferOldest, + logDataSizeExceeding = Some(10 * 1024)) + + // for backwards compatibility + @deprecated("use full constructor", "2.6.11") def this( roles: Set[String], gossipInterval: FiniteDuration, @@ -191,6 +235,7 @@ final class ReplicatorSettings( preferOldest = false) // for backwards compatibility + @deprecated("use full constructor", "2.6.11") def this( role: Option[String], gossipInterval: FiniteDuration, @@ -221,6 +266,7 @@ final class ReplicatorSettings( maxDeltaSize) // For backwards compatibility + @deprecated("use full constructor", "2.6.11") def this( role: Option[String], gossipInterval: FiniteDuration, @@ -245,6 +291,7 @@ final class ReplicatorSettings( 200) // For backwards compatibility + @deprecated("use full constructor", "2.6.11") def this( role: Option[String], gossipInterval: FiniteDuration, @@ -271,6 +318,7 @@ final class ReplicatorSettings( 200) // For backwards compatibility + @deprecated("use full constructor", "2.6.11") def this( role: Option[String], gossipInterval: FiniteDuration, @@ -367,6 +415,9 @@ final class ReplicatorSettings( def withPreferOldest(preferOldest: Boolean): ReplicatorSettings = copy(preferOldest = preferOldest) + def withLogDataSizeExceeding(logDataSizeExceeding: Int): ReplicatorSettings = + copy(logDataSizeExceeding = Some(logDataSizeExceeding)) + private def copy( roles: Set[String] = roles, gossipInterval: FiniteDuration = gossipInterval, @@ -381,7 +432,8 @@ final class ReplicatorSettings( durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive, deltaCrdtEnabled: Boolean = deltaCrdtEnabled, maxDeltaSize: Int = maxDeltaSize, - preferOldest: Boolean = preferOldest): ReplicatorSettings = + preferOldest: Boolean = preferOldest, + logDataSizeExceeding: Option[Int] = logDataSizeExceeding): ReplicatorSettings = new ReplicatorSettings( roles, gossipInterval, @@ -396,7 +448,8 @@ final class ReplicatorSettings( durablePruningMarkerTimeToLive, deltaCrdtEnabled, maxDeltaSize, - preferOldest) + preferOldest, + logDataSizeExceeding) } object Replicator { @@ -1311,6 +1364,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog roles.subsetOf(cluster.selfRoles), s"This cluster member [${selfAddress}] doesn't have all the roles [${roles.mkString(", ")}]") + private val payloadSizeAggregator = settings.logDataSizeExceeding.map { sizeExceeding => + val remoteProvider = RARP(context.system).provider + val remoteSettings = remoteProvider.remoteSettings + val maxFrameSize = + if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Advanced.MaximumFrameSize + else context.system.settings.config.getBytes("akka.remote.classic.netty.tcp.maximum-frame-size").toInt + new PayloadSizeAggregator(log, sizeExceeding, maxFrameSize * 3 / 4) + } + //Start periodic gossip to random nodes in cluster import context.dispatcher val gossipTask = context.system.scheduler.scheduleWithFixedDelay(gossipInterval, gossipInterval, self, GossipTick) @@ -1823,6 +1885,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog replyTo ! DataDeleted(key, req) case _ => setData(key.id, DeletedEnvelope) + payloadSizeAggregator.foreach(_.remove(key.id)) val durable = isDurable(key.id) if (isLocalUpdate(consistency)) { if (durable) @@ -1874,7 +1937,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val dig = if (subscribers.contains(key) && !changed.contains(key)) { val oldDigest = getDigest(key) - val dig = digest(newEnvelope) + val (dig, payloadSize) = digest(newEnvelope) + payloadSizeAggregator.foreach(_.updatePayloadSize(key, payloadSize)) if (dig != oldDigest) changed += key // notify subscribers, later dig @@ -1890,7 +1954,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def getDigest(key: KeyId): Digest = { dataEntries.get(key) match { case Some((envelope, LazyDigest)) => - val d = digest(envelope) + val (d, size) = digest(envelope) + payloadSizeAggregator.foreach(_.updatePayloadSize(key, size)) dataEntries = dataEntries.updated(key, (envelope, d)) d case Some((_, digest)) => digest @@ -1898,11 +1963,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } - def digest(envelope: DataEnvelope): Digest = - if (envelope.data == DeletedData) DeletedDigest + /** + * @return SHA-1 digest of the serialized data, and the size of the serialized data + */ + def digest(envelope: DataEnvelope): (Digest, Int) = + if (envelope.data == DeletedData) (DeletedDigest, 0) else { val bytes = serializer.toBinary(envelope.withoutDeltaVersions) - ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes)) + val dig = ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes)) + (dig, bytes.length) } def getData(key: KeyId): Option[DataEnvelope] = dataEntries.get(key).map { case (envelope, _) => envelope }