From 8dfe619140387cbf543ccaf1131eaf260e955041 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 20 Jan 2012 12:30:19 +0100 Subject: [PATCH 1/3] #1703 & #1693 - moving daemonicity to one place, and in doing so creating a thread factory in ActorSystem --- .../test/scala/akka/config/ConfigSpec.scala | 5 ++- .../test/scala/akka/routing/RoutingSpec.scala | 5 +-- akka-actor/src/main/resources/reference.conf | 7 ++-- .../main/scala/akka/actor/ActorSystem.scala | 7 ++-- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../main/scala/akka/dispatch/Dispatcher.scala | 9 ++++- .../scala/akka/dispatch/Dispatchers.scala | 5 +-- .../akka/dispatch/ThreadPoolBuilder.scala | 33 +++++++++++++------ .../akka/docs/actor/TypedActorDocSpec.scala | 2 +- .../docs/dispatcher/DispatcherDocSpec.scala | 2 -- akka-remote/src/main/resources/reference.conf | 5 --- .../scala/akka/remote/RemoteSettings.scala | 1 - .../remote/netty/NettyRemoteSupport.scala | 21 ++++++------ .../scala/akka/remote/RemoteConfigSpec.scala | 1 - 14 files changed, 57 insertions(+), 48 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 521e8d4d4a..67c7a51b60 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -23,6 +23,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getString("akka.version") must equal("2.0-SNAPSHOT") settings.ConfigVersion must equal("2.0-SNAPSHOT") + getBoolean("akka.daemonic") must equal(false) + getString("akka.actor.default-dispatcher.type") must equal("Dispatcher") getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000) getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0) @@ -45,9 +47,6 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getMilliseconds("akka.scheduler.tickDuration") must equal(100) settings.SchedulerTickDuration must equal(100 millis) - - getBoolean("akka.scheduler.daemonic") must equal(true) - settings.SchedulerDaemonicity must equal(true) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index bcf3e6328e..9529854314 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -31,10 +31,7 @@ object RoutingSpec { """ class TestActor extends Actor { - def receive = { - case _ ⇒ - println("Hello") - } + def receive = { case _ ⇒ } } class Echo extends Actor { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index e90e4f41bf..999c4286c2 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -33,6 +33,9 @@ akka { # See the Akka Documentation for more info about Extensions extensions = [] + # Toggles whether the threads created by this ActorSystem should be daemons or not + daemonic = off + actor { provider = "akka.actor.LocalActorRefProvider" @@ -155,9 +158,6 @@ akka { # parameters type = "Dispatcher" - # Toggles whether the threads created by this dispatcher should be daemons or not - daemonic = off - # Keep alive time for threads keep-alive-time = 60s @@ -271,6 +271,5 @@ akka { # For more information see: http://www.jboss.org/netty/ tickDuration = 100ms ticksPerWheel = 512 - daemonic = on } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 0a646709bc..9fc3946808 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -96,7 +96,7 @@ object ActorSystem { final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") - final val SchedulerDaemonicity = getBoolean("akka.scheduler.daemonic") + final val Daemonicity = getBoolean("akka.daemonic") if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") @@ -275,6 +275,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor import ActorSystem._ final val settings = new Settings(applicationConfig, name) + final val threadFactory = new MonitorableThreadFactory(name, settings.Daemonicity) def logConfiguration(): Unit = log.info(settings.toString) @@ -361,7 +362,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } } - val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) + val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler)) val dispatcher = dispatchers.defaultGlobalDispatcher def terminationFuture: Future[Unit] = provider.terminationFuture @@ -410,7 +411,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor */ protected def createScheduler(): Scheduler = { val hwt = new HashedWheelTimer(log, - new MonitorableThreadFactory("DefaultScheduler", settings.SchedulerDaemonicity), + threadFactory.copy(threadFactory.name + "-scheduler"), settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel) // note that dispatcher is by-name parameter in DefaultScheduler constructor, diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ff209c8c00..12c8c89d25 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -342,7 +342,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit //Apply the following options to the config if they are present in the config - ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(daemonic = config getBoolean "daemonic")) + ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()) .setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS)) .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout") .setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max") diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 0e72f01681..906c160dce 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -32,7 +32,14 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id) + protected[akka] val executorServiceFactory: ExecutorServiceFactory = + executorServiceFactoryProvider.createExecutorServiceFactory( + id, + prerequisites.threadFactory match { + case m: MonitorableThreadFactory ⇒ m.copy(m.name + "-" + id) + case other ⇒ other + }) + protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 31258c540e..b622c52c74 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -4,8 +4,6 @@ package akka.dispatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.ConcurrentHashMap import akka.actor.newUuid import akka.util.{ Duration, ReflectiveAccess } import akka.actor.ActorSystem @@ -17,14 +15,17 @@ import com.typesafe.config.ConfigFactory import akka.config.ConfigurationException import akka.event.Logging.Warning import akka.actor.Props +import java.util.concurrent.{ ThreadFactory, TimeUnit, ConcurrentHashMap } trait DispatcherPrerequisites { + def threadFactory: ThreadFactory def eventStream: EventStream def deadLetterMailbox: Mailbox def scheduler: Scheduler } case class DefaultDispatcherPrerequisites( + val threadFactory: ThreadFactory, val eventStream: EventStream, val deadLetterMailbox: Mailbox, val scheduler: Scheduler) extends DispatcherPrerequisites diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index e073e18b66..9601c4dd5e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -16,6 +16,7 @@ object ThreadPoolConfig { val defaultCorePoolSize: Int = 16 val defaultMaxPoolSize: Int = 128 val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS) + val defaultRejectionPolicy: RejectedExecutionHandler = new SaneRejectedExecutionHandler() def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = { import scala.math.{ min, max } @@ -54,7 +55,7 @@ trait ExecutorServiceFactory { * Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired */ trait ExecutorServiceFactoryProvider { - def createExecutorServiceFactory(name: String): ExecutorServiceFactory + def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory } /** @@ -65,16 +66,24 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(), - daemonic: Boolean = false) + rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) extends ExecutorServiceFactoryProvider { class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { - val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, new SaneRejectedExecutionHandler) + val service = new ThreadPoolExecutor( + corePoolSize, + maxPoolSize, + threadTimeout.length, + threadTimeout.unit, + queueFactory(), + threadFactory, + rejectionPolicy) service.allowCoreThreadTimeOut(allowCorePoolTimeout) service } } - final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ThreadPoolExecutorServiceFactory(new MonitorableThreadFactory(name, daemonic)) + final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + new ThreadPoolExecutorServiceFactory(threadFactory) } trait DispatcherBuilder { @@ -143,16 +152,20 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c)) } -class MonitorableThreadFactory(val name: String, val daemonic: Boolean = false) extends ThreadFactory { +object MonitorableThreadFactory { + val doNothing: Thread.UncaughtExceptionHandler = + new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () } +} + +case class MonitorableThreadFactory(name: String, + daemonic: Boolean, + exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing) + extends ThreadFactory { protected val counter = new AtomicLong - protected val doNothing: Thread.UncaughtExceptionHandler = - new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable) = {} - } def newThread(runnable: Runnable) = { val t = new Thread(runnable, name + counter.incrementAndGet()) - t.setUncaughtExceptionHandler(doNothing) + t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) t } diff --git a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala index f292b39a7c..da718b503d 100644 --- a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala @@ -45,7 +45,7 @@ class SquarerImpl(val name: String) extends Squarer { //#typed-actor-impl-methods } //#typed-actor-impl - +import java.lang.Integer.{ parseInt ⇒ println } //Mr funny man avoids printing to stdout AND keeping docs alright //#typed-actor-supercharge trait Foo { def doFoo(times: Int): Unit = println("doFoo(" + times + ")") diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 2747da9f91..d0e0945fe8 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -22,8 +22,6 @@ object DispatcherDocSpec { my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher - # Toggles whether the threads created by this dispatcher should be daemons or not - daemonic = off # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a89508982c..cac020bf70 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -59,9 +59,6 @@ akka { # Reuse inbound connections for outbound messages use-passive-connections = on - # Whether any Threds created by the remoting should be daemons or not - daemonic = on - # accrual failure detection config failure-detector { @@ -84,13 +81,11 @@ akka { compute-grid-dispatcher { # defaults to same settings as default-dispatcher name = ComputeGridDispatcher - daemonic = on } # The dispatcher used for the system actor "network-event-sender" network-event-sender-dispatcher { type = PinnedDispatcher - daemonic = on } server { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index ad4fd2625e..4a3e869271 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -16,7 +16,6 @@ class RemoteSettings(val config: Config, val systemName: String) { import config._ val RemoteTransport = getString("akka.remote.transport") - val Daemonic = getBoolean("akka.remote.daemonic") val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") val ShouldCompressData = getBoolean("akka.remote.use-compression") diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index bfb30bc940..600e49290b 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -369,7 +369,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre val serverSettings = remote.remoteSettings.serverSettings val clientSettings = remote.remoteSettings.clientSettings - val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) + val threadFactory = _system.threadFactory.copy(_system.threadFactory.name + "-remote") val timer: HashedWheelTimer = new HashedWheelTimer val executor = new OrderedMemoryAwareThreadPoolExecutor( serverSettings.ExecutionPoolSize, @@ -535,23 +535,24 @@ class NettyRemoteServer( Executors.newCachedThreadPool(remoteSupport.threadFactory), Executors.newCachedThreadPool(remoteSupport.threadFactory)) - private val bootstrap = new ServerBootstrap(factory) - private val executionHandler = new ExecutionHandler(remoteSupport.executor) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", Backlog) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) + private val bootstrap: ServerBootstrap = { + val b = new ServerBootstrap(factory) + b.setPipelineFactory(pipelineFactory) + b.setOption("backlog", Backlog) + b.setOption("child.tcpNoDelay", true) + b.setOption("child.keepAlive", true) + b.setOption("child.reuseAddress", true) + b.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) + b + } openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port))) - remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) def shutdown() { try { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 49502b99ea..03a343f3b1 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -16,7 +16,6 @@ class RemoteConfigSpec extends AkkaSpec("") { getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) getMilliseconds("akka.remote.backoff-timeout") must equal(0) - getBoolean("akka.remote.daemonic") must equal(true) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) //akka.remote.server From cb86591656629f1d69d5c2a558eab5420dd49042 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 20 Jan 2012 14:27:00 +0100 Subject: [PATCH 2/3] #1657 - trying to get random port for port 0 --- .../src/main/scala/akka/remote/RemoteSettings.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index ad4fd2625e..94f5c87e93 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -66,7 +66,13 @@ class RemoteSettings(val config: Config, val systemName: String) { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } - val Port = getInt("akka.remote.server.port") + val Port = getInt("akka.remote.server.port") match { + case 0 => try { + val s = new java.net.ServerSocket(0) + try s.getLocalPort finally s.close() + } catch { case e => throw new ConfigurationException("Unable to obtain random port", e) } + case other => other + } val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) val Backlog = getInt("akka.remote.server.backlog") From 47c2b3000a2d28b23201e20cf079bd0fa3fffb0d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 20 Jan 2012 15:36:56 +0100 Subject: [PATCH 3/3] Moving connection-timeout to client since it belongs there, also adding more docs to the remote reference config --- akka-remote/src/main/resources/reference.conf | 16 +++++++++++----- .../scala/akka/remote/RemoteSettings.scala | 8 ++++---- .../akka/remote/netty/NettyRemoteSupport.scala | 18 ++++++++++-------- .../scala/akka/remote/RemoteConfigSpec.scala | 17 ++--------------- 4 files changed, 27 insertions(+), 32 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a89508982c..de143ddbb5 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -105,9 +105,6 @@ akka { # Increase this if you want to be able to send messages with large payloads message-frame-size = 1 MiB - # Timeout duration - connection-timeout = 120s - # Should the remote server require that it peers share the same secure-cookie # (defined in the 'remote' section)? require-cookie = off @@ -133,11 +130,20 @@ akka { } client { + # Time before an attempted connection is considered failed + connection-timeout = 10s + + #Time between each reconnection attempt reconnect-delay = 5s - read-timeout = 3600s - message-frame-size = 1 MiB + # Maximum time window that a client should try to reconnect for reconnection-time-window = 600s + + #Period of time of connection inactivity to be tolerated before hanging up + read-timeout = 3600s + + #Max size per message + message-frame-size = 1 MiB } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 94f5c87e93..59e4de5702 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -39,6 +39,7 @@ class RemoteSettings(val config: Config, val systemName: String) { case cookie ⇒ Some(cookie) } + val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS) val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) @@ -67,13 +68,12 @@ class RemoteSettings(val config: Config, val systemName: String) { case value ⇒ value } val Port = getInt("akka.remote.server.port") match { - case 0 => try { + case 0 ⇒ try { val s = new java.net.ServerSocket(0) try s.getLocalPort finally s.close() - } catch { case e => throw new ConfigurationException("Unable to obtain random port", e) } - case other => other + } catch { case e ⇒ throw new ConfigurationException("Unable to obtain random port", e) } + case other ⇒ other } - val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) val Backlog = getInt("akka.remote.server.backlog") diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f7d6b1d8b3..a8d25884b8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -184,6 +184,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) + bootstrap.setOption("connectTimeoutMillis", ConnectionTimeout.toMillis) log.debug("Starting remote client connection to [{}]", remoteAddress) @@ -548,20 +549,21 @@ class NettyRemoteServer( Executors.newCachedThreadPool(remoteSupport.threadFactory), Executors.newCachedThreadPool(remoteSupport.threadFactory)) - private val bootstrap = new ServerBootstrap(factory) - private val executionHandler = new ExecutionHandler(remoteSupport.executor) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", Backlog) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) + private val bootstrap: ServerBootstrap = { + val b = new ServerBootstrap(factory) + b.setPipelineFactory(pipelineFactory) + b.setOption("backlog", Backlog) + b.setOption("child.tcpNoDelay", true) + b.setOption("child.keepAlive", true) + b.setOption("child.reuseAddress", true) + b + } openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port))) remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 49502b99ea..9f498e6273 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -22,7 +22,7 @@ class RemoteConfigSpec extends AkkaSpec("") { //akka.remote.server getInt("akka.remote.server.port") must equal(2552) getBytes("akka.remote.server.message-frame-size") must equal(1048576L) - getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000) + getBoolean("akka.remote.server.require-cookie") must equal(false) getBoolean("akka.remote.server.untrusted-mode") must equal(false) getInt("akka.remote.server.backlog") must equal(4096) @@ -38,24 +38,11 @@ class RemoteConfigSpec extends AkkaSpec("") { getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000) getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000) getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000) + getMilliseconds("akka.remote.client.connection-timeout") must equal(10000) // TODO cluster config will go into akka-cluster/reference.conf when we enable that module //akka.cluster getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String]) - - // getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000) - // getMilliseconds("akka.cluster.session-timeout") must equal(60 * 1000) - // getMilliseconds("akka.cluster.connection-timeout") must equal(60 * 1000) - // getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true) - // getString("akka.cluster.log-directory") must equal("_akka_cluster") - - // //akka.cluster.replication - // getString("akka.cluster.replication.digest-type") must equal("MAC") - // getString("akka.cluster.replication.password") must equal("secret") - // getInt("akka.cluster.replication.ensemble-size") must equal(3) - // getInt("akka.cluster.replication.quorum-size") must equal(2) - // getInt("akka.cluster.replication.snapshot-frequency") must equal(1000) - // getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000) } } }