Adding OrderedMemoryAwareThreadPoolExecutor with an ExecutionHandler to the NettyRemoteServer

This commit is contained in:
Viktor Klang 2011-03-23 11:31:19 +01:00
parent f79d5c4613
commit db1b50a6b6
4 changed files with 39 additions and 4 deletions

View file

@ -44,4 +44,24 @@ object RemoteServerSettings {
} }
val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) val BACKLOG = config.getInt("akka.remote.server.backlog", 4096)
val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT)
val EXECUTION_POOL_SIZE = {
val sz = config.getInt("akka.remote.server.execution-pool-size",16)
if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
sz
}
val MAX_CHANNEL_MEMORY_SIZE = {
val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0)
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0")
sz
}
val MAX_TOTAL_MEMORY_SIZE = {
val sz = config.getInt("akka.remote.server.max-total-memory-size", 0)
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0")
sz
}
} }

View file

@ -33,6 +33,7 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler import org.jboss.netty.handler.ssl.SslHandler
@ -753,9 +754,17 @@ class RemoteServerPipelineFactory(
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil, Nil) case _ => (Nil, Nil)
} }
val execution = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit
)
)
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil
new StaticChannelPipeline(stages: _*) new StaticChannelPipeline(stages: _*)
} }
} }
@ -856,8 +865,6 @@ class RemoteServerHandler(
} }
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
//FIXME we should definitely spawn off this in a thread pool or something,
// potentially using Actor.spawn or something similar
request.getActorInfo.getActorType match { request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel) case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel)

View file

@ -36,6 +36,10 @@ class ConfigSpec extends WordSpec with MustMatchers {
getBool("akka.remote.ssl.debug") must equal(None) getBool("akka.remote.ssl.debug") must equal(None)
getBool("akka.remote.ssl.service") must equal(None) getBool("akka.remote.ssl.service") must equal(None)
getInt("akka.remote.zlib-compression-level") must equal(Some(6)) getInt("akka.remote.zlib-compression-level") must equal(Some(6))
getInt("akka.remote.server.execution-pool-size") must equal(Some(16))
getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60))
getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0))
getInt("akka.remote.server.max-total-memory-size") must equal(Some(0))
} }
} }
} }

View file

@ -142,6 +142,10 @@ akka {
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog backlog = 4096 # Sets the size of the connection backlog
execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling
execution-pool-size = 16# Size of the core pool of the remote execution unit
max-channel-memory-size = 0 # Maximum channel size, 0 for off
max-total-memory-size = 0 # Maximum total size of all channels, 0 for off
} }
client { client {