diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 6d72a40a40..3c9e990ed2 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -145,11 +145,22 @@ object RemoteServer { * Life-cycle events for RemoteServer. */ sealed trait RemoteServerLifeCycleEvent -case class RemoteServerError(@BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent -case class RemoteServerShutdown(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent -case class RemoteServerStarted(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent -case class RemoteServerClientConnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent -case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerStarted( + @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerShutdown( + @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerError( + @BeanProperty val cause: Throwable, + @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerClientConnected( + @BeanProperty val server: RemoteServer, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientDisconnected( + @BeanProperty val server: RemoteServer, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientClosed( + @BeanProperty val server: RemoteServer, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent /** * Use this class if you need a more than one remote server on a specific node. @@ -418,26 +429,33 @@ class RemoteServerHandler( override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - log.debug("Remote client connected to [%s]", server.name) + val clientAddress = getClientAddress(ctx) + log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name) if (RemoteServer.SECURE) { val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - // Begin handshake. sslHandler.handshake().addListener(new ChannelFutureListener { def operationComplete(future: ChannelFuture): Unit = { if (future.isSuccess) { openChannels.add(future.getChannel) - server.notifyListeners(RemoteServerClientConnected(server)) + server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) } else future.getChannel.close } }) - } else server.notifyListeners(RemoteServerClientConnected(server)) + } else server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication } + override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + val clientAddress = getClientAddress(ctx) + log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) + server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) + } + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - log.debug("Remote client disconnected from [%s]", server.name) - server.notifyListeners(RemoteServerClientDisconnected(server)) + val clientAddress = getClientAddress(ctx) + log.debug("Remote client [%s] channel closed from [%s]", clientAddress, server.name) + server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -463,6 +481,12 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerError(event.getCause, server)) } + private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = { + val remoteAddress = ctx.getChannel.getRemoteAddress + if (remoteAddress.isInstanceOf[InetSocketAddress]) Some(remoteAddress.asInstanceOf[InetSocketAddress]) + else None + } + private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) request.getActorInfo.getActorType match {