Merge pull request #22351 from jrudolph/jr/w/revert-22320
Revert #22320 for now but still provide simpler overload
This commit is contained in:
commit
ae331a7c6f
7 changed files with 85 additions and 33 deletions
|
|
@ -484,10 +484,10 @@ class TlsSpec extends StreamSpec("akka.loglevel=INFO\nakka.actor.debug.receive=o
|
|||
Source.single(SendBytes(ByteString.empty)).via(flow).runWith(Sink.ignore)
|
||||
}
|
||||
Await.result(run("akka-remote"), 3.seconds) // CN=akka-remote
|
||||
val cause = intercept[SSLHandshakeException] {
|
||||
Await.result(run("akka-stream"), 3.seconds)
|
||||
val cause = intercept[Exception] {
|
||||
Await.result(run("unknown.example.org"), 3.seconds)
|
||||
}
|
||||
cause.getCause.getCause.getMessage should startWith("No name matching akka-stream found")
|
||||
cause.getMessage should ===("Hostname verification failed! Expected session to be for unknown.example.org")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ private[akka] case class ActorMaterializerImpl(
|
|||
case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here
|
||||
val es = effectiveSettings(effectiveAttributes)
|
||||
val props =
|
||||
TLSActor.props(es, tls.createSSLEngine, tls.closing)
|
||||
TLSActor.props(es, tls.createSSLEngine, tls.verifySession, tls.closing)
|
||||
val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
|
||||
def factory(id: Int) = new ActorPublisher[Any](impl) {
|
||||
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
|
||||
|
|
|
|||
|
|
@ -30,9 +30,10 @@ private[stream] object TLSActor {
|
|||
def props(
|
||||
settings: ActorMaterializerSettings,
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing,
|
||||
tracing: Boolean = false): Props =
|
||||
Props(new TLSActor(settings, createSSLEngine, closing, tracing)).withDeploy(Deploy.local)
|
||||
tracing: Boolean = false): Props =
|
||||
Props(new TLSActor(settings, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local)
|
||||
|
||||
final val TransportIn = 0
|
||||
final val TransportOut = 0
|
||||
|
|
@ -47,6 +48,7 @@ private[stream] object TLSActor {
|
|||
private[stream] class TLSActor(
|
||||
settings: ActorMaterializerSettings,
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing,
|
||||
tracing: Boolean)
|
||||
extends Actor with ActorLogging with Pump {
|
||||
|
|
@ -408,8 +410,15 @@ private[stream] class TLSActor(
|
|||
|
||||
private def handshakeFinished(): Unit = {
|
||||
if (tracing) log.debug("handshake finished")
|
||||
currentSession = engine.getSession
|
||||
corkUser = false
|
||||
val session = engine.getSession
|
||||
|
||||
verifySession(context.system, session) match {
|
||||
case Success(()) ⇒
|
||||
currentSession = session
|
||||
corkUser = false
|
||||
case Failure(ex) ⇒
|
||||
fail(ex, closeTransport = true)
|
||||
}
|
||||
}
|
||||
|
||||
override def receive = inputBunch.subreceive.orElse[Any, Unit](outputBunch.subreceive)
|
||||
|
|
|
|||
|
|
@ -18,10 +18,11 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain
|
|||
cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString],
|
||||
shape: Shape, attributes: Attributes,
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing) extends AtomicModule {
|
||||
|
||||
override def withAttributes(att: Attributes): TlsModule = copy(attributes = att)
|
||||
override def carbonCopy: TlsModule = TlsModule(attributes, createSSLEngine, closing)
|
||||
override def carbonCopy: TlsModule = TlsModule(attributes, createSSLEngine, verifySession, closing)
|
||||
|
||||
override def replaceShape(s: Shape) =
|
||||
if (s != shape) {
|
||||
|
|
@ -39,6 +40,7 @@ private[stream] object TlsModule {
|
|||
def apply(
|
||||
attributes: Attributes,
|
||||
createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
|
||||
closing: TLSClosing): TlsModule = {
|
||||
val name = attributes.nameOrDefault(s"StreamTls()")
|
||||
val cipherIn = Inlet[ByteString](s"$name.cipherIn")
|
||||
|
|
@ -46,6 +48,6 @@ private[stream] object TlsModule {
|
|||
val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn")
|
||||
val plainOut = Outlet[SslTlsInbound](s"$name.transportOut")
|
||||
val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut)
|
||||
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, createSSLEngine, closing)
|
||||
TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, createSSLEngine, verifySession, closing)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +1,17 @@
|
|||
package akka.stream.javadsl
|
||||
|
||||
import java.util.Optional
|
||||
import javax.net.ssl.{ SSLContext }
|
||||
import java.util.function.{ Consumer, Supplier }
|
||||
import javax.net.ssl.{ SSLContext, SSLEngine, SSLSession }
|
||||
|
||||
import akka.{ japi, NotUsed }
|
||||
import akka.{ NotUsed, japi }
|
||||
import akka.stream._
|
||||
import akka.stream.TLSProtocol._
|
||||
import akka.util.ByteString
|
||||
import com.typesafe.sslconfig.akka.AkkaSSLConfig
|
||||
|
||||
import scala.compat.java8.OptionConverters
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
* Stream cipher support based upon JSSE.
|
||||
|
|
@ -115,6 +117,35 @@ object TLS {
|
|||
def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: TLSRole, hostInfo: Optional[japi.Pair[String, java.lang.Integer]], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new javadsl.BidiFlow(scaladsl.TLS.apply(sslContext, None, firstSession, role, closing, OptionConverters.toScala(hostInfo).map(e ⇒ (e.first, e.second))))
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface.
|
||||
*
|
||||
* You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for
|
||||
* client and server mode and with all the parameters for the first session.
|
||||
*
|
||||
* You can specify a verification function `sessionVerifier` that will be called
|
||||
* after every successful handshake to verify additional session information.
|
||||
*
|
||||
* For a description of the `closing` parameter please refer to [[TLSClosing]].
|
||||
*/
|
||||
def create(sslEngineCreator: Supplier[SSLEngine], sessionVerifier: Consumer[SSLSession], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new javadsl.BidiFlow(scaladsl.TLS.apply(
|
||||
() ⇒ sslEngineCreator.get(),
|
||||
session ⇒ Try(sessionVerifier.accept(session)),
|
||||
closing))
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.javadsl.BidiFlow]]. This is a low-level interface.
|
||||
*
|
||||
* You can specify a constructor `sslEngineCreator` to create an SSLEngine that must already be configured for
|
||||
* client and server mode and with all the parameters for the first session.
|
||||
*
|
||||
* For a description of the `closing` parameter please refer to [[TLSClosing]].
|
||||
*/
|
||||
def create(sslEngineCreator: Supplier[SSLEngine], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new javadsl.BidiFlow(scaladsl.TLS.apply(
|
||||
() ⇒ sslEngineCreator.get(),
|
||||
closing))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -84,12 +84,6 @@ object TLS {
|
|||
config.sslEngineConfigurator.configure(engine, sslContext)
|
||||
engine.setUseClientMode(role == Client)
|
||||
|
||||
if (!config.config.loose.disableHostnameVerification && engine.getUseClientMode && hostInfo.isDefined) {
|
||||
val parameters = engine.getSSLParameters
|
||||
parameters.setEndpointIdentificationAlgorithm("HTTPS")
|
||||
engine.setSSLParameters(parameters)
|
||||
}
|
||||
|
||||
val finalSessionParameters =
|
||||
if (firstSession.sslParameters.isDefined && hostInfo.isDefined && !config.config.loose.disableSNI) {
|
||||
val newParams = TlsUtils.cloneParameters(firstSession.sslParameters.get)
|
||||
|
|
@ -105,8 +99,19 @@ object TLS {
|
|||
TlsUtils.applySessionParameters(engine, finalSessionParameters)
|
||||
engine
|
||||
}
|
||||
def verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit] =
|
||||
hostInfo match {
|
||||
case Some((hostname, _)) ⇒ { (system, session) ⇒
|
||||
val hostnameVerifier = theSslConfig(system).hostnameVerifier
|
||||
if (!hostnameVerifier.verify(hostname, session))
|
||||
Failure(new ConnectionException(s"Hostname verification failed! Expected session to be for $hostname"))
|
||||
else
|
||||
Success(())
|
||||
}
|
||||
case None ⇒ (_, _) ⇒ Success(())
|
||||
}
|
||||
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, createSSLEngine, closing))
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, createSSLEngine, verifySession, closing))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -145,6 +150,24 @@ object TLS {
|
|||
firstSession: NegotiateNewSession, role: TLSRole): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
apply(sslContext, None, firstSession, role, IgnoreComplete, None)
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface.
|
||||
*
|
||||
* You can specify a constructor to create an SSLEngine that must already be configured for
|
||||
* client and server mode and with all the parameters for the first session.
|
||||
*
|
||||
* You can specify a verification function that will be called after every successful handshake
|
||||
* to verify additional session information.
|
||||
*
|
||||
* For a description of the `closing` parameter please refer to [[TLSClosing]].
|
||||
*/
|
||||
def apply(
|
||||
createSSLEngine: () ⇒ SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753
|
||||
verifySession: SSLSession ⇒ Try[Unit], // we don't offer the internal API that provides `ActorSystem` here, see #21753
|
||||
closing: TLSClosing
|
||||
): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, _ ⇒ createSSLEngine(), (_, session) ⇒ verifySession(session), closing))
|
||||
|
||||
/**
|
||||
* Create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. This is a low-level interface.
|
||||
*
|
||||
|
|
@ -157,7 +180,7 @@ object TLS {
|
|||
createSSLEngine: () ⇒ SSLEngine, // we don't offer the internal `ActorSystem => SSLEngine` API here, see #21753
|
||||
closing: TLSClosing
|
||||
): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
|
||||
new scaladsl.BidiFlow(TlsModule(Attributes.none, _ ⇒ createSSLEngine(), closing))
|
||||
apply(createSSLEngine, _ ⇒ Success(()), closing)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -413,19 +413,6 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.copy"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.apply"),
|
||||
|
||||
// #21854 Remove manual hostname verifier support
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.scaladsl.TLS.apply"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.copy$default$9"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.TlsModule.copy$default$8"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.copy"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.verifySession"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TLSActor.props$default$5"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TLSActor.props"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TLSActor.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.apply"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TlsModule.apply"),
|
||||
|
||||
// #22105 Akka Typed process DSL
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.addFunctionRef"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef"),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue