From 4dab3252bdfb6eedae6dcf72b1a9ec2b689e36d2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 16 Feb 2018 10:04:45 +0100 Subject: [PATCH] Add Artery TCP/TLS transport, #24390 * configurable SSLEngineProvider * ssl configuration * add internal outgoingTlsConnectionWithSSLEngine and bindTlsWithSSLEngine in stream Tcp * TLS tests * update remote security section in reference documentation --- akka-docs/src/main/paradox/remoting-artery.md | 96 ++++++++- akka-docs/src/main/paradox/remoting.md | 9 +- .../akka/remote/RemotingMultiNodeSpec.scala | 2 + .../artery/ArteryFailedToBindSpec.scala | 2 +- akka-remote/src/main/resources/reference.conf | 79 +++++++- .../akka/remote/RemoteActorRefProvider.scala | 3 +- .../akka/remote/artery/ArterySettings.scala | 9 +- .../artery/tcp/ArteryTcpTransport.scala | 48 ++++- .../remote/artery/tcp/SSLEngineProvider.scala | 191 ++++++++++++++++++ .../remote/artery/ArterySpecSupport.scala | 16 ++ .../artery/RemoteSendConsistencySpec.scala | 14 ++ .../akka/remote/artery/tcp/TlsTcpSpec.scala | 167 +++++++++++++++ .../main/scala/akka/stream/scaladsl/Tcp.scala | 44 +++- 13 files changed, 655 insertions(+), 25 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index 0053df1871..dcce515698 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -345,16 +345,100 @@ Actor classes not included in the whitelist will not be allowed to be remote dep ## Remote Security -An `ActorSystem` should not be exposed via Akka Remote (Artery) over plain Aeron/UDP to an untrusted network (e.g. internet). -It should be protected by network security, such as a firewall. There is currently no support for encryption with Artery -so if network security is not considered as enough protection the classic remoting with -@ref:[TLS and mutual authentication](remoting.md#remote-tls) should be used. +An `ActorSystem` should not be exposed via Akka Remote (Artery) over plain Aeron/UDP or TCP to an untrusted +network (e.g. Internet). It should be protected by network security, such as a firewall. If that is not considered +as enough protection [TLS with mutual authentication](#remote-tls) should be enabled. -Best practice is that Akka remoting nodes should only be accessible from the adjacent network. +Best practice is that Akka remoting nodes should only be accessible from the adjacent network. Note that if TLS is +enabled with mutual authentication there is still a risk that an attacker can gain access to a valid certificate by +compromising any node with certificates issued by the same internal PKI tree. -It is also security best practice to @ref:[disable the Java serializer](#disabling-the-java-serializer) because of +It is also security best-practice to [disable the Java serializer](#disable-java-serializer) because of its multiple [known attack surfaces](https://community.hpe.com/t5/Security-Research/The-perils-of-Java-deserialization/ba-p/6838995). + +### Configuring SSL/TLS for Akka Remoting + +SSL can be used as the remote transport by using the `tls-tcp` transport: + +``` +akka.remote.artery { + transport = tls-tcp +} +``` + +Next the actual SSL/TLS parameters have to be configured: + +``` +akka.remote.artery { + transport = tls-tcp + + ssl.config-ssl-engine { + key-store = "/example/path/to/mykeystore.jks" + trust-store = "/example/path/to/mytruststore.jks" + + key-store-password = ${SSL_KEY_STORE_PASSWORD} + key-password = ${SSL_KEY_PASSWORD} + trust-store-password = ${SSL_TRUST_STORE_PASSWORD} + + protocol = "TLSv1.2" + + enabled-algorithms = [TLS_DHE_RSA_WITH_AES_128_GCM_SHA256] + + random-number-generator = "AES128CounterSecureRNG" + + hostname-verification = on + } +} +``` + +Always use [substitution from environment variables](https://github.com/lightbend/config#optional-system-or-env-variable-overrides) +for passwords. Don't define real passwords in config files. + +According to [RFC 7525](https://tools.ietf.org/html/rfc7525) the recommended algorithms to use with TLS 1.2 (as of writing this document) are: + + * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256 + * TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 + * TLS_DHE_RSA_WITH_AES_256_GCM_SHA384 + * TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 + +You should always check the latest information about security and algorithm recommendations though before you configure your system. + +Creating and working with keystores and certificates is well documented in the +[Generating X.509 Certificates](http://lightbend.github.io/ssl-config/CertificateGeneration.html#using-keytool) +section of Lightbend's SSL-Config library. + +Since an Akka remoting is inherently @ref:[peer-to-peer](general/remoting.md#symmetric-communication) both the key-store as well as trust-store +need to be configured on each remoting node participating in the cluster. + +The official [Java Secure Socket Extension documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jsse/JSSERefGuide.html) +as well as the [Oracle documentation on creating KeyStore and TrustStores](https://docs.oracle.com/cd/E19509-01/820-3503/6nf1il6er/index.html) +are both great resources to research when setting up security on the JVM. Please consult those resources when troubleshooting +and configuring SSL. + +Mutual authentication between TLS peers is enabled by default. Mutual authentication means that the the passive side +(the TLS server side) of a connection will also request and verify a certificate from the connecting peer. +Without this mode only the client side is requesting and verifying certificates. While Akka is a peer-to-peer +technology, each connection between nodes starts out from one side (the "client") towards the other (the "server"). + +Note that if TLS is enabled with mutual authentication there is still a risk that an attacker can gain access to a +valid certificate by compromising any node with certificates issued by the same internal PKI tree. + +It's recommended that you enable hostname verification with +`akka.remote.artery.ssl.config-ssl-engine.hostname-verification=on`. +When enabled it will verify that the destination hostname matches the hostname in the peer's credentials. +An application could be exploited with URL spoofing if the hostname is not verified. + +See also a description of the settings in the @ref:[Remote Configuration](#remote-configuration-artery) section. + +@@@ note + +When using SHA1PRNG on Linux it's recommended specify `-Djava.security.egd=file:/dev/urandom` as argument +to the JVM to prevent blocking. It is NOT as secure because it reuses the seed. + +@@@ + + ### Untrusted Mode As soon as an actor system can connect to another remotely, it may in principle diff --git a/akka-docs/src/main/paradox/remoting.md b/akka-docs/src/main/paradox/remoting.md index 846b448611..f536833cf4 100644 --- a/akka-docs/src/main/paradox/remoting.md +++ b/akka-docs/src/main/paradox/remoting.md @@ -550,9 +550,9 @@ akka { key-store = "/example/path/to/mykeystore.jks" trust-store = "/example/path/to/mytruststore.jks" - key-store-password = "changeme" - key-password = "changeme" - trust-store-password = "changeme" + key-store-password = ${SSL_KEY_STORE_PASSWORD} + key-password = ${SSL_KEY_PASSWORD} + trust-store-password = ${SSL_TRUST_STORE_PASSWORD} protocol = "TLSv1.2" @@ -565,6 +565,9 @@ akka { } ``` +Always use [substitution from environment variables](https://github.com/lightbend/config#optional-system-or-env-variable-overrides) +for passwords. Don't define real passwords in config files. + According to [RFC 7525](https://tools.ietf.org/html/rfc7525) the recommended algorithms to use with TLS 1.2 (as of writing this document) are: * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256 diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala index 0140cd337d..fee84413a9 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala @@ -5,6 +5,7 @@ package akka.remote import java.util.UUID +import akka.remote.artery.ArterySpecSupport import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.testkit.{ DefaultTimeout, ImplicitSender } import com.typesafe.config.ConfigFactory @@ -21,6 +22,7 @@ object RemotingMultiNodeSpec { destination=target/flight-recorder-${UUID.randomUUID().toString}.afr } """) + .withFallback(ArterySpecSupport.tlsConfig) // TLS only used if transport=tls-tcp } diff --git a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala index eae1d91d5d..03cd968202 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala @@ -37,7 +37,7 @@ class ArteryFailedToBindSpec extends WordSpec with Matchers { RARP(as).provider.transport.asInstanceOf[ArteryTransport].settings.Transport match { case ArterySettings.AeronUpd ⇒ ex.getMessage should ===("Inbound Aeron channel is in errored state. See Aeron logs for details.") - case ArterySettings.Tcp ⇒ + case ArterySettings.Tcp | ArterySettings.TlsTcp ⇒ ex.getMessage should startWith("Failed to bind TCP") } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index dabf1562ca..67a51111a3 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -732,7 +732,7 @@ akka { # Select the underlying transport implementation. # - # Possible values: aeron-udp, tcp + # Possible values: aeron-udp, tcp, tls-tcp # # The Aeron (UDP) transport is a high performance transport and should be used for systems # that require high throughput and low latency. It is using more CPU than TCP when the @@ -948,7 +948,7 @@ akka { system-message-resend-interval = 1 second # Timeout of establishing outbound connections. - # Only used when transport is tcp + # Only used when transport is tcp or tls-tcp. connection-timeout = 5 seconds # The timeout for outbound associations to perform the handshake. @@ -985,7 +985,7 @@ akka { inbound-max-restarts = 5 # Retry outbound connection after this backoff. - # Only used when transport is tcp + # Only used when transport is tcp or tls-tcp. outbound-restart-backoff = 1 second # See 'outbound-max-restarts' @@ -1064,6 +1064,79 @@ akka { # it doesn't have to be thread-safe. # Refer to `akka.remote.artery.RemoteInstrument` for more information. instruments = ${?akka.remote.artery.advanced.instruments} [] + + } + + # SSL configuration that is used when transport=tls-tcp. + ssl { + # Factory of SSLEngine. + # Must implement akka.remote.artery.tcp.SSLEngineProvider and have a public + # constructor with an ActorSystem parameter. + # The default ConfigSSLEngineProvider is configured by properties in section + # akka.remote.artery.ssl.config-ssl-engine + ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider + + # Config of akka.remote.artery.tcp.ConfigSSLEngineProvider + config-ssl-engine { + + # This is the Java Key Store used by the server connection + key-store = "keystore" + + # This password is used for decrypting the key store + # Use substitution from environment variables for passwords. Don't define + # real passwords in config files. key-store-password=${SSL_KEY_STORE_PASSWORD} + key-store-password = "changeme" + + # This password is used for decrypting the key + # Use substitution from environment variables for passwords. Don't define + # real passwords in config files. key-password=${SSL_KEY_PASSWORD} + key-password = "changeme" + + # This is the Java Key Store used by the client connection + trust-store = "truststore" + + # This password is used for decrypting the trust store + # Use substitution from environment variables for passwords. Don't define + # real passwords in config files. trust-store-password=${SSL_TRUST_STORE_PASSWORD} + trust-store-password = "changeme" + + # Protocol to use for SSL encryption, choose from: + # TLS 1.2 is available since JDK7, and default since JDK8: + # https://blogs.oracle.com/java-platform-group/entry/java_8_will_use_tls + protocol = "TLSv1.2" + + # Example: ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"] + # You need to install the JCE Unlimited Strength Jurisdiction Policy + # Files to use AES 256. + # More info here: + # http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider + enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"] + + # There are three options, in increasing order of security: + # "" or SecureRandom => (default) + # "SHA1PRNG" => Can be slow because of blocking issues on Linux + # "AES128CounterSecureRNG" => fastest startup and based on AES encryption + # algorithm + # "AES256CounterSecureRNG" (Install JCE Unlimited Strength Jurisdiction + # Policy Files first) + # + # Setting a value here may require you to supply the appropriate cipher + # suite (see enabled-algorithms section above) + random-number-generator = "" + + # Require mutual authentication between TLS peers + # + # Without mutual authentication only the peer that actively establishes a connection (TLS client side) + # checks if the passive side (TLS server side) sends over a trusted certificate. With the flag turned on, + # the passive side will also request and verify a certificate from the connecting peer. + # + # To prevent man-in-the-middle attacks this setting is enabled by default. + require-mutual-authentication = on + + # Set this to `on` to verify hostnames with sun.security.util.HostnameChecker + hostname-verification = off + } + } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index bbf7bea5a7..c472e363f3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -211,7 +211,8 @@ private[akka] class RemoteActorRefProvider( transport = if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match { case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this) - case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this) + case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = false) + case ArterySettings.TlsTcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = true) } else new Remoting(system, this)) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 7e11ffdcaf..46f1810d8d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -64,6 +64,8 @@ private[akka] final class ArterySettings private (config: Config) { tree.insert(segments, NotUsed) } + val SSLEngineProviderClassName: String = config.getString("ssl.ssl-engine-provider") + val UntrustedMode: Boolean = getBoolean("untrusted-mode") val TrustedSelectionPaths: Set[String] = immutableSeq(getStringList("trusted-selection-paths")).toSet @@ -74,8 +76,9 @@ private[akka] final class ArterySettings private (config: Config) { val Transport: Transport = toRootLowerCase(getString("transport")) match { case AeronUpd.configName ⇒ AeronUpd case Tcp.configName ⇒ Tcp + case TlsTcp.configName ⇒ TlsTcp case other ⇒ throw new IllegalArgumentException(s"Unknown transport [$other], possible values: " + - s""""${AeronUpd.configName}", "${Tcp.configName}"""") + s""""${AeronUpd.configName}", "${Tcp.configName}", or "${TlsTcp.configName}"""") } /** @@ -228,4 +231,8 @@ private[akka] object ArterySettings { override val configName: String = "tcp" override def toString: String = configName } + object TlsTcp extends Transport { + override val configName: String = "tls-tcp" + override def toString: String = configName + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 985ab45e8f..125421b32c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -15,6 +15,7 @@ import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try + import akka.ConfigurationException import akka.Done import akka.NotUsed @@ -65,7 +66,8 @@ private[remote] object ArteryTcpTransport { /** * INTERNAL API */ -private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) +private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider, + tlsEnabled: Boolean) extends ArteryTransport(_system, _provider) { import ArteryTransport._ import ArteryTcpTransport._ @@ -77,6 +79,16 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider @volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None @volatile private var serverBinding: Option[Future[ServerBinding]] = None + private val sslEngineProvider: OptionVal[SSLEngineProvider] = + if (tlsEnabled) { + OptionVal.Some(system.dynamicAccess.createInstanceFor[SSLEngineProvider]( + settings.SSLEngineProviderClassName, + List((classOf[ActorSystem], system))).recover { + case e ⇒ throw new ConfigurationException( + s"Could not create SSLEngineProvider [${settings.SSLEngineProviderClassName}]", e) + }.get) + } else OptionVal.None + override protected def startTransport(): Unit = { // nothing specific here } @@ -94,11 +106,20 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider val remoteAddress = InetSocketAddress.createUnresolved(host, port) def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = - Tcp() - .outgoingConnection( + if (tlsEnabled) { + val sslProvider = sslEngineProvider.get + Tcp().outgoingTlsConnectionWithSSLEngine( remoteAddress, - halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false - connectTimeout = settings.Advanced.ConnectionTimeout) + createSSLEngine = () ⇒ sslProvider.createClientSSLEngine(host, port), + connectTimeout = settings.Advanced.ConnectionTimeout, + verifySession = session ⇒ optionToTry(sslProvider.verifyClientSession(host, session))) + } else { + Tcp() + .outgoingConnection( + remoteAddress, + halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false + connectTimeout = settings.Advanced.ConnectionTimeout) + } def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { val flowFactory = () ⇒ { @@ -222,10 +243,19 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider val port = localAddress.address.port.get val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = - Tcp().bind( - interface = host, - port = port, - halfClose = false) + if (tlsEnabled) { + val sslProvider = sslEngineProvider.get + Tcp().bindTlsWithSSLEngine( + interface = host, + port = port, + createSSLEngine = () ⇒ sslProvider.createServerSSLEngine(host, port), + verifySession = session ⇒ optionToTry(sslProvider.verifyServerSession(host, session))) + } else { + Tcp().bind( + interface = host, + port = port, + halfClose = false) + } serverBinding = serverBinding match { case None ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala new file mode 100644 index 0000000000..ad907d060b --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import java.io.FileNotFoundException +import java.io.IOException +import java.nio.file.Files +import java.nio.file.Paths +import java.security.GeneralSecurityException +import java.security.KeyStore +import java.security.SecureRandom +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLEngine +import javax.net.ssl.SSLSession +import javax.net.ssl.TrustManagerFactory + +import scala.util.Try + +import akka.actor.ActorSystem +import akka.annotation.ApiMayChange +import akka.annotation.InternalApi +import akka.event.LogMarker +import akka.event.Logging +import akka.event.MarkerLoggingAdapter +import akka.japi.Util.immutableSeq +import akka.remote.security.provider.AkkaProvider +import akka.stream.IgnoreComplete +import akka.stream.TLSClosing +import akka.stream.TLSRole +import com.typesafe.config.Config + +@ApiMayChange trait SSLEngineProvider { + + def createServerSSLEngine(hostname: String, port: Int): SSLEngine + + def createClientSSLEngine(hostname: String, port: Int): SSLEngine + + /** + * Verification that will be called after every successful handshake + * to verify additional session information. Return `None` if valid + * otherwise `Some` with explaining cause. + */ + def verifyClientSession(hostname: String, session: SSLSession): Option[Throwable] + + /** + * Verification that will be called after every successful handshake + * to verify additional session information. Return `None` if valid + * otherwise `Some` with explaining cause. + */ + def verifyServerSession(hostname: String, session: SSLSession): Option[Throwable] + +} + +class SslTransportException(message: String, cause: Throwable) extends RuntimeException(message, cause) + +/** + * INTERNAL API: only public via config + * Config in akka.remote.artery.ssl.config-ssl-engine + */ +@InternalApi private[akka] final class ConfigSSLEngineProvider(config: Config, log: MarkerLoggingAdapter) extends SSLEngineProvider { + + def this(system: ActorSystem) = this( + system.settings.config.getConfig("akka.remote.artery.ssl.config-ssl-engine"), + Logging.withMarker(system, classOf[ConfigSSLEngineProvider].getName)) + + private val SSLKeyStore = config.getString("key-store") + private val SSLTrustStore = config.getString("trust-store") + private val SSLKeyStorePassword = config.getString("key-store-password") + private val SSLKeyPassword = config.getString("key-password") + private val SSLTrustStorePassword = config.getString("trust-store-password") + val SSLEnabledAlgorithms = immutableSeq(config.getStringList("enabled-algorithms")).to[Set] + val SSLProtocol = config.getString("protocol") + val SSLRandomNumberGenerator = config.getString("random-number-generator") + val SSLRequireMutualAuthentication = config.getBoolean("require-mutual-authentication") + private val HostnameVerification = config.getBoolean("hostname-verification") + + private lazy val sslContext: SSLContext = { + // log hostname verification warning once + if (HostnameVerification) + log.debug("TLS/SSL hostname verification is enabled.") + else + log.warning(LogMarker.Security, "TLS/SSL hostname verification is disabled. " + + "Please configure akka.remote.artery.ssl.config-ssl-engine.hostname-verification=on " + + "and ensure the X.509 certificate on the host is correct to remove this warning.") + + constructContext() + } + + private def constructContext(): SSLContext = { + try { + def loadKeystore(filename: String, password: String): KeyStore = { + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + val fin = Files.newInputStream(Paths.get(filename)) + try keyStore.load(fin, password.toCharArray) finally Try(fin.close()) + keyStore + } + + val keyManagers = { + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + factory.init(loadKeystore(SSLKeyStore, SSLKeyStorePassword), SSLKeyPassword.toCharArray) + factory.getKeyManagers + } + val trustManagers = { + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init(loadKeystore(SSLTrustStore, SSLTrustStorePassword)) + trustManagerFactory.getTrustManagers + } + val rng = createSecureRandom() + + val ctx = SSLContext.getInstance(SSLProtocol) + ctx.init(keyManagers, trustManagers, rng) + ctx + } catch { + case e: FileNotFoundException ⇒ + throw new SslTransportException("Server SSL connection could not be established because key store could not be loaded", e) + case e: IOException ⇒ + throw new SslTransportException("Server SSL connection could not be established because: " + e.getMessage, e) + case e: GeneralSecurityException ⇒ + throw new SslTransportException("Server SSL connection could not be established because SSL context could not be constructed", e) + } + } + + def createSecureRandom(): SecureRandom = { + val rng = SSLRandomNumberGenerator match { + case r @ ("AES128CounterSecureRNG" | "AES256CounterSecureRNG") ⇒ + log.debug("SSL random number generator set to: {}", r) + SecureRandom.getInstance(r, AkkaProvider) + case s @ ("SHA1PRNG" | "NativePRNG") ⇒ + log.debug("SSL random number generator set to: {}", s) + // SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking + // However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD). + SecureRandom.getInstance(s) + + case "" ⇒ + log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom") + new SecureRandom + + case unknown ⇒ + log.warning(LogMarker.Security, "Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown) + new SecureRandom + } + rng.nextInt() // prevent stall on first access + rng + } + + override def createServerSSLEngine(hostname: String, port: Int): SSLEngine = + createSSLEngine(akka.stream.Server, hostname, port) + + override def createClientSSLEngine(hostname: String, port: Int): SSLEngine = + createSSLEngine(akka.stream.Client, hostname, port) + + private def createSSLEngine(role: TLSRole, hostname: String, port: Int): SSLEngine = { + createSSLEngine(sslContext, role, hostname, port) + } + + private def createSSLEngine( + sslContext: SSLContext, + role: TLSRole, + hostname: String, + port: Int, + closing: TLSClosing = IgnoreComplete): SSLEngine = { + + val engine = sslContext.createSSLEngine(hostname, port) + + if (HostnameVerification) { + val sslParams = sslContext.getDefaultSSLParameters + sslParams.setEndpointIdentificationAlgorithm("HTTPS") + engine.setSSLParameters(sslParams) + } + + engine.setUseClientMode(role == akka.stream.Client) + engine.setEnabledCipherSuites(SSLEnabledAlgorithms.toArray) + engine.setEnabledProtocols(Array(SSLProtocol)) + + if ((role != akka.stream.Client) && SSLRequireMutualAuthentication) + engine.setNeedClientAuth(true) + + engine + } + + override def verifyClientSession(hostname: String, session: SSLSession): Option[Throwable] = + None + + override def verifyServerSession(hostname: String, session: SSLSession): Option[Throwable] = + None + +} + diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala index 04ffd32a4c..40614e5cb7 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala @@ -49,6 +49,22 @@ object ArterySpecSupport { */ def defaultConfig = newFlightRecorderConfig .withFallback(staticArteryRemotingConfig) + .withFallback(tlsConfig) // TLS only used if transport=tls-tcp + + // set the test key-store and trust-store properties + // TLS only used if transport=tls-tcp, which can be set from specific tests or + // System properties (e.g. jenkins job) + lazy val tlsConfig: Config = { + val trustStore = getClass.getClassLoader.getResource("truststore").getPath + val keyStore = getClass.getClassLoader.getResource("keystore").getPath + + ConfigFactory.parseString(s""" + akka.remote.artery.ssl.config-ssl-engine { + key-store = "$keyStore" + trust-store = "$trustStore" + } + """) + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 43b82e29a9..55b82fe29f 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -36,6 +36,20 @@ class ArteryTcpSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsi akka.remote.artery.advanced.inbound-lanes = 3 """).withFallback(ArterySpecSupport.defaultConfig)) +class ArteryTlsTcpSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.transport = tls-tcp + akka.remote.artery.advanced.outbound-lanes = 1 + akka.remote.artery.advanced.inbound-lanes = 1 + """).withFallback(ArterySpecSupport.defaultConfig)) + +class ArteryTlsTcpSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.transport = tls-tcp + akka.remote.artery.advanced.outbound-lanes = 1 + akka.remote.artery.advanced.inbound-lanes = 1 + """).withFallback(ArterySpecSupport.defaultConfig)) + abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryMultiNodeSpec(config) with ImplicitSender { val systemB = newRemoteSystem(name = Some("systemB")) diff --git a/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala new file mode 100644 index 0000000000..4ace02e1ad --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import java.security.NoSuchAlgorithmException + +import scala.concurrent.duration._ + +import akka.actor.ActorRef +import akka.actor.ActorPath +import akka.actor.ActorIdentity +import akka.actor.ExtendedActorSystem +import akka.actor.Identify +import akka.actor.RootActorPath +import akka.testkit.ImplicitSender +import akka.testkit.TestActors +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +class TlsTcpWithDefaultConfigSpec extends TlsTcpSpec(ConfigFactory.empty()) + +class TlsTcpWithSHA1PRNGSpec extends TlsTcpSpec(ConfigFactory.parseString(""" + akka.remote.artery.ssl.config-ssl-engine { + random-number-generator = "SHA1PRNG" + enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"] + } + """)) + +class TlsTcpWithAES128CounterSecureRNGSpec extends TlsTcpSpec(ConfigFactory.parseString(""" + akka.remote.artery.ssl.config-ssl-engine { + random-number-generator = "AES128CounterSecureRNG" + enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"] + } + """)) + +class TlsTcpWithAES256CounterSecureRNGSpec extends TlsTcpSpec(ConfigFactory.parseString(""" + akka.remote.artery.ssl.config-ssl-engine { + random-number-generator = "AES256CounterSecureRNG" + enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"] + } + """)) + +class TlsTcpWithDefaultRNGSecureSpec extends TlsTcpSpec(ConfigFactory.parseString(""" + akka.remote.artery.ssl.config-ssl-engine { + random-number-generator = "" + enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"] + } + """)) + +class TlsTcpWithCrappyRSAWithMD5OnlyHereToMakeSureThingsWorkSpec extends TlsTcpSpec(ConfigFactory.parseString(""" + akka.remote.artery.ssl.config-ssl-engine { + random-number-generator = "" + enabled-algorithms = [""SSL_RSA_WITH_NULL_MD5""] + } + """)) + +object TlsTcpSpec { + + lazy val config: Config = { + ConfigFactory.parseString(s""" + akka.loglevel = DEBUG + akka.remote.artery { + transport = tls-tcp + large-message-destinations = [ "/user/large" ] + } + """) + } + +} + +abstract class TlsTcpSpec(config: Config) + extends ArteryMultiNodeSpec(config.withFallback(TlsTcpSpec.config)) with ImplicitSender { + + val systemB = newRemoteSystem(name = Some("systemB")) + val addressB = address(systemB) + val rootB = RootActorPath(addressB) + + def isSupported: Boolean = { + try { + val provider = new ConfigSSLEngineProvider(system) + + val rng = provider.createSecureRandom() + rng.nextInt() // Has to work + val sRng = provider.SSLRandomNumberGenerator + if (rng.getAlgorithm != sRng && sRng != "") + throw new NoSuchAlgorithmException(sRng) + + val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val host = address.host.get + val port = address.port.get + + val engine = provider.createServerSSLEngine(host, port) + val gotAllSupported = provider.SSLEnabledAlgorithms diff engine.getSupportedCipherSuites.toSet + val gotAllEnabled = provider.SSLEnabledAlgorithms diff engine.getEnabledCipherSuites.toSet + gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) + gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) + engine.getSupportedProtocols.contains(provider.SSLProtocol) || + (throw new IllegalArgumentException("Protocol not supported: " + provider.SSLProtocol)) + } catch { + case e @ ((_: IllegalArgumentException) | (_: NoSuchAlgorithmException)) ⇒ + info(e.toString) + false + } + } + + def identify(path: ActorPath): ActorRef = { + system.actorSelection(path) ! Identify(path.name) + expectMsgType[ActorIdentity].ref.get + } + + def testDelivery(echoRef: ActorRef): Unit = { + echoRef ! "ping-1" + expectMsg("ping-1") + + // and some more + (2 to 10).foreach { n ⇒ + echoRef ! s"ping-$n" + } + receiveN(9) should equal((2 to 10).map(n ⇒ s"ping-$n")) + } + + "Artery with TLS/TCP" must { + + if (isSupported) { + + "deliver messages" in { + systemB.actorOf(TestActors.echoActorProps, "echo") + val echoRef = identify(rootB / "user" / "echo") + testDelivery(echoRef) + } + + "deliver messages over large messages stream" in { + systemB.actorOf(TestActors.echoActorProps, "large") + val echoRef = identify(rootB / "user" / "large") + testDelivery(echoRef) + } + + } else { + "not be run when the cipher is not supported by the platform this test is currently being executed on" in { + pending + } + } + } + +} + +class TlsTcpWithHostnameVerificationSpec extends ArteryMultiNodeSpec( + ConfigFactory.parseString(""" + akka.remote.artery.ssl.config-ssl-engine { + hostname-verification = on + } + """).withFallback(TlsTcpSpec.config)) with ImplicitSender { + + val systemB = newRemoteSystem(name = Some("systemB")) + val addressB = address(systemB) + val rootB = RootActorPath(addressB) + + "Artery with TLS/TCP and hostname-verification=on" must { + "reject invalid" in { + systemB.actorOf(TestActors.echoActorProps, "echo") + system.actorSelection(rootB / "user" / "echo") ! Identify("echo") + expectNoMessage(2.seconds) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 47f72e2d13..513f6f8156 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -6,6 +6,8 @@ package akka.stream.scaladsl import java.net.InetSocketAddress import java.util.concurrent.TimeoutException import javax.net.ssl.SSLContext +import javax.net.ssl.SSLEngine +import javax.net.ssl.SSLSession import akka.actor._ import akka.annotation.{ ApiMayChange, InternalApi } @@ -17,10 +19,10 @@ import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout } import akka.util.ByteString import akka.{ Done, NotUsed } - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.util.Try import scala.util.control.NoStackTrace object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { @@ -265,6 +267,24 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { connection.join(tlsWrapping.atop(tls).reversed) } + /** + * INTERNAL API: for raw SSLEngine + */ + @InternalApi private[akka] def outgoingTlsConnectionWithSSLEngine( + remoteAddress: InetSocketAddress, + createSSLEngine: () ⇒ SSLEngine, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[SocketOption] = Nil, + connectTimeout: Duration = Duration.Inf, + idleTimeout: Duration = Duration.Inf, + verifySession: SSLSession ⇒ Try[Unit], + closing: TLSClosing = IgnoreComplete): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { + + val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout) + val tls = TLS(createSSLEngine, verifySession, closing) + connection.join(tlsWrapping.atop(tls).reversed) + } + /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * where all incoming and outgoing bytes are passed through TLS. @@ -294,6 +314,28 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { } } + /** + * INTERNAL API + */ + @InternalApi private[akka] def bindTlsWithSSLEngine( + interface: String, + port: Int, + createSSLEngine: () ⇒ SSLEngine, + backlog: Int = 100, + options: immutable.Traversable[SocketOption] = Nil, + idleTimeout: Duration = Duration.Inf, + verifySession: SSLSession ⇒ Try[Unit], + closing: TLSClosing = IgnoreComplete): Source[IncomingConnection, Future[ServerBinding]] = { + + val tls = tlsWrapping.atop(TLS(createSSLEngine, verifySession, closing)).reversed + + bind(interface, port, backlog, options, true, idleTimeout).map { incomingConnection ⇒ + incomingConnection.copy( + flow = incomingConnection.flow.join(tls) + ) + } + } + /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * handling the incoming connections through TLS and then run using the provided Flow.