diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index f8ef0472a5..d7ff009322 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -103,7 +103,7 @@ private[akka] case class ActorMaterializerImpl( case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) val props = - SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing) + SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing, tls.hostInfo) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala index c63d8b2667..9ad40125fc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala @@ -34,8 +34,9 @@ private[akka] object SslTlsCipherActor { firstSession: NegotiateNewSession, tracing: Boolean, role: Role, - closing: Closing): Props = - Props(new SslTlsCipherActor(settings, sslContext, firstSession, tracing, role, closing)).withDeploy(Deploy.local) + closing: Closing, + hostInfo: Option[(String, Int)]): Props = + Props(new SslTlsCipherActor(settings, sslContext, firstSession, tracing, role, closing, hostInfo)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 @@ -49,7 +50,7 @@ private[akka] object SslTlsCipherActor { */ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslContext: SSLContext, firstSession: NegotiateNewSession, tracing: Boolean, - role: Role, closing: Closing) + role: Role, closing: Closing, hostInfo: Option[(String, Int)]) extends Actor with ActorLogging with Pump { import SslTlsCipherActor._ @@ -146,7 +147,11 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo transportInChoppingBlock.prepare(transportInBuffer) val engine: SSLEngine = { - val e = sslContext.createSSLEngine() + val e = hostInfo match { + case Some((hostname, port)) ⇒ sslContext.createSSLEngine(hostname, port) + case None ⇒ sslContext.createSSLEngine() + } + e.setUseClientMode(role == Client) e } diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index eb5438c6a4..186dd5759a 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -3,6 +3,9 @@ */ package akka.stream.io +import java.lang.{ Integer ⇒ jInteger } + +import akka.japi import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.util.ByteString @@ -10,7 +13,6 @@ import javax.net.ssl._ import scala.annotation.varargs import scala.collection.immutable import java.security.cert.Certificate -import akka.event.Logging.simpleName /** * Stream cipher support based upon JSSE. @@ -63,10 +65,15 @@ object SslTls { * protocol. * * For a description of the `closing` parameter please refer to [[Closing]]. + * + * The `hostInfo` parameter allows to optionally specify a pair of hostname and port + * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. + * The SSLEngine may use this information e.g. when an endpoint identification algorithm was + * configured using [[SSLParameters.setEndpointIdentificationAlgorithm]]. */ - def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, - role: Role, closing: Closing = IgnoreComplete): ScalaFlow = - new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, firstSession, role, closing)) + def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, + closing: Closing = IgnoreComplete, hostInfo: Option[(String, Int)] = None): ScalaFlow = + new scaladsl.BidiFlow(TlsModule(Attributes.none, sslContext, firstSession, role, closing, hostInfo)) /** * Java API: create a StreamTls [[akka.stream.javadsl.BidiFlow]] in client mode. The @@ -92,9 +99,14 @@ object SslTls { * protocol. * * For a description of the `closing` parameter please refer to [[Closing]]. + * + * The `hostInfo` parameter allows to optionally specify a pair of hostname and port + * that will be used when creating the SSLEngine with `sslContext.createSslEngine`. + * The SSLEngine may use this information e.g. when an endpoint identification algorithm was + * configured using [[SSLParameters.setEndpointIdentificationAlgorithm]]. */ - def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing): JavaFlow = - new javadsl.BidiFlow(apply(sslContext, firstSession, role, closing)) + def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, hostInfo: japi.Option[japi.Pair[String, jInteger]], closing: Closing): JavaFlow = + new javadsl.BidiFlow(apply(sslContext, firstSession, role, closing, hostInfo.asScala.map(e ⇒ (e.first, e.second)))) /** * INTERNAL API. @@ -103,12 +115,12 @@ object SslTls { cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], shape: Shape, attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, - role: Role, closing: Closing) extends Module { + role: Role, closing: Closing, hostInfo: Option[(String, Int)]) extends Module { override def subModules: Set[Module] = Set.empty override def withAttributes(att: Attributes): Module = copy(attributes = att) override def carbonCopy: Module = { - val mod = TlsModule(attributes, sslContext, firstSession, role, closing) + val mod = TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) if (plainIn == shape.inlets(0)) mod else mod.replaceShape(mod.shape.asInstanceOf[BidiShape[_, _, _, _]].reversed) } @@ -123,14 +135,14 @@ object SslTls { * INTERNAL API. */ private[akka] object TlsModule { - def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing): TlsModule = { + def apply(attributes: Attributes, sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing, hostInfo: Option[(String, Int)]): TlsModule = { val name = attributes.nameOrDefault(s"StreamTls($role)") val cipherIn = Inlet[ByteString](s"$name.cipherIn") val cipherOut = Outlet[ByteString](s"$name.cipherOut") val plainIn = Inlet[SslTlsOutbound](s"$name.transportIn") val plainOut = Outlet[SslTlsInbound](s"$name.transportOut") val shape = new BidiShape(plainIn, cipherOut, cipherIn, plainOut) - TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing) + TlsModule(plainIn, plainOut, cipherIn, cipherOut, shape, attributes, sslContext, firstSession, role, closing, hostInfo) } } }