Adding config options, tests and conditional code for remote message logging
This commit is contained in:
parent
1333700c0d
commit
7e0cf29e8f
5 changed files with 45 additions and 31 deletions
|
|
@ -59,6 +59,12 @@ akka {
|
|||
# Reuse inbound connections for outbound messages
|
||||
use-passive-connections = on
|
||||
|
||||
# If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not logged
|
||||
log-received-messages = off
|
||||
|
||||
# If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
|
||||
log-sent-messages = off
|
||||
|
||||
# accrual failure detection config
|
||||
failure-detector {
|
||||
|
||||
|
|
|
|||
|
|
@ -239,7 +239,8 @@ trait RemoteMarshallingOps {
|
|||
}
|
||||
|
||||
def receiveMessage(remoteMessage: RemoteMessage) {
|
||||
log.debug("received message {}", remoteMessage)
|
||||
if (remote.remoteSettings.LogReceivedMessages)
|
||||
log.debug("received message {}", remoteMessage)
|
||||
|
||||
val remoteDaemon = remote.remoteDaemon
|
||||
|
||||
|
|
|
|||
|
|
@ -15,22 +15,24 @@ class RemoteSettings(val config: Config, val systemName: String) {
|
|||
|
||||
import config._
|
||||
|
||||
val RemoteTransport = getString("akka.remote.transport")
|
||||
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
|
||||
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
|
||||
val ShouldCompressData = getBoolean("akka.remote.use-compression")
|
||||
val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
|
||||
val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
|
||||
val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
|
||||
val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS)
|
||||
final val RemoteTransport = getString("akka.remote.transport")
|
||||
final val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
|
||||
final val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
|
||||
final val ShouldCompressData = getBoolean("akka.remote.use-compression")
|
||||
final val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
|
||||
final val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
|
||||
final val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
|
||||
final val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS)
|
||||
final val LogReceivedMessages = getBoolean("akka.remote.log-received-messages")
|
||||
final val LogSentMessages = getBoolean("akka.remote.log-sent-messages")
|
||||
|
||||
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
|
||||
val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
|
||||
final val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
|
||||
case RemoteAddressExtractor(addr) ⇒ addr.transport
|
||||
}
|
||||
|
||||
val serverSettings = new RemoteServerSettings
|
||||
val clientSettings = new RemoteClientSettings
|
||||
final val serverSettings = new RemoteServerSettings
|
||||
final val clientSettings = new RemoteClientSettings
|
||||
|
||||
class RemoteClientSettings {
|
||||
val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match {
|
||||
|
|
@ -38,35 +40,35 @@ class RemoteSettings(val config: Config, val systemName: String) {
|
|||
case cookie ⇒ Some(cookie)
|
||||
}
|
||||
|
||||
val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS)
|
||||
val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
|
||||
val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
|
||||
val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
|
||||
val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt
|
||||
final val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS)
|
||||
final val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
|
||||
final val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
|
||||
final val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
|
||||
final val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt
|
||||
}
|
||||
|
||||
class RemoteServerSettings {
|
||||
import scala.collection.JavaConverters._
|
||||
val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt
|
||||
val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match {
|
||||
final val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt
|
||||
final val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match {
|
||||
case "" ⇒ None
|
||||
case cookie ⇒ Some(cookie)
|
||||
}
|
||||
val RequireCookie = {
|
||||
final val RequireCookie = {
|
||||
val requireCookie = getBoolean("akka.remote.server.require-cookie")
|
||||
if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
|
||||
requireCookie
|
||||
}
|
||||
|
||||
val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections")
|
||||
final val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections")
|
||||
|
||||
val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode")
|
||||
val Hostname = getString("akka.remote.server.hostname") match {
|
||||
final val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode")
|
||||
final val Hostname = getString("akka.remote.server.hostname") match {
|
||||
case "" ⇒ InetAddress.getLocalHost.getHostAddress
|
||||
case value ⇒ value
|
||||
}
|
||||
val Port = getInt("akka.remote.server.port") match {
|
||||
final val Port = getInt("akka.remote.server.port") match {
|
||||
case 0 ⇒ try {
|
||||
val s = new java.net.ServerSocket(0)
|
||||
try s.getLocalPort finally s.close()
|
||||
|
|
@ -74,26 +76,26 @@ class RemoteSettings(val config: Config, val systemName: String) {
|
|||
case other ⇒ other
|
||||
}
|
||||
|
||||
val Backlog = getInt("akka.remote.server.backlog")
|
||||
final val Backlog = getInt("akka.remote.server.backlog")
|
||||
|
||||
val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS)
|
||||
final val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS)
|
||||
|
||||
val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match {
|
||||
final val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match {
|
||||
case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match {
|
||||
final val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match {
|
||||
final val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
// TODO handle the system name right and move this to config file syntax
|
||||
val URI = "akka://sys@" + Hostname + ":" + Port
|
||||
final val URI = "akka://sys@" + Hostname + ":" + Port
|
||||
}
|
||||
}
|
||||
|
|
@ -60,7 +60,9 @@ abstract class RemoteClient private[akka] (
|
|||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
|
||||
log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
if (remoteSupport.remote.remoteSettings.LogSentMessages)
|
||||
log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
|
||||
send((message, senderOption, recipient))
|
||||
} else {
|
||||
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
|
||||
|
|
|
|||
|
|
@ -26,6 +26,9 @@ class RemoteConfigSpec extends AkkaSpec("") {
|
|||
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
|
||||
getInt("akka.remote.server.backlog") must equal(4096)
|
||||
|
||||
getBoolean("akka.remote.log-received-messages") must equal(false)
|
||||
getBoolean("akka.remote.log-sent-messages") must equal(false)
|
||||
|
||||
getMilliseconds("akka.remote.server.execution-pool-keepalive") must equal(60 * 1000)
|
||||
|
||||
getInt("akka.remote.server.execution-pool-size") must equal(4)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue