Removing legacy, non-functional, SSL support from akka-remote
This commit is contained in:
parent
39caa297ac
commit
f8e9c61134
2 changed files with 13 additions and 80 deletions
|
|
@ -361,28 +361,18 @@ class ActiveRemoteClientPipelineFactory(
|
|||
client: ActiveRemoteClient) extends ChannelPipelineFactory {
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
|
||||
|
||||
lazy val engine = {
|
||||
val e = RemoteServerSslContext.client.createSSLEngine()
|
||||
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
|
||||
e.setUseClientMode(true)
|
||||
e
|
||||
}
|
||||
|
||||
val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join()
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
|
||||
case _ => (Nil,Nil)
|
||||
}
|
||||
|
||||
val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
|
||||
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
|
||||
val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -465,20 +455,8 @@ class ActiveRemoteClientHandler(
|
|||
}
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
def connect = {
|
||||
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
||||
if (RemoteServerSettings.SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
sslHandler.handshake.addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) connect
|
||||
else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress)
|
||||
}
|
||||
})
|
||||
} else connect
|
||||
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
|
|
@ -777,27 +755,17 @@ class RemoteServerPipelineFactory(
|
|||
import RemoteServerSettings._
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
|
||||
|
||||
lazy val engine = {
|
||||
val e = RemoteServerSslContext.server.createSSLEngine()
|
||||
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
|
||||
e.setUseClientMode(false)
|
||||
e
|
||||
}
|
||||
|
||||
val ssl = if(SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc, dec) = COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
|
||||
case _ => (Nil, Nil)
|
||||
}
|
||||
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
|
||||
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
|
||||
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -847,20 +815,7 @@ class RemoteServerHandler(
|
|||
val clientAddress = getClientAddress(ctx)
|
||||
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
|
||||
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
|
||||
if (SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
// Begin handshake.
|
||||
sslHandler.handshake().addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) {
|
||||
openChannels.add(future.getChannel)
|
||||
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
|
||||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else {
|
||||
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
|
||||
}
|
||||
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
|
||||
if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue