diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index fb5b843e17..5aa2030e9f 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -105,6 +105,18 @@ akka { # Sets the size of the connection backlog backlog = 4096 + + # Length in akka.time-unit how long core threads will be kept alive if idling + execution-pool-keepalive = 60s + + # Size of the core pool of the remote execution unit + execution-pool-size = 4 + + # Maximum channel size, 0 for off + max-channel-memory-size = 0 + + # Maximum total size of all channels, 0 for off + max-total-memory-size = 0 } client { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 80e870c076..17e9f3dd80 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -76,6 +76,23 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi val Backlog = config.getInt("akka.remote.server.backlog") + val ExecutionPoolKeepAlive = Duration(config.getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) + + val ExecutionPoolSize = config.getInt("akka.remote.server.execution-pool-size") match { + case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") + case sz ⇒ sz + } + + val MaxChannelMemorySize = config.getInt("akka.remote.server.max-channel-memory-size") match { + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0") + case sz ⇒ sz + } + + val MaxTotalMemorySize = config.getInt("akka.remote.server.max-total-memory-size") match { + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0") + case sz ⇒ sz + } + // TODO handle the system name right and move this to config file syntax val URI = "akka://sys@" + Hostname + ":" + Port } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 11c59ea40d..c5fcd9e276 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,12 +16,12 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } +import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.util.concurrent._ import java.util.concurrent.atomic._ import akka.AkkaException -import akka.actor.ActorSystem import akka.event.Logging import locks.ReentrantReadWriteLock import org.jboss.netty.channel._ @@ -507,10 +507,18 @@ class NettyRemoteServer( private val bootstrap = new ServerBootstrap(factory) + private val executor = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor( + ExecutionPoolSize, + MaxChannelMemorySize, + MaxTotalMemorySize, + ExecutionPoolKeepAlive.length, + ExecutionPoolKeepAlive.unit)) + // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteSupport) + val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, remoteSupport) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", Backlog) bootstrap.setOption("child.tcpNoDelay", true) @@ -538,6 +546,7 @@ class NettyRemoteServer( openChannels.disconnect openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources() + executor.releaseExternalResources() remoteSupport.notifyListeners(RemoteServerShutdown(remoteSupport)) } catch { case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport)) @@ -548,6 +557,7 @@ class NettyRemoteServer( class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, + val executor: ExecutionHandler, val loader: Option[ClassLoader], val remoteSupport: NettyRemoteSupport) extends ChannelPipelineFactory { @@ -561,7 +571,7 @@ class RemoteServerPipelineFactory( val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport) - val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: authenticator ::: remoteServer :: Nil + val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } }