Added changes to Netty pipelines to support SSL/TLS. Fixes #1978
1) Netty server and client pipelines updated to conditionally load keystore/truststore if SSL is enabled in the config 2) Supports any available encryption protocol via 'ssl-protocol' 3) Supported encryption algorithms are specified via 'ssl-encryption-protocol' config key Conflicts: akka-remote/src/main/scala/akka/remote/netty/Client.scala akka-remote/src/main/scala/akka/remote/netty/Server.scala akka-remote/src/main/scala/akka/remote/netty/Settings.scala
This commit is contained in:
parent
35aaa220aa
commit
dbc3d91395
5 changed files with 286 additions and 2 deletions
|
|
@ -155,6 +155,33 @@ akka {
|
|||
|
||||
# (O) Maximum time window that a client should try to reconnect for
|
||||
reconnection-time-window = 600s
|
||||
|
||||
# (I&O) Enable SSL/TLS encryption.
|
||||
# This must be enabled on both the client and server to work.
|
||||
enable-ssl = off
|
||||
|
||||
# (I) This is the Java Key Store used by the server connection
|
||||
ssl-key-store = "keystore"
|
||||
|
||||
# This password is used for decrypting the key store
|
||||
ssl-key-store-password = "changeme"
|
||||
|
||||
# (O) This is the Java Key Store used by the client connection
|
||||
ssl-trust-store = "truststore"
|
||||
|
||||
# This password is used for decrypting the trust store
|
||||
ssl-trust-store-password = "changeme"
|
||||
|
||||
# (I&O) Protocol to use for SSL encryption, choose from:
|
||||
# Java 6 & 7:
|
||||
# SSLv3, TLSv1,
|
||||
# Java 7:
|
||||
# TLSv1.1, TLSv1.2
|
||||
ssl-protocol = "TLSv1"
|
||||
|
||||
# You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256
|
||||
# More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider
|
||||
ssl-supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,19 @@ import akka.actor.{ Address, ActorRef }
|
|||
import akka.AkkaException
|
||||
import akka.event.Logging
|
||||
import akka.util.Switch
|
||||
import akka.actor.ActorRef
|
||||
import org.jboss.netty.channel.ChannelFutureListener
|
||||
import akka.remote.RemoteClientWriteFailed
|
||||
import java.net.InetAddress
|
||||
import java.security.{ SecureRandom, KeyStore, GeneralSecurityException }
|
||||
import org.jboss.netty.util.TimerTask
|
||||
import org.jboss.netty.util.Timeout
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler }
|
||||
import java.security.cert.X509Certificate
|
||||
import javax.net.ssl.{ SSLContext, X509TrustManager, TrustManagerFactory, TrustManager }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import java.io.FileInputStream
|
||||
|
||||
/**
|
||||
* This is the abstract baseclass for netty remote clients, currently there's only an
|
||||
|
|
@ -310,6 +323,81 @@ private[akka] class PassiveRemoteClient(val currentChannel: Channel,
|
|||
netty: NettyRemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClient(netty, remoteAddress) {
|
||||
|
||||
import client.netty.settings
|
||||
|
||||
def initTLS(trustStorePath: String, trustStorePassword: String): Option[SSLContext] = {
|
||||
if (trustStorePath != null && trustStorePassword != null)
|
||||
try {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val stream = new FileInputStream(trustStorePath)
|
||||
trustStore.load(stream, trustStorePassword.toCharArray)
|
||||
trustManagerFactory.init(trustStore);
|
||||
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
|
||||
|
||||
val sslContext = SSLContext.getInstance("TLS")
|
||||
sslContext.init(null, trustManagers, new SecureRandom())
|
||||
Some(sslContext)
|
||||
} catch {
|
||||
case e: GeneralSecurityException ⇒ {
|
||||
client.log.error(e, "TLS connection could not be established. TLS is not used!");
|
||||
None
|
||||
}
|
||||
}
|
||||
else {
|
||||
client.log.error("TLS connection could not be established because trust store details are missing")
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
def getSSLHandler_? : Option[SslHandler] = {
|
||||
val sslContext: Option[SSLContext] = {
|
||||
if (settings.EnableSSL) {
|
||||
client.log.debug("Client SSL is enabled, initialising ...")
|
||||
initTLS(settings.SSLTrustStore.get, settings.SSLTrustStorePassword.get)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
if (sslContext.isDefined) {
|
||||
client.log.debug("Client Using SSL context to create SSLEngine ...")
|
||||
val sslEngine = sslContext.get.createSSLEngine
|
||||
sslEngine.setUseClientMode(true)
|
||||
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
|
||||
Some(new SslHandler(sslEngine))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val sslHandler = getSSLHandler_?
|
||||
val timeout = new IdleStateHandler(client.netty.timer,
|
||||
settings.ReadTimeout.toSeconds.toInt,
|
||||
settings.WriteTimeout.toSeconds.toInt,
|
||||
settings.AllTimeout.toSeconds.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val messageDec = new RemoteMessageDecoder
|
||||
val messageEnc = new RemoteMessageEncoder(client.netty)
|
||||
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, localAddress, client.netty.timer, client)
|
||||
|
||||
val stages: List[ChannelHandler] = timeout :: lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: remoteClient :: Nil
|
||||
if (sslHandler.isDefined) {
|
||||
client.log.debug("Client creating pipeline with SSL handler...")
|
||||
new StaticChannelPipeline(sslHandler.get :: stages: _*)
|
||||
} else {
|
||||
client.log.debug("Client creating pipeline without SSL handler...")
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class PassiveRemoteClient(val currentChannel: Channel,
|
||||
netty: NettyRemoteTransport,
|
||||
remoteAddress: Address)
|
||||
extends RemoteClient(netty, remoteAddress) {
|
||||
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn {
|
||||
netty.notifyListeners(RemoteClientStarted(netty, remoteAddress))
|
||||
log.debug("Starting remote client connection to [{}]", remoteAddress)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package akka.remote.netty
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.Executors
|
||||
import java.io.FileNotFoundException
|
||||
import scala.Option.option2Iterable
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
import org.jboss.netty.channel.ChannelHandler.Sharable
|
||||
|
|
@ -12,13 +13,17 @@ import org.jboss.netty.channel.group.ChannelGroup
|
|||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
|
||||
import org.jboss.netty.handler.execution.ExecutionHandler
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
|
||||
import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage }
|
||||
import akka.actor.Address
|
||||
import java.net.InetAddress
|
||||
import akka.actor.ActorSystemImpl
|
||||
import org.jboss.netty.channel._
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import java.security.{ SecureRandom, KeyStore, GeneralSecurityException }
|
||||
import javax.net.ssl.{ KeyManagerFactory, SSLContext }
|
||||
import java.io.FileInputStream
|
||||
import akka.event.{ LoggingAdapter, Logging }
|
||||
|
||||
private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
||||
|
||||
|
|
@ -26,6 +31,8 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
|||
|
||||
val ip = InetAddress.getByName(settings.Hostname)
|
||||
|
||||
lazy val log = Logging(netty.system, "NettyRemoteServer(" + ip + ")")
|
||||
|
||||
private val factory =
|
||||
settings.UseDispatcherForIO match {
|
||||
case Some(id) ⇒
|
||||
|
|
@ -80,6 +87,81 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
|||
}
|
||||
}
|
||||
|
||||
class RemoteServerPipelineFactory(
|
||||
val openChannels: ChannelGroup,
|
||||
val executionHandler: ExecutionHandler,
|
||||
val netty: NettyRemoteTransport,
|
||||
val log: LoggingAdapter) extends ChannelPipelineFactory {
|
||||
|
||||
import netty.settings
|
||||
|
||||
def initTLS(keyStorePath: String, keyStorePassword: String): Option[SSLContext] = {
|
||||
if (keyStorePath != null && keyStorePassword != null) {
|
||||
try {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val stream = new FileInputStream(keyStorePath)
|
||||
keyStore.load(stream, keyStorePassword.toCharArray)
|
||||
factory.init(keyStore, keyStorePassword.toCharArray)
|
||||
val sslContext = SSLContext.getInstance(settings.SSLProtocol.get)
|
||||
sslContext.init(factory.getKeyManagers, null, new SecureRandom())
|
||||
Some(sslContext)
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒ {
|
||||
log.error(e, "TLS connection could not be established because keystore could not be loaded")
|
||||
None
|
||||
}
|
||||
case e: GeneralSecurityException ⇒ {
|
||||
log.error(e, "TLS connection could not be established")
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.error("TLS connection could not be established because key store details are missing")
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
def getSSLHandler_? : Option[SslHandler] = {
|
||||
val sslContext: Option[SSLContext] = {
|
||||
if (settings.EnableSSL) {
|
||||
log.debug("SSL is enabled, initialising...")
|
||||
initTLS(settings.SSLKeyStore.get, settings.SSLKeyStorePassword.get)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
if (sslContext.isDefined) {
|
||||
log.debug("Using SSL context to create SSLEngine...")
|
||||
val sslEngine = sslContext.get.createSSLEngine
|
||||
sslEngine.setUseClientMode(false)
|
||||
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
|
||||
Some(new SslHandler(sslEngine))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val sslHandler = getSSLHandler_?
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val messageDec = new RemoteMessageDecoder
|
||||
val messageEnc = new RemoteMessageEncoder(netty)
|
||||
|
||||
val authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil
|
||||
val remoteServer = new RemoteServerHandler(openChannels, netty)
|
||||
val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil
|
||||
if (sslHandler.isDefined) {
|
||||
log.debug("Creating pipeline with SSL handler...")
|
||||
new StaticChannelPipeline(sslHandler.get :: stages: _*)
|
||||
} else {
|
||||
log.debug("Creating pipeline without SSL handler...")
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private[akka] class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler {
|
||||
val authenticated = new AnyRef
|
||||
|
|
|
|||
|
|
@ -73,4 +73,45 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
|
|||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val SSLKeyStore = getString("ssl-key-store") match {
|
||||
case "" ⇒ None
|
||||
case keyStore ⇒ Some(keyStore)
|
||||
}
|
||||
|
||||
val SSLTrustStore = getString("ssl-trust-store") match {
|
||||
case "" ⇒ None
|
||||
case trustStore ⇒ Some(trustStore)
|
||||
}
|
||||
|
||||
val SSLKeyStorePassword = getString("ssl-key-store-password") match {
|
||||
case "" ⇒ None
|
||||
case password ⇒ Some(password)
|
||||
}
|
||||
|
||||
val SSLTrustStorePassword = getString("ssl-trust-store-password") match {
|
||||
case "" ⇒ None
|
||||
case password ⇒ Some(password)
|
||||
}
|
||||
|
||||
val SSLSupportedAlgorithms = getStringList("ssl-supported-algorithms")
|
||||
|
||||
val SSLProtocol = getString("ssl-protocol") match {
|
||||
case "" ⇒ None
|
||||
case protocol ⇒ Some(protocol)
|
||||
}
|
||||
|
||||
val EnableSSL = {
|
||||
val enableSSL = getBoolean("enable-ssl")
|
||||
if (enableSSL) {
|
||||
if (SSLProtocol.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.enable-ssl is turned on but no protocol is defined in 'akka.remote.netty.ssl-protocol'.")
|
||||
if (SSLKeyStore.isEmpty && SSLTrustStore.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.enable-ssl is turned on but no key/trust store is defined in 'akka.remote.netty.ssl-key-store' / 'akka.remote.netty.ssl-trust-store'.")
|
||||
if (SSLKeyStore.isDefined && SSLKeyStorePassword.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.ssl-key-store' is defined but no key-store password is defined in 'akka.remote.netty.ssl-key-store-password'.")
|
||||
if (SSLTrustStore.isDefined && SSLTrustStorePassword.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.ssl-trust-store' is defined but no trust-store password is defined in 'akka.remote.netty.ssl-trust-store-password'.")
|
||||
}
|
||||
enableSSL
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
package akka.remote
|
||||
|
||||
import akka.testkit._
|
||||
import akka.actor._
|
||||
import com.typesafe.config._
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.util.duration._
|
||||
import akka.util.Duration
|
||||
import akka.remote.netty.NettyRemoteTransport
|
||||
import java.util.ArrayList
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978ConfigSpec extends AkkaSpec("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty {
|
||||
hostname = localhost
|
||||
port = 12345
|
||||
}
|
||||
actor.deployment {
|
||||
/blub.remote = "akka://remote-sys@localhost:12346"
|
||||
/looker/child.remote = "akka://remote-sys@localhost:12346"
|
||||
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
|
||||
}
|
||||
}
|
||||
""") with ImplicitSender with DefaultTimeout {
|
||||
|
||||
"SSL Remoting" must {
|
||||
"be able to parse these extra Netty config elements" in {
|
||||
val settings =
|
||||
system.asInstanceOf[ExtendedActorSystem]
|
||||
.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
.transport.asInstanceOf[NettyRemoteTransport]
|
||||
.settings
|
||||
import settings._
|
||||
|
||||
EnableSSL must be(false)
|
||||
SSLKeyStore must be(Some("keystore"))
|
||||
SSLKeyStorePassword must be(Some("changeme"))
|
||||
SSLTrustStore must be(Some("truststore"))
|
||||
SSLTrustStorePassword must be(Some("changeme"))
|
||||
SSLProtocol must be(Some("TLSv1"))
|
||||
SSLSupportedAlgorithms must be(java.util.Arrays.asList("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue