Add Artery TCP/TLS transport, #24390
* configurable SSLEngineProvider * ssl configuration * add internal outgoingTlsConnectionWithSSLEngine and bindTlsWithSSLEngine in stream Tcp * TLS tests * update remote security section in reference documentation
This commit is contained in:
parent
162a1f80a0
commit
4dab3252bd
13 changed files with 655 additions and 25 deletions
|
|
@ -345,16 +345,100 @@ Actor classes not included in the whitelist will not be allowed to be remote dep
|
|||
|
||||
## Remote Security
|
||||
|
||||
An `ActorSystem` should not be exposed via Akka Remote (Artery) over plain Aeron/UDP to an untrusted network (e.g. internet).
|
||||
It should be protected by network security, such as a firewall. There is currently no support for encryption with Artery
|
||||
so if network security is not considered as enough protection the classic remoting with
|
||||
@ref:[TLS and mutual authentication](remoting.md#remote-tls) should be used.
|
||||
An `ActorSystem` should not be exposed via Akka Remote (Artery) over plain Aeron/UDP or TCP to an untrusted
|
||||
network (e.g. Internet). It should be protected by network security, such as a firewall. If that is not considered
|
||||
as enough protection [TLS with mutual authentication](#remote-tls) should be enabled.
|
||||
|
||||
Best practice is that Akka remoting nodes should only be accessible from the adjacent network.
|
||||
Best practice is that Akka remoting nodes should only be accessible from the adjacent network. Note that if TLS is
|
||||
enabled with mutual authentication there is still a risk that an attacker can gain access to a valid certificate by
|
||||
compromising any node with certificates issued by the same internal PKI tree.
|
||||
|
||||
It is also security best practice to @ref:[disable the Java serializer](#disabling-the-java-serializer) because of
|
||||
It is also security best-practice to [disable the Java serializer](#disable-java-serializer) because of
|
||||
its multiple [known attack surfaces](https://community.hpe.com/t5/Security-Research/The-perils-of-Java-deserialization/ba-p/6838995).
|
||||
|
||||
<a id="remote-tls"></a>
|
||||
### Configuring SSL/TLS for Akka Remoting
|
||||
|
||||
SSL can be used as the remote transport by using the `tls-tcp` transport:
|
||||
|
||||
```
|
||||
akka.remote.artery {
|
||||
transport = tls-tcp
|
||||
}
|
||||
```
|
||||
|
||||
Next the actual SSL/TLS parameters have to be configured:
|
||||
|
||||
```
|
||||
akka.remote.artery {
|
||||
transport = tls-tcp
|
||||
|
||||
ssl.config-ssl-engine {
|
||||
key-store = "/example/path/to/mykeystore.jks"
|
||||
trust-store = "/example/path/to/mytruststore.jks"
|
||||
|
||||
key-store-password = ${SSL_KEY_STORE_PASSWORD}
|
||||
key-password = ${SSL_KEY_PASSWORD}
|
||||
trust-store-password = ${SSL_TRUST_STORE_PASSWORD}
|
||||
|
||||
protocol = "TLSv1.2"
|
||||
|
||||
enabled-algorithms = [TLS_DHE_RSA_WITH_AES_128_GCM_SHA256]
|
||||
|
||||
random-number-generator = "AES128CounterSecureRNG"
|
||||
|
||||
hostname-verification = on
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Always use [substitution from environment variables](https://github.com/lightbend/config#optional-system-or-env-variable-overrides)
|
||||
for passwords. Don't define real passwords in config files.
|
||||
|
||||
According to [RFC 7525](https://tools.ietf.org/html/rfc7525) the recommended algorithms to use with TLS 1.2 (as of writing this document) are:
|
||||
|
||||
* TLS_DHE_RSA_WITH_AES_128_GCM_SHA256
|
||||
* TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
|
||||
* TLS_DHE_RSA_WITH_AES_256_GCM_SHA384
|
||||
* TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
|
||||
|
||||
You should always check the latest information about security and algorithm recommendations though before you configure your system.
|
||||
|
||||
Creating and working with keystores and certificates is well documented in the
|
||||
[Generating X.509 Certificates](http://lightbend.github.io/ssl-config/CertificateGeneration.html#using-keytool)
|
||||
section of Lightbend's SSL-Config library.
|
||||
|
||||
Since an Akka remoting is inherently @ref:[peer-to-peer](general/remoting.md#symmetric-communication) both the key-store as well as trust-store
|
||||
need to be configured on each remoting node participating in the cluster.
|
||||
|
||||
The official [Java Secure Socket Extension documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jsse/JSSERefGuide.html)
|
||||
as well as the [Oracle documentation on creating KeyStore and TrustStores](https://docs.oracle.com/cd/E19509-01/820-3503/6nf1il6er/index.html)
|
||||
are both great resources to research when setting up security on the JVM. Please consult those resources when troubleshooting
|
||||
and configuring SSL.
|
||||
|
||||
Mutual authentication between TLS peers is enabled by default. Mutual authentication means that the the passive side
|
||||
(the TLS server side) of a connection will also request and verify a certificate from the connecting peer.
|
||||
Without this mode only the client side is requesting and verifying certificates. While Akka is a peer-to-peer
|
||||
technology, each connection between nodes starts out from one side (the "client") towards the other (the "server").
|
||||
|
||||
Note that if TLS is enabled with mutual authentication there is still a risk that an attacker can gain access to a
|
||||
valid certificate by compromising any node with certificates issued by the same internal PKI tree.
|
||||
|
||||
It's recommended that you enable hostname verification with
|
||||
`akka.remote.artery.ssl.config-ssl-engine.hostname-verification=on`.
|
||||
When enabled it will verify that the destination hostname matches the hostname in the peer's credentials.
|
||||
An application could be exploited with URL spoofing if the hostname is not verified.
|
||||
|
||||
See also a description of the settings in the @ref:[Remote Configuration](#remote-configuration-artery) section.
|
||||
|
||||
@@@ note
|
||||
|
||||
When using SHA1PRNG on Linux it's recommended specify `-Djava.security.egd=file:/dev/urandom` as argument
|
||||
to the JVM to prevent blocking. It is NOT as secure because it reuses the seed.
|
||||
|
||||
@@@
|
||||
|
||||
|
||||
### Untrusted Mode
|
||||
|
||||
As soon as an actor system can connect to another remotely, it may in principle
|
||||
|
|
|
|||
|
|
@ -550,9 +550,9 @@ akka {
|
|||
key-store = "/example/path/to/mykeystore.jks"
|
||||
trust-store = "/example/path/to/mytruststore.jks"
|
||||
|
||||
key-store-password = "changeme"
|
||||
key-password = "changeme"
|
||||
trust-store-password = "changeme"
|
||||
key-store-password = ${SSL_KEY_STORE_PASSWORD}
|
||||
key-password = ${SSL_KEY_PASSWORD}
|
||||
trust-store-password = ${SSL_TRUST_STORE_PASSWORD}
|
||||
|
||||
protocol = "TLSv1.2"
|
||||
|
||||
|
|
@ -565,6 +565,9 @@ akka {
|
|||
}
|
||||
```
|
||||
|
||||
Always use [substitution from environment variables](https://github.com/lightbend/config#optional-system-or-env-variable-overrides)
|
||||
for passwords. Don't define real passwords in config files.
|
||||
|
||||
According to [RFC 7525](https://tools.ietf.org/html/rfc7525) the recommended algorithms to use with TLS 1.2 (as of writing this document) are:
|
||||
|
||||
* TLS_DHE_RSA_WITH_AES_128_GCM_SHA256
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package akka.remote
|
|||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.remote.artery.ArterySpecSupport
|
||||
import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
||||
import akka.testkit.{ DefaultTimeout, ImplicitSender }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
|
@ -21,6 +22,7 @@ object RemotingMultiNodeSpec {
|
|||
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
|
||||
}
|
||||
""")
|
||||
.withFallback(ArterySpecSupport.tlsConfig) // TLS only used if transport=tls-tcp
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ class ArteryFailedToBindSpec extends WordSpec with Matchers {
|
|||
RARP(as).provider.transport.asInstanceOf[ArteryTransport].settings.Transport match {
|
||||
case ArterySettings.AeronUpd ⇒
|
||||
ex.getMessage should ===("Inbound Aeron channel is in errored state. See Aeron logs for details.")
|
||||
case ArterySettings.Tcp ⇒
|
||||
case ArterySettings.Tcp | ArterySettings.TlsTcp ⇒
|
||||
ex.getMessage should startWith("Failed to bind TCP")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -732,7 +732,7 @@ akka {
|
|||
|
||||
# Select the underlying transport implementation.
|
||||
#
|
||||
# Possible values: aeron-udp, tcp
|
||||
# Possible values: aeron-udp, tcp, tls-tcp
|
||||
#
|
||||
# The Aeron (UDP) transport is a high performance transport and should be used for systems
|
||||
# that require high throughput and low latency. It is using more CPU than TCP when the
|
||||
|
|
@ -948,7 +948,7 @@ akka {
|
|||
system-message-resend-interval = 1 second
|
||||
|
||||
# Timeout of establishing outbound connections.
|
||||
# Only used when transport is tcp
|
||||
# Only used when transport is tcp or tls-tcp.
|
||||
connection-timeout = 5 seconds
|
||||
|
||||
# The timeout for outbound associations to perform the handshake.
|
||||
|
|
@ -985,7 +985,7 @@ akka {
|
|||
inbound-max-restarts = 5
|
||||
|
||||
# Retry outbound connection after this backoff.
|
||||
# Only used when transport is tcp
|
||||
# Only used when transport is tcp or tls-tcp.
|
||||
outbound-restart-backoff = 1 second
|
||||
|
||||
# See 'outbound-max-restarts'
|
||||
|
|
@ -1064,6 +1064,79 @@ akka {
|
|||
# it doesn't have to be thread-safe.
|
||||
# Refer to `akka.remote.artery.RemoteInstrument` for more information.
|
||||
instruments = ${?akka.remote.artery.advanced.instruments} []
|
||||
|
||||
}
|
||||
|
||||
# SSL configuration that is used when transport=tls-tcp.
|
||||
ssl {
|
||||
# Factory of SSLEngine.
|
||||
# Must implement akka.remote.artery.tcp.SSLEngineProvider and have a public
|
||||
# constructor with an ActorSystem parameter.
|
||||
# The default ConfigSSLEngineProvider is configured by properties in section
|
||||
# akka.remote.artery.ssl.config-ssl-engine
|
||||
ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider
|
||||
|
||||
# Config of akka.remote.artery.tcp.ConfigSSLEngineProvider
|
||||
config-ssl-engine {
|
||||
|
||||
# This is the Java Key Store used by the server connection
|
||||
key-store = "keystore"
|
||||
|
||||
# This password is used for decrypting the key store
|
||||
# Use substitution from environment variables for passwords. Don't define
|
||||
# real passwords in config files. key-store-password=${SSL_KEY_STORE_PASSWORD}
|
||||
key-store-password = "changeme"
|
||||
|
||||
# This password is used for decrypting the key
|
||||
# Use substitution from environment variables for passwords. Don't define
|
||||
# real passwords in config files. key-password=${SSL_KEY_PASSWORD}
|
||||
key-password = "changeme"
|
||||
|
||||
# This is the Java Key Store used by the client connection
|
||||
trust-store = "truststore"
|
||||
|
||||
# This password is used for decrypting the trust store
|
||||
# Use substitution from environment variables for passwords. Don't define
|
||||
# real passwords in config files. trust-store-password=${SSL_TRUST_STORE_PASSWORD}
|
||||
trust-store-password = "changeme"
|
||||
|
||||
# Protocol to use for SSL encryption, choose from:
|
||||
# TLS 1.2 is available since JDK7, and default since JDK8:
|
||||
# https://blogs.oracle.com/java-platform-group/entry/java_8_will_use_tls
|
||||
protocol = "TLSv1.2"
|
||||
|
||||
# Example: ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"]
|
||||
# You need to install the JCE Unlimited Strength Jurisdiction Policy
|
||||
# Files to use AES 256.
|
||||
# More info here:
|
||||
# http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider
|
||||
enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"]
|
||||
|
||||
# There are three options, in increasing order of security:
|
||||
# "" or SecureRandom => (default)
|
||||
# "SHA1PRNG" => Can be slow because of blocking issues on Linux
|
||||
# "AES128CounterSecureRNG" => fastest startup and based on AES encryption
|
||||
# algorithm
|
||||
# "AES256CounterSecureRNG" (Install JCE Unlimited Strength Jurisdiction
|
||||
# Policy Files first)
|
||||
#
|
||||
# Setting a value here may require you to supply the appropriate cipher
|
||||
# suite (see enabled-algorithms section above)
|
||||
random-number-generator = ""
|
||||
|
||||
# Require mutual authentication between TLS peers
|
||||
#
|
||||
# Without mutual authentication only the peer that actively establishes a connection (TLS client side)
|
||||
# checks if the passive side (TLS server side) sends over a trusted certificate. With the flag turned on,
|
||||
# the passive side will also request and verify a certificate from the connecting peer.
|
||||
#
|
||||
# To prevent man-in-the-middle attacks this setting is enabled by default.
|
||||
require-mutual-authentication = on
|
||||
|
||||
# Set this to `on` to verify hostnames with sun.security.util.HostnameChecker
|
||||
hostname-verification = off
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -211,7 +211,8 @@ private[akka] class RemoteActorRefProvider(
|
|||
transport =
|
||||
if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match {
|
||||
case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this)
|
||||
case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this)
|
||||
case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = false)
|
||||
case ArterySettings.TlsTcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = true)
|
||||
}
|
||||
else new Remoting(system, this))
|
||||
|
||||
|
|
|
|||
|
|
@ -64,6 +64,8 @@ private[akka] final class ArterySettings private (config: Config) {
|
|||
tree.insert(segments, NotUsed)
|
||||
}
|
||||
|
||||
val SSLEngineProviderClassName: String = config.getString("ssl.ssl-engine-provider")
|
||||
|
||||
val UntrustedMode: Boolean = getBoolean("untrusted-mode")
|
||||
val TrustedSelectionPaths: Set[String] = immutableSeq(getStringList("trusted-selection-paths")).toSet
|
||||
|
||||
|
|
@ -74,8 +76,9 @@ private[akka] final class ArterySettings private (config: Config) {
|
|||
val Transport: Transport = toRootLowerCase(getString("transport")) match {
|
||||
case AeronUpd.configName ⇒ AeronUpd
|
||||
case Tcp.configName ⇒ Tcp
|
||||
case TlsTcp.configName ⇒ TlsTcp
|
||||
case other ⇒ throw new IllegalArgumentException(s"Unknown transport [$other], possible values: " +
|
||||
s""""${AeronUpd.configName}", "${Tcp.configName}"""")
|
||||
s""""${AeronUpd.configName}", "${Tcp.configName}", or "${TlsTcp.configName}"""")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -228,4 +231,8 @@ private[akka] object ArterySettings {
|
|||
override val configName: String = "tcp"
|
||||
override def toString: String = configName
|
||||
}
|
||||
object TlsTcp extends Transport {
|
||||
override val configName: String = "tls-tcp"
|
||||
override def toString: String = configName
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import scala.concurrent.duration._
|
|||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
|
||||
import akka.ConfigurationException
|
||||
import akka.Done
|
||||
import akka.NotUsed
|
||||
|
|
@ -65,7 +66,8 @@ private[remote] object ArteryTcpTransport {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
|
||||
private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider,
|
||||
tlsEnabled: Boolean)
|
||||
extends ArteryTransport(_system, _provider) {
|
||||
import ArteryTransport._
|
||||
import ArteryTcpTransport._
|
||||
|
|
@ -77,6 +79,16 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
|
|||
@volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None
|
||||
@volatile private var serverBinding: Option[Future[ServerBinding]] = None
|
||||
|
||||
private val sslEngineProvider: OptionVal[SSLEngineProvider] =
|
||||
if (tlsEnabled) {
|
||||
OptionVal.Some(system.dynamicAccess.createInstanceFor[SSLEngineProvider](
|
||||
settings.SSLEngineProviderClassName,
|
||||
List((classOf[ActorSystem], system))).recover {
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create SSLEngineProvider [${settings.SSLEngineProviderClassName}]", e)
|
||||
}.get)
|
||||
} else OptionVal.None
|
||||
|
||||
override protected def startTransport(): Unit = {
|
||||
// nothing specific here
|
||||
}
|
||||
|
|
@ -94,11 +106,20 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
|
|||
val remoteAddress = InetSocketAddress.createUnresolved(host, port)
|
||||
|
||||
def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] =
|
||||
Tcp()
|
||||
.outgoingConnection(
|
||||
if (tlsEnabled) {
|
||||
val sslProvider = sslEngineProvider.get
|
||||
Tcp().outgoingTlsConnectionWithSSLEngine(
|
||||
remoteAddress,
|
||||
halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false
|
||||
connectTimeout = settings.Advanced.ConnectionTimeout)
|
||||
createSSLEngine = () ⇒ sslProvider.createClientSSLEngine(host, port),
|
||||
connectTimeout = settings.Advanced.ConnectionTimeout,
|
||||
verifySession = session ⇒ optionToTry(sslProvider.verifyClientSession(host, session)))
|
||||
} else {
|
||||
Tcp()
|
||||
.outgoingConnection(
|
||||
remoteAddress,
|
||||
halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false
|
||||
connectTimeout = settings.Advanced.ConnectionTimeout)
|
||||
}
|
||||
|
||||
def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
|
||||
val flowFactory = () ⇒ {
|
||||
|
|
@ -222,10 +243,19 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
|
|||
val port = localAddress.address.port.get
|
||||
|
||||
val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] =
|
||||
Tcp().bind(
|
||||
interface = host,
|
||||
port = port,
|
||||
halfClose = false)
|
||||
if (tlsEnabled) {
|
||||
val sslProvider = sslEngineProvider.get
|
||||
Tcp().bindTlsWithSSLEngine(
|
||||
interface = host,
|
||||
port = port,
|
||||
createSSLEngine = () ⇒ sslProvider.createServerSSLEngine(host, port),
|
||||
verifySession = session ⇒ optionToTry(sslProvider.verifyServerSession(host, session)))
|
||||
} else {
|
||||
Tcp().bind(
|
||||
interface = host,
|
||||
port = port,
|
||||
halfClose = false)
|
||||
}
|
||||
|
||||
serverBinding = serverBinding match {
|
||||
case None ⇒
|
||||
|
|
|
|||
|
|
@ -0,0 +1,191 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
package tcp
|
||||
|
||||
import java.io.FileNotFoundException
|
||||
import java.io.IOException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.security.GeneralSecurityException
|
||||
import java.security.KeyStore
|
||||
import java.security.SecureRandom
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLEngine
|
||||
import javax.net.ssl.SSLSession
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.InternalApi
|
||||
import akka.event.LogMarker
|
||||
import akka.event.Logging
|
||||
import akka.event.MarkerLoggingAdapter
|
||||
import akka.japi.Util.immutableSeq
|
||||
import akka.remote.security.provider.AkkaProvider
|
||||
import akka.stream.IgnoreComplete
|
||||
import akka.stream.TLSClosing
|
||||
import akka.stream.TLSRole
|
||||
import com.typesafe.config.Config
|
||||
|
||||
@ApiMayChange trait SSLEngineProvider {
|
||||
|
||||
def createServerSSLEngine(hostname: String, port: Int): SSLEngine
|
||||
|
||||
def createClientSSLEngine(hostname: String, port: Int): SSLEngine
|
||||
|
||||
/**
|
||||
* Verification that will be called after every successful handshake
|
||||
* to verify additional session information. Return `None` if valid
|
||||
* otherwise `Some` with explaining cause.
|
||||
*/
|
||||
def verifyClientSession(hostname: String, session: SSLSession): Option[Throwable]
|
||||
|
||||
/**
|
||||
* Verification that will be called after every successful handshake
|
||||
* to verify additional session information. Return `None` if valid
|
||||
* otherwise `Some` with explaining cause.
|
||||
*/
|
||||
def verifyServerSession(hostname: String, session: SSLSession): Option[Throwable]
|
||||
|
||||
}
|
||||
|
||||
class SslTransportException(message: String, cause: Throwable) extends RuntimeException(message, cause)
|
||||
|
||||
/**
|
||||
* INTERNAL API: only public via config
|
||||
* Config in akka.remote.artery.ssl.config-ssl-engine
|
||||
*/
|
||||
@InternalApi private[akka] final class ConfigSSLEngineProvider(config: Config, log: MarkerLoggingAdapter) extends SSLEngineProvider {
|
||||
|
||||
def this(system: ActorSystem) = this(
|
||||
system.settings.config.getConfig("akka.remote.artery.ssl.config-ssl-engine"),
|
||||
Logging.withMarker(system, classOf[ConfigSSLEngineProvider].getName))
|
||||
|
||||
private val SSLKeyStore = config.getString("key-store")
|
||||
private val SSLTrustStore = config.getString("trust-store")
|
||||
private val SSLKeyStorePassword = config.getString("key-store-password")
|
||||
private val SSLKeyPassword = config.getString("key-password")
|
||||
private val SSLTrustStorePassword = config.getString("trust-store-password")
|
||||
val SSLEnabledAlgorithms = immutableSeq(config.getStringList("enabled-algorithms")).to[Set]
|
||||
val SSLProtocol = config.getString("protocol")
|
||||
val SSLRandomNumberGenerator = config.getString("random-number-generator")
|
||||
val SSLRequireMutualAuthentication = config.getBoolean("require-mutual-authentication")
|
||||
private val HostnameVerification = config.getBoolean("hostname-verification")
|
||||
|
||||
private lazy val sslContext: SSLContext = {
|
||||
// log hostname verification warning once
|
||||
if (HostnameVerification)
|
||||
log.debug("TLS/SSL hostname verification is enabled.")
|
||||
else
|
||||
log.warning(LogMarker.Security, "TLS/SSL hostname verification is disabled. " +
|
||||
"Please configure akka.remote.artery.ssl.config-ssl-engine.hostname-verification=on " +
|
||||
"and ensure the X.509 certificate on the host is correct to remove this warning.")
|
||||
|
||||
constructContext()
|
||||
}
|
||||
|
||||
private def constructContext(): SSLContext = {
|
||||
try {
|
||||
def loadKeystore(filename: String, password: String): KeyStore = {
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = Files.newInputStream(Paths.get(filename))
|
||||
try keyStore.load(fin, password.toCharArray) finally Try(fin.close())
|
||||
keyStore
|
||||
}
|
||||
|
||||
val keyManagers = {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
factory.init(loadKeystore(SSLKeyStore, SSLKeyStorePassword), SSLKeyPassword.toCharArray)
|
||||
factory.getKeyManagers
|
||||
}
|
||||
val trustManagers = {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
trustManagerFactory.init(loadKeystore(SSLTrustStore, SSLTrustStorePassword))
|
||||
trustManagerFactory.getTrustManagers
|
||||
}
|
||||
val rng = createSecureRandom()
|
||||
|
||||
val ctx = SSLContext.getInstance(SSLProtocol)
|
||||
ctx.init(keyManagers, trustManagers, rng)
|
||||
ctx
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒
|
||||
throw new SslTransportException("Server SSL connection could not be established because key store could not be loaded", e)
|
||||
case e: IOException ⇒
|
||||
throw new SslTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
|
||||
case e: GeneralSecurityException ⇒
|
||||
throw new SslTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
|
||||
}
|
||||
}
|
||||
|
||||
def createSecureRandom(): SecureRandom = {
|
||||
val rng = SSLRandomNumberGenerator match {
|
||||
case r @ ("AES128CounterSecureRNG" | "AES256CounterSecureRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", r)
|
||||
SecureRandom.getInstance(r, AkkaProvider)
|
||||
case s @ ("SHA1PRNG" | "NativePRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", s)
|
||||
// SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
|
||||
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
|
||||
SecureRandom.getInstance(s)
|
||||
|
||||
case "" ⇒
|
||||
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
|
||||
new SecureRandom
|
||||
|
||||
case unknown ⇒
|
||||
log.warning(LogMarker.Security, "Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
|
||||
new SecureRandom
|
||||
}
|
||||
rng.nextInt() // prevent stall on first access
|
||||
rng
|
||||
}
|
||||
|
||||
override def createServerSSLEngine(hostname: String, port: Int): SSLEngine =
|
||||
createSSLEngine(akka.stream.Server, hostname, port)
|
||||
|
||||
override def createClientSSLEngine(hostname: String, port: Int): SSLEngine =
|
||||
createSSLEngine(akka.stream.Client, hostname, port)
|
||||
|
||||
private def createSSLEngine(role: TLSRole, hostname: String, port: Int): SSLEngine = {
|
||||
createSSLEngine(sslContext, role, hostname, port)
|
||||
}
|
||||
|
||||
private def createSSLEngine(
|
||||
sslContext: SSLContext,
|
||||
role: TLSRole,
|
||||
hostname: String,
|
||||
port: Int,
|
||||
closing: TLSClosing = IgnoreComplete): SSLEngine = {
|
||||
|
||||
val engine = sslContext.createSSLEngine(hostname, port)
|
||||
|
||||
if (HostnameVerification) {
|
||||
val sslParams = sslContext.getDefaultSSLParameters
|
||||
sslParams.setEndpointIdentificationAlgorithm("HTTPS")
|
||||
engine.setSSLParameters(sslParams)
|
||||
}
|
||||
|
||||
engine.setUseClientMode(role == akka.stream.Client)
|
||||
engine.setEnabledCipherSuites(SSLEnabledAlgorithms.toArray)
|
||||
engine.setEnabledProtocols(Array(SSLProtocol))
|
||||
|
||||
if ((role != akka.stream.Client) && SSLRequireMutualAuthentication)
|
||||
engine.setNeedClientAuth(true)
|
||||
|
||||
engine
|
||||
}
|
||||
|
||||
override def verifyClientSession(hostname: String, session: SSLSession): Option[Throwable] =
|
||||
None
|
||||
|
||||
override def verifyServerSession(hostname: String, session: SSLSession): Option[Throwable] =
|
||||
None
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -49,6 +49,22 @@ object ArterySpecSupport {
|
|||
*/
|
||||
def defaultConfig = newFlightRecorderConfig
|
||||
.withFallback(staticArteryRemotingConfig)
|
||||
.withFallback(tlsConfig) // TLS only used if transport=tls-tcp
|
||||
|
||||
// set the test key-store and trust-store properties
|
||||
// TLS only used if transport=tls-tcp, which can be set from specific tests or
|
||||
// System properties (e.g. jenkins job)
|
||||
lazy val tlsConfig: Config = {
|
||||
val trustStore = getClass.getClassLoader.getResource("truststore").getPath
|
||||
val keyStore = getClass.getClassLoader.getResource("keystore").getPath
|
||||
|
||||
ConfigFactory.parseString(s"""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
key-store = "$keyStore"
|
||||
trust-store = "$trustStore"
|
||||
}
|
||||
""")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,6 +36,20 @@ class ArteryTcpSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsi
|
|||
akka.remote.artery.advanced.inbound-lanes = 3
|
||||
""").withFallback(ArterySpecSupport.defaultConfig))
|
||||
|
||||
class ArteryTlsTcpSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec(
|
||||
ConfigFactory.parseString("""
|
||||
akka.remote.artery.transport = tls-tcp
|
||||
akka.remote.artery.advanced.outbound-lanes = 1
|
||||
akka.remote.artery.advanced.inbound-lanes = 1
|
||||
""").withFallback(ArterySpecSupport.defaultConfig))
|
||||
|
||||
class ArteryTlsTcpSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
|
||||
ConfigFactory.parseString("""
|
||||
akka.remote.artery.transport = tls-tcp
|
||||
akka.remote.artery.advanced.outbound-lanes = 1
|
||||
akka.remote.artery.advanced.inbound-lanes = 1
|
||||
""").withFallback(ArterySpecSupport.defaultConfig))
|
||||
|
||||
abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryMultiNodeSpec(config) with ImplicitSender {
|
||||
|
||||
val systemB = newRemoteSystem(name = Some("systemB"))
|
||||
|
|
|
|||
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
package tcp
|
||||
|
||||
import java.security.NoSuchAlgorithmException
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorPath
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Identify
|
||||
import akka.actor.RootActorPath
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.TestActors
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
class TlsTcpWithDefaultConfigSpec extends TlsTcpSpec(ConfigFactory.empty())
|
||||
|
||||
class TlsTcpWithSHA1PRNGSpec extends TlsTcpSpec(ConfigFactory.parseString("""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
random-number-generator = "SHA1PRNG"
|
||||
enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"]
|
||||
}
|
||||
"""))
|
||||
|
||||
class TlsTcpWithAES128CounterSecureRNGSpec extends TlsTcpSpec(ConfigFactory.parseString("""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
random-number-generator = "AES128CounterSecureRNG"
|
||||
enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"]
|
||||
}
|
||||
"""))
|
||||
|
||||
class TlsTcpWithAES256CounterSecureRNGSpec extends TlsTcpSpec(ConfigFactory.parseString("""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
random-number-generator = "AES256CounterSecureRNG"
|
||||
enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"]
|
||||
}
|
||||
"""))
|
||||
|
||||
class TlsTcpWithDefaultRNGSecureSpec extends TlsTcpSpec(ConfigFactory.parseString("""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
random-number-generator = ""
|
||||
enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"]
|
||||
}
|
||||
"""))
|
||||
|
||||
class TlsTcpWithCrappyRSAWithMD5OnlyHereToMakeSureThingsWorkSpec extends TlsTcpSpec(ConfigFactory.parseString("""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
random-number-generator = ""
|
||||
enabled-algorithms = [""SSL_RSA_WITH_NULL_MD5""]
|
||||
}
|
||||
"""))
|
||||
|
||||
object TlsTcpSpec {
|
||||
|
||||
lazy val config: Config = {
|
||||
ConfigFactory.parseString(s"""
|
||||
akka.loglevel = DEBUG
|
||||
akka.remote.artery {
|
||||
transport = tls-tcp
|
||||
large-message-destinations = [ "/user/large" ]
|
||||
}
|
||||
""")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
abstract class TlsTcpSpec(config: Config)
|
||||
extends ArteryMultiNodeSpec(config.withFallback(TlsTcpSpec.config)) with ImplicitSender {
|
||||
|
||||
val systemB = newRemoteSystem(name = Some("systemB"))
|
||||
val addressB = address(systemB)
|
||||
val rootB = RootActorPath(addressB)
|
||||
|
||||
def isSupported: Boolean = {
|
||||
try {
|
||||
val provider = new ConfigSSLEngineProvider(system)
|
||||
|
||||
val rng = provider.createSecureRandom()
|
||||
rng.nextInt() // Has to work
|
||||
val sRng = provider.SSLRandomNumberGenerator
|
||||
if (rng.getAlgorithm != sRng && sRng != "")
|
||||
throw new NoSuchAlgorithmException(sRng)
|
||||
|
||||
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||
val host = address.host.get
|
||||
val port = address.port.get
|
||||
|
||||
val engine = provider.createServerSSLEngine(host, port)
|
||||
val gotAllSupported = provider.SSLEnabledAlgorithms diff engine.getSupportedCipherSuites.toSet
|
||||
val gotAllEnabled = provider.SSLEnabledAlgorithms diff engine.getEnabledCipherSuites.toSet
|
||||
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
|
||||
gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled))
|
||||
engine.getSupportedProtocols.contains(provider.SSLProtocol) ||
|
||||
(throw new IllegalArgumentException("Protocol not supported: " + provider.SSLProtocol))
|
||||
} catch {
|
||||
case e @ ((_: IllegalArgumentException) | (_: NoSuchAlgorithmException)) ⇒
|
||||
info(e.toString)
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
def identify(path: ActorPath): ActorRef = {
|
||||
system.actorSelection(path) ! Identify(path.name)
|
||||
expectMsgType[ActorIdentity].ref.get
|
||||
}
|
||||
|
||||
def testDelivery(echoRef: ActorRef): Unit = {
|
||||
echoRef ! "ping-1"
|
||||
expectMsg("ping-1")
|
||||
|
||||
// and some more
|
||||
(2 to 10).foreach { n ⇒
|
||||
echoRef ! s"ping-$n"
|
||||
}
|
||||
receiveN(9) should equal((2 to 10).map(n ⇒ s"ping-$n"))
|
||||
}
|
||||
|
||||
"Artery with TLS/TCP" must {
|
||||
|
||||
if (isSupported) {
|
||||
|
||||
"deliver messages" in {
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
val echoRef = identify(rootB / "user" / "echo")
|
||||
testDelivery(echoRef)
|
||||
}
|
||||
|
||||
"deliver messages over large messages stream" in {
|
||||
systemB.actorOf(TestActors.echoActorProps, "large")
|
||||
val echoRef = identify(rootB / "user" / "large")
|
||||
testDelivery(echoRef)
|
||||
}
|
||||
|
||||
} else {
|
||||
"not be run when the cipher is not supported by the platform this test is currently being executed on" in {
|
||||
pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class TlsTcpWithHostnameVerificationSpec extends ArteryMultiNodeSpec(
|
||||
ConfigFactory.parseString("""
|
||||
akka.remote.artery.ssl.config-ssl-engine {
|
||||
hostname-verification = on
|
||||
}
|
||||
""").withFallback(TlsTcpSpec.config)) with ImplicitSender {
|
||||
|
||||
val systemB = newRemoteSystem(name = Some("systemB"))
|
||||
val addressB = address(systemB)
|
||||
val rootB = RootActorPath(addressB)
|
||||
|
||||
"Artery with TLS/TCP and hostname-verification=on" must {
|
||||
"reject invalid" in {
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
system.actorSelection(rootB / "user" / "echo") ! Identify("echo")
|
||||
expectNoMessage(2.seconds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -6,6 +6,8 @@ package akka.stream.scaladsl
|
|||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.TimeoutException
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLEngine
|
||||
import javax.net.ssl.SSLSession
|
||||
|
||||
import akka.actor._
|
||||
import akka.annotation.{ ApiMayChange, InternalApi }
|
||||
|
|
@ -17,10 +19,10 @@ import akka.stream.impl.fusing.GraphStages.detacher
|
|||
import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout }
|
||||
import akka.util.ByteString
|
||||
import akka.{ Done, NotUsed }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.util.Try
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
||||
|
|
@ -265,6 +267,24 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
connection.join(tlsWrapping.atop(tls).reversed)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API: for raw SSLEngine
|
||||
*/
|
||||
@InternalApi private[akka] def outgoingTlsConnectionWithSSLEngine(
|
||||
remoteAddress: InetSocketAddress,
|
||||
createSSLEngine: () ⇒ SSLEngine,
|
||||
localAddress: Option[InetSocketAddress] = None,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
connectTimeout: Duration = Duration.Inf,
|
||||
idleTimeout: Duration = Duration.Inf,
|
||||
verifySession: SSLSession ⇒ Try[Unit],
|
||||
closing: TLSClosing = IgnoreComplete): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
|
||||
|
||||
val connection = outgoingConnection(remoteAddress, localAddress, options, true, connectTimeout, idleTimeout)
|
||||
val tls = TLS(createSSLEngine, verifySession, closing)
|
||||
connection.join(tlsWrapping.atop(tls).reversed)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
||||
* where all incoming and outgoing bytes are passed through TLS.
|
||||
|
|
@ -294,6 +314,28 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def bindTlsWithSSLEngine(
|
||||
interface: String,
|
||||
port: Int,
|
||||
createSSLEngine: () ⇒ SSLEngine,
|
||||
backlog: Int = 100,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
idleTimeout: Duration = Duration.Inf,
|
||||
verifySession: SSLSession ⇒ Try[Unit],
|
||||
closing: TLSClosing = IgnoreComplete): Source[IncomingConnection, Future[ServerBinding]] = {
|
||||
|
||||
val tls = tlsWrapping.atop(TLS(createSSLEngine, verifySession, closing)).reversed
|
||||
|
||||
bind(interface, port, backlog, options, true, idleTimeout).map { incomingConnection ⇒
|
||||
incomingConnection.copy(
|
||||
flow = incomingConnection.flow.join(tls)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
||||
* handling the incoming connections through TLS and then run using the provided Flow.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue