diff --git a/akka-actor/src/main/scala/akka/routing/Router.scala b/akka-actor/src/main/scala/akka/routing/Router.scala index 1702cd5fcd..b2006e7bdb 100644 --- a/akka-actor/src/main/scala/akka/routing/Router.scala +++ b/akka-actor/src/main/scala/akka/routing/Router.scala @@ -10,6 +10,7 @@ import akka.actor.ActorRef import akka.actor.ActorSelection import akka.actor.InternalActorRef import akka.actor.NoSerializationVerificationNeeded +import akka.actor.WrappedMessage import akka.japi.Util.immutableSeq /** @@ -184,6 +185,6 @@ final case class Broadcast(message: Any) extends RouterEnvelope * Only the contained message will be forwarded to the * destination, i.e. the envelope will be stripped off. */ -trait RouterEnvelope { +trait RouterEnvelope extends WrappedMessage { def message: Any } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala index 941e6a528e..8cdb341d8c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala @@ -28,6 +28,7 @@ object NodeChurnMultiJvmSpec extends MultiNodeConfig { akka.cluster.testkit.auto-down-unreachable-after = 1s akka.cluster.prune-gossip-tombstones-after = 1s akka.remote.classic.log-frame-size-exceeding = 1200b + akka.remote.artery.log-frame-size-exceeding = 1200b akka.remote.artery.advanced.aeron { idle-cpu-level = 1 embedded-media-driver = off diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala index 1337b699a6..2014ac845f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala @@ -35,6 +35,7 @@ object PerformanceSpec extends MultiNodeConfig { akka.log-dead-letters-during-shutdown = off akka.remote.classic.log-remote-lifecycle-events = ERROR akka.remote.classic.log-frame-size-exceeding=1000b + akka.remote.artery.log-frame-size-exceeding=1000b akka.testconductor.barrier-timeout = 60 s akka.cluster.distributed-data.gossip-interval = 1 s diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index b3893dcc1d..c098cbc6e5 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -695,6 +695,22 @@ This means that all messages sent to the following actors will pass through the Messages destined for actors not matching any of these patterns are sent using the default channel as before. +To notice large messages you can enable logging of message types with payload size in bytes larger than the +configured `log-frame-size-exceeding`. + +``` +akka.remote.artery { + log-frame-size-exceeding = 10000b +} +``` + +Example log messages: + +``` +[INFO] Payload size for [java.lang.String] is [39068] bytes. Sent to Actor[akka://Sys@localhost:53039/user/destination#-1908386800] +[INFO] New maximum payload size for [java.lang.String] is [44068] bytes. Sent to Actor[akka://Sys@localhost:53039/user/destination#-1908386800]. +``` + The large messages channel can still not be used for extremely large messages, a few MB per message at most. An alternative is to use the @ref:[Reliable delivery](typed/reliable-delivery.md) that has support for automatically @ref[splitting up large messages](typed/reliable-delivery.md#chunk-large-messages) and assemble diff --git a/akka-docs/src/main/paradox/typed/logging.md b/akka-docs/src/main/paradox/typed/logging.md index 1bdf8732be..7434af85eb 100644 --- a/akka-docs/src/main/paradox/typed/logging.md +++ b/akka-docs/src/main/paradox/typed/logging.md @@ -397,6 +397,14 @@ akka.remote.artery { } ``` +Logging of message types with payload size in bytes larger than the configured `log-frame-size-exceeding`. + +``` +akka.remote.artery { + log-frame-size-exceeding = 10000b +} +``` + ### MDC values from Akka internal logging Since the logging is done asynchronously, the thread in which the logging was performed is captured in diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 1a7e82e7b6..310781fae6 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -830,6 +830,14 @@ akka { # if off then they are not logged log-sent-messages = off + # Logging of message types with payload size in bytes larger than + # this value. Maximum detected size per message type is logged once, + # with an increase threshold of 10%. + # By default this feature is turned off. Activate it by setting the property to + # a value in bytes, such as 1000b. Note that for all messages larger than this + # limit there will be extra performance and scalability cost. + log-frame-size-exceeding = off + advanced { # Maximum serialized message size, including header data. diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index ed183844b8..7e3b269771 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -73,6 +73,11 @@ private[akka] final class ArterySettings private (config: Config) { val LogReceive: Boolean = getBoolean("log-received-messages") val LogSend: Boolean = getBoolean("log-sent-messages") + val LogFrameSizeExceeding: Option[Int] = { + if (toRootLowerCase(getString("log-frame-size-exceeding")) == "off") None + else Some(getBytes("log-frame-size-exceeding").toInt) + } + val Transport: Transport = toRootLowerCase(getString("transport")) match { case AeronUpd.configName => AeronUpd case Tcp.configName => Tcp diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala index ed31509434..fa2403a17a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala @@ -5,16 +5,23 @@ package akka.remote.artery import java.nio.ByteBuffer +import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec -import scala.collection.immutable import scala.util.control.NonFatal -import akka.actor.{ ActorRef, ExtendedActorSystem } +import akka.actor.ActorSystem +import akka.actor.WrappedMessage +import akka.actor.ActorRef +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi import akka.annotation.InternalStableApi -import akka.event.{ Logging, LoggingAdapter } -import akka.util.{ unused, OptionVal } +import akka.event.Logging +import akka.event.LoggingAdapter +import akka.remote.RemoteActorRefProvider import akka.util.ccompat._ +import akka.util.OptionVal +import akka.util.unused /** * INTERNAL API @@ -78,6 +85,71 @@ abstract class RemoteInstrument { def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit } +/** + * INTERNAL API + */ +@InternalApi private[akka] class LoggingRemoteInstrument(system: ActorSystem) extends RemoteInstrument { + + private val settings = system + .asInstanceOf[ExtendedActorSystem] + .provider + .asInstanceOf[RemoteActorRefProvider] + .transport + .asInstanceOf[ArteryTransport] + .settings + private val logFrameSizeExceeding = settings.LogFrameSizeExceeding.get + + private val log = Logging(system, this.getClass) + + private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap + + override def identifier: Byte = 1 // Cinnamon is using 0 + + override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = + () + + override def remoteMessageSent( + recipient: ActorRef, + message: Object, + sender: ActorRef, + size: Int, + time: Long): Unit = { + if (size >= logFrameSizeExceeding) { + val clazz = message match { + case x: WrappedMessage => x.message.getClass + case _ => message.getClass + } + + // 10% threshold until next log + def newMax = (size * 1.1).toInt + + @tailrec def check(): Unit = { + val max = maxPayloadBytes.get(clazz) + if (max eq null) { + if (maxPayloadBytes.putIfAbsent(clazz, newMax) eq null) + log.info("Payload size for [{}] is [{}] bytes. Sent to {}", clazz.getName, size, recipient) + else check() + } else if (size > max) { + if (maxPayloadBytes.replace(clazz, max, newMax)) + log.info("New maximum payload size for [{}] is [{}] bytes. Sent to {}.", clazz.getName, size, recipient) + else check() + } + } + check() + } + } + + override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = + () + + override def remoteMessageReceived( + recipient: ActorRef, + message: Object, + sender: ActorRef, + size: Int, + time: Long): Unit = () +} + /** * INTERNAL API * @@ -313,7 +385,7 @@ private[remote] final class RemoteInstruments( def isEmpty: Boolean = instruments.isEmpty def nonEmpty: Boolean = instruments.nonEmpty - def timeSerialization = serializationTimingEnabled + def timeSerialization: Boolean = serializationTimingEnabled } /** INTERNAL API */ @@ -334,7 +406,8 @@ private[remote] object RemoteInstruments { val c = system.settings.config val path = "akka.remote.artery.advanced.instruments" import akka.util.ccompat.JavaConverters._ - c.getStringList(path) + val configuredInstruments = c + .getStringList(path) .asScala .iterator .map { fqcn => @@ -344,6 +417,19 @@ private[remote] object RemoteInstruments { .createInstanceFor[RemoteInstrument](fqcn, List(classOf[ExtendedActorSystem] -> system))) .get } - .to(immutable.Vector) + .toVector + + system.provider match { + case rarp: RemoteActorRefProvider => + rarp.transport match { + case artery: ArteryTransport => + artery.settings.LogFrameSizeExceeding match { + case Some(_) => configuredInstruments :+ new LoggingRemoteInstrument(system) + case None => configuredInstruments + } + case _ => configuredInstruments + } + case _ => configuredInstruments + } } } diff --git a/akka-remote/src/test/scala/akka/remote/MessageLoggingSpec.scala b/akka-remote/src/test/scala/akka/remote/MessageLoggingSpec.scala index 5b93726c89..3ec8d967fd 100644 --- a/akka-remote/src/test/scala/akka/remote/MessageLoggingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/MessageLoggingSpec.scala @@ -9,6 +9,8 @@ import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.{ Actor, ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, Props, RootActorPath } import akka.serialization.jackson.CborSerializable +import akka.testkit.EventFilter +import akka.testkit.TestActors import akka.testkit.{ AkkaSpec, ImplicitSender, TestKit } object MessageLoggingSpec { @@ -20,6 +22,7 @@ object MessageLoggingSpec { classic { log-received-messages = on log-sent-messages = on + log-frame-size-exceeding = 10000b netty.tcp { hostname = localhost port = 0 @@ -34,6 +37,7 @@ object MessageLoggingSpec { canonical.port = 0 log-received-messages = on log-sent-messages = on + log-frame-size-exceeding = 10000b } } """.stripMargin) @@ -68,6 +72,25 @@ abstract class MessageLoggingSpec(config: Config) extends AkkaSpec(config) with ref ! "hello" expectMsgType[BadMsg] } + + "log increasing message sizes" in { + remoteSystem.actorOf(TestActors.blackholeProps, "destination") + system.actorSelection(RootActorPath(remoteAddress) / "user" / "destination") ! Identify("lookup") + val ref = expectMsgType[ActorIdentity].ref.get + EventFilter.info(pattern = s"Payload size for *", occurrences = 1).intercept { + ref ! (1 to 10000).mkString("") + } + EventFilter.info(pattern = s"New maximum payload size *", occurrences = 1).intercept { + ref ! (1 to 11000).mkString("") + } + EventFilter.info(pattern = s"New maximum payload size *", occurrences = 0).intercept { + ref ! (1 to 11100).mkString("") + } + EventFilter.info(pattern = s"New maximum payload size *", occurrences = 1).intercept { + ref ! (1 to 13000).mkString("") + } + + } } override protected def afterTermination(): Unit = {