diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 2dc1295ff9..1611980b5d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -484,10 +484,10 @@ class TlsSpec extends StreamSpec("akka.loglevel=INFO\nakka.actor.debug.receive=o Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore) } Await.result(run("akka-remote"), 3.seconds) // CN=akka-remote - val cause = intercept[SSLHandshakeException] { - Await.result(run("akka-stream"), 3.seconds) + val cause = intercept[Exception] { + Await.result(run("unknown.example.org"), 3.seconds) } - cause.getCause.getCause.getMessage should startWith("No name matching akka-stream found") + cause.getMessage should ===("Hostname verification failed! Expected session to be for unknown.example.org") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 857d6568d8..8613a2b07c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -197,7 +197,7 @@ private[akka] case class ActorMaterializerImpl( case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) val props = - TLSActor.props(es, tls.createSSLEngine, tls.closing) + TLSActor.props(es, tls.createSSLEngine, tls.verifySession, tls.closing) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index e732024717..3fa9625290 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -30,9 +30,10 @@ private[stream] object TLSActor { def props( settings: ActorMaterializerSettings, createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 closing: TLSClosing, - tracing: Boolean = false): Props = - Props(new TLSActor(settings, createSSLEngine, closing, tracing)).withDeploy(Deploy.local) + tracing: Boolean = false): Props = + Props(new TLSActor(settings, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 @@ -47,6 +48,7 @@ private[stream] object TLSActor { private[stream] class TLSActor( settings: ActorMaterializerSettings, createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 closing: TLSClosing, tracing: Boolean) extends Actor with ActorLogging with Pump { @@ -408,8 +410,15 @@ private[stream] class TLSActor( private def handshakeFinished(): Unit = { if (tracing) log.debug("handshake finished") - currentSession = engine.getSession - corkUser = false + val session = engine.getSession + + verifySession(context.system, session) match { + case Success(()) ⇒ + currentSession = session + corkUser = false + case Failure(ex) ⇒ + fail(ex, closeTransport = true) + } } override def receive = inputBunch.subreceive.orElse[Any, Unit](outputBunch.subreceive) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 09551e24b1..21d787b4c3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -18,10 +18,11 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], shape: Shape, attributes: Attributes, createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 closing: TLSClosing) extends AtomicModule { override def withAttributes(att: Attributes): TlsModule = copy(attributes = att) - override def carbonCopy: TlsModule = TlsModule(attributes, createSSLEngine, closing) + override def carbonCopy: TlsModule = TlsModule(attributes, createSSLEngine, verifySession, closing) override def replaceShape(s: Shape) = if (s != shape) { @@ -39,6 +40,7 @@ private[stream] object TlsModule { def apply( attributes: Attributes, createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 closing: TLSClosing): TlsModule = { val name = attributes.nameOrDefault(s"StreamTls()") val cipherIn = Inlet[ByteString](s"$name.cipherIn") @@ -46,6 +48,6 @@ private[stream] object TlsModule { val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn") val plainOut = Outlet[SslTlsInbound](s"$name.transportOut") val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut) - TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, createSSLEngine, closing) + TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, createSSLEngine, verifySession, closing) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala index ac5c2d3740..5c8701719d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/TLS.scala @@ -1,15 +1,17 @@ package akka.stream.javadsl import java.util.Optional -import javax.net.ssl.{ SSLContext } +import java.util.function.{ Consumer, Supplier } +import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession } -import akka.{ japi, NotUsed } +import akka.{ NotUsed, japi } import akka.stream._ import akka.stream.TLSProtocol._ import akka.util.ByteString import com.typesafe.sslconfig.akka.AkkaSSLConfig import scala.compat.java8.OptionConverters +import scala.util.Try /** * Stream cipher support based upon JSSE. @@ -115,6 +117,35 @@ object TLS { def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, hostInfo: Optional[japi.Pair[String, java.lang.Integer]], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, None, firstSession, role, closing, OptionConverters.toScala(hostInfo).map(e ⇒ (e.first, e.second)))) + /** + * Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface. + * + * You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for + * client and server mode and with all the parameters for the first session. + * + * You can specify a verification function `sessionVerifier` that will be called + * after every successful handshake to verify additional session information. + * + * For a description of the `closing` parameter please refer to [[TLSClosing]]. + */ + def create(sslEngineCreator: Supplier[SSLEngine], sessionVerifier: Consumer[SSLSession], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new javadsl.BidiFlow(scaladsl.TLS.apply( + () ⇒ sslEngineCreator.get(), + session ⇒ Try(sessionVerifier.accept(session)), + closing)) + + /** + * Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface. + * + * You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for + * client and server mode and with all the parameters for the first session. + * + * For a description of the `closing` parameter please refer to [[TLSClosing]]. + */ + def create(sslEngineCreator: Supplier[SSLEngine], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new javadsl.BidiFlow(scaladsl.TLS.apply( + () ⇒ sslEngineCreator.get(), + closing)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala index 74de43ca07..50723a31e9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/TLS.scala @@ -84,12 +84,6 @@ object TLS { config.sslEngineConfigurator.configure(engine, sslContext) engine.setUseClientMode(role == Client) - if (!config.config.loose.disableHostnameVerification && engine.getUseClientMode && hostInfo.isDefined) { - val parameters = engine.getSSLParameters - parameters.setEndpointIdentificationAlgorithm("HTTPS") - engine.setSSLParameters(parameters) - } - val finalSessionParameters = if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) { val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get) @@ -105,8 +99,19 @@ object TLS { TlsUtils.applySessionParameters(engine, finalSessionParameters) engine } + def verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit] = + hostInfo match { + case Some((hostname, _)) ⇒ { (system, session) ⇒ + val hostnameVerifier = theSslConfig(system).hostnameVerifier + if (!hostnameVerifier.verify(hostname, session)) + Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname")) + else + Success(()) + } + case None ⇒ (_, _) ⇒ Success(()) + } - new scaladsl.BidiFlow(TlsModule(Attributes.none, createSSLEngine, closing)) + new scaladsl.BidiFlow(TlsModule(Attributes.none, createSSLEngine, verifySession, closing)) } /** @@ -145,6 +150,24 @@ object TLS { firstSession: NegotiateNewSession, role: TLSRole): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = apply(sslContext, None, firstSession, role, IgnoreComplete, None) + /** + * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. + * + * You can specify a constructor to create an SSLEngine that must already be configured for + * client and server mode and with all the parameters for the first session. + * + * You can specify a verification function that will be called after every successful handshake + * to verify additional session information. + * + * For a description of the `closing` parameter please refer to [[TLSClosing]]. + */ + def apply( + createSSLEngine: () ⇒ SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753 + verifySession: SSLSession ⇒ Try[Unit], // we don't offer the internal API that provides `ActorSystem` here, see #21753 + closing: TLSClosing + ): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = + new scaladsl.BidiFlow(TlsModule(Attributes.none, _ ⇒ createSSLEngine(), (_, session) ⇒ verifySession(session), closing)) + /** * Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface. * @@ -157,7 +180,7 @@ object TLS { createSSLEngine: () ⇒ SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753 closing: TLSClosing ): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new scaladsl.BidiFlow(TlsModule(Attributes.none, _ ⇒ createSSLEngine(), closing)) + apply(createSSLEngine, _ ⇒ Success(()), closing) } /** diff --git a/project/MiMa.scala b/project/MiMa.scala index eab1897e88..c4af1d2e53 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -413,19 +413,6 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.copy"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.apply"), - // #21854 Remove manual hostname verifier support - ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.scaladsl.TLS.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.copy$default$9"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.TlsModule.copy$default$8"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.copy"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.verifySession"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TLSActor.props$default$5"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TLSActor.props"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TLSActor.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.apply"), - // #22105 Akka Typed process DSL ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.addFunctionRef"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef"),