diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index dee44bd86c..4db13bb264 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -59,6 +59,12 @@ akka { # Reuse inbound connections for outbound messages use-passive-connections = on + # If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not logged + log-received-messages = off + + # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged + log-sent-messages = off + # accrual failure detection config failure-detector { diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index c5535833aa..04b1bf83a4 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -239,7 +239,8 @@ trait RemoteMarshallingOps { } def receiveMessage(remoteMessage: RemoteMessage) { - log.debug("received message {}", remoteMessage) + if (remote.remoteSettings.LogReceivedMessages) + log.debug("received message {}", remoteMessage) val remoteDaemon = remote.remoteDaemon diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index b861026e9d..8a9a55a679 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -15,22 +15,24 @@ class RemoteSettings(val config: Config, val systemName: String) { import config._ - val RemoteTransport = getString("akka.remote.transport") - val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") - val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") - val ShouldCompressData = getBoolean("akka.remote.use-compression") - val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) - val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) - val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) + final val RemoteTransport = getString("akka.remote.transport") + final val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") + final val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") + final val ShouldCompressData = getBoolean("akka.remote.use-compression") + final val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) + final val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) + final val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) + final val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) + final val LogReceivedMessages = getBoolean("akka.remote.log-received-messages") + final val LogSentMessages = getBoolean("akka.remote.log-sent-messages") // TODO cluster config will go into akka-cluster/reference.conf when we enable that module - val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { + final val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { case RemoteAddressExtractor(addr) ⇒ addr.transport } - val serverSettings = new RemoteServerSettings - val clientSettings = new RemoteClientSettings + final val serverSettings = new RemoteServerSettings + final val clientSettings = new RemoteClientSettings class RemoteClientSettings { val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { @@ -38,35 +40,35 @@ class RemoteSettings(val config: Config, val systemName: String) { case cookie ⇒ Some(cookie) } - val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS) - val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) - val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) - val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) - val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt + final val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS) + final val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) + final val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) + final val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) + final val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt } class RemoteServerSettings { import scala.collection.JavaConverters._ - val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt - val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { + final val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt + final val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { case "" ⇒ None case cookie ⇒ Some(cookie) } - val RequireCookie = { + final val RequireCookie = { val requireCookie = getBoolean("akka.remote.server.require-cookie") if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") requireCookie } - val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections") + final val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections") - val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode") - val Hostname = getString("akka.remote.server.hostname") match { + final val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode") + final val Hostname = getString("akka.remote.server.hostname") match { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } - val Port = getInt("akka.remote.server.port") match { + final val Port = getInt("akka.remote.server.port") match { case 0 ⇒ try { val s = new java.net.ServerSocket(0) try s.getLocalPort finally s.close() @@ -74,26 +76,26 @@ class RemoteSettings(val config: Config, val systemName: String) { case other ⇒ other } - val Backlog = getInt("akka.remote.server.backlog") + final val Backlog = getInt("akka.remote.server.backlog") - val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) + final val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) - val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match { + final val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match { case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") case sz ⇒ sz } - val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match { + final val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match { case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes") case sz ⇒ sz } - val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match { + final val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match { case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes") case sz ⇒ sz } // TODO handle the system name right and move this to config file syntax - val URI = "akka://sys@" + Hostname + ":" + Port + final val URI = "akka://sys@" + Hostname + ":" + Port } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 0034bf25f9..b0f24592de 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -60,7 +60,9 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + if (remoteSupport.remote.remoteSettings.LogSentMessages) + log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 13adbf31e8..32db14194a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -26,6 +26,9 @@ class RemoteConfigSpec extends AkkaSpec("") { getBoolean("akka.remote.server.untrusted-mode") must equal(false) getInt("akka.remote.server.backlog") must equal(4096) + getBoolean("akka.remote.log-received-messages") must equal(false) + getBoolean("akka.remote.log-sent-messages") must equal(false) + getMilliseconds("akka.remote.server.execution-pool-keepalive") must equal(60 * 1000) getInt("akka.remote.server.execution-pool-size") must equal(4)