Do all (Akka)SSLConfig magic in one place directly in the TLS API. Also, introduce new low-level entrypoint in TLS that allows to specify an SSLEngine constructor directly without relying on SSLContext. This allows users to use third-party SSLEngine implementations like netty's OpenSslEngine together with akka-stream.
This commit is contained in:
parent
97bada7deb
commit
b4cfc3717f
4 changed files with 151 additions and 110 deletions
|
|
@ -197,7 +197,7 @@ private[akka] case class ActorMaterializerImpl(
|
|||
case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here
|
||||
val es = effectiveSettings(effectiveAttributes)
|
||||
val props =
|
||||
TLSActor.props(es, tls.sslContext, tls.sslConfig, tls.firstSession, tls.role, tls.closing, tls.hostInfo)
|
||||
TLSActor.props(es, tls.createSSLEngine, tls.verifySession, tls.closing)
|
||||
val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
|
||||
def factory(id: Int) = new ActorPublisher[Any](impl) {
|
||||
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
|
||||
|
|
|
|||
|
|
@ -4,37 +4,36 @@
|
|||
package akka.stream.impl.io
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util
|
||||
import java.util.Collections
|
||||
import javax.net.ssl.SSLEngineResult.HandshakeStatus
|
||||
import javax.net.ssl.SSLEngineResult.HandshakeStatus._
|
||||
import javax.net.ssl.SSLEngineResult.Status._
|
||||
import javax.net.ssl._
|
||||
|
||||
import akka.actor._
|
||||
import akka.stream._
|
||||
import akka.stream.impl.FanIn.InputBunch
|
||||
import akka.stream.impl.FanOut.OutputBunch
|
||||
import akka.stream.impl._
|
||||
import akka.util.ByteString
|
||||
import com.typesafe.sslconfig.akka.AkkaSSLConfig
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.stream.TLSProtocol._
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
object TLSActor {
|
||||
private[stream] object TLSActor {
|
||||
|
||||
def props(
|
||||
settings: ActorMaterializerSettings,
|
||||
sslContext: SSLContext,
|
||||
sslConfig: Option[AkkaSSLConfig],
|
||||
firstSession: NegotiateNewSession,
|
||||
role: TLSRole,
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing,
|
||||
hostInfo: Option[(String, Int)],
|
||||
tracing: Boolean = false): Props =
|
||||
Props(new TLSActor(settings, sslContext, sslConfig, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local)
|
||||
Props(new TLSActor(settings, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local)
|
||||
|
||||
final val TransportIn = 0
|
||||
final val TransportOut = 0
|
||||
|
|
@ -46,12 +45,12 @@ object TLSActor {
|
|||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
class TLSActor(
|
||||
private[stream] class TLSActor(
|
||||
settings: ActorMaterializerSettings,
|
||||
sslContext: SSLContext,
|
||||
externalSslConfig: Option[AkkaSSLConfig],
|
||||
firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing,
|
||||
hostInfo: Option[(String, Int)], tracing: Boolean)
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing,
|
||||
tracing: Boolean)
|
||||
extends Actor with ActorLogging with Pump {
|
||||
|
||||
import TLSActor._
|
||||
|
|
@ -147,43 +146,22 @@ class TLSActor(
|
|||
private val transportInChoppingBlock = new ChoppingBlock(TransportIn, "TransportIn")
|
||||
transportInChoppingBlock.prepare(transportInBuffer)
|
||||
|
||||
private val sslConfig = externalSslConfig.getOrElse(AkkaSSLConfig(context.system))
|
||||
private val hostnameVerifier = sslConfig.hostnameVerifier
|
||||
|
||||
val engine: SSLEngine = {
|
||||
val e = hostInfo match {
|
||||
case Some((hostname, port)) ⇒ sslContext.createSSLEngine(hostname, port)
|
||||
case None ⇒ sslContext.createSSLEngine()
|
||||
}
|
||||
sslConfig.sslEngineConfigurator.configure(e, sslContext)
|
||||
e.setUseClientMode(role == Client)
|
||||
e
|
||||
}
|
||||
|
||||
var currentSession = engine.getSession
|
||||
applySessionParameters(firstSession)
|
||||
|
||||
def applySessionParameters(params: NegotiateNewSession): Unit = {
|
||||
params.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray))
|
||||
params.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray))
|
||||
params.clientAuth match {
|
||||
case Some(TLSClientAuth.None) ⇒ engine.setNeedClientAuth(false)
|
||||
case Some(TLSClientAuth.Want) ⇒ engine.setWantClientAuth(true)
|
||||
case Some(TLSClientAuth.Need) ⇒ engine.setNeedClientAuth(true)
|
||||
case _ ⇒ // do nothing
|
||||
}
|
||||
|
||||
// configure Server Name Indication unless ssl-config disabled it (in which case we already logged many warnings)
|
||||
applySNI(params)
|
||||
// The engine could also be instantiated in ActorMaterializerImpl but if creation fails
|
||||
// during materialization it would be worse than failing later on.
|
||||
val engine =
|
||||
try createSSLEngine(context.system) catch { case NonFatal(ex) ⇒ fail(ex, closeTransport = true); throw ex }
|
||||
|
||||
engine.beginHandshake()
|
||||
lastHandshakeStatus = engine.getHandshakeStatus
|
||||
}
|
||||
|
||||
var currentSession = engine.getSession
|
||||
|
||||
def setNewSessionParameters(params: NegotiateNewSession): Unit = {
|
||||
if (tracing) log.debug(s"applying $params")
|
||||
currentSession.invalidate()
|
||||
applySessionParameters(params)
|
||||
TlsUtils.applySessionParameters(engine, params)
|
||||
engine.beginHandshake()
|
||||
lastHandshakeStatus = engine.getHandshakeStatus
|
||||
corkUser = true
|
||||
}
|
||||
|
||||
|
|
@ -434,12 +412,12 @@ class TLSActor(
|
|||
if (tracing) log.debug("handshake finished")
|
||||
val session = engine.getSession
|
||||
|
||||
hostInfo.map(_._1) match {
|
||||
case Some(hostname) if !hostnameVerifier.verify(hostname, session) ⇒
|
||||
fail(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"), closeTransport = true)
|
||||
case _ ⇒
|
||||
verifySession(context.system, session) match {
|
||||
case Success(()) ⇒
|
||||
currentSession = session
|
||||
corkUser = false
|
||||
case Failure(ex) ⇒
|
||||
fail(ex, closeTransport = true)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -458,6 +436,7 @@ class TLSActor(
|
|||
pump()
|
||||
}
|
||||
|
||||
// FIXME: what happens if this actor dies unexpectedly?
|
||||
override def postStop(): Unit = {
|
||||
if (tracing) log.debug("postStop")
|
||||
super.postStop()
|
||||
|
|
@ -471,33 +450,36 @@ class TLSActor(
|
|||
if (tracing) log.debug(s"STOP Outbound Closed: ${engine.isOutboundDone} Inbound closed: ${engine.isInboundDone}")
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
// Additional ssl-config related setup
|
||||
|
||||
// since setting a custom HostnameVerified (in JDK8, update 60 still) disables SNI
|
||||
// see here: https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SNIExamples
|
||||
// resolves: https://github.com/akka/akka/issues/19287
|
||||
private def applySNI(params: NegotiateNewSession): Unit = {
|
||||
for {
|
||||
sslParams ← params.sslParameters
|
||||
(hostname, _) ← hostInfo
|
||||
if !sslConfig.config.loose.disableSNI
|
||||
} yield {
|
||||
// first copy the *mutable* SLLParameters before modifying to prevent race condition in `setServerNames`
|
||||
val clone = new SSLParameters()
|
||||
clone.setCipherSuites(sslParams.getCipherSuites)
|
||||
clone.setProtocols(sslParams.getProtocols)
|
||||
clone.setWantClientAuth(sslParams.getWantClientAuth)
|
||||
clone.setNeedClientAuth(sslParams.getNeedClientAuth)
|
||||
clone.setEndpointIdentificationAlgorithm(sslParams.getEndpointIdentificationAlgorithm)
|
||||
clone.setAlgorithmConstraints(sslParams.getAlgorithmConstraints)
|
||||
clone.setSNIMatchers(sslParams.getSNIMatchers)
|
||||
clone.setUseCipherSuitesOrder(sslParams.getUseCipherSuitesOrder)
|
||||
|
||||
// apply the changes
|
||||
clone.setServerNames(Collections.singletonList(new SNIHostName(hostname)))
|
||||
engine.setSSLParameters(clone)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[stream] object TlsUtils {
|
||||
def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = {
|
||||
sessionParameters.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray))
|
||||
sessionParameters.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray))
|
||||
sessionParameters.clientAuth match {
|
||||
case Some(TLSClientAuth.None) ⇒ engine.setNeedClientAuth(false)
|
||||
case Some(TLSClientAuth.Want) ⇒ engine.setWantClientAuth(true)
|
||||
case Some(TLSClientAuth.Need) ⇒ engine.setNeedClientAuth(true)
|
||||
case _ ⇒ // do nothing
|
||||
}
|
||||
|
||||
sessionParameters.sslParameters.foreach(engine.setSSLParameters)
|
||||
}
|
||||
|
||||
def cloneParameters(old: SSLParameters): SSLParameters = {
|
||||
val newParameters = new SSLParameters()
|
||||
newParameters.setAlgorithmConstraints(old.getAlgorithmConstraints)
|
||||
newParameters.setCipherSuites(old.getCipherSuites)
|
||||
newParameters.setEndpointIdentificationAlgorithm(old.getEndpointIdentificationAlgorithm)
|
||||
newParameters.setNeedClientAuth(old.getNeedClientAuth)
|
||||
newParameters.setProtocols(old.getProtocols)
|
||||
newParameters.setServerNames(old.getServerNames)
|
||||
newParameters.setSNIMatchers(old.getSNIMatchers)
|
||||
newParameters.setUseCipherSuitesOrder(old.getUseCipherSuitesOrder)
|
||||
newParameters.setWantClientAuth(old.getWantClientAuth)
|
||||
newParameters
|
||||
}
|
||||
}
|
||||
|
|
@ -1,27 +1,28 @@
|
|||
package akka.stream.impl.io
|
||||
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession }
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream._
|
||||
import akka.stream.impl.StreamLayout.{ CompositeModule, AtomicModule }
|
||||
import akka.stream.impl.StreamLayout.{ AtomicModule, CompositeModule }
|
||||
import akka.stream.TLSProtocol._
|
||||
import akka.util.ByteString
|
||||
import com.typesafe.sslconfig.akka.AkkaSSLConfig
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound],
|
||||
private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound],
|
||||
cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString],
|
||||
shape: Shape, attributes: Attributes,
|
||||
sslContext: SSLContext,
|
||||
sslConfig: Option[AkkaSSLConfig],
|
||||
firstSession: NegotiateNewSession,
|
||||
role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule {
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing) extends AtomicModule {
|
||||
|
||||
override def withAttributes(att: Attributes): TlsModule = copy(attributes = att)
|
||||
override def carbonCopy: TlsModule =
|
||||
TlsModule(attributes, sslContext, sslConfig, firstSession, role, closing, hostInfo)
|
||||
override def carbonCopy: TlsModule = TlsModule(attributes, createSSLEngine, verifySession, closing)
|
||||
|
||||
override def replaceShape(s: Shape) =
|
||||
if (s != shape) {
|
||||
|
|
@ -29,20 +30,24 @@ final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslT
|
|||
CompositeModule(this, s)
|
||||
} else this
|
||||
|
||||
override def toString: String = f"TlsModule($firstSession, $role, $closing, $hostInfo) [${System.identityHashCode(this)}%08x]"
|
||||
override def toString: String = f"TlsModule($closing) [${System.identityHashCode(this)}%08x]"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
object TlsModule {
|
||||
def apply(attributes: Attributes, sslContext: SSLContext, sslConfig: Option[AkkaSSLConfig], firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]): TlsModule = {
|
||||
val name = attributes.nameOrDefault(s"StreamTls($role)")
|
||||
private[stream] object TlsModule {
|
||||
def apply(
|
||||
attributes: Attributes,
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing): TlsModule = {
|
||||
val name = attributes.nameOrDefault(s"StreamTls()")
|
||||
val cipherIn = Inlet[ByteString](s"$name.cipherIn")
|
||||
val cipherOut = Outlet[ByteString](s"$name.cipherOut")
|
||||
val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn")
|
||||
val plainOut = Outlet[SslTlsInbound](s"$name.transportOut")
|
||||
val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut)
|
||||
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, sslConfig, firstSession, role, closing, hostInfo)
|
||||
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, createSSLEngine, verifySession, closing)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,14 +1,18 @@
|
|||
package akka.stream.scaladsl
|
||||
|
||||
import javax.net.ssl.{ SSLContext }
|
||||
import java.util.Collections
|
||||
import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession }
|
||||
|
||||
import akka.stream.impl.io.TlsModule
|
||||
import akka.stream.impl.io.{ TlsModule, TlsUtils }
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream._
|
||||
import akka.stream.TLSProtocol._
|
||||
import akka.util.ByteString
|
||||
import com.typesafe.sslconfig.akka.AkkaSSLConfig
|
||||
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
|
||||
/**
|
||||
* Stream cipher support based upon JSSE.
|
||||
*
|
||||
|
|
@ -64,11 +68,51 @@ object TLS {
|
|||
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
|
||||
*/
|
||||
def apply(
|
||||
sslContext: SSLContext, // TODO: in 2.5.x replace sslContext and sslConfig by generic SSLEngine constructor function, see https://github.com/akka/akka/issues/21753
|
||||
sslContext: SSLContext,
|
||||
sslConfig: Option[AkkaSSLConfig],
|
||||
firstSession: NegotiateNewSession, role: TLSRole,
|
||||
closing: TLSClosing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, sslConfig, firstSession, role, closing, hostInfo))
|
||||
closing: TLSClosing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = {
|
||||
def theSslConfig(system: ActorSystem): AkkaSSLConfig =
|
||||
sslConfig.getOrElse(AkkaSSLConfig(system))
|
||||
|
||||
val createSSLEngine = { system: ActorSystem ⇒
|
||||
val engine = hostInfo match {
|
||||
case Some((hostname, port)) ⇒ sslContext.createSSLEngine(hostname, port)
|
||||
case None ⇒ sslContext.createSSLEngine()
|
||||
}
|
||||
val config = theSslConfig(system)
|
||||
config.sslEngineConfigurator.configure(engine, sslContext)
|
||||
engine.setUseClientMode(role == Client)
|
||||
|
||||
val finalSessionParameters =
|
||||
if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) {
|
||||
val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get)
|
||||
// In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using
|
||||
// `createSSLEngine(hostname, port)`.
|
||||
// In Java 8, SNI is only enabled if the server names are added to the parameters.
|
||||
// See https://github.com/akka/akka/issues/19287.
|
||||
newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1)))
|
||||
firstSession.copy(sslParameters = Some(newParams))
|
||||
} else
|
||||
firstSession
|
||||
|
||||
TlsUtils.applySessionParameters(engine, finalSessionParameters)
|
||||
engine
|
||||
}
|
||||
def verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit] =
|
||||
hostInfo match {
|
||||
case Some((hostname, _)) ⇒ { (system, session) ⇒
|
||||
val hostnameVerifier = theSslConfig(system).hostnameVerifier
|
||||
if (!hostnameVerifier.verify(hostname, session))
|
||||
Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"))
|
||||
else
|
||||
Success(())
|
||||
}
|
||||
case None ⇒ (_, _) ⇒ Success(())
|
||||
}
|
||||
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, createSSLEngine, verifySession, closing))
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The
|
||||
|
|
@ -90,7 +134,7 @@ object TLS {
|
|||
sslContext: SSLContext,
|
||||
firstSession: NegotiateNewSession, role: TLSRole,
|
||||
closing: TLSClosing, hostInfo: Option[(String, Int)]): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, None, firstSession, role, closing, hostInfo))
|
||||
apply(sslContext, None, firstSession, role, closing, hostInfo)
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The
|
||||
|
|
@ -100,19 +144,29 @@ object TLS {
|
|||
* often the same as the underlying transport’s server or client role, but
|
||||
* that is not a requirement and depends entirely on the application
|
||||
* protocol.
|
||||
*
|
||||
* For a description of the `closing` parameter please refer to [[TLSClosing]].
|
||||
*
|
||||
* The `hostInfo` parameter allows to optionally specify a pair of hostname and port
|
||||
* that will be used when creating the SSLEngine with `sslContext.createSslEngine`.
|
||||
* The SSLEngine may use this information e.g. when an endpoint identification algorithm was
|
||||
* configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]].
|
||||
*/
|
||||
def apply(
|
||||
sslContext: SSLContext,
|
||||
firstSession: NegotiateNewSession, role: TLSRole): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, None, firstSession, role, IgnoreComplete, None))
|
||||
apply(sslContext, None, firstSession, role, IgnoreComplete, None)
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface.
|
||||
*
|
||||
* You can specify a constructor to create an SSLEngine that must already be configured for
|
||||
* client and server mode and with all the parameters for the first session.
|
||||
*
|
||||
* You can specify a verification function that will be called after every successful handshake
|
||||
* to verify additional session information.
|
||||
*
|
||||
* For a description of the `closing` parameter please refer to [[TLSClosing]].
|
||||
*/
|
||||
def apply(
|
||||
createSSLEngine: () ⇒ SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753
|
||||
verifySession: SSLSession ⇒ Try[Unit], // we don't offer the internal API that provides `ActorSystem` here, see #21753
|
||||
closing: TLSClosing
|
||||
): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, _ ⇒ createSSLEngine(), (_, session) ⇒ verifySession(session), closing))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue