Added remote client info to remote server life-cycle events
This commit is contained in:
parent
c589c4fb55
commit
17fa581907
1 changed files with 35 additions and 11 deletions
|
|
@ -145,11 +145,22 @@ object RemoteServer {
|
|||
* Life-cycle events for RemoteServer.
|
||||
*/
|
||||
sealed trait RemoteServerLifeCycleEvent
|
||||
case class RemoteServerError(@BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerShutdown(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerStarted(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerClientConnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerStarted(
|
||||
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerShutdown(
|
||||
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerError(
|
||||
@BeanProperty val cause: Throwable,
|
||||
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerClientConnected(
|
||||
@BeanProperty val server: RemoteServer,
|
||||
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerClientDisconnected(
|
||||
@BeanProperty val server: RemoteServer,
|
||||
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
|
||||
case class RemoteServerClientClosed(
|
||||
@BeanProperty val server: RemoteServer,
|
||||
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
|
||||
|
||||
/**
|
||||
* Use this class if you need a more than one remote server on a specific node.
|
||||
|
|
@ -418,26 +429,33 @@ class RemoteServerHandler(
|
|||
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
log.debug("Remote client connected to [%s]", server.name)
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name)
|
||||
if (RemoteServer.SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
|
||||
// Begin handshake.
|
||||
sslHandler.handshake().addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) {
|
||||
openChannels.add(future.getChannel)
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
|
||||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else server.notifyListeners(RemoteServerClientConnected(server))
|
||||
} else server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
|
||||
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name)
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
log.debug("Remote client disconnected from [%s]", server.name)
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server))
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.debug("Remote client [%s] channel closed from [%s]", clientAddress, server.name)
|
||||
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
|
||||
}
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
|
|
@ -463,6 +481,12 @@ class RemoteServerHandler(
|
|||
server.notifyListeners(RemoteServerError(event.getCause, server))
|
||||
}
|
||||
|
||||
private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = {
|
||||
val remoteAddress = ctx.getChannel.getRemoteAddress
|
||||
if (remoteAddress.isInstanceOf[InetSocketAddress]) Some(remoteAddress.asInstanceOf[InetSocketAddress])
|
||||
else None
|
||||
}
|
||||
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
|
||||
request.getActorInfo.getActorType match {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue