From b4cfc3717fe41e18bd7cc80a8e6b85721aeaa34b Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 17 Nov 2016 16:07:24 +0100 Subject: [PATCH] =str #21753 simplify TLSActor configuration by allowing to specify SSLEngine directly (#21822) Do all (Akka)SSLConfig magic in one place directly in the TLS API. Also, introduce new low-level entrypoint in TLS that allows to specify an SSLEngine constructor directly without relying on SSLContext. This allows users to use third-party SSLEngine implementations like netty's OpenSslEngine together with akka-stream. --- .../stream/impl/ActorMaterializerImpl.scala | 2 +- .../scala/akka/stream/impl/io/TLSActor.scala | 140 ++++++++---------- .../scala/akka/stream/impl/io/TlsModule.scala | 37 +++-- .../main/scala/akka/stream/scaladsl/TLS.scala | 82 ++++++++-- 4 files changed, 151 insertions(+), 110 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 41436c352d..d27cf9d354 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -197,7 +197,7 @@ private[akka] case class ActorMaterializerImpl( case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) val props = - TLSActor.props(es, tls.sslContext, tls.sslConfig, tls.firstSession, tls.role, tls.closing, tls.hostInfo) + TLSActor.props(es, tls.createSSLEngine, tls.verifySession, tls.closing) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 918b57b129..aea652bb03 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -4,37 +4,36 @@ package akka.stream.impl.io import java.nio.ByteBuffer -import java.util -import java.util.Collections import javax.net.ssl.SSLEngineResult.HandshakeStatus import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl._ + import akka.actor._ import akka.stream._ import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl._ import akka.util.ByteString -import com.typesafe.sslconfig.akka.AkkaSSLConfig + import scala.annotation.tailrec import akka.stream.TLSProtocol._ +import scala.util.control.NonFatal +import scala.util.{ Failure, Success, Try } + /** * INTERNAL API. */ -object TLSActor { +private[stream] object TLSActor { def props( - settings: ActorMaterializerSettings, - sslContext: SSLContext, - sslConfig: Option[AkkaSSLConfig], - firstSession: NegotiateNewSession, - role: TLSRole, - closing: TLSClosing, - hostInfo: Option[(String, Int)], - tracing: Boolean = false): Props = - Props(new TLSActor(settings, sslContext, sslConfig, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local) + settings: ActorMaterializerSettings, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing, + tracing: Boolean = false): Props = + Props(new TLSActor(settings, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 @@ -46,12 +45,12 @@ object TLSActor { /** * INTERNAL API. */ -class TLSActor( - settings: ActorMaterializerSettings, - sslContext: SSLContext, - externalSslConfig: Option[AkkaSSLConfig], - firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, - hostInfo: Option[(String, Int)], tracing: Boolean) +private[stream] class TLSActor( + settings: ActorMaterializerSettings, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing, + tracing: Boolean) extends Actor with ActorLogging with Pump { import TLSActor._ @@ -147,43 +146,22 @@ class TLSActor( private val transportInChoppingBlock = new ChoppingBlock(TransportIn, "TransportIn") transportInChoppingBlock.prepare(transportInBuffer) - private val sslConfig = externalSslConfig.getOrElse(AkkaSSLConfig(context.system)) - private val hostnameVerifier = sslConfig.hostnameVerifier + // The engine could also be instantiated in ActorMaterializerImpl but if creation fails + // during materialization it would be worse than failing later on. + val engine = + try createSSLEngine(context.system) catch { case NonFatal(ex) ⇒ fail(ex, closeTransport = true); throw ex } - val engine: SSLEngine = { - val e = hostInfo match { - case Some((hostname, port)) ⇒ sslContext.createSSLEngine(hostname, port) - case None ⇒ sslContext.createSSLEngine() - } - sslConfig.sslEngineConfigurator.configure(e, sslContext) - e.setUseClientMode(role == Client) - e - } + engine.beginHandshake() + lastHandshakeStatus = engine.getHandshakeStatus var currentSession = engine.getSession - applySessionParameters(firstSession) - - def applySessionParameters(params: NegotiateNewSession): Unit = { - params.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) - params.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) - params.clientAuth match { - case Some(TLSClientAuth.None) ⇒ engine.setNeedClientAuth(false) - case Some(TLSClientAuth.Want) ⇒ engine.setWantClientAuth(true) - case Some(TLSClientAuth.Need) ⇒ engine.setNeedClientAuth(true) - case _ ⇒ // do nothing - } - - // configure Server Name Indication unless ssl-config disabled it (in which case we already logged many warnings) - applySNI(params) - - engine.beginHandshake() - lastHandshakeStatus = engine.getHandshakeStatus - } def setNewSessionParameters(params: NegotiateNewSession): Unit = { if (tracing) log.debug(s"applying $params") currentSession.invalidate() - applySessionParameters(params) + TlsUtils.applySessionParameters(engine, params) + engine.beginHandshake() + lastHandshakeStatus = engine.getHandshakeStatus corkUser = true } @@ -434,12 +412,12 @@ class TLSActor( if (tracing) log.debug("handshake finished") val session = engine.getSession - hostInfo.map(_._1) match { - case Some(hostname) if !hostnameVerifier.verify(hostname, session) ⇒ - fail(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"), closeTransport = true) - case _ ⇒ + verifySession(context.system, session) match { + case Success(()) ⇒ currentSession = session corkUser = false + case Failure(ex) ⇒ + fail(ex, closeTransport = true) } } @@ -458,6 +436,7 @@ class TLSActor( pump() } + // FIXME: what happens if this actor dies unexpectedly? override def postStop(): Unit = { if (tracing) log.debug("postStop") super.postStop() @@ -471,33 +450,36 @@ class TLSActor( if (tracing) log.debug(s"STOP Outbound Closed: ${engine.isOutboundDone} Inbound closed: ${engine.isInboundDone}") context.stop(self) } +} - // Additional ssl-config related setup - - // since setting a custom HostnameVerified (in JDK8, update 60 still) disables SNI - // see here: https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SNIExamples - // resolves: https://github.com/akka/akka/issues/19287 - private def applySNI(params: NegotiateNewSession): Unit = { - for { - sslParams ← params.sslParameters - (hostname, _) ← hostInfo - if !sslConfig.config.loose.disableSNI - } yield { - // first copy the *mutable* SLLParameters before modifying to prevent race condition in `setServerNames` - val clone = new SSLParameters() - clone.setCipherSuites(sslParams.getCipherSuites) - clone.setProtocols(sslParams.getProtocols) - clone.setWantClientAuth(sslParams.getWantClientAuth) - clone.setNeedClientAuth(sslParams.getNeedClientAuth) - clone.setEndpointIdentificationAlgorithm(sslParams.getEndpointIdentificationAlgorithm) - clone.setAlgorithmConstraints(sslParams.getAlgorithmConstraints) - clone.setSNIMatchers(sslParams.getSNIMatchers) - clone.setUseCipherSuitesOrder(sslParams.getUseCipherSuitesOrder) - - // apply the changes - clone.setServerNames(Collections.singletonList(new SNIHostName(hostname))) - engine.setSSLParameters(clone) +/** + * INTERNAL API + */ +private[stream] object TlsUtils { + def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = { + sessionParameters.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) + sessionParameters.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) + sessionParameters.clientAuth match { + case Some(TLSClientAuth.None) ⇒ engine.setNeedClientAuth(false) + case Some(TLSClientAuth.Want) ⇒ engine.setWantClientAuth(true) + case Some(TLSClientAuth.Need) ⇒ engine.setNeedClientAuth(true) + case _ ⇒ // do nothing } + + sessionParameters.sslParameters.foreach(engine.setSSLParameters) } -} + def cloneParameters(old: SSLParameters): SSLParameters = { + val newParameters = new SSLParameters() + newParameters.setAlgorithmConstraints(old.getAlgorithmConstraints) + newParameters.setCipherSuites(old.getCipherSuites) + newParameters.setEndpointIdentificationAlgorithm(old.getEndpointIdentificationAlgorithm) + newParameters.setNeedClientAuth(old.getNeedClientAuth) + newParameters.setProtocols(old.getProtocols) + newParameters.setServerNames(old.getServerNames) + newParameters.setSNIMatchers(old.getSNIMatchers) + newParameters.setUseCipherSuitesOrder(old.getUseCipherSuitesOrder) + newParameters.setWantClientAuth(old.getWantClientAuth) + newParameters + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 4abaa2e9de..21d787b4c3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -1,27 +1,28 @@ package akka.stream.impl.io -import javax.net.ssl.SSLContext +import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession } +import akka.actor.ActorSystem import akka.stream._ -import akka.stream.impl.StreamLayout.{ CompositeModule, AtomicModule } +import akka.stream.impl.StreamLayout.{ AtomicModule, CompositeModule } import akka.stream.TLSProtocol._ import akka.util.ByteString import com.typesafe.sslconfig.akka.AkkaSSLConfig +import scala.util.Try + /** * INTERNAL API. */ -final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], - cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], - shape: Shape, attributes: Attributes, - sslContext: SSLContext, - sslConfig: Option[AkkaSSLConfig], - firstSession: NegotiateNewSession, - role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule { +private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], + cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], + shape: Shape, attributes: Attributes, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing) extends AtomicModule { override def withAttributes(att: Attributes): TlsModule = copy(attributes = att) - override def carbonCopy: TlsModule = - TlsModule(attributes, sslContext, sslConfig, firstSession, role, closing, hostInfo) + override def carbonCopy: TlsModule = TlsModule(attributes, createSSLEngine, verifySession, closing) override def replaceShape(s: Shape) = if (s != shape) { @@ -29,20 +30,24 @@ final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslT CompositeModule(this, s) } else this - override def toString: String = f"TlsModule($firstSession, $role, $closing, $hostInfo) [${System.identityHashCode(this)}%08x]" + override def toString: String = f"TlsModule($closing) [${System.identityHashCode(this)}%08x]" } /** * INTERNAL API. */ -object TlsModule { - def apply(attributes: Attributes, sslContext: SSLContext, sslConfig: Option[AkkaSSLConfig], firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]): TlsModule = { - val name = attributes.nameOrDefault(s"StreamTls($role)") +private[stream] object TlsModule { + def apply( + attributes: Attributes, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing): TlsModule = { + val name = attributes.nameOrDefault(s"StreamTls()") val cipherIn = Inlet[ByteString](s"$name.cipherIn") val cipherOut = Outlet[ByteString](s"$name.cipherOut") val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn") val plainOut = Outlet[SslTlsInbound](s"$name.transportOut") val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut) - TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, sslConfig, firstSession, role, closing, hostInfo) + TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, createSSLEngine, verifySession, closing) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala index fa77dbb87a..cfb35821ca 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala @@ -1,14 +1,18 @@ package akka.stream.scaladsl -import javax.net.ssl.{ SSLContext } +import java.util.Collections +import javax.net.ssl.{ SNIHostName, SSLContext, SSLEngine, SSLSession } -import akka.stream.impl.io.TlsModule +import akka.stream.impl.io.{ TlsModule, TlsUtils } import akka.NotUsed +import akka.actor.ActorSystem import akka.stream._ import akka.stream.TLSProtocol._ import akka.util.ByteString import com.typesafe.sslconfig.akka.AkkaSSLConfig +import scala.util.{ Failure, Success, Try } + /** * Stream cipher support based upon JSSE. * @@ -64,11 +68,51 @@ object TLS { * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. */ def apply( - sslContext: SSLContext, // TODO: in 2.5.x replace sslContext and sslConfig by generic SSLEngine constructor function, see https://github.com/akka/akka/issues/21753 + sslContext: SSLContext, sslConfig: Option[AkkaSSLConfig], firstSession: NegotiateNewSession, role: TLSRole, - closing: TLSClosing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, sslConfig, firstSession, role, closing, hostInfo)) + closing: TLSClosing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = { + def theSslConfig(system: ActorSystem): AkkaSSLConfig = + sslConfig.getOrElse(AkkaSSLConfig(system)) + + val createSSLEngine = { system: ActorSystem ⇒ + val engine = hostInfo match { + case Some((hostname, port)) ⇒ sslContext.createSSLEngine(hostname, port) + case None ⇒ sslContext.createSSLEngine() + } + val config = theSslConfig(system) + config.sslEngineConfigurator.configure(engine, sslContext) + engine.setUseClientMode(role == Client) + + val finalSessionParameters = + if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) { + val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get) + // In Java 7, SNI was automatically enabled by enabling "jsse.enableSNIExtension" and using + // `createSSLEngine(hostname, port)`. + // In Java 8, SNI is only enabled if the server names are added to the parameters. + // See https://github.com/akka/akka/issues/19287. + newParams.setServerNames(Collections.singletonList(new SNIHostName(hostInfo.get._1))) + firstSession.copy(sslParameters = Some(newParams)) + } else + firstSession + + TlsUtils.applySessionParameters(engine, finalSessionParameters) + engine + } + def verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit] = + hostInfo match { + case Some((hostname, _)) ⇒ { (system, session) ⇒ + val hostnameVerifier = theSslConfig(system).hostnameVerifier + if (!hostnameVerifier.verify(hostname, session)) + Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname")) + else + Success(()) + } + case None ⇒ (_, _) ⇒ Success(()) + } + + new scaladsl.BidiFlow(TlsModule(Attributes.none, createSSLEngine, verifySession, closing)) + } /** * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The @@ -90,7 +134,7 @@ object TLS { sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, None, firstSession, role, closing, hostInfo)) + apply(sslContext, None, firstSession, role, closing, hostInfo) /** * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The @@ -100,19 +144,29 @@ object TLS { * often the same as the underlying transport’s server or client role, but * that is not a requirement and depends entirely on the application * protocol. - * - * For a description of the `closing` parameter please refer to [[TLSClosing]]. - * - * The `hostInfo` parameter allows to optionally specify a pair of hostname and port - * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. - * The SSLEngine may use this information e.g. when an endpoint identification algorithm was - * configured using [[javax.net.ssl.SSLParameters.setEndpointIdentificationAlgorithm]]. */ def apply( sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, None, firstSession, role, IgnoreComplete, None)) + apply(sslContext, None, firstSession, role, IgnoreComplete, None) + /** + * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. + * + * You can specify a constructor to create an SSLEngine that must already be configured for + * client and server mode and with all the parameters for the first session. + * + * You can specify a verification function that will be called after every successful handshake + * to verify additional session information. + * + * For a description of the `closing` parameter please refer to [[TLSClosing]]. + */ + def apply( + createSSLEngine: () ⇒ SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753 + verifySession: SSLSession ⇒ Try[Unit], // we don't offer the internal API that provides `ActorSystem` here, see #21753 + closing: TLSClosing + ): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new scaladsl.BidiFlow(TlsModule(Attributes.none, _ ⇒ createSSLEngine(), (_, session) ⇒ verifySession(session), closing)) } /**