Switching to using OrderedMemoryAware executor same for all of the remoting, added more remote config tests and switched to mem size conf
This commit is contained in:
parent
88d2427b7f
commit
5d2bd2492f
7 changed files with 47 additions and 31 deletions
|
|
@ -113,10 +113,10 @@ akka {
|
|||
execution-pool-size = 4
|
||||
|
||||
# Maximum channel size, 0 for off
|
||||
max-channel-memory-size = 0
|
||||
max-channel-memory-size = 0b
|
||||
|
||||
# Maximum total size of all channels, 0 for off
|
||||
max-total-memory-size = 0
|
||||
max-total-memory-size = 0b
|
||||
}
|
||||
|
||||
client {
|
||||
|
|
|
|||
|
|
@ -61,8 +61,8 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
|
|||
def eventStream = _eventStream
|
||||
|
||||
@volatile
|
||||
private var _server: RemoteSupport[ParsedTransportAddress] = _
|
||||
def server = _server
|
||||
private var _transport: RemoteSupport[ParsedTransportAddress] = _
|
||||
def transport = _transport
|
||||
|
||||
@volatile
|
||||
private var _provider: RemoteActorRefProvider = _
|
||||
|
|
@ -77,7 +77,7 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
|
|||
_computeGridDispatcher = system.dispatchers.lookup("akka.remote.compute-grid-dispatcher")
|
||||
_remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log)
|
||||
_eventStream = new NetworkEventStream(system)
|
||||
_server = {
|
||||
_transport = {
|
||||
val arguments = Seq(
|
||||
classOf[ActorSystemImpl] -> system,
|
||||
classOf[Remote] -> this,
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class RemoteActorRefProvider(
|
|||
local.init(system)
|
||||
remote.init(system, this)
|
||||
local.registerExtraNames(Map(("remote", remote.remoteDaemon)))
|
||||
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
|
||||
terminationFuture.onComplete(_ ⇒ remote.transport.shutdown())
|
||||
}
|
||||
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = {
|
||||
|
|
@ -114,7 +114,7 @@ class RemoteActorRefProvider(
|
|||
else {
|
||||
val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements
|
||||
useActorOnNode(rpath, props.creator, supervisor)
|
||||
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
|
||||
new RemoteActorRef(this, remote.transport, rpath, supervisor, None)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -125,14 +125,14 @@ class RemoteActorRefProvider(
|
|||
|
||||
def actorFor(path: ActorPath): InternalActorRef = path.root match {
|
||||
case `rootPath` ⇒ actorFor(rootGuardian, path.elements)
|
||||
case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None)
|
||||
case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.transport, path, Nobody, None)
|
||||
case _ ⇒ local.actorFor(path)
|
||||
}
|
||||
|
||||
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
||||
case ParsedActorPath(address, elems) ⇒
|
||||
if (address == rootPath.address) actorFor(rootGuardian, elems)
|
||||
else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, Nobody, None)
|
||||
else new RemoteActorRef(this, remote.transport, new RootActorPath(address) / elems, Nobody, None)
|
||||
case _ ⇒ local.actorFor(ref, path)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -147,5 +147,5 @@ class RemoteConnectionManager(
|
|||
}
|
||||
|
||||
private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) =
|
||||
new RemoteActorRef(remote.provider, remote.server, actorPath, Nobody, None)
|
||||
new RemoteActorRef(remote.provider, remote.transport, actorPath, Nobody, None)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,13 +83,13 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi
|
|||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxChannelMemorySize = config.getInt("akka.remote.server.max-channel-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0")
|
||||
val MaxChannelMemorySize = config.getBytes("akka.remote.server.max-channel-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxTotalMemorySize = config.getInt("akka.remote.server.max-total-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0")
|
||||
val MaxTotalMemorySize = config.getBytes("akka.remote.server.max-total-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt
|
|||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
|
||||
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
|
||||
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
|
||||
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
|
||||
import scala.collection.mutable.HashMap
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent._
|
||||
|
|
@ -26,6 +25,7 @@ import akka.event.Logging
|
|||
import locks.ReentrantReadWriteLock
|
||||
import org.jboss.netty.channel._
|
||||
import akka.actor.ActorSystemImpl
|
||||
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
|
||||
|
||||
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null)
|
||||
|
|
@ -139,6 +139,8 @@ class ActiveRemoteClient private[akka] (
|
|||
def currentChannel = connection.getChannel
|
||||
|
||||
private val senderRemoteAddress = remoteSupport.remote.remoteAddress
|
||||
@volatile
|
||||
private var executionHandler: ExecutionHandler = _
|
||||
|
||||
/**
|
||||
* Connect to remote server.
|
||||
|
|
@ -179,8 +181,10 @@ class ActiveRemoteClient private[akka] (
|
|||
runSwitch switchOn {
|
||||
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
|
||||
|
||||
executionHandler = new ExecutionHandler(remoteSupport.executor)
|
||||
|
||||
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
|
||||
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this))
|
||||
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
|
|
@ -218,6 +222,8 @@ class ActiveRemoteClient private[akka] (
|
|||
notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
|
||||
openChannels.close.awaitUninterruptibly
|
||||
openChannels = null
|
||||
executionHandler.releaseExternalResources()
|
||||
executionHandler = null
|
||||
bootstrap.releaseExternalResources()
|
||||
bootstrap = null
|
||||
connection = null
|
||||
|
|
@ -244,6 +250,7 @@ class ActiveRemoteClient private[akka] (
|
|||
class ActiveRemoteClientPipelineFactory(
|
||||
name: String,
|
||||
bootstrap: ClientBootstrap,
|
||||
executionHandler: ExecutionHandler,
|
||||
remoteAddress: RemoteNettyAddress,
|
||||
client: ActiveRemoteClient) extends ChannelPipelineFactory {
|
||||
|
||||
|
|
@ -257,7 +264,7 @@ class ActiveRemoteClientPipelineFactory(
|
|||
val protobufEnc = new ProtobufEncoder
|
||||
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client)
|
||||
|
||||
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)
|
||||
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, executionHandler, remoteClient)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -351,8 +358,12 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
val clientSettings = remote.remoteSettings.clientSettings
|
||||
|
||||
val timer: HashedWheelTimer = new HashedWheelTimer
|
||||
|
||||
_system.registerOnTermination(timer.stop()) //Shut this guy down at the end
|
||||
val executor = new OrderedMemoryAwareThreadPoolExecutor(
|
||||
serverSettings.ExecutionPoolSize,
|
||||
serverSettings.MaxChannelMemorySize,
|
||||
serverSettings.MaxTotalMemorySize,
|
||||
serverSettings.ExecutionPoolKeepAlive.length,
|
||||
serverSettings.ExecutionPoolKeepAlive.unit)
|
||||
|
||||
private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient]
|
||||
private val clientsLock = new ReentrantReadWriteLock
|
||||
|
|
@ -486,7 +497,11 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
remoteClients.clear()
|
||||
} finally {
|
||||
clientsLock.writeLock().unlock()
|
||||
currentServer.getAndSet(None) foreach { _.shutdown() }
|
||||
try {
|
||||
currentServer.getAndSet(None) foreach { _.shutdown() }
|
||||
} finally {
|
||||
try { timer.stop() } finally { executor.shutdown() }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -506,18 +521,12 @@ class NettyRemoteServer(
|
|||
|
||||
private val bootstrap = new ServerBootstrap(factory)
|
||||
|
||||
private val executor = new ExecutionHandler(
|
||||
new OrderedMemoryAwareThreadPoolExecutor(
|
||||
ExecutionPoolSize,
|
||||
MaxChannelMemorySize,
|
||||
MaxTotalMemorySize,
|
||||
ExecutionPoolKeepAlive.length,
|
||||
ExecutionPoolKeepAlive.unit))
|
||||
private val executionHandler = new ExecutionHandler(remoteSupport.executor)
|
||||
|
||||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
|
||||
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, remoteSupport)
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("backlog", Backlog)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
|
|
@ -545,7 +554,7 @@ class NettyRemoteServer(
|
|||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources()
|
||||
executor.releaseExternalResources()
|
||||
executionHandler.releaseExternalResources()
|
||||
remoteSupport.notifyListeners(RemoteServerShutdown(remoteSupport))
|
||||
} catch {
|
||||
case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport))
|
||||
|
|
@ -556,7 +565,7 @@ class NettyRemoteServer(
|
|||
class RemoteServerPipelineFactory(
|
||||
val name: String,
|
||||
val openChannels: ChannelGroup,
|
||||
val executor: ExecutionHandler,
|
||||
val executionHandler: ExecutionHandler,
|
||||
val loader: Option[ClassLoader],
|
||||
val remoteSupport: NettyRemoteSupport) extends ChannelPipelineFactory {
|
||||
|
||||
|
|
@ -570,7 +579,7 @@ class RemoteServerPipelineFactory(
|
|||
|
||||
val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport)
|
||||
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
|
||||
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executionHandler :: authenticator ::: remoteServer :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,13 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
|
|||
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
|
||||
getInt("akka.remote.server.backlog") must equal(4096)
|
||||
|
||||
getMilliseconds("akka.remote.server.execution-pool-keepalive") must equal(60 * 1000)
|
||||
|
||||
getInt("akka.remote.server.execution-pool-size") must equal(4)
|
||||
|
||||
getBytes("akka.remote.server.max-channel-memory-size") must equal(0)
|
||||
getBytes("akka.remote.server.max-total-memory-size") must equal(0)
|
||||
|
||||
//akka.remote.client
|
||||
getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false)
|
||||
getInt("akka.remote.client.buffering.capacity") must equal(-1)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue