diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala index 0ae5971866..ee4e5cf809 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -44,26 +44,4 @@ object RemoteServerSettings { } val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) - - val SECURE = { - /*if (config.getBool("akka.remote.ssl.service",false)) { - val properties = List( - ("key-store-type" , "keyStoreType"), - ("key-store" , "keyStore"), - ("key-store-pass" , "keyStorePassword"), - ("trust-store-type", "trustStoreType"), - ("trust-store" , "trustStore"), - ("trust-store-pass", "trustStorePassword") - ).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2)) - - // If property is not set, and we have a value from our akka.conf, use that value - for { - p <- properties if System.getProperty(p._2) eq null - c <- config.getString(p._1) - } System.setProperty(p._2, c) - - if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl") - true - } else */false - } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d4f56dce23..0e85bc70b4 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -361,28 +361,18 @@ class ActiveRemoteClientPipelineFactory( client: ActiveRemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) - - lazy val engine = { - val e = RemoteServerSslContext.client.createSSLEngine() - e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible? - e.setUseClientMode(true) - e - } - - val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join() - val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) - val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) + val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) - case _ => (join(), join()) + val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { + case "zlib" => (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) + case _ => (Nil,Nil) } val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) + val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil new StaticChannelPipeline(stages: _*) } } @@ -465,20 +455,8 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - def connect = { - client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) - client.resetReconnectionTimeWindow - } - - if (RemoteServerSettings.SECURE) { - val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - sslHandler.handshake.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture): Unit = { - if (future.isSuccess) connect - else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress) - } - }) - } else connect + client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) + client.resetReconnectionTimeWindow } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -777,27 +755,17 @@ class RemoteServerPipelineFactory( import RemoteServerSettings._ def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) - - lazy val engine = { - val e = RemoteServerSslContext.server.createSSLEngine() - e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible? - e.setUseClientMode(false) - e - } - - val ssl = if(SECURE) join(new SslHandler(engine)) else join() val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val (enc, dec) = COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) - case _ => (join(), join()) + case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) + case _ => (Nil, Nil) } val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) - val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) + val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } @@ -847,20 +815,7 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) - if (SECURE) { - val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - // Begin handshake. - sslHandler.handshake().addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture): Unit = { - if (future.isSuccess) { - openChannels.add(future.getChannel) - server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) - } else future.getChannel.close - } - }) - } else { - server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) - } + server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication }