Configurable SSLEngineProvider in classic remoting, #23732
This commit is contained in:
parent
e343222505
commit
27e9587a3b
11 changed files with 419 additions and 173 deletions
|
|
@ -75,7 +75,7 @@ final class ActorSystemSetup private[akka] (setups: Map[Class[_], AnyRef]) {
|
|||
}
|
||||
|
||||
/**
|
||||
* alias for `withSetting` allowing for fluent combination of settings: `a and b and c`, where `a`, `b` and `c` are
|
||||
* alias for `withSetup` allowing for fluent combination of settings: `a and b and c`, where `a`, `b` and `c` are
|
||||
* concrete [[Setup]] instances. If a setting of the same concrete [[Setup]] already is
|
||||
* present it will be replaced.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
# #23732 SSLEngineProvider, changes to internal classes
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.SSLSettings.getOrCreateContext")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.SSLSettings.createSecureRandom")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.NettySSLSupport.apply")
|
||||
|
||||
|
|
@ -625,6 +625,19 @@ akka {
|
|||
# This must be enabled on both the client and server to work.
|
||||
enable-ssl = true
|
||||
|
||||
# Factory of SSLEngine.
|
||||
# Must implement akka.remote.transport.netty.SSLEngineProvider and have a public
|
||||
# constructor with an ActorSystem parameter.
|
||||
# The default ConfigSSLEngineProvider is configured by properties in section
|
||||
# akka.remote.netty.ssl.security
|
||||
#
|
||||
# The SSLEngineProvider can also be defined via ActorSystemSetup with
|
||||
# SSLEngineProviderSetup when starting the ActorSystem. That is useful when
|
||||
# the SSLEngineProvider implementation requires other external constructor
|
||||
# parameters or is created before the ActorSystem is created.
|
||||
# If such SSLEngineProviderSetup is defined this config property is not used.
|
||||
ssl-engine-provider = akka.remote.transport.netty.ConfigSSLEngineProvider
|
||||
|
||||
security {
|
||||
# This is the Java Key Store used by the server connection
|
||||
key-store = "keystore"
|
||||
|
|
|
|||
|
|
@ -77,12 +77,18 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
|
|||
|
||||
private val sslEngineProvider: OptionVal[SSLEngineProvider] =
|
||||
if (tlsEnabled) {
|
||||
system.settings.setup.get[SSLEngineProviderSetup] match {
|
||||
case Some(p) ⇒
|
||||
OptionVal.Some(p.sslEngineProvider(system))
|
||||
case None ⇒
|
||||
// load from config
|
||||
OptionVal.Some(system.dynamicAccess.createInstanceFor[SSLEngineProvider](
|
||||
settings.SSLEngineProviderClassName,
|
||||
List((classOf[ActorSystem], system))).recover {
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create SSLEngineProvider [${settings.SSLEngineProviderClassName}]", e)
|
||||
}.get)
|
||||
}
|
||||
} else OptionVal.None
|
||||
|
||||
override protected def startTransport(): Unit = {
|
||||
|
|
|
|||
|
|
@ -12,15 +12,12 @@ import java.nio.file.Paths
|
|||
import java.security.GeneralSecurityException
|
||||
import java.security.KeyStore
|
||||
import java.security.SecureRandom
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLEngine
|
||||
import javax.net.ssl.SSLSession
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.setup.Setup
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.InternalApi
|
||||
import akka.event.LogMarker
|
||||
|
|
@ -32,6 +29,13 @@ import akka.stream.IgnoreComplete
|
|||
import akka.stream.TLSClosing
|
||||
import akka.stream.TLSRole
|
||||
import com.typesafe.config.Config
|
||||
import javax.net.ssl.KeyManager
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLEngine
|
||||
import javax.net.ssl.SSLSession
|
||||
import javax.net.ssl.TrustManager
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
|
||||
@ApiMayChange trait SSLEngineProvider {
|
||||
|
||||
|
|
@ -58,25 +62,28 @@ import com.typesafe.config.Config
|
|||
class SslTransportException(message: String, cause: Throwable) extends RuntimeException(message, cause)
|
||||
|
||||
/**
|
||||
* INTERNAL API: only public via config
|
||||
* Config in akka.remote.artery.ssl.config-ssl-engine
|
||||
*
|
||||
* Subclass may override protected methods to replace certain parts, such as key and trust manager.
|
||||
*/
|
||||
@InternalApi private[akka] final class ConfigSSLEngineProvider(config: Config, log: MarkerLoggingAdapter) extends SSLEngineProvider {
|
||||
@ApiMayChange class ConfigSSLEngineProvider(
|
||||
protected val config: Config,
|
||||
protected val log: MarkerLoggingAdapter) extends SSLEngineProvider {
|
||||
|
||||
def this(system: ActorSystem) = this(
|
||||
system.settings.config.getConfig("akka.remote.artery.ssl.config-ssl-engine"),
|
||||
Logging.withMarker(system, classOf[ConfigSSLEngineProvider].getName))
|
||||
|
||||
private val SSLKeyStore = config.getString("key-store")
|
||||
private val SSLTrustStore = config.getString("trust-store")
|
||||
private val SSLKeyStorePassword = config.getString("key-store-password")
|
||||
private val SSLKeyPassword = config.getString("key-password")
|
||||
private val SSLTrustStorePassword = config.getString("trust-store-password")
|
||||
val SSLEnabledAlgorithms = immutableSeq(config.getStringList("enabled-algorithms")).to[Set]
|
||||
val SSLProtocol = config.getString("protocol")
|
||||
val SSLRandomNumberGenerator = config.getString("random-number-generator")
|
||||
val SSLRequireMutualAuthentication = config.getBoolean("require-mutual-authentication")
|
||||
private val HostnameVerification = config.getBoolean("hostname-verification")
|
||||
val SSLKeyStore: String = config.getString("key-store")
|
||||
val SSLTrustStore: String = config.getString("trust-store")
|
||||
val SSLKeyStorePassword: String = config.getString("key-store-password")
|
||||
val SSLKeyPassword: String = config.getString("key-password")
|
||||
val SSLTrustStorePassword: String = config.getString("trust-store-password")
|
||||
val SSLEnabledAlgorithms: Set[String] = immutableSeq(config.getStringList("enabled-algorithms")).to[Set]
|
||||
val SSLProtocol: String = config.getString("protocol")
|
||||
val SSLRandomNumberGenerator: String = config.getString("random-number-generator")
|
||||
val SSLRequireMutualAuthentication: Boolean = config.getBoolean("require-mutual-authentication")
|
||||
val HostnameVerification: Boolean = config.getBoolean("hostname-verification")
|
||||
|
||||
private lazy val sslContext: SSLContext = {
|
||||
// log hostname verification warning once
|
||||
|
|
@ -93,25 +100,7 @@ class SslTransportException(message: String, cause: Throwable) extends RuntimeEx
|
|||
|
||||
private def constructContext(): SSLContext = {
|
||||
try {
|
||||
def loadKeystore(filename: String, password: String): KeyStore = {
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = Files.newInputStream(Paths.get(filename))
|
||||
try keyStore.load(fin, password.toCharArray) finally Try(fin.close())
|
||||
keyStore
|
||||
}
|
||||
|
||||
val keyManagers = {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
factory.init(loadKeystore(SSLKeyStore, SSLKeyStorePassword), SSLKeyPassword.toCharArray)
|
||||
factory.getKeyManagers
|
||||
}
|
||||
val trustManagers = {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
trustManagerFactory.init(loadKeystore(SSLTrustStore, SSLTrustStorePassword))
|
||||
trustManagerFactory.getTrustManagers
|
||||
}
|
||||
val rng = createSecureRandom()
|
||||
|
||||
val ctx = SSLContext.getInstance(SSLProtocol)
|
||||
ctx.init(keyManagers, trustManagers, rng)
|
||||
ctx
|
||||
|
|
@ -125,29 +114,37 @@ class SslTransportException(message: String, cause: Throwable) extends RuntimeEx
|
|||
}
|
||||
}
|
||||
|
||||
def createSecureRandom(): SecureRandom = {
|
||||
val rng = SSLRandomNumberGenerator match {
|
||||
case r @ ("AES128CounterSecureRNG" | "AES256CounterSecureRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", r)
|
||||
SecureRandom.getInstance(r, AkkaProvider)
|
||||
case s @ ("SHA1PRNG" | "NativePRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", s)
|
||||
// SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
|
||||
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
|
||||
SecureRandom.getInstance(s)
|
||||
|
||||
case "" ⇒
|
||||
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
|
||||
new SecureRandom
|
||||
|
||||
case unknown ⇒
|
||||
log.warning(LogMarker.Security, "Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
|
||||
new SecureRandom
|
||||
/**
|
||||
* Subclass may override to customize loading of `KeyStore`
|
||||
*/
|
||||
protected def loadKeystore(filename: String, password: String): KeyStore = {
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = Files.newInputStream(Paths.get(filename))
|
||||
try keyStore.load(fin, password.toCharArray) finally Try(fin.close())
|
||||
keyStore
|
||||
}
|
||||
rng.nextInt() // prevent stall on first access
|
||||
rng
|
||||
|
||||
/**
|
||||
* Subclass may override to customize `KeyManager`
|
||||
*/
|
||||
protected def keyManagers: Array[KeyManager] = {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
factory.init(loadKeystore(SSLKeyStore, SSLKeyStorePassword), SSLKeyPassword.toCharArray)
|
||||
factory.getKeyManagers
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass may override to customize `TrustManager`
|
||||
*/
|
||||
protected def trustManagers: Array[TrustManager] = {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
trustManagerFactory.init(loadKeystore(SSLTrustStore, SSLTrustStorePassword))
|
||||
trustManagerFactory.getTrustManagers
|
||||
}
|
||||
|
||||
def createSecureRandom(): SecureRandom =
|
||||
SecureRandomFactory.createSecureRandom(SSLRandomNumberGenerator, log)
|
||||
|
||||
override def createServerSSLEngine(hostname: String, port: Int): SSLEngine =
|
||||
createSSLEngine(akka.stream.Server, hostname, port)
|
||||
|
||||
|
|
@ -191,3 +188,59 @@ class SslTransportException(message: String, cause: Throwable) extends RuntimeEx
|
|||
|
||||
}
|
||||
|
||||
object SSLEngineProviderSetup {
|
||||
|
||||
/**
|
||||
* Scala API: factory for defining a `SSLEngineProvider` that is passed in when ActorSystem
|
||||
* is created rather than creating one from configured class name.
|
||||
*/
|
||||
def apply(sslEngineProvider: ExtendedActorSystem ⇒ SSLEngineProvider): SSLEngineProviderSetup =
|
||||
new SSLEngineProviderSetup(sslEngineProvider)
|
||||
|
||||
/**
|
||||
* Java API: factory for defining a `SSLEngineProvider` that is passed in when ActorSystem
|
||||
* is created rather than creating one from configured class name.
|
||||
*/
|
||||
def create(sslEngineProvider: java.util.function.Function[ExtendedActorSystem, SSLEngineProvider]): SSLEngineProviderSetup =
|
||||
apply(sys ⇒ sslEngineProvider(sys))
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup for for defining a `SSLEngineProvider` that is passed in when ActorSystem
|
||||
* is created rather than creating one from configured class name. That is useful
|
||||
* when the SSLEngineProvider implementation require other external constructor parameters
|
||||
* or is created before the ActorSystem is created.
|
||||
*
|
||||
* Constructor is *Internal API*, use factories in [[SSLEngineProviderSetup()]]
|
||||
*/
|
||||
@ApiMayChange class SSLEngineProviderSetup private (
|
||||
val sslEngineProvider: ExtendedActorSystem ⇒ SSLEngineProvider) extends Setup
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object SecureRandomFactory {
|
||||
def createSecureRandom(randomNumberGenerator: String, log: MarkerLoggingAdapter): SecureRandom = {
|
||||
val rng = randomNumberGenerator match {
|
||||
case r @ ("AES128CounterSecureRNG" | "AES256CounterSecureRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", r)
|
||||
SecureRandom.getInstance(r, AkkaProvider)
|
||||
case s @ ("SHA1PRNG" | "NativePRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", s)
|
||||
// SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
|
||||
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
|
||||
SecureRandom.getInstance(s)
|
||||
|
||||
case "" ⇒
|
||||
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
|
||||
new SecureRandom
|
||||
|
||||
case unknown ⇒
|
||||
log.warning(LogMarker.Security, "Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
|
||||
new SecureRandom
|
||||
}
|
||||
rng.nextInt() // prevent stall on first access
|
||||
rng
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,28 +4,21 @@
|
|||
|
||||
package akka.remote.transport.netty
|
||||
|
||||
import java.io.{ FileNotFoundException, IOException }
|
||||
import java.security._
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import javax.net.ssl.{ KeyManagerFactory, SSLContext, TrustManagerFactory }
|
||||
|
||||
import akka.event.{ LogMarker, MarkerLoggingAdapter }
|
||||
import akka.japi.Util._
|
||||
import akka.remote.RemoteTransportException
|
||||
import akka.remote.security.provider.AkkaProvider
|
||||
import com.typesafe.config.Config
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.Try
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class SSLSettings(config: Config) {
|
||||
import config.{ getBoolean, getString, getStringList }
|
||||
|
||||
import config.getBoolean
|
||||
import config.getString
|
||||
import config.getStringList
|
||||
|
||||
val SSLKeyStore = getString("key-store")
|
||||
val SSLTrustStore = getString("trust-store")
|
||||
|
|
@ -42,89 +35,26 @@ private[akka] class SSLSettings(config: Config) {
|
|||
|
||||
val SSLRequireMutualAuthentication = getBoolean("require-mutual-authentication")
|
||||
|
||||
private val sslContext = new AtomicReference[SSLContext]()
|
||||
@tailrec final def getOrCreateContext(log: MarkerLoggingAdapter): SSLContext =
|
||||
sslContext.get() match {
|
||||
case null ⇒
|
||||
val newCtx = constructContext(log)
|
||||
if (sslContext.compareAndSet(null, newCtx)) newCtx
|
||||
else getOrCreateContext(log)
|
||||
case ctx ⇒ ctx
|
||||
}
|
||||
|
||||
private def constructContext(log: MarkerLoggingAdapter): SSLContext =
|
||||
try {
|
||||
def loadKeystore(filename: String, password: String): KeyStore = {
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = Files.newInputStream(Paths.get(filename))
|
||||
try keyStore.load(fin, password.toCharArray) finally Try(fin.close())
|
||||
keyStore
|
||||
}
|
||||
|
||||
val keyManagers = {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
factory.init(loadKeystore(SSLKeyStore, SSLKeyStorePassword), SSLKeyPassword.toCharArray)
|
||||
factory.getKeyManagers
|
||||
}
|
||||
val trustManagers = {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
trustManagerFactory.init(loadKeystore(SSLTrustStore, SSLTrustStorePassword))
|
||||
trustManagerFactory.getTrustManagers
|
||||
}
|
||||
val rng = createSecureRandom(log)
|
||||
|
||||
val ctx = SSLContext.getInstance(SSLProtocol)
|
||||
ctx.init(keyManagers, trustManagers, rng)
|
||||
ctx
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
|
||||
case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
|
||||
case e: GeneralSecurityException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
|
||||
}
|
||||
|
||||
def createSecureRandom(log: MarkerLoggingAdapter): SecureRandom = {
|
||||
val rng = SSLRandomNumberGenerator match {
|
||||
case r @ ("AES128CounterSecureRNG" | "AES256CounterSecureRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", r)
|
||||
SecureRandom.getInstance(r, AkkaProvider)
|
||||
case s @ ("SHA1PRNG" | "NativePRNG") ⇒
|
||||
log.debug("SSL random number generator set to: {}", s)
|
||||
// SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
|
||||
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
|
||||
SecureRandom.getInstance(s)
|
||||
|
||||
case "" ⇒
|
||||
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
|
||||
new SecureRandom
|
||||
|
||||
case unknown ⇒
|
||||
log.warning(LogMarker.Security, "Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
|
||||
new SecureRandom
|
||||
}
|
||||
rng.nextInt() // prevent stall on first access
|
||||
rng
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Used for adding SSL support to Netty pipeline
|
||||
* Used for adding SSL support to Netty pipeline.
|
||||
* The `SSLEngine` is created via the configured [[SSLEngineProvider]].
|
||||
*/
|
||||
private[akka] object NettySSLSupport {
|
||||
|
||||
// TODO is this needed in Artery TLS?
|
||||
Security addProvider AkkaProvider
|
||||
|
||||
/**
|
||||
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
|
||||
*/
|
||||
def apply(settings: SSLSettings, log: MarkerLoggingAdapter, isClient: Boolean): SslHandler = {
|
||||
val sslEngine = settings.getOrCreateContext(log).createSSLEngine // TODO: pass host information to enable host verification
|
||||
sslEngine.setUseClientMode(isClient)
|
||||
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray)
|
||||
sslEngine.setEnabledProtocols(Array(settings.SSLProtocol))
|
||||
|
||||
if (!isClient && settings.SSLRequireMutualAuthentication) sslEngine.setNeedClientAuth(true)
|
||||
def apply(sslEngineProvider: SSLEngineProvider, isClient: Boolean): SslHandler = {
|
||||
val sslEngine =
|
||||
if (isClient) sslEngineProvider.createClientSSLEngine()
|
||||
else sslEngineProvider.createServerSSLEngine()
|
||||
new SslHandler(sslEngine)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,32 +4,60 @@
|
|||
|
||||
package akka.remote.transport.netty
|
||||
|
||||
import akka.actor.{ Address, ExtendedActorSystem }
|
||||
import java.net.InetAddress
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.SocketAddress
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.CancellationException
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.blocking
|
||||
import scala.util.Try
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.dispatch.ThreadPoolConfig
|
||||
import akka.event.Logging
|
||||
import akka.remote.RARP
|
||||
import akka.remote.transport.AssociationHandle.HandleEventListener
|
||||
import akka.remote.transport.Transport._
|
||||
import akka.remote.transport.netty.NettyTransportSettings.{ Udp, Tcp, Mode }
|
||||
import akka.remote.transport.{ AssociationHandle, Transport }
|
||||
import akka.{ OnlyCauseStackTrace, ConfigurationException }
|
||||
import com.typesafe.config.Config
|
||||
import java.net.{ SocketAddress, InetAddress, InetSocketAddress }
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.{ ConcurrentHashMap, Executors, CancellationException }
|
||||
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
|
||||
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
|
||||
import org.jboss.netty.channel._
|
||||
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
|
||||
import org.jboss.netty.channel.socket.nio.{ NioWorkerPool, NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import scala.concurrent.duration.{ FiniteDuration }
|
||||
import scala.concurrent.{ ExecutionContext, Promise, Future, blocking }
|
||||
import scala.util.{ Try }
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
import akka.util.Helpers.Requiring
|
||||
import akka.remote.transport.netty.NettyTransportSettings.Mode
|
||||
import akka.remote.transport.netty.NettyTransportSettings.Tcp
|
||||
import akka.remote.transport.netty.NettyTransportSettings.Udp
|
||||
import akka.remote.transport.AssociationHandle
|
||||
import akka.remote.transport.Transport
|
||||
import akka.util.Helpers
|
||||
import akka.remote.RARP
|
||||
import akka.util.Helpers.Requiring
|
||||
import akka.util.OptionVal
|
||||
import akka.ConfigurationException
|
||||
import akka.OnlyCauseStackTrace
|
||||
import com.typesafe.config.Config
|
||||
import org.jboss.netty.bootstrap.Bootstrap
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap
|
||||
import org.jboss.netty.bootstrap.ConnectionlessBootstrap
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
import org.jboss.netty.buffer.ChannelBuffer
|
||||
import org.jboss.netty.buffer.ChannelBuffers
|
||||
import org.jboss.netty.channel._
|
||||
import org.jboss.netty.channel.group.ChannelGroup
|
||||
import org.jboss.netty.channel.group.ChannelGroupFuture
|
||||
import org.jboss.netty.channel.group.ChannelGroupFutureListener
|
||||
import org.jboss.netty.channel.group.DefaultChannelGroup
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
||||
import org.jboss.netty.channel.socket.nio.NioWorkerPool
|
||||
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder
|
||||
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import org.jboss.netty.util.HashedWheelTimer
|
||||
|
||||
object NettyTransportSettings {
|
||||
|
|
@ -88,6 +116,8 @@ class NettyTransportSettings(config: Config) {
|
|||
|
||||
val EnableSsl: Boolean = getBoolean("enable-ssl") requiring (!_ || TransportMode == Tcp, s"$TransportMode does not support SSL")
|
||||
|
||||
val SSLEngineProviderClassName: String = if (EnableSsl) getString("ssl-engine-provider") else ""
|
||||
|
||||
val UseDispatcherForIo: Option[String] = getString("use-dispatcher-for-io") match {
|
||||
case "" | null ⇒ None
|
||||
case dispatcher ⇒ Some(dispatcher)
|
||||
|
|
@ -341,10 +371,26 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
|||
|
||||
private val associationListenerPromise: Promise[AssociationEventListener] = Promise()
|
||||
|
||||
private val sslEngineProvider: OptionVal[SSLEngineProvider] =
|
||||
if (settings.EnableSsl) {
|
||||
OptionVal.Some(system.dynamicAccess.createInstanceFor[SSLEngineProvider](
|
||||
settings.SSLEngineProviderClassName,
|
||||
List((classOf[ActorSystem], system))).recover {
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
s"Could not create SSLEngineProvider [${settings.SSLEngineProviderClassName}]", e)
|
||||
}.get)
|
||||
} else OptionVal.None
|
||||
|
||||
private def sslHandler(isClient: Boolean): SslHandler = {
|
||||
val handler = NettySSLSupport(settings.SslSettings.get, log, isClient)
|
||||
sslEngineProvider match {
|
||||
case OptionVal.Some(sslProvider) ⇒
|
||||
val handler = NettySSLSupport(sslProvider, isClient)
|
||||
handler.setCloseOnSSLException(true)
|
||||
handler
|
||||
case OptionVal.None ⇒
|
||||
throw new IllegalStateException("Expected enable-ssl=on")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private val serverPipelineFactory: ChannelPipelineFactory = new ChannelPipelineFactory {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,140 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.remote.transport.netty
|
||||
|
||||
import java.io.FileNotFoundException
|
||||
import java.io.IOException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.security.GeneralSecurityException
|
||||
import java.security.KeyStore
|
||||
import java.security.SecureRandom
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.Try
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.event.LogMarker
|
||||
import akka.event.Logging
|
||||
import akka.event.MarkerLoggingAdapter
|
||||
import akka.japi.Util.immutableSeq
|
||||
import akka.remote.RemoteTransport
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.remote.RemoteTransportException
|
||||
import akka.remote.artery.tcp.SecureRandomFactory
|
||||
import akka.remote.security.provider.AkkaProvider
|
||||
import akka.remote.transport.AbstractTransportAdapter
|
||||
import akka.stream.IgnoreComplete
|
||||
import akka.stream.TLSClosing
|
||||
import akka.stream.TLSRole
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.sslconfig.ssl.SSLConfigSettings
|
||||
import javax.net.ssl.KeyManager
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLEngine
|
||||
import javax.net.ssl.SSLSession
|
||||
import javax.net.ssl.TrustManager
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
|
||||
@ApiMayChange trait SSLEngineProvider {
|
||||
|
||||
def createServerSSLEngine(): SSLEngine
|
||||
|
||||
def createClientSSLEngine(): SSLEngine
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Config in akka.remote.netty.ssl.security
|
||||
*
|
||||
* Subclass may override protected methods to replace certain parts, such as key and trust manager.
|
||||
*/
|
||||
@ApiMayChange class ConfigSSLEngineProvider(
|
||||
protected val log: MarkerLoggingAdapter,
|
||||
private val settings: SSLSettings) extends SSLEngineProvider {
|
||||
|
||||
def this(system: ActorSystem) = this(
|
||||
Logging.withMarker(system, classOf[ConfigSSLEngineProvider].getName),
|
||||
new SSLSettings(system.settings.config.getConfig("akka.remote.netty.ssl.security"))
|
||||
)
|
||||
|
||||
import settings._
|
||||
|
||||
private lazy val sslContext: SSLContext = {
|
||||
try {
|
||||
val rng = createSecureRandom()
|
||||
val ctx = SSLContext.getInstance(SSLProtocol)
|
||||
ctx.init(keyManagers, trustManagers, rng)
|
||||
ctx
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
|
||||
case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
|
||||
case e: GeneralSecurityException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass may override to customize loading of `KeyStore`
|
||||
*/
|
||||
protected def loadKeystore(filename: String, password: String): KeyStore = {
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = Files.newInputStream(Paths.get(filename))
|
||||
try keyStore.load(fin, password.toCharArray) finally Try(fin.close())
|
||||
keyStore
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass may override to customize `KeyManager`
|
||||
*/
|
||||
protected def keyManagers: Array[KeyManager] = {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
factory.init(loadKeystore(SSLKeyStore, SSLKeyStorePassword), SSLKeyPassword.toCharArray)
|
||||
factory.getKeyManagers
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass may override to customize `TrustManager`
|
||||
*/
|
||||
protected def trustManagers: Array[TrustManager] = {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
trustManagerFactory.init(loadKeystore(SSLTrustStore, SSLTrustStorePassword))
|
||||
trustManagerFactory.getTrustManagers
|
||||
}
|
||||
|
||||
def createSecureRandom(): SecureRandom =
|
||||
SecureRandomFactory.createSecureRandom(SSLRandomNumberGenerator, log)
|
||||
|
||||
override def createServerSSLEngine(): SSLEngine =
|
||||
createSSLEngine(akka.stream.Server)
|
||||
|
||||
override def createClientSSLEngine(): SSLEngine =
|
||||
createSSLEngine(akka.stream.Client)
|
||||
|
||||
private def createSSLEngine(role: TLSRole): SSLEngine = {
|
||||
createSSLEngine(sslContext, role)
|
||||
}
|
||||
|
||||
private def createSSLEngine(
|
||||
sslContext: SSLContext,
|
||||
role: TLSRole,
|
||||
closing: TLSClosing = IgnoreComplete): SSLEngine = {
|
||||
|
||||
val engine = sslContext.createSSLEngine()
|
||||
|
||||
engine.setUseClientMode(role == akka.stream.Client)
|
||||
engine.setEnabledCipherSuites(SSLEnabledAlgorithms.toArray)
|
||||
engine.setEnabledProtocols(Array(SSLProtocol))
|
||||
|
||||
if ((role != akka.stream.Client) && SSLRequireMutualAuthentication)
|
||||
engine.setNeedClientAuth(true)
|
||||
|
||||
engine
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -14,11 +14,13 @@ import akka.remote.transport.netty.{ NettySSLSupport, SSLSettings }
|
|||
import akka.testkit._
|
||||
import akka.util.Timeout
|
||||
import com.typesafe.config._
|
||||
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
import scala.reflect.classTag
|
||||
|
||||
import akka.event.Logging
|
||||
import akka.remote.transport.netty.ConfigSSLEngineProvider
|
||||
|
||||
object Configuration {
|
||||
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
|
||||
// The certificate will expire in 2109
|
||||
|
|
@ -63,14 +65,15 @@ object Configuration {
|
|||
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty.ssl.security")
|
||||
val settings = new SSLSettings(fullConfig)
|
||||
|
||||
val rng = settings.createSecureRandom(NoMarkerLogging)
|
||||
val sslEngineProvider = new ConfigSSLEngineProvider(NoMarkerLogging, settings)
|
||||
val rng = sslEngineProvider.createSecureRandom()
|
||||
|
||||
rng.nextInt() // Has to work
|
||||
val sRng = settings.SSLRandomNumberGenerator
|
||||
if (rng.getAlgorithm != sRng && sRng != "")
|
||||
throw new NoSuchAlgorithmException(sRng)
|
||||
|
||||
val engine = NettySSLSupport(settings, NoMarkerLogging, isClient = true).getEngine
|
||||
val engine = sslEngineProvider.createClientSSLEngine()
|
||||
val gotAllSupported = enabled.toSet diff engine.getSupportedCipherSuites.toSet
|
||||
val gotAllEnabled = enabled.toSet diff engine.getEnabledCipherSuites.toSet
|
||||
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.remote.artery
|
||||
|
||||
import akka.actor.BootstrapSetup
|
||||
import akka.actor.setup.ActorSystemSetup
|
||||
import akka.actor.{ ActorSystem, RootActorPath }
|
||||
import akka.remote.RARP
|
||||
import akka.testkit.AkkaSpec
|
||||
|
|
@ -33,15 +35,23 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF
|
|||
* @return A new actor system configured with artery enabled. The system will
|
||||
* automatically be terminated after test is completed to avoid leaks.
|
||||
*/
|
||||
def newRemoteSystem(extraConfig: Option[String] = None, name: Option[String] = None): ActorSystem = {
|
||||
def newRemoteSystem(
|
||||
extraConfig: Option[String] = None,
|
||||
name: Option[String] = None,
|
||||
setup: Option[ActorSystemSetup] = None): ActorSystem = {
|
||||
val config =
|
||||
ArterySpecSupport.newFlightRecorderConfig.withFallback(extraConfig.fold(
|
||||
localSystem.settings.config
|
||||
)(
|
||||
str ⇒ ConfigFactory.parseString(str).withFallback(localSystem.settings.config)
|
||||
))
|
||||
val sysName = name.getOrElse(nextGeneratedSystemName)
|
||||
|
||||
val remoteSystem = setup match {
|
||||
case None ⇒ ActorSystem(sysName, config)
|
||||
case Some(s) ⇒ ActorSystem(sysName, s.and(BootstrapSetup.apply(config)))
|
||||
}
|
||||
|
||||
val remoteSystem = ActorSystem(name.getOrElse(nextGeneratedSystemName), config)
|
||||
remoteSystems = remoteSystems :+ remoteSystem
|
||||
|
||||
remoteSystem
|
||||
|
|
|
|||
|
|
@ -15,10 +15,13 @@ import akka.actor.ActorIdentity
|
|||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Identify
|
||||
import akka.actor.RootActorPath
|
||||
import akka.actor.setup.ActorSystemSetup
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.TestActors
|
||||
import akka.testkit.TestProbe
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import javax.net.ssl.SSLEngine
|
||||
|
||||
class TlsTcpWithDefaultConfigSpec extends TlsTcpSpec(ConfigFactory.empty())
|
||||
|
||||
|
|
@ -170,3 +173,40 @@ class TlsTcpWithHostnameVerificationSpec extends ArteryMultiNodeSpec(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TlsTcpWithActorSystemSetupSpec
|
||||
extends ArteryMultiNodeSpec(TlsTcpSpec.config) with ImplicitSender {
|
||||
|
||||
val sslProviderServerProbe = TestProbe()
|
||||
val sslProviderClientProbe = TestProbe()
|
||||
val sslProviderSetup = SSLEngineProviderSetup(sys ⇒ new ConfigSSLEngineProvider(sys) {
|
||||
override def createServerSSLEngine(hostname: String, port: Int): SSLEngine = {
|
||||
sslProviderServerProbe.ref ! "createServerSSLEngine"
|
||||
super.createServerSSLEngine(hostname, port)
|
||||
}
|
||||
|
||||
override def createClientSSLEngine(hostname: String, port: Int): SSLEngine = {
|
||||
sslProviderClientProbe.ref ! "createClientSSLEngine"
|
||||
super.createClientSSLEngine(hostname, port)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
val systemB = newRemoteSystem(name = Some("systemB"), setup = Some(ActorSystemSetup(sslProviderSetup)))
|
||||
val addressB = address(systemB)
|
||||
val rootB = RootActorPath(addressB)
|
||||
|
||||
"Artery with TLS/TCP with SSLEngineProvider defined via Setup" must {
|
||||
"use the right SSLEngineProvider" in {
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
val path = rootB / "user" / "echo"
|
||||
system.actorSelection(path) ! Identify(path.name)
|
||||
val echoRef = expectMsgType[ActorIdentity].ref.get
|
||||
echoRef ! "ping-1"
|
||||
expectMsg("ping-1")
|
||||
|
||||
sslProviderServerProbe.expectMsg("createServerSSLEngine")
|
||||
sslProviderClientProbe.expectMsg("createClientSSLEngine")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue