Reconnect now possible in RemoteClient
This commit is contained in:
parent
4b7bf0ded0
commit
57cdccd349
1 changed files with 30 additions and 25 deletions
|
|
@ -181,41 +181,43 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade
|
|||
extends Logging with ListenerManagement {
|
||||
val name = "RemoteClient@" + hostname + "::" + port
|
||||
|
||||
@volatile private[remote] var isRunning = false
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
|
||||
private val supervisors = new ConcurrentHashMap[String, ActorRef]
|
||||
|
||||
private val channelFactory = new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,
|
||||
Executors.newCachedThreadPool)
|
||||
|
||||
private val bootstrap = new ClientBootstrap(channelFactory)
|
||||
private val timer = new HashedWheelTimer
|
||||
private val remoteAddress = new InetSocketAddress(hostname, port)
|
||||
|
||||
private[remote] var connection: ChannelFuture = _
|
||||
private[remote] val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName);
|
||||
@volatile private[remote] var isRunning = false
|
||||
@volatile private var bootstrap: ClientBootstrap = _
|
||||
@volatile private var reconnectionTimeWindowStart = 0L
|
||||
@volatile private[remote] var connection: ChannelFuture = _
|
||||
@volatile private[remote] var openChannels: DefaultChannelGroup = _
|
||||
@volatile private var timer: HashedWheelTimer = _
|
||||
|
||||
private val reconnectionTimeWindow =
|
||||
Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
||||
@volatile private var reconnectionTimeWindowStart = 0L
|
||||
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
def connect = synchronized {
|
||||
if (!isRunning) {
|
||||
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
|
||||
timer = new HashedWheelTimer
|
||||
bootstrap = new ClientBootstrap(
|
||||
new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,Executors.newCachedThreadPool
|
||||
)
|
||||
)
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
||||
// Wait until the connection attempt succeeds or fails.
|
||||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) {
|
||||
foreachListener(l => l ! RemoteClientError(connection.getCause, this))
|
||||
foreachListener(_ ! RemoteClientError(connection.getCause, this))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
}
|
||||
foreachListener(l => l ! RemoteClientStarted(this))
|
||||
foreachListener(_ ! RemoteClientStarted(this))
|
||||
isRunning = true
|
||||
}
|
||||
}
|
||||
|
|
@ -224,10 +226,14 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade
|
|||
log.info("Shutting down %s", name)
|
||||
if (isRunning) {
|
||||
isRunning = false
|
||||
foreachListener(l => l ! RemoteClientShutdown(this))
|
||||
foreachListener(_ ! RemoteClientShutdown(this))
|
||||
timer.stop
|
||||
timer = null
|
||||
openChannels.close.awaitUninterruptibly
|
||||
openChannels = null
|
||||
bootstrap.releaseExternalResources
|
||||
bootstrap = null
|
||||
connection = null
|
||||
log.info("%s has been shut down", name)
|
||||
}
|
||||
}
|
||||
|
|
@ -257,7 +263,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade
|
|||
}
|
||||
} else {
|
||||
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
|
||||
foreachListener(l => l ! RemoteClientError(exception, this))
|
||||
foreachListener(_ ! RemoteClientError(exception, this))
|
||||
throw exception
|
||||
}
|
||||
|
||||
|
|
@ -373,12 +379,12 @@ class RemoteClientHandler(
|
|||
futures.remove(reply.getId)
|
||||
} else {
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
|
||||
client.foreachListener(l => l ! RemoteClientError(exception, client))
|
||||
client.foreachListener(_ ! RemoteClientError(exception, client))
|
||||
throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
client.foreachListener(l => l ! RemoteClientError(e, client))
|
||||
client.foreachListener(_ ! RemoteClientError(e, client))
|
||||
log.error("Unexpected exception in remote client handler: %s", e)
|
||||
throw e
|
||||
}
|
||||
|
|
@ -393,8 +399,7 @@ class RemoteClientHandler(
|
|||
client.connection = bootstrap.connect(remoteAddress)
|
||||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||
if (!client.connection.isSuccess) {
|
||||
client.foreachListener(l =>
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client))
|
||||
client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client))
|
||||
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
}
|
||||
}
|
||||
|
|
@ -404,7 +409,7 @@ class RemoteClientHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
def connect = {
|
||||
client.foreachListener(l => l ! RemoteClientConnected(client))
|
||||
client.foreachListener(_ ! RemoteClientConnected(client))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
|
@ -421,12 +426,12 @@ class RemoteClientHandler(
|
|||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.foreachListener(l => l ! RemoteClientDisconnected(client))
|
||||
client.foreachListener(_ ! RemoteClientDisconnected(client))
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.foreachListener(l => l ! RemoteClientError(event.getCause, client))
|
||||
client.foreachListener(_ ! RemoteClientError(event.getCause, client))
|
||||
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue