diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 5aa2030e9f..98a7c75329 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -113,10 +113,10 @@ akka { execution-pool-size = 4 # Maximum channel size, 0 for off - max-channel-memory-size = 0 + max-channel-memory-size = 0b # Maximum total size of all channels, 0 for off - max-total-memory-size = 0 + max-total-memory-size = 0b } client { diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 304d82e0eb..1af4802552 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -61,8 +61,8 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti def eventStream = _eventStream @volatile - private var _server: RemoteSupport[ParsedTransportAddress] = _ - def server = _server + private var _transport: RemoteSupport[ParsedTransportAddress] = _ + def transport = _transport @volatile private var _provider: RemoteActorRefProvider = _ @@ -77,7 +77,7 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti _computeGridDispatcher = system.dispatchers.lookup("akka.remote.compute-grid-dispatcher") _remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log) _eventStream = new NetworkEventStream(system) - _server = { + _transport = { val arguments = Seq( classOf[ActorSystemImpl] -> system, classOf[Remote] -> this, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 89ae932cbf..5b748098ac 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -54,7 +54,7 @@ class RemoteActorRefProvider( local.init(system) remote.init(system, this) local.registerExtraNames(Map(("remote", remote.remoteDaemon))) - terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) + terminationFuture.onComplete(_ ⇒ remote.transport.shutdown()) } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = { @@ -114,7 +114,7 @@ class RemoteActorRefProvider( else { val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements useActorOnNode(rpath, props.creator, supervisor) - new RemoteActorRef(this, remote.server, rpath, supervisor, None) + new RemoteActorRef(this, remote.transport, rpath, supervisor, None) } } @@ -125,14 +125,14 @@ class RemoteActorRefProvider( def actorFor(path: ActorPath): InternalActorRef = path.root match { case `rootPath` ⇒ actorFor(rootGuardian, path.elements) - case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None) + case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.transport, path, Nobody, None) case _ ⇒ local.actorFor(path) } def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ParsedActorPath(address, elems) ⇒ if (address == rootPath.address) actorFor(rootGuardian, elems) - else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, Nobody, None) + else new RemoteActorRef(this, remote.transport, new RootActorPath(address) / elems, Nobody, None) case _ ⇒ local.actorFor(ref, path) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 03aa5ddc62..404bf98c4c 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -147,5 +147,5 @@ class RemoteConnectionManager( } private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) = - new RemoteActorRef(remote.provider, remote.server, actorPath, Nobody, None) + new RemoteActorRef(remote.provider, remote.transport, actorPath, Nobody, None) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 17e9f3dd80..67f5b6f8c9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -83,13 +83,13 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi case sz ⇒ sz } - val MaxChannelMemorySize = config.getInt("akka.remote.server.max-channel-memory-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0") + val MaxChannelMemorySize = config.getBytes("akka.remote.server.max-channel-memory-size") match { + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes") case sz ⇒ sz } - val MaxTotalMemorySize = config.getInt("akka.remote.server.max-total-memory-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0") + val MaxTotalMemorySize = config.getBytes("akka.remote.server.max-total-memory-size") match { + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes") case sz ⇒ sz } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index df4df415d4..45a134e2d5 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,7 +16,6 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } -import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.util.concurrent._ @@ -26,6 +25,7 @@ import akka.event.Logging import locks.ReentrantReadWriteLock import org.jboss.netty.channel._ import akka.actor.ActorSystemImpl +import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) @@ -139,6 +139,8 @@ class ActiveRemoteClient private[akka] ( def currentChannel = connection.getChannel private val senderRemoteAddress = remoteSupport.remote.remoteAddress + @volatile + private var executionHandler: ExecutionHandler = _ /** * Connect to remote server. @@ -179,8 +181,10 @@ class ActiveRemoteClient private[akka] ( runSwitch switchOn { openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) + executionHandler = new ExecutionHandler(remoteSupport.executor) + bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -218,6 +222,8 @@ class ActiveRemoteClient private[akka] ( notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress)) openChannels.close.awaitUninterruptibly openChannels = null + executionHandler.releaseExternalResources() + executionHandler = null bootstrap.releaseExternalResources() bootstrap = null connection = null @@ -244,6 +250,7 @@ class ActiveRemoteClient private[akka] ( class ActiveRemoteClientPipelineFactory( name: String, bootstrap: ClientBootstrap, + executionHandler: ExecutionHandler, remoteAddress: RemoteNettyAddress, client: ActiveRemoteClient) extends ChannelPipelineFactory { @@ -257,7 +264,7 @@ class ActiveRemoteClientPipelineFactory( val protobufEnc = new ProtobufEncoder val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client) - new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient) + new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, executionHandler, remoteClient) } } @@ -351,8 +358,12 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre val clientSettings = remote.remoteSettings.clientSettings val timer: HashedWheelTimer = new HashedWheelTimer - - _system.registerOnTermination(timer.stop()) //Shut this guy down at the end + val executor = new OrderedMemoryAwareThreadPoolExecutor( + serverSettings.ExecutionPoolSize, + serverSettings.MaxChannelMemorySize, + serverSettings.MaxTotalMemorySize, + serverSettings.ExecutionPoolKeepAlive.length, + serverSettings.ExecutionPoolKeepAlive.unit) private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -486,7 +497,11 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre remoteClients.clear() } finally { clientsLock.writeLock().unlock() - currentServer.getAndSet(None) foreach { _.shutdown() } + try { + currentServer.getAndSet(None) foreach { _.shutdown() } + } finally { + try { timer.stop() } finally { executor.shutdown() } + } } } } @@ -506,18 +521,12 @@ class NettyRemoteServer( private val bootstrap = new ServerBootstrap(factory) - private val executor = new ExecutionHandler( - new OrderedMemoryAwareThreadPoolExecutor( - ExecutionPoolSize, - MaxChannelMemorySize, - MaxTotalMemorySize, - ExecutionPoolKeepAlive.length, - ExecutionPoolKeepAlive.unit)) + private val executionHandler = new ExecutionHandler(remoteSupport.executor) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, remoteSupport) + val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", Backlog) bootstrap.setOption("child.tcpNoDelay", true) @@ -545,7 +554,7 @@ class NettyRemoteServer( openChannels.disconnect openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources() - executor.releaseExternalResources() + executionHandler.releaseExternalResources() remoteSupport.notifyListeners(RemoteServerShutdown(remoteSupport)) } catch { case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport)) @@ -556,7 +565,7 @@ class NettyRemoteServer( class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, - val executor: ExecutionHandler, + val executionHandler: ExecutionHandler, val loader: Option[ClassLoader], val remoteSupport: NettyRemoteSupport) extends ChannelPipelineFactory { @@ -570,7 +579,7 @@ class RemoteServerPipelineFactory( val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport) - val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil + val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executionHandler :: authenticator ::: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index d1c87e3c13..1132d1a733 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -25,6 +25,13 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { getBoolean("akka.remote.server.untrusted-mode") must equal(false) getInt("akka.remote.server.backlog") must equal(4096) + getMilliseconds("akka.remote.server.execution-pool-keepalive") must equal(60 * 1000) + + getInt("akka.remote.server.execution-pool-size") must equal(4) + + getBytes("akka.remote.server.max-channel-memory-size") must equal(0) + getBytes("akka.remote.server.max-total-memory-size") must equal(0) + //akka.remote.client getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false) getInt("akka.remote.client.buffering.capacity") must equal(-1)