Logging of the size of different message types.

* Maximum detected size per message type is logged once
* Default configuration disables this feature
This commit is contained in:
Patrik Nordwall 2013-04-28 08:32:58 +02:00
parent 8112421ec6
commit a9cde60bee
6 changed files with 143 additions and 1 deletions

View file

@ -157,6 +157,23 @@ If you want to see all messages that are received through remoting at DEBUG log
} }
} }
If you want to see message types with payload size in bytes larger than
a specified limit at INFO log level:
.. code-block:: ruby
akka {
remote {
# Logging of message types with payload size in bytes larger than
# this value. Maximum detected size per message type is logged once,
# with an increase threshold of 10%.
# By default this feature is turned off. Activate it by setting the property to
# a value in bytes, such as 1000b. Note that for all messages larger than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = 1000b
}
}
Also see the logging options for TestKit: :ref:`actor.logging-java`. Also see the logging options for TestKit: :ref:`actor.logging-java`.
Turn Off Logging Turn Off Logging

View file

@ -173,6 +173,23 @@ If you want to see all messages that are received through remoting at DEBUG log
} }
} }
If you want to see message types with payload size in bytes larger than
a specified limit at INFO log level:
.. code-block:: ruby
akka {
remote {
# Logging of message types with payload size in bytes larger than
# this value. Maximum detected size per message type is logged once,
# with an increase threshold of 10%.
# By default this feature is turned off. Activate it by setting the property to
# a value in bytes, such as 1000b. Note that for all messages larger than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = 1000b
}
}
Also see the logging options for TestKit: :ref:`actor.logging-scala`. Also see the logging options for TestKit: :ref:`actor.logging-scala`.
Translating Log Source to String and Class Translating Log Source to String and Class

View file

@ -124,6 +124,14 @@ akka {
# received messages also fall under this flag. # received messages also fall under this flag.
log-remote-lifecycle-events = on log-remote-lifecycle-events = on
# Logging of message types with payload size in bytes larger than
# this value. Maximum detected size per message type is logged once,
# with an increase threshold of 10%.
# By default this feature is turned off. Activate it by setting the property to
# a value in bytes, such as 1000b. Note that for all messages larger than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = off
### Failure detection and recovery ### Failure detection and recovery
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf

View file

@ -398,6 +398,7 @@ private[remote] class EndpointWriter(
import context.dispatcher import context.dispatcher
val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem] val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem]
val remoteMetrics = RemoteMetricsExtension(context.system)
var reader: Option[ActorRef] = None var reader: Option[ActorRef] = None
var handle: Option[AkkaProtocolHandle] = handleOrActive // FIXME: refactor into state data var handle: Option[AkkaProtocolHandle] = handleOrActive // FIXME: refactor into state data
@ -495,7 +496,10 @@ private[remote] class EndpointWriter(
seqOption = seqOption, seqOption = seqOption,
ackOption = lastAck) ackOption = lastAck)
if (pdu.size > transport.maximumPayloadBytes) { val pduSize = pdu.size
remoteMetrics.logPayloadBytes(msg, pduSize)
if (pduSize > transport.maximumPayloadBytes) {
publishAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes.")) publishAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes."))
} else if (h.write(pdu)) { } else if (h.write(pdu)) {
stay() stay()

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.SelectChildName
import akka.event.Logging
import akka.routing.RouterEnvelope
/**
* INTERNAL API
* Extension that keeps track of remote metrics, such
* as max size of different message types.
*/
private[akka] object RemoteMetricsExtension extends ExtensionId[RemoteMetrics] with ExtensionIdProvider {
override def get(system: ActorSystem): RemoteMetrics = super.get(system)
override def lookup = RemoteMetricsExtension
override def createExtension(system: ExtendedActorSystem): RemoteMetrics =
if (system.settings.config.getString("akka.remote.log-frame-size-exceeding").toLowerCase == "off")
new RemoteMetricsOff
else
new RemoteMetricsOn(system)
}
/**
* INTERNAL API
*/
private[akka] trait RemoteMetrics extends Extension {
/**
* Logging of the size of different message types.
* Maximum detected size per message type is logged once, with
* and increase threshold of 10%.
*/
def logPayloadBytes(msg: Any, payloadBytes: Int): Unit
}
/**
* INTERNAL API
*/
private[akka] class RemoteMetricsOff extends RemoteMetrics {
override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit = ()
}
/**
* INTERNAL API
*/
private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteMetrics {
private val logFrameSizeExceeding: Int = system.settings.config.getBytes(
"akka.remote.log-frame-size-exceeding").toInt
private val log = Logging(system, this.getClass)
private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap
override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit =
if (payloadBytes >= logFrameSizeExceeding) {
val clazz = msg match {
case x: SelectChildName x.wrappedMessage.getClass
case x: RouterEnvelope x.message.getClass
case _ msg.getClass
}
// 10% threshold until next log
def newMax = (payloadBytes * 1.1).toInt
@tailrec def check(): Unit = {
val max = maxPayloadBytes.get(clazz)
if (max eq null) {
if (maxPayloadBytes.putIfAbsent(clazz, newMax) eq null)
log.info("Payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes)
else check()
} else if (payloadBytes > max) {
if (maxPayloadBytes.replace(clazz, max, newMax))
log.info("New maximum payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes)
else check()
}
}
check()
}
}

View file

@ -59,6 +59,7 @@ class RemoteConfigSpec extends AkkaSpec(
Duration(WatchFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(4 seconds) Duration(WatchFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) must be(4 seconds)
Duration(WatchFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis) Duration(WatchFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) must be(100 millis)
remoteSettings.config.getString("akka.remote.log-frame-size-exceeding") must be("off")
} }
"be able to parse AkkaProtocol related config elements" in { "be able to parse AkkaProtocol related config elements" in {
@ -77,6 +78,12 @@ class RemoteConfigSpec extends AkkaSpec(
} }
"contain correct netty.tcp values in reference.conf" in {
val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp")
c.getBytes("maximum-frame-size") must be(128000)
}
"contain correct socket worker pool configuration values in reference.conf" in { "contain correct socket worker pool configuration values in reference.conf" in {
val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp") val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.netty.tcp")