Merge pull request #29691 from akka/wip-29683-log-size-patriknw

log-frame-size-exceeding for Artery, #29683
This commit is contained in:
Patrik Nordwall 2020-10-09 10:26:23 +02:00 committed by GitHub
commit 73a9bbb264
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 157 additions and 8 deletions

View file

@ -10,6 +10,7 @@ import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.InternalActorRef
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.WrappedMessage
import akka.japi.Util.immutableSeq
/**
@ -184,6 +185,6 @@ final case class Broadcast(message: Any) extends RouterEnvelope
* Only the contained message will be forwarded to the
* destination, i.e. the envelope will be stripped off.
*/
trait RouterEnvelope {
trait RouterEnvelope extends WrappedMessage {
def message: Any
}

View file

@ -28,6 +28,7 @@ object NodeChurnMultiJvmSpec extends MultiNodeConfig {
akka.cluster.testkit.auto-down-unreachable-after = 1s
akka.cluster.prune-gossip-tombstones-after = 1s
akka.remote.classic.log-frame-size-exceeding = 1200b
akka.remote.artery.log-frame-size-exceeding = 1200b
akka.remote.artery.advanced.aeron {
idle-cpu-level = 1
embedded-media-driver = off

View file

@ -35,6 +35,7 @@ object PerformanceSpec extends MultiNodeConfig {
akka.log-dead-letters-during-shutdown = off
akka.remote.classic.log-remote-lifecycle-events = ERROR
akka.remote.classic.log-frame-size-exceeding=1000b
akka.remote.artery.log-frame-size-exceeding=1000b
akka.testconductor.barrier-timeout = 60 s
akka.cluster.distributed-data.gossip-interval = 1 s

View file

@ -695,6 +695,22 @@ This means that all messages sent to the following actors will pass through the
Messages destined for actors not matching any of these patterns are sent using the default channel as before.
To notice large messages you can enable logging of message types with payload size in bytes larger than the
configured `log-frame-size-exceeding`.
```
akka.remote.artery {
log-frame-size-exceeding = 10000b
}
```
Example log messages:
```
[INFO] Payload size for [java.lang.String] is [39068] bytes. Sent to Actor[akka://Sys@localhost:53039/user/destination#-1908386800]
[INFO] New maximum payload size for [java.lang.String] is [44068] bytes. Sent to Actor[akka://Sys@localhost:53039/user/destination#-1908386800].
```
The large messages channel can still not be used for extremely large messages, a few MB per message at most.
An alternative is to use the @ref:[Reliable delivery](typed/reliable-delivery.md) that has support for
automatically @ref[splitting up large messages](typed/reliable-delivery.md#chunk-large-messages) and assemble

View file

@ -397,6 +397,14 @@ akka.remote.artery {
}
```
Logging of message types with payload size in bytes larger than the configured `log-frame-size-exceeding`.
```
akka.remote.artery {
log-frame-size-exceeding = 10000b
}
```
### MDC values from Akka internal logging
Since the logging is done asynchronously, the thread in which the logging was performed is captured in

View file

@ -830,6 +830,14 @@ akka {
# if off then they are not logged
log-sent-messages = off
# Logging of message types with payload size in bytes larger than
# this value. Maximum detected size per message type is logged once,
# with an increase threshold of 10%.
# By default this feature is turned off. Activate it by setting the property to
# a value in bytes, such as 1000b. Note that for all messages larger than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = off
advanced {
# Maximum serialized message size, including header data.

View file

@ -73,6 +73,11 @@ private[akka] final class ArterySettings private (config: Config) {
val LogReceive: Boolean = getBoolean("log-received-messages")
val LogSend: Boolean = getBoolean("log-sent-messages")
val LogFrameSizeExceeding: Option[Int] = {
if (toRootLowerCase(getString("log-frame-size-exceeding")) == "off") None
else Some(getBytes("log-frame-size-exceeding").toInt)
}
val Transport: Transport = toRootLowerCase(getString("transport")) match {
case AeronUpd.configName => AeronUpd
case Tcp.configName => Tcp

View file

@ -5,16 +5,23 @@
package akka.remote.artery
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.actor.ActorSystem
import akka.actor.WrappedMessage
import akka.actor.ActorRef
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.event.{ Logging, LoggingAdapter }
import akka.util.{ unused, OptionVal }
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.remote.RemoteActorRefProvider
import akka.util.ccompat._
import akka.util.OptionVal
import akka.util.unused
/**
* INTERNAL API
@ -78,6 +85,71 @@ abstract class RemoteInstrument {
def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class LoggingRemoteInstrument(system: ActorSystem) extends RemoteInstrument {
private val settings = system
.asInstanceOf[ExtendedActorSystem]
.provider
.asInstanceOf[RemoteActorRefProvider]
.transport
.asInstanceOf[ArteryTransport]
.settings
private val logFrameSizeExceeding = settings.LogFrameSizeExceeding.get
private val log = Logging(system, this.getClass)
private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap
override def identifier: Byte = 1 // Cinnamon is using 0
override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit =
()
override def remoteMessageSent(
recipient: ActorRef,
message: Object,
sender: ActorRef,
size: Int,
time: Long): Unit = {
if (size >= logFrameSizeExceeding) {
val clazz = message match {
case x: WrappedMessage => x.message.getClass
case _ => message.getClass
}
// 10% threshold until next log
def newMax = (size * 1.1).toInt
@tailrec def check(): Unit = {
val max = maxPayloadBytes.get(clazz)
if (max eq null) {
if (maxPayloadBytes.putIfAbsent(clazz, newMax) eq null)
log.info("Payload size for [{}] is [{}] bytes. Sent to {}", clazz.getName, size, recipient)
else check()
} else if (size > max) {
if (maxPayloadBytes.replace(clazz, max, newMax))
log.info("New maximum payload size for [{}] is [{}] bytes. Sent to {}.", clazz.getName, size, recipient)
else check()
}
}
check()
}
}
override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit =
()
override def remoteMessageReceived(
recipient: ActorRef,
message: Object,
sender: ActorRef,
size: Int,
time: Long): Unit = ()
}
/**
* INTERNAL API
*
@ -313,7 +385,7 @@ private[remote] final class RemoteInstruments(
def isEmpty: Boolean = instruments.isEmpty
def nonEmpty: Boolean = instruments.nonEmpty
def timeSerialization = serializationTimingEnabled
def timeSerialization: Boolean = serializationTimingEnabled
}
/** INTERNAL API */
@ -334,7 +406,8 @@ private[remote] object RemoteInstruments {
val c = system.settings.config
val path = "akka.remote.artery.advanced.instruments"
import akka.util.ccompat.JavaConverters._
c.getStringList(path)
val configuredInstruments = c
.getStringList(path)
.asScala
.iterator
.map { fqcn =>
@ -344,6 +417,19 @@ private[remote] object RemoteInstruments {
.createInstanceFor[RemoteInstrument](fqcn, List(classOf[ExtendedActorSystem] -> system)))
.get
}
.to(immutable.Vector)
.toVector
system.provider match {
case rarp: RemoteActorRefProvider =>
rarp.transport match {
case artery: ArteryTransport =>
artery.settings.LogFrameSizeExceeding match {
case Some(_) => configuredInstruments :+ new LoggingRemoteInstrument(system)
case None => configuredInstruments
}
case _ => configuredInstruments
}
case _ => configuredInstruments
}
}
}

View file

@ -9,6 +9,8 @@ import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.{ Actor, ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, Props, RootActorPath }
import akka.serialization.jackson.CborSerializable
import akka.testkit.EventFilter
import akka.testkit.TestActors
import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit }
object MessageLoggingSpec {
@ -20,6 +22,7 @@ object MessageLoggingSpec {
classic {
log-received-messages = on
log-sent-messages = on
log-frame-size-exceeding = 10000b
netty.tcp {
hostname = localhost
port = 0
@ -34,6 +37,7 @@ object MessageLoggingSpec {
canonical.port = 0
log-received-messages = on
log-sent-messages = on
log-frame-size-exceeding = 10000b
}
}
""".stripMargin)
@ -68,6 +72,25 @@ abstract class MessageLoggingSpec(config: Config) extends AkkaSpec(config) with
ref ! "hello"
expectMsgType[BadMsg]
}
"log increasing message sizes" in {
remoteSystem.actorOf(TestActors.blackholeProps, "destination")
system.actorSelection(RootActorPath(remoteAddress) / "user" / "destination") ! Identify("lookup")
val ref = expectMsgType[ActorIdentity].ref.get
EventFilter.info(pattern = s"Payload size for *", occurrences = 1).intercept {
ref ! (1 to 10000).mkString("")
}
EventFilter.info(pattern = s"New maximum payload size *", occurrences = 1).intercept {
ref ! (1 to 11000).mkString("")
}
EventFilter.info(pattern = s"New maximum payload size *", occurrences = 0).intercept {
ref ! (1 to 11100).mkString("")
}
EventFilter.info(pattern = s"New maximum payload size *", occurrences = 1).intercept {
ref ! (1 to 13000).mkString("")
}
}
}
override protected def afterTermination(): Unit = {