diff --git a/akka-docs/rst/java/logging.rst b/akka-docs/rst/java/logging.rst index 4483dc5036..687ac7f53f 100644 --- a/akka-docs/rst/java/logging.rst +++ b/akka-docs/rst/java/logging.rst @@ -157,6 +157,23 @@ If you want to see all messages that are received through remoting at DEBUG log } } +If you want to see message types with payload size in bytes larger than +a specified limit at INFO log level: + +.. code-block:: ruby + + akka { + remote { + # Logging of message types with payload size in bytes larger than + # this value. Maximum detected size per message type is logged once, + # with an increase threshold of 10%. + # By default this feature is turned off. Activate it by setting the property to + # a value in bytes, such as 1000b. Note that for all messages larger than this + # limit there will be extra performance and scalability cost. + log-frame-size-exceeding = 1000b + } + } + Also see the logging options for TestKit: :ref:`actor.logging-java`. Turn Off Logging diff --git a/akka-docs/rst/scala/logging.rst b/akka-docs/rst/scala/logging.rst index 174a53ed7a..1d204b4cc8 100644 --- a/akka-docs/rst/scala/logging.rst +++ b/akka-docs/rst/scala/logging.rst @@ -173,6 +173,23 @@ If you want to see all messages that are received through remoting at DEBUG log } } +If you want to see message types with payload size in bytes larger than +a specified limit at INFO log level: + +.. code-block:: ruby + + akka { + remote { + # Logging of message types with payload size in bytes larger than + # this value. Maximum detected size per message type is logged once, + # with an increase threshold of 10%. + # By default this feature is turned off. Activate it by setting the property to + # a value in bytes, such as 1000b. Note that for all messages larger than this + # limit there will be extra performance and scalability cost. + log-frame-size-exceeding = 1000b + } + } + Also see the logging options for TestKit: :ref:`actor.logging-scala`. Translating Log Source to String and Class diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 71eaedb118..ca8da650c5 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -124,6 +124,14 @@ akka { # received messages also fall under this flag. log-remote-lifecycle-events = on + # Logging of message types with payload size in bytes larger than + # this value. Maximum detected size per message type is logged once, + # with an increase threshold of 10%. + # By default this feature is turned off. Activate it by setting the property to + # a value in bytes, such as 1000b. Note that for all messages larger than this + # limit there will be extra performance and scalability cost. + log-frame-size-exceeding = off + ### Failure detection and recovery # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index dc197ac999..2138c8ca07 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -398,6 +398,7 @@ private[remote] class EndpointWriter( import context.dispatcher val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem] + val remoteMetrics = RemoteMetricsExtension(context.system) var reader: Option[ActorRef] = None var handle: Option[AkkaProtocolHandle] = handleOrActive // FIXME: refactor into state data @@ -495,7 +496,10 @@ private[remote] class EndpointWriter( seqOption = seqOption, ackOption = lastAck) - if (pdu.size > transport.maximumPayloadBytes) { + val pduSize = pdu.size + remoteMetrics.logPayloadBytes(msg, pduSize) + + if (pduSize > transport.maximumPayloadBytes) { publishAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes.")) } else if (h.write(pdu)) { stay() diff --git a/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala new file mode 100644 index 0000000000..cbdb96da35 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import java.util.concurrent.ConcurrentHashMap +import scala.annotation.tailrec +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.SelectChildName +import akka.event.Logging +import akka.routing.RouterEnvelope + +/** + * INTERNAL API + * Extension that keeps track of remote metrics, such + * as max size of different message types. + */ +private[akka] object RemoteMetricsExtension extends ExtensionId[RemoteMetrics] with ExtensionIdProvider { + override def get(system: ActorSystem): RemoteMetrics = super.get(system) + + override def lookup = RemoteMetricsExtension + + override def createExtension(system: ExtendedActorSystem): RemoteMetrics = + if (system.settings.config.getString("akka.remote.log-frame-size-exceeding").toLowerCase == "off") + new RemoteMetricsOff + else + new RemoteMetricsOn(system) +} + +/** + * INTERNAL API + */ +private[akka] trait RemoteMetrics extends Extension { + /** + * Logging of the size of different message types. + * Maximum detected size per message type is logged once, with + * and increase threshold of 10%. + */ + def logPayloadBytes(msg: Any, payloadBytes: Int): Unit +} + +/** + * INTERNAL API + */ +private[akka] class RemoteMetricsOff extends RemoteMetrics { + override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit = () +} + +/** + * INTERNAL API + */ +private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteMetrics { + + private val logFrameSizeExceeding: Int = system.settings.config.getBytes( + "akka.remote.log-frame-size-exceeding").toInt + private val log = Logging(system, this.getClass) + private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap + + override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit = + if (payloadBytes >= logFrameSizeExceeding) { + val clazz = msg match { + case x: SelectChildName ⇒ x.wrappedMessage.getClass + case x: RouterEnvelope ⇒ x.message.getClass + case _ ⇒ msg.getClass + } + + // 10% threshold until next log + def newMax = (payloadBytes * 1.1).toInt + + @tailrec def check(): Unit = { + val max = maxPayloadBytes.get(clazz) + if (max eq null) { + if (maxPayloadBytes.putIfAbsent(clazz, newMax) eq null) + log.info("Payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes) + else check() + } else if (payloadBytes > max) { + if (maxPayloadBytes.replace(clazz, max, newMax)) + log.info("New maximum payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes) + else check() + } + } + check() + } +} + diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 11d466e086..ab41562018 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -59,6 +59,7 @@ class RemoteConfigSpec extends AkkaSpec( Duration(WatchFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(4 seconds) Duration(WatchFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) + remoteSettings.config.getString("akka.remote.log-frame-size-exceeding") must be("off") } "be able to parse AkkaProtocol related config elements" in { @@ -77,6 +78,12 @@ class RemoteConfigSpec extends AkkaSpec( } + "contain correct netty.tcp values in reference.conf" in { + val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp") + + c.getBytes("maximum-frame-size") must be(128000) + } + "contain correct socket worker pool configuration values in reference.conf" in { val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp")