Fixing #586 and #588 and adding support for reconnect and shutdown of individual clients

This commit is contained in:
Viktor Klang 2010-12-29 17:59:38 +01:00
parent b51a4fec64
commit 960e161659
2 changed files with 96 additions and 64 deletions

View file

@ -63,28 +63,37 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
clientFor(remoteAddress, loader).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
private[akka] def clientFor(
address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { //TODO: REVIST: synchronized here seems bottlenecky
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { //TODO: REVISIT: synchronized here seems bottlenecky
val key = makeKey(address)
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
if (remoteClients.contains(hash)) remoteClients(hash)
else {
val client = new RemoteClient(this, new InetSocketAddress(hostname, port), loader, self.notifyListeners _)
client.connect
remoteClients += hash -> client
client
remoteClients.get(key) match {
case Some(client) => client
case None =>
val client = new RemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
remoteClients += key -> client
client
}
}
def shutdownClientFor(address: InetSocketAddress) = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
if (remoteClients.contains(hash)) {
val client = remoteClients(hash)
client.shutdown
remoteClients -= hash
private def makeKey(a: InetSocketAddress): String = a match {
case null => null
case address => address.getHostName + ':' + address.getPort
}
def shutdownClientConnection(address: InetSocketAddress): Boolean = synchronized {
remoteClients.remove(makeKey(address)) match {
case Some(client) =>
client.shutdown
true
case None => false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = synchronized {
remoteClients.get(makeKey(address)) match {
case Some(client) => client.connect(reconnectIfAlreadyConnected = true)
case None => false
}
}
@ -109,7 +118,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid) = synchronized {
val set = actorsFor(Address(hostname, port))
set -= uuid
if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
if (set.isEmpty) shutdownClientConnection(new InetSocketAddress(hostname, port))
}
private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = {
@ -168,28 +177,48 @@ class RemoteClient private[akka] (
@volatile
private var reconnectionTimeWindowStart = 0L
def connect = runSwitch switchOn {
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
runSwitch switchOn {
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
log.slf4j.info("Starting remote client connection to [{}]", remoteAddress)
log.slf4j.info("Starting remote client connection to [{}]", remoteAddress)
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress)
log.slf4j.debug("Remote client connection failed", connection.getCause)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress)
log.slf4j.debug("Remote client connection failed", connection.getCause)
false
} else {
notifyListeners(RemoteClientStarted(module, remoteAddress))
true
}
} match {
case true => true
case false if reconnectIfAlreadyConnected =>
isAuthenticated.set(false)
log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress)
openChannels.remove(connection.getChannel)
connection.getChannel.close
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
log.slf4j.debug("Reconnection failed", connection.getCause)
false
} else true
case false => false
}
notifyListeners(RemoteClientStarted(module, remoteAddress))
}
def shutdown = runSwitch switchOff {
@ -386,15 +415,7 @@ class RemoteClientHandler(
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
client.openChannels.remove(event.getChannel)
client.isAuthenticated.set(false)
log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress)
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.notifyListeners(RemoteClientError(client.connection.getCause, client.module, client.remoteAddress))
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
log.slf4j.debug("Reconnection failed", client.connection.getCause)
}
client.connect(reconnectIfAlreadyConnected = true)
}
}, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.shutdown }