diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 97b85895ed..5c7b802f1c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -155,6 +155,33 @@ akka { # (O) Maximum time window that a client should try to reconnect for reconnection-time-window = 600s + + # (I&O) Enable SSL/TLS encryption. + # This must be enabled on both the client and server to work. + enable-ssl = off + + # (I) This is the Java Key Store used by the server connection + ssl-key-store = "keystore" + + # This password is used for decrypting the key store + ssl-key-store-password = "changeme" + + # (O) This is the Java Key Store used by the client connection + ssl-trust-store = "truststore" + + # This password is used for decrypting the trust store + ssl-trust-store-password = "changeme" + + # (I&O) Protocol to use for SSL encryption, choose from: + # Java 6 & 7: + # SSLv3, TLSv1, + # Java 7: + # TLSv1.1, TLSv1.2 + ssl-protocol = "TLSv1" + + # You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256 + # More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider + ssl-supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"] } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index c1737831da..b1b37a08f8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -18,6 +18,19 @@ import akka.actor.{ Address, ActorRef } import akka.AkkaException import akka.event.Logging import akka.util.Switch +import akka.actor.ActorRef +import org.jboss.netty.channel.ChannelFutureListener +import akka.remote.RemoteClientWriteFailed +import java.net.InetAddress +import java.security.{ SecureRandom, KeyStore, GeneralSecurityException } +import org.jboss.netty.util.TimerTask +import org.jboss.netty.util.Timeout +import java.util.concurrent.TimeUnit +import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler } +import java.security.cert.X509Certificate +import javax.net.ssl.{ SSLContext, X509TrustManager, TrustManagerFactory, TrustManager } +import org.jboss.netty.handler.ssl.SslHandler +import java.io.FileInputStream /** * This is the abstract baseclass for netty remote clients, currently there's only an @@ -310,6 +323,81 @@ private[akka] class PassiveRemoteClient(val currentChannel: Channel, netty: NettyRemoteTransport, remoteAddress: Address) extends RemoteClient(netty, remoteAddress) { + import client.netty.settings + + def initTLS(trustStorePath: String, trustStorePassword: String): Option[SSLContext] = { + if (trustStorePath != null && trustStorePassword != null) + try { + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + val stream = new FileInputStream(trustStorePath) + trustStore.load(stream, trustStorePassword.toCharArray) + trustManagerFactory.init(trustStore); + val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers + + val sslContext = SSLContext.getInstance("TLS") + sslContext.init(null, trustManagers, new SecureRandom()) + Some(sslContext) + } catch { + case e: GeneralSecurityException ⇒ { + client.log.error(e, "TLS connection could not be established. TLS is not used!"); + None + } + } + else { + client.log.error("TLS connection could not be established because trust store details are missing") + None + } + } + + def getSSLHandler_? : Option[SslHandler] = { + val sslContext: Option[SSLContext] = { + if (settings.EnableSSL) { + client.log.debug("Client SSL is enabled, initialising ...") + initTLS(settings.SSLTrustStore.get, settings.SSLTrustStorePassword.get) + } else { + None + } + } + if (sslContext.isDefined) { + client.log.debug("Client Using SSL context to create SSLEngine ...") + val sslEngine = sslContext.get.createSSLEngine + sslEngine.setUseClientMode(true) + sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString)) + Some(new SslHandler(sslEngine)) + } else { + None + } + } + + def getPipeline: ChannelPipeline = { + val sslHandler = getSSLHandler_? + val timeout = new IdleStateHandler(client.netty.timer, + settings.ReadTimeout.toSeconds.toInt, + settings.WriteTimeout.toSeconds.toInt, + settings.AllTimeout.toSeconds.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val messageDec = new RemoteMessageDecoder + val messageEnc = new RemoteMessageEncoder(client.netty) + val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, localAddress, client.netty.timer, client) + + val stages: List[ChannelHandler] = timeout :: lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: remoteClient :: Nil + if (sslHandler.isDefined) { + client.log.debug("Client creating pipeline with SSL handler...") + new StaticChannelPipeline(sslHandler.get :: stages: _*) + } else { + client.log.debug("Client creating pipeline without SSL handler...") + new StaticChannelPipeline(stages: _*) + } + } +} + +class PassiveRemoteClient(val currentChannel: Channel, + netty: NettyRemoteTransport, + remoteAddress: Address) + extends RemoteClient(netty, remoteAddress) { + def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { netty.notifyListeners(RemoteClientStarted(netty, remoteAddress)) log.debug("Starting remote client connection to [{}]", remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index cc3310fada..ace45677f1 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -5,6 +5,7 @@ package akka.remote.netty import java.net.InetSocketAddress import java.util.concurrent.Executors +import java.io.FileNotFoundException import scala.Option.option2Iterable import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel.ChannelHandler.Sharable @@ -12,13 +13,17 @@ import org.jboss.netty.channel.group.ChannelGroup import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } import org.jboss.netty.handler.execution.ExecutionHandler -import akka.event.Logging import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage } import akka.actor.Address import java.net.InetAddress import akka.actor.ActorSystemImpl import org.jboss.netty.channel._ +import org.jboss.netty.handler.ssl.SslHandler +import java.security.{ SecureRandom, KeyStore, GeneralSecurityException } +import javax.net.ssl.{ KeyManagerFactory, SSLContext } +import java.io.FileInputStream +import akka.event.{ LoggingAdapter, Logging } private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { @@ -26,6 +31,8 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { val ip = InetAddress.getByName(settings.Hostname) + lazy val log = Logging(netty.system, "NettyRemoteServer(" + ip + ")") + private val factory = settings.UseDispatcherForIO match { case Some(id) ⇒ @@ -80,6 +87,81 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { } } +class RemoteServerPipelineFactory( + val openChannels: ChannelGroup, + val executionHandler: ExecutionHandler, + val netty: NettyRemoteTransport, + val log: LoggingAdapter) extends ChannelPipelineFactory { + + import netty.settings + + def initTLS(keyStorePath: String, keyStorePassword: String): Option[SSLContext] = { + if (keyStorePath != null && keyStorePassword != null) { + try { + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + val stream = new FileInputStream(keyStorePath) + keyStore.load(stream, keyStorePassword.toCharArray) + factory.init(keyStore, keyStorePassword.toCharArray) + val sslContext = SSLContext.getInstance(settings.SSLProtocol.get) + sslContext.init(factory.getKeyManagers, null, new SecureRandom()) + Some(sslContext) + } catch { + case e: FileNotFoundException ⇒ { + log.error(e, "TLS connection could not be established because keystore could not be loaded") + None + } + case e: GeneralSecurityException ⇒ { + log.error(e, "TLS connection could not be established") + None + } + } + } else { + log.error("TLS connection could not be established because key store details are missing") + None + } + } + + def getSSLHandler_? : Option[SslHandler] = { + val sslContext: Option[SSLContext] = { + if (settings.EnableSSL) { + log.debug("SSL is enabled, initialising...") + initTLS(settings.SSLKeyStore.get, settings.SSLKeyStorePassword.get) + } else { + None + } + } + if (sslContext.isDefined) { + log.debug("Using SSL context to create SSLEngine...") + val sslEngine = sslContext.get.createSSLEngine + sslEngine.setUseClientMode(false) + sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString)) + Some(new SslHandler(sslEngine)) + } else { + None + } + } + + def getPipeline: ChannelPipeline = { + val sslHandler = getSSLHandler_? + val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val messageDec = new RemoteMessageDecoder + val messageEnc = new RemoteMessageEncoder(netty) + + val authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil + val remoteServer = new RemoteServerHandler(openChannels, netty) + val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil + if (sslHandler.isDefined) { + log.debug("Creating pipeline with SSL handler...") + new StaticChannelPipeline(sslHandler.get :: stages: _*) + } else { + log.debug("Creating pipeline without SSL handler...") + new StaticChannelPipeline(stages: _*) + } + } +} + @ChannelHandler.Sharable private[akka] class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler { val authenticated = new AnyRef diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 64bc184408..d753a743b6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -73,4 +73,45 @@ private[akka] class NettySettings(config: Config, val systemName: String) { case sz ⇒ sz } -} + val SSLKeyStore = getString("ssl-key-store") match { + case "" ⇒ None + case keyStore ⇒ Some(keyStore) + } + + val SSLTrustStore = getString("ssl-trust-store") match { + case "" ⇒ None + case trustStore ⇒ Some(trustStore) + } + + val SSLKeyStorePassword = getString("ssl-key-store-password") match { + case "" ⇒ None + case password ⇒ Some(password) + } + + val SSLTrustStorePassword = getString("ssl-trust-store-password") match { + case "" ⇒ None + case password ⇒ Some(password) + } + + val SSLSupportedAlgorithms = getStringList("ssl-supported-algorithms") + + val SSLProtocol = getString("ssl-protocol") match { + case "" ⇒ None + case protocol ⇒ Some(protocol) + } + + val EnableSSL = { + val enableSSL = getBoolean("enable-ssl") + if (enableSSL) { + if (SSLProtocol.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.enable-ssl is turned on but no protocol is defined in 'akka.remote.netty.ssl-protocol'.") + if (SSLKeyStore.isEmpty && SSLTrustStore.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.enable-ssl is turned on but no key/trust store is defined in 'akka.remote.netty.ssl-key-store' / 'akka.remote.netty.ssl-trust-store'.") + if (SSLKeyStore.isDefined && SSLKeyStorePassword.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.ssl-key-store' is defined but no key-store password is defined in 'akka.remote.netty.ssl-key-store-password'.") + if (SSLTrustStore.isDefined && SSLTrustStorePassword.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.ssl-trust-store' is defined but no trust-store password is defined in 'akka.remote.netty.ssl-trust-store-password'.") + } + enableSSL + } +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala new file mode 100644 index 0000000000..0d429043c2 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala @@ -0,0 +1,46 @@ +package akka.remote + +import akka.testkit._ +import akka.actor._ +import com.typesafe.config._ +import akka.actor.ExtendedActorSystem +import akka.util.duration._ +import akka.util.Duration +import akka.remote.netty.NettyRemoteTransport +import java.util.ArrayList + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978ConfigSpec extends AkkaSpec(""" +akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port = 12345 + } + actor.deployment { + /blub.remote = "akka://remote-sys@localhost:12346" + /looker/child.remote = "akka://remote-sys@localhost:12346" + /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" + } +} +""") with ImplicitSender with DefaultTimeout { + + "SSL Remoting" must { + "be able to parse these extra Netty config elements" in { + val settings = + system.asInstanceOf[ExtendedActorSystem] + .provider.asInstanceOf[RemoteActorRefProvider] + .transport.asInstanceOf[NettyRemoteTransport] + .settings + import settings._ + + EnableSSL must be(false) + SSLKeyStore must be(Some("keystore")) + SSLKeyStorePassword must be(Some("changeme")) + SSLTrustStore must be(Some("truststore")) + SSLTrustStorePassword must be(Some("changeme")) + SSLProtocol must be(Some("TLSv1")) + SSLSupportedAlgorithms must be(java.util.Arrays.asList("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) + } + } +}