diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a89508982c..de143ddbb5 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -105,9 +105,6 @@ akka { # Increase this if you want to be able to send messages with large payloads message-frame-size = 1 MiB - # Timeout duration - connection-timeout = 120s - # Should the remote server require that it peers share the same secure-cookie # (defined in the 'remote' section)? require-cookie = off @@ -133,11 +130,20 @@ akka { } client { + # Time before an attempted connection is considered failed + connection-timeout = 10s + + #Time between each reconnection attempt reconnect-delay = 5s - read-timeout = 3600s - message-frame-size = 1 MiB + # Maximum time window that a client should try to reconnect for reconnection-time-window = 600s + + #Period of time of connection inactivity to be tolerated before hanging up + read-timeout = 3600s + + #Max size per message + message-frame-size = 1 MiB } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 94f5c87e93..59e4de5702 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -39,6 +39,7 @@ class RemoteSettings(val config: Config, val systemName: String) { case cookie ⇒ Some(cookie) } + val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS) val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) @@ -67,13 +68,12 @@ class RemoteSettings(val config: Config, val systemName: String) { case value ⇒ value } val Port = getInt("akka.remote.server.port") match { - case 0 => try { + case 0 ⇒ try { val s = new java.net.ServerSocket(0) try s.getLocalPort finally s.close() - } catch { case e => throw new ConfigurationException("Unable to obtain random port", e) } - case other => other + } catch { case e ⇒ throw new ConfigurationException("Unable to obtain random port", e) } + case other ⇒ other } - val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) val Backlog = getInt("akka.remote.server.backlog") diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f7d6b1d8b3..a8d25884b8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -184,6 +184,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) + bootstrap.setOption("connectTimeoutMillis", ConnectionTimeout.toMillis) log.debug("Starting remote client connection to [{}]", remoteAddress) @@ -548,20 +549,21 @@ class NettyRemoteServer( Executors.newCachedThreadPool(remoteSupport.threadFactory), Executors.newCachedThreadPool(remoteSupport.threadFactory)) - private val bootstrap = new ServerBootstrap(factory) - private val executionHandler = new ExecutionHandler(remoteSupport.executor) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", Backlog) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) + private val bootstrap: ServerBootstrap = { + val b = new ServerBootstrap(factory) + b.setPipelineFactory(pipelineFactory) + b.setOption("backlog", Backlog) + b.setOption("child.tcpNoDelay", true) + b.setOption("child.keepAlive", true) + b.setOption("child.reuseAddress", true) + b + } openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port))) remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 49502b99ea..9f498e6273 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -22,7 +22,7 @@ class RemoteConfigSpec extends AkkaSpec("") { //akka.remote.server getInt("akka.remote.server.port") must equal(2552) getBytes("akka.remote.server.message-frame-size") must equal(1048576L) - getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000) + getBoolean("akka.remote.server.require-cookie") must equal(false) getBoolean("akka.remote.server.untrusted-mode") must equal(false) getInt("akka.remote.server.backlog") must equal(4096) @@ -38,24 +38,11 @@ class RemoteConfigSpec extends AkkaSpec("") { getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000) getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000) getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000) + getMilliseconds("akka.remote.client.connection-timeout") must equal(10000) // TODO cluster config will go into akka-cluster/reference.conf when we enable that module //akka.cluster getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String]) - - // getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000) - // getMilliseconds("akka.cluster.session-timeout") must equal(60 * 1000) - // getMilliseconds("akka.cluster.connection-timeout") must equal(60 * 1000) - // getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true) - // getString("akka.cluster.log-directory") must equal("_akka_cluster") - - // //akka.cluster.replication - // getString("akka.cluster.replication.digest-type") must equal("MAC") - // getString("akka.cluster.replication.password") must equal("secret") - // getInt("akka.cluster.replication.ensemble-size") must equal(3) - // getInt("akka.cluster.replication.quorum-size") must equal(2) - // getInt("akka.cluster.replication.snapshot-frequency") must equal(1000) - // getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000) } } }