Restructuring ActiveRemoteClient, moving to a shared NioClientChannelFactory, making the timer optionally a daemon, switching to channel groups

This commit is contained in:
Viktor Klang 2012-01-19 15:06:58 +01:00
parent 97280ffeed
commit 6db3e59ce1

View file

@ -9,8 +9,7 @@ import akka.remote._
import RemoteProtocol._
import akka.util._
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
@ -121,6 +120,7 @@ class PassiveRemoteClient(val currentChannel: Channel,
class ActiveRemoteClient private[akka] (
remoteSupport: NettyRemoteSupport,
remoteAddress: RemoteNettyAddress,
localAddress: RemoteSystemAddress[ParsedTransportAddress],
val loader: Option[ClassLoader] = None)
extends RemoteClient(remoteSupport, remoteAddress) {
@ -132,7 +132,11 @@ class ActiveRemoteClient private[akka] (
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
private var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var executionHandler: ExecutionHandler = _
@volatile
private var reconnectionTimeWindowStart = 0L
@ -141,10 +145,6 @@ class ActiveRemoteClient private[akka] (
def currentChannel = connection.getChannel
private val senderRemoteAddress = remoteSupport.remote.remoteAddress
@volatile
private var executionHandler: ExecutionHandler = _
/**
* Connect to remote server.
*/
@ -154,9 +154,9 @@ class ActiveRemoteClient private[akka] (
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get)
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setSystem(senderRemoteAddress.system)
.setHostname(senderRemoteAddress.transport.host)
.setPort(senderRemoteAddress.transport.port)
.setSystem(localAddress.system)
.setHostname(localAddress.transport.host)
.setPort(localAddress.transport.port)
.build)
connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build))
}
@ -164,7 +164,7 @@ class ActiveRemoteClient private[akka] (
def attemptReconnect(): Boolean = {
log.debug("Remote client reconnecting to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails.
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
@ -176,11 +176,11 @@ class ActiveRemoteClient private[akka] (
}
runSwitch switchOn {
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
executionHandler = new ExecutionHandler(remoteSupport.executor)
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(remoteSupport.threadFactory),
Executors.newCachedThreadPool(remoteSupport.threadFactory)))
bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory)
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -188,7 +188,8 @@ class ActiveRemoteClient private[akka] (
log.debug("Starting remote client connection to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails.
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
@ -202,6 +203,7 @@ class ActiveRemoteClient private[akka] (
case true true
case false if reconnectIfAlreadyConnected
connection.getChannel.close()
openChannels.remove(connection.getChannel)
log.debug("Remote client reconnecting to [{}]", remoteAddress)
attemptReconnect()
@ -219,13 +221,11 @@ class ActiveRemoteClient private[akka] (
if ((connection ne null) && (connection.getChannel ne null))
connection.getChannel.close()
} finally {
connection = null
executionHandler = null
//Do not do this: executionHandler.releaseExternalResources(), since it's shutting down the shared threadpool
try {
bootstrap.releaseExternalResources()
if (openChannels ne null) openChannels.close.awaitUninterruptibly()
} finally {
bootstrap = null
connection = null
executionHandler = null
}
}
@ -324,7 +324,10 @@ class ActiveRemoteClientHandler(
if (client.isWithinReconnectionTimeWindow) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) =
if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true)
if (client.isRunning) {
client.openChannels.remove(event.getChannel)
client.connect(reconnectIfAlreadyConnected = true)
}
}, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS)
} else runOnceNow {
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
@ -369,8 +372,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
val serverSettings = remote.remoteSettings.serverSettings
val clientSettings = remote.remoteSettings.clientSettings
val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic)
val timer: HashedWheelTimer = new HashedWheelTimer
val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory)
val executor = new OrderedMemoryAwareThreadPoolExecutor(
serverSettings.ExecutionPoolSize,
serverSettings.MaxChannelMemorySize,
@ -379,6 +384,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
serverSettings.ExecutionPoolKeepAlive.unit,
threadFactory)
val clientChannelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(threadFactory),
Executors.newCachedThreadPool(threadFactory))
private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
@ -411,7 +420,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
//Recheck for addition, race between upgrades
case Some(client) client //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, recipientAddress, loader)
val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader)
client.connect()
remoteClients += recipientAddress -> client
client
@ -479,26 +488,20 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
/**
* Server section
*/
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
@volatile
private var currentServer: NettyRemoteServer = _
def name = currentServer.get match {
case Some(server) server.name
case None remote.remoteAddress.toString
def name = currentServer match {
case null remote.remoteAddress.toString
case server server.name
}
private val _isRunning = new Switch(false)
def isRunning = _isRunning.isOn
def start(loader: Option[ClassLoader] = None): Unit = {
_isRunning switchOn {
try {
currentServer.set(Some(new NettyRemoteServer(this, loader, address)))
} catch {
case e: Exception notifyListeners(RemoteServerError(e, this))
}
}
}
def start(loader: Option[ClassLoader] = None): Unit =
_isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) }
/**
* Common section
@ -512,9 +515,19 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
} finally {
clientsLock.writeLock().unlock()
try {
currentServer.getAndSet(None) foreach { _.shutdown() }
val s = currentServer
currentServer = null
s.shutdown()
} finally {
try { timer.stop() } finally { executor.shutdown() }
try {
timer.stop()
} finally {
try {
clientChannelFactory.releaseExternalResources()
} finally {
executor.shutdown()
}
}
}
}
}