diff --git a/akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java b/akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java deleted file mode 100644 index dbe39054a4..0000000000 --- a/akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.javadsl; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLParameters; - -import akka.japi.Util; -import akka.stream.io.ClientAuth; - -import java.util.Collection; -import java.util.Optional; - -import scala.compat.java8.OptionConverters; - -/** - * TLS configuration for an HTTPS server binding or client connection. - * For the sslContext please refer to the com.typeasfe.ssl-config library. - * The remaining four parameters configure the initial session that will - * be negotiated, see {@link akka.stream.io.NegotiateNewSession} for details. - */ -public abstract class HttpsContext { - - public abstract SSLContext getSslContext(); - - public abstract Optional> getEnabledCipherSuites(); - - public abstract Optional> getEnabledProtocols(); - - public abstract Optional getClientAuth(); - - public abstract Optional getSslParameters(); - - //#http-context-creation - public static HttpsContext create(SSLContext sslContext, - Optional> enabledCipherSuites, - Optional> enabledProtocols, - Optional clientAuth, - Optional sslParameters) - //#http-context-creation - { - final scala.Option> ecs; - if (enabledCipherSuites.isPresent()) ecs = scala.Option.apply(Util.immutableSeq(enabledCipherSuites.get())); - else ecs = scala.Option.empty(); - final scala.Option> ep; - if(enabledProtocols.isPresent()) ep = scala.Option.apply(Util.immutableSeq(enabledProtocols.get())); - else ep = scala.Option.empty(); - return new akka.http.scaladsl.HttpsContext(sslContext, - ecs, - ep, - OptionConverters.toScala(clientAuth), - OptionConverters.toScala(sslParameters)); - } -} diff --git a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala index 75bf4e7130..2153cc8ec1 100644 --- a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala @@ -4,31 +4,27 @@ package akka.http -import java.lang.{ Iterable ⇒ JIterable } -import java.util.Optional - import akka.actor.ActorSystem import akka.event.LoggingAdapter import akka.http.impl.util._ -import akka.http.scaladsl.HttpsContext +import akka.http.javadsl.ConnectionContext import com.typesafe.config.Config import scala.concurrent.duration.Duration -import scala.compat.java8.OptionConverters._ final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup) final case class ConnectionPoolSetup( settings: ConnectionPoolSettings, - httpsContext: Option[HttpsContext], + conContext: ConnectionContext = ConnectionContext.noEncryption(), log: LoggingAdapter) object ConnectionPoolSetup { /** Java API */ def create(settings: ConnectionPoolSettings, - httpsContext: Optional[akka.http.javadsl.HttpsContext], + connectionContext: ConnectionContext, log: LoggingAdapter): ConnectionPoolSetup = - ConnectionPoolSetup(settings, httpsContext.asScala.map(_.asInstanceOf[HttpsContext]), log) + ConnectionPoolSetup(settings, connectionContext, log) } final case class ConnectionPoolSettings( diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index a89c87ceef..1b20b79fa1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -20,7 +20,7 @@ import akka.stream.impl.{ SeqActorName, FixedSizeBuffer } import akka.stream.scaladsl.{ Keep, Flow, Sink, Source } import akka.http.HostConnectionPoolSetup import akka.http.scaladsl.model._ -import akka.http.scaladsl.Http +import akka.http.scaladsl.{ ConnectionContext, HttpsConnectionContext, Http } import PoolFlow._ private object PoolInterfaceActor { @@ -64,9 +64,10 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, import hcps._ import setup._ - val connectionFlow = - if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) - else Http().outgoingConnectionTls(host, port, None, settings.connectionSettings, httpsContext, setup.log) + val connectionFlow = conContext match { + case httpsContext: HttpsConnectionContext ⇒ Http().outgoingConnectionTls(host, port, httpsContext, None, settings.connectionSettings, setup.log) + case _ ⇒ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) + } val poolFlow = PoolFlow( Flow[HttpRequest].viaMat(connectionFlow)(Keep.right), @@ -147,7 +148,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, } def dispatchRequest(pr: PoolRequest): Unit = { - val scheme = Uri.httpScheme(hcps.setup.httpsContext.isDefined) + val scheme = Uri.httpScheme(hcps.setup.conContext.isSecure) val hostHeader = headers.Host(hcps.host, Uri.normalizePort(hcps.port, scheme)) val effectiveRequest = pr.request diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala b/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala index 5dfaa2fcda..f37764c28f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala @@ -14,7 +14,7 @@ import scala.collection.immutable import scala.reflect.ClassTag import akka.{ NotUsed, japi } import akka.http.impl.model.{ JavaQuery, JavaUri } -import akka.http.javadsl.{ model ⇒ jm } +import akka.http.javadsl.{ model ⇒ jm, HttpConnectionContext, ConnectionContext, HttpsConnectionContext } import akka.http.scaladsl.{ model ⇒ sm } import scala.compat.java8.OptionConverters._ @@ -82,7 +82,7 @@ private[http] object JavaMapping { } } - /** This trivial mapping isn't enabled by default to prevent it from conflicting with the `Inherited` ones `*/ + /** This trivial mapping isn't enabled by default to prevent it from conflicting with the `Inherited` ones */ def identity[T]: JavaMapping[T, T] = new JavaMapping[T, T] { def toJava(scalaObject: T): J = scalaObject @@ -164,6 +164,10 @@ private[http] object JavaMapping { def toScala(javaObject: J): S = cast[S](javaObject) } + implicit object ConnectionContext extends Inherited[ConnectionContext, akka.http.scaladsl.HttpConnectionContext] + implicit object HttpConnectionContext extends Inherited[HttpConnectionContext, akka.http.scaladsl.HttpConnectionContext] + implicit object HttpsConnectionContext extends Inherited[HttpsConnectionContext, akka.http.scaladsl.HttpsConnectionContext] + implicit object DateTime extends Inherited[jm.DateTime, akka.http.scaladsl.model.DateTime] implicit object ContentType extends Inherited[jm.ContentType, sm.ContentType] diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala new file mode 100644 index 0000000000..20556f778a --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.http.javadsl + +import java.util.{ Collection ⇒ JCollection, Optional } +import java.{ util ⇒ ju } +import javax.net.ssl.{ SSLContext, SSLParameters } + +import akka.http.scaladsl +import com.typesafe.sslconfig.ssl.ClientAuth + +import scala.compat.java8.OptionConverters + +object ConnectionContext { + /** Used to serve HTTPS traffic. */ + def https(sslContext: SSLContext): HttpsConnectionContext = + scaladsl.ConnectionContext.https(sslContext) + + /** Used to serve HTTPS traffic. */ + def https(sslContext: SSLContext, enabledCipherSuites: Optional[JCollection[String]], + enabledProtocols: Optional[JCollection[String]], clientAuth: Optional[ClientAuth], sslParameters: Optional[SSLParameters]) = + scaladsl.ConnectionContext.https(sslContext, sslParameters = OptionConverters.toScala(sslParameters)) + + /** Used to serve HTTP traffic. */ + def noEncryption(): HttpConnectionContext = + scaladsl.ConnectionContext.noEncryption() +} + +trait ConnectionContext { + def isSecure: Boolean + /** Java API */ + def getDefaultPort: Int +} + +trait HttpConnectionContext extends ConnectionContext { + override final def isSecure = false + override final def getDefaultPort = 80 +} + +trait HttpsConnectionContext extends ConnectionContext { + override final def isSecure = true + override final def getDefaultPort = 443 + + /** Java API */ + def getEnabledCipherSuites: Optional[JCollection[String]] + /** Java API */ + def getEnabledProtocols: Optional[JCollection[String]] + /** Java API */ + def getClientAuth: Optional[ClientAuth] + + /** Java API */ + def getSslContext: SSLContext + /** Java API */ + def getSslParameters: Optional[SSLParameters] +} + diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 9705d9ae4c..a6bce65f66 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -39,9 +39,6 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { private lazy val delegate = akka.http.scaladsl.Http(system) - private implicit def convertHttpsContext(hctx: Optional[HttpsContext]): Option[akka.http.scaladsl.HttpsContext] = - hctx.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]) - /** * Constructs a server layer stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]] isn't * reusable and can only be materialized once. @@ -96,19 +93,41 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * on the given `endpoint`. + * * If the given port is 0 the resulting source can be materialized several times. Each materialization will * then be assigned a new local port by the operating system, which can then be retrieved by the materialized * [[ServerBinding]]. + * * If the given port is non-zero subsequent materialization attempts of the produced source will immediately * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. */ def bind(interface: String, port: Int, settings: ServerSettings, - httpsContext: Optional[HttpsContext], + connectionContext: ConnectionContext, + materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] = + new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala)(materializer) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) + + /** + * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding + * on the given `endpoint`. + * + * If the given port is 0 the resulting source can be materialized several times. Each materialization will + * then be assigned a new local port by the operating system, which can then be retrieved by the materialized + * [[ServerBinding]]. + * + * If the given port is non-zero subsequent materialization attempts of the produced source will immediately + * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized + * [[ServerBinding]]. + */ + def bind(interface: String, port: Int, + settings: ServerSettings, + connectionContext: ConnectionContext, log: LoggingAdapter, materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] = - new Source(delegate.bind(interface, port, settings, httpsContext, log)(materializer) + new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala, log)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) @@ -126,6 +145,21 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { interface, port)(materializer) .map(new ServerBinding(_))(ec) + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + */ + def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], + interface: String, port: Int, + connectionContext: ConnectionContext, + materializer: Materializer): Future[ServerBinding] = + delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, + interface, port, connectionContext.asScala)(materializer) + .map(new ServerBinding(_))(ec) + /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` * [[Flow]] for processing all incoming connections. @@ -136,11 +170,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int, settings: ServerSettings, - httpsContext: Optional[HttpsContext], + connectionContext: ConnectionContext, log: LoggingAdapter, materializer: Materializer): Future[ServerBinding] = delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, settings, httpsContext, log)(materializer) + interface, port, connectionContext.asScala, settings, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -156,6 +190,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port)(materializer) .map(new ServerBinding(_))(ec) + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + */ + def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], + interface: String, port: Int, + connectionContext: ConnectionContext, + materializer: Materializer): Future[ServerBinding] = + delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port, connectionContext.asScala)(materializer) + .map(new ServerBinding(_))(ec) + /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` * [[Flow]] for processing all incoming connections. @@ -166,11 +214,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], interface: String, port: Int, settings: ServerSettings, - httpsContext: Optional[HttpsContext], + connectionContext: ConnectionContext, log: LoggingAdapter, materializer: Materializer): Future[ServerBinding] = delegate.bindAndHandleSync(handler.apply(_).asScala, - interface, port, settings, httpsContext, log)(materializer) + interface, port, connectionContext.asScala, settings, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -195,11 +243,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], interface: String, port: Int, - settings: ServerSettings, httpsContext: Optional[HttpsContext], + connectionContext: ConnectionContext, + materializer: Materializer): Future[ServerBinding] = + delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], interface, port, connectionContext.asScala)(materializer) + .map(new ServerBinding(_))(ec) + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + */ + def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], + interface: String, port: Int, + settings: ServerSettings, connectionContext: ConnectionContext, parallelism: Int, log: LoggingAdapter, materializer: Materializer): Future[ServerBinding] = delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], - interface, port, settings, httpsContext, parallelism, log)(materializer) + interface, port, connectionContext.asScala, settings, parallelism, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -231,10 +293,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { outgoingConnection(host, 80) /** - * Same as [[outgoingConnection]] but with HTTPS encryption. + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. */ def outgoingConnectionTls(host: String): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - outgoingConnectionTls(host, 443) + outgoingConnectionTls(host, defaultClientHttpsContext) + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + */ + def outgoingConnectionTls(host: String, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + outgoingConnectionTls(host, 443, connectionContext) /** * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. @@ -248,12 +320,28 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * Same as [[outgoingConnection]] but with HTTPS encryption. + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. */ def outgoingConnectionTls(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = Flow.fromGraph { akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnectionTls(host, port))(Keep.right) + .viaMat(delegate.outgoingConnection(host, port))(Keep.right) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + } + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * The given [[HttpsConnectionContext]] will be used for encryption on the connection. + */ + def outgoingConnectionTls(host: String, port: Int, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.fromGraph { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala))(Keep.right) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } @@ -272,20 +360,35 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * Same as [[outgoingConnection]] but with HTTPS encryption. + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used - * for encryption on the connection. + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. */ def outgoingConnectionTls(host: String, port: Int, localAddress: Optional[InetSocketAddress], settings: ClientConnectionSettings, - httpsContext: Optional[HttpsContext], log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = Flow.fromGraph { akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, settings, - httpsContext.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log))(Keep.right) + .viaMat(delegate.outgoingConnectionTls(host, port, defaultClientHttpsContext.asScala, localAddress.asScala, settings, log))(Keep.right) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + } + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * The given [[HttpsConnectionContext]] will be used for encryption on the connection. + */ + def outgoingConnectionTls(host: String, port: Int, + connectionContext: HttpsConnectionContext, + localAddress: Optional[InetSocketAddress], + settings: ClientConnectionSettings, + log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.fromGraph { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala, localAddress.asScala, settings, log))(Keep.right) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } @@ -303,15 +406,29 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def newHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + def newHostConnectionPool[T](host: String, port: Int, + materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer)) /** * Same as [[newHostConnectionPool]] but with HTTPS encryption. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. */ - def newHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + def newHostConnectionPoolTls[T](host: String, port: Int, + materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer)) + /** + * Same as [[newHostConnectionPool]] but with HTTPS encryption. + * + * The given [[HttpsConnectionContext]] will be used for encryption on the connection. + */ + def newHostConnectionPoolTls[T](host: String, port: Int, + connectionContext: HttpsConnectionContext, + materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala)(materializer)) + /** * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches * the requests from all its materializations across this pool. @@ -334,32 +451,23 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Same as [[newHostConnectionPool]] but with HTTPS encryption. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used - * for encryption on the connection. + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. */ def newHostConnectionPoolTls[T](host: String, port: Int, settings: ConnectionPoolSettings, - httpsContext: Optional[HttpsContext], log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings, - httpsContext.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) + adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala, settings, log)(materializer)) /** - * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches - * the requests from all its materializations across this pool. - * While the started host connection pool internally shuts itself down automatically after the configured idle - * timeout it will spin itself up again if more requests arrive from an existing or a new client flow - * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * Same as [[newHostConnectionPool]] but with HTTPS encryption. * - * Since the underlying transport usually comprises more than a single connection the produced flow might generate - * responses in an order that doesn't directly match the consumed requests. - * For example, if two requests A and B enter the flow in that order the response for B might be produced before the - * response for A. - * In order to allow for easy response-to-request association the flow takes in a custom, opaque context - * object of type `T` from the application which is emitted together with the corresponding response. + * The given [[HttpsConnectionContext]] will be used for encryption on the connection. */ - def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer)) + def newHostConnectionPoolTls[T](host: String, port: Int, + connectionContext: HttpsConnectionContext, + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer)) /** * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing @@ -383,9 +491,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. */ def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port)(materializer)) + adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala)(materializer)) /** * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing @@ -412,35 +522,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used - * for encryption on the connection. + * The given [[HttpsConnectionContext]] will be used for encryption on the connection. */ def cachedHostConnectionPoolTls[T](host: String, port: Int, settings: ConnectionPoolSettings, - httpsContext: Optional[HttpsContext], + connectionContext: HttpsConnectionContext, log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings, - httpsContext.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) - - /** - * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing - * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool - * configuration a separate connection pool is maintained. - * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. - * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. - * - * The internal caching logic guarantees that there will never be more than a single pool running for the - * given target host endpoint and configuration (in this ActorSystem). - * - * Since the underlying transport usually comprises more than a single connection the produced flow might generate - * responses in an order that doesn't directly match the consumed requests. - * For example, if two requests A and B enter the flow in that order the response for B might be produced before the - * response for A. - * In order to allow for easy response-to-request association the flow takes in a custom, opaque context - * object of type `T` from the application which is emitted together with the corresponding response. - */ - def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer)) + adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer)) /** * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool @@ -449,12 +537,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * Since the underlying transport usually comprises more than a single connection the produced flow might generate * responses in an order that doesn't directly match the consumed requests. - * For example, if two requests A and B enter the flow in that order the response for B might be produced before the - * response for A. + * For example, if two requests `A` and `B` enter the flow in that order the response for `B` might be produced before the + * response for `A`. * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = + def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] = adaptTupleFlow(delegate.superPool[T]()(materializer)) /** @@ -462,25 +550,46 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or * a valid `Host` header. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used - * for setting up the HTTPS connection pool, if required. + * The given [[HttpsConnectionContext]] is used to configure TLS for the connection. * * Since the underlying transport usually comprises more than a single connection the produced flow might generate * responses in an order that doesn't directly match the consumed requests. - * For example, if two requests A and B enter the flow in that order the response for B might be produced before the - * response for A. + * For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the + * response for `A`. + * * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ def superPool[T](settings: ConnectionPoolSettings, - httpsContext: Optional[HttpsContext], + connectionContext: HttpsConnectionContext, log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = - adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer)) + adaptTupleFlow(delegate.superPool[T](connectionContext.asScala, settings, log)(materializer)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the + * response for `A`. + * + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def superPool[T](settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = + adaptTupleFlow(delegate.superPool[T](defaultClientHttpsContext.asScala, settings, log)(materializer)) /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's * effective URI to produce a response future. * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * * Note that the request must have either an absolute URI or a valid `Host` header, otherwise * the future will be completed with an error. */ @@ -601,15 +710,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools() /** - * Gets the current default client-side [[HttpsContext]]. + * Gets the current default client-side [[HttpsConnectionContext]]. */ - def defaultClientHttpsContext: HttpsContext = delegate.defaultClientHttpsContext + def defaultClientHttpsContext: HttpsConnectionContext = delegate.defaultClientHttpsContext /** - * Sets the default client-side [[HttpsContext]]. + * Sets the default client-side [[HttpsConnectionContext]]. */ - def setDefaultClientHttpsContext(context: HttpsContext): Unit = - delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext]) + def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit = + delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsConnectionContext]) private def adaptTupleFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = { implicit val _ = JavaMapping.identity[T] diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala new file mode 100644 index 0000000000..e17508d683 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.scaladsl + +import akka.stream.io.NegotiateNewSession + +import scala.collection.JavaConverters._ +import java.util.{ Optional, Collection ⇒ JCollection } +import javax.net.ssl._ + +import com.typesafe.sslconfig.ssl.ClientAuth + +import scala.collection.immutable +import scala.compat.java8.OptionConverters._ + +trait ConnectionContext extends akka.http.javadsl.ConnectionContext { + final def defaultPort = getDefaultPort +} + +object ConnectionContext { + def https(sslContext: SSLContext, + enabledCipherSuites: Option[immutable.Seq[String]] = None, + enabledProtocols: Option[immutable.Seq[String]] = None, + clientAuth: Option[ClientAuth] = None, + sslParameters: Option[SSLParameters] = None) = { + new HttpsConnectionContext(sslContext, enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) + } + + def noEncryption() = HttpConnectionContext +} + +final class HttpsConnectionContext( + val sslContext: SSLContext, + val enabledCipherSuites: Option[immutable.Seq[String]] = None, + val enabledProtocols: Option[immutable.Seq[String]] = None, + val clientAuth: Option[ClientAuth] = None, + val sslParameters: Option[SSLParameters] = None) + extends akka.http.javadsl.HttpsConnectionContext + with ConnectionContext { + + def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) + + override def getSslContext = sslContext + override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava + override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava + override def getClientAuth: Optional[ClientAuth] = clientAuth.asJava + override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava +} + +sealed class HttpConnectionContext extends akka.http.javadsl.HttpConnectionContext with ConnectionContext +final object HttpConnectionContext extends HttpConnectionContext { + /** Java API */ + def getInstance() = this +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index d6c4f9870f..1ea0bae136 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -6,7 +6,6 @@ package akka.http.scaladsl import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap -import java.util.{ Collection ⇒ JCollection, Optional } import javax.net.ssl._ import akka.actor._ @@ -46,21 +45,25 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte // configured default HttpsContext for the client-side // SYNCHRONIZED ACCESS ONLY! - private[this] var _defaultClientHttpsContext: HttpsContext = _ + private[this] var _defaultClientHttpsContext: HttpsConnectionContext = _ // ** SERVER ** // + private[this] final val DefaultPortForProtocol = -1 // any negative value + /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * on the given `endpoint`. + * * If the given port is 0 the resulting source can be materialized several times. Each materialization will * then be assigned a new local port by the operating system, which can then be retrieved by the materialized * [[ServerBinding]]. + * * If the given port is non-zero subsequent materialization attempts of the produced source will immediately * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. * - * If an [[HttpsContext]] is given it will be used for setting up TLS encryption on the binding. + * If an [[ConnectionContext]] is given it will be used for setting up TLS encryption on the binding. * Otherwise the binding will be unencrypted. * * If no `port` is explicitly given (or the port value is negative) the protocol's default port will be used, @@ -69,12 +72,12 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * To configure additional settings for a server started using this method, * use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly. */ - def bind(interface: String, port: Int = -1, + def bind(interface: String, port: Int = DefaultPortForProtocol, settings: ServerSettings = ServerSettings(system), - httpsContext: Option[HttpsContext] = None, + connectionContext: ConnectionContext = defaultServerHttpContext, log: LoggingAdapter = system.log)(implicit fm: Materializer): Source[IncomingConnection, Future[ServerBinding]] = { - val effectivePort = if (port >= 0) port else if (httpsContext.isEmpty) 80 else 443 - val tlsStage = sslTlsStage(httpsContext, Server) + val effectivePort = if (port >= 0) port else connectionContext.defaultPort + val tlsStage = sslTlsStage(connectionContext, Server) val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout) connections.map { @@ -98,9 +101,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly. */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any], - interface: String, port: Int = -1, + interface: String, port: Int = DefaultPortForProtocol, + connectionContext: ConnectionContext = defaultServerHttpContext, settings: ServerSettings = ServerSettings(system), - httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = { def handleOneConnection(incomingConnection: IncomingConnection): Future[Unit] = try @@ -114,7 +117,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte throw e } - bind(interface, port, settings, httpsContext, log) + bind(interface, port, settings, connectionContext, log) .mapAsyncUnordered(settings.maxConnections) { connection ⇒ handleOneConnection(connection).recoverWith { // Ignore incoming errors from the connection as they will cancel the binding. @@ -139,11 +142,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly. */ def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse, - interface: String, port: Int = -1, + interface: String, port: Int = DefaultPortForProtocol, + connectionContext: ConnectionContext = defaultServerHttpContext, settings: ServerSettings = ServerSettings(system), - httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].map(handler), interface, port, settings, httpsContext, log) + bindAndHandle(Flow[HttpRequest].map(handler), interface, port, connectionContext, settings, log) /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -156,12 +159,12 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly. */ def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse], - interface: String, port: Int = -1, + interface: String, port: Int = DefaultPortForProtocol, + connectionContext: ConnectionContext = defaultServerHttpContext, settings: ServerSettings = ServerSettings(system), - httpsContext: Option[HttpsContext] = None, parallelism: Int = 1, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, settings, httpsContext, log) + bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, connectionContext, settings, log) type ServerLayer = Http.ServerLayer @@ -196,36 +199,36 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - _outgoingConnection(host, port, localAddress, settings, None, log) + _outgoingConnection(host, port, localAddress, settings, ConnectionContext.noEncryption(), log) /** * Same as [[outgoingConnection]] but for encrypted (HTTPS) connections. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * If an explicit [[HttpsConnectionContext]] is given then it rather than the configured default [[HttpsConnectionContext]] will be used * for encryption on the connection. * * To configure additional settings for requests made using this method, * use the `akka.http.client` config section or pass in a [[ClientConnectionSettings]] explicitly. */ def outgoingConnectionTls(host: String, port: Int = 443, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), - httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - _outgoingConnection(host, port, localAddress, settings, effectiveHttpsContext(httpsContext), log) + _outgoingConnection(host, port, localAddress, settings, connectionContext, log) private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress], - settings: ClientConnectionSettings, httpsContext: Option[HttpsContext], + settings: ClientConnectionSettings, connectionContext: ConnectionContext, log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { - val hostHeader = if (port == (if (httpsContext.isEmpty) 80 else 443)) Host(host) else Host(host, port) + val hostHeader = if (port == connectionContext.defaultPort) Host(host) else Host(host, port) val layer = clientLayer(hostHeader, settings, log) - layer.joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, httpsContext, log))(Keep.right) + layer.joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, connectionContext, log))(Keep.right) } private def _outgoingTlsConnectionLayer(host: String, port: Int, localAddress: Option[InetSocketAddress], - settings: ClientConnectionSettings, httpsContext: Option[HttpsContext], + settings: ClientConnectionSettings, connectionContext: ConnectionContext, log: LoggingAdapter): Flow[SslTlsOutbound, SslTlsInbound, Future[OutgoingConnection]] = { - val tlsStage = sslTlsStage(httpsContext, Client, Some(host -> port)) + val tlsStage = sslTlsStage(connectionContext, Client, Some(host -> port)) val transportFlow = Tcp().outgoingConnection(new InetSocketAddress(host, port), localAddress, settings.socketOptions, halfClose = true, settings.connectingTimeout, settings.idleTimeout) @@ -274,24 +277,24 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte def newHostConnectionPool[T](host: String, port: Int = 80, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(settings, None, log) + val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) } /** * Same as [[newHostConnectionPool]] but for encrypted (HTTPS) connections. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used * for encryption on the connections. * * To configure additional settings for the pool (and requests made using it), * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ def newHostConnectionPoolTls[T](host: String, port: Int = 443, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log) + val cps = ConnectionPoolSetup(settings, connectionContext, log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) } @@ -309,7 +312,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def newHostConnectionPool[T](setup: HostConnectionPoolSetup)( + private def newHostConnectionPool[T](setup: HostConnectionPoolSetup)( implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val gatewayFuture = FastFuture.successful(new PoolGateway(setup, Promise())) gatewayClientFlow(setup, gatewayFuture) @@ -338,7 +341,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte def cachedHostConnectionPool[T](host: String, port: Int = 80, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(settings, None, log) + val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) } @@ -346,17 +349,17 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte /** * Same as [[cachedHostConnectionPool]] but for encrypted (HTTPS) connections. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used * for encryption on the connections. * * To configure additional settings for the pool (and requests made using it), * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ def cachedHostConnectionPoolTls[T](host: String, port: Int = 443, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log) + val cps = ConnectionPoolSetup(settings, connectionContext, log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) } @@ -386,7 +389,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool * depending on their respective effective URIs. Note that incoming requests must have an absolute URI. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used * for setting up HTTPS connection pools, if required. * * Since the underlying transport usually comprises more than a single connection the produced flow might generate @@ -399,80 +402,80 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * To configure additional settings for the pool (and requests made using it), * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ - def superPool[T](settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - httpsContext: Option[HttpsContext] = None, + def superPool[T](connectionContext: HttpsConnectionContext = defaultClientHttpsContext, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = - clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, settings, httpsContext, log) } + clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, settings, connectionContext, log) } /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's * effective URI to produce a response future. * - * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used - * for setting up the HTTPS connection pool, if required. + * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used + * for setting up the HTTPS connection pool, if the request is targetted towards an `https` endpoint. * * Note that the request must have an absolute URI, otherwise the future will be completed with an error. */ def singleRequest(request: HttpRequest, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[HttpResponse] = try { - val gatewayFuture = cachedGateway(request, settings, httpsContext, log) + val gatewayFuture = cachedGateway(request, settings, connectionContext, log) gatewayFuture.flatMap(_(request))(fm.executionContext) } catch { case e: IllegalUriException ⇒ FastFuture.failed(e) } /** - * Constructs a [[WebsocketClientLayer]] stage using the configured default [[ClientConnectionSettings]], + * Constructs a [[WebSocketClientLayer]] stage using the configured default [[ClientConnectionSettings]], * configured using the `akka.http.client` config section. * * The layer is not reusable and must only be materialized once. */ - def websocketClientLayer(request: WebsocketRequest, + def webSocketClientLayer(request: WebSocketRequest, settings: ClientConnectionSettings = ClientConnectionSettings(system), - log: LoggingAdapter = system.log): Http.WebsocketClientLayer = - WebsocketClientBlueprint(request, settings, log) + log: LoggingAdapter = system.log): Http.WebSocketClientLayer = + WebSocketClientBlueprint(request, settings, log) /** - * Constructs a flow that once materialized establishes a Websocket connection to the given Uri. + * Constructs a flow that once materialized establishes a WebSocket connection to the given Uri. * * The layer is not reusable and must only be materialized once. */ - def websocketClientFlow(request: WebsocketRequest, + def webSocketClientFlow(request: WebSocketRequest, + connectionContext: ConnectionContext = defaultClientHttpsContext, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), - httpsContext: Option[HttpsContext] = None, - log: LoggingAdapter = system.log): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = { + log: LoggingAdapter = system.log): Flow[Message, Message, Future[WebSocketUpgradeResponse]] = { import request.uri - require(uri.isAbsolute, s"Websocket request URI must be absolute but was '$uri'") + require(uri.isAbsolute, s"WebSocket request URI must be absolute but was '$uri'") val ctx = uri.scheme match { - case "ws" ⇒ None - case "wss" ⇒ effectiveHttpsContext(httpsContext) - case scheme @ _ ⇒ - throw new IllegalArgumentException(s"Illegal URI scheme '$scheme' in '$uri' for Websocket request. " + - s"Websocket requests must use either 'ws' or 'wss'") + case "ws" ⇒ ConnectionContext.noEncryption() + case "wss" ⇒ connectionContext + case scheme ⇒ + throw new IllegalArgumentException(s"Illegal URI scheme '$scheme' in '$uri' for WebSocket request. " + + s"WebSocket requests must use either 'ws' or 'wss'") } val host = uri.authority.host.address val port = uri.effectivePort - websocketClientLayer(request, settings, log) + webSocketClientLayer(request, settings, log) .joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, ctx, log))(Keep.left) } /** - * Runs a single Websocket conversation given a Uri and a flow that represents the client side of the - * Websocket conversation. + * Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the + * WebSocket conversation. */ - def singleWebsocketRequest[T](request: WebsocketRequest, + def singleWebSocketRequest[T](request: WebSocketRequest, clientFlow: Flow[Message, Message, T], + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), - httpsContext: Option[HttpsContext] = None, - log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebsocketUpgradeResponse], T) = - websocketClientFlow(request, localAddress, settings, httpsContext, log) + log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebSocketUpgradeResponse], T) = + webSocketClientFlow(request, connectionContext, localAddress, settings, log) .joinMat(clientFlow)(Keep.both).run() /** @@ -493,9 +496,16 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte } /** - * Gets the current default client-side [[HttpsContext]]. + * Gets the current default server-side [[ConnectionContext]] – defaults to plain HTTP. */ - def defaultClientHttpsContext: HttpsContext = + def defaultServerHttpContext: ConnectionContext = + ConnectionContext.noEncryption() + + /** + * Gets the current default client-side [[HttpsConnectionContext]]. + * Defaults used here can be configured using ssl-config or the context can be replaced using [[setDefaultClientHttpsContext]] + */ + def defaultClientHttpsContext: HttpsConnectionContext = synchronized { _defaultClientHttpsContext match { case null ⇒ @@ -507,9 +517,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte } /** - * Sets the default client-side [[HttpsContext]]. + * Sets the default client-side [[HttpsConnectionContext]]. */ - def setDefaultClientHttpsContext(context: HttpsContext): Unit = + def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit = synchronized { _defaultClientHttpsContext = context } @@ -518,10 +528,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] private def cachedGateway(request: HttpRequest, - settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], + settings: ConnectionPoolSettings, connectionContext: ConnectionContext, log: LoggingAdapter)(implicit fm: Materializer): Future[PoolGateway] = if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) { - val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) effectiveHttpsContext(httpsContext) else None + val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) connectionContext else ConnectionContext.noEncryption() val setup = ConnectionPoolSetup(settings, httpsCtx, log) val host = request.uri.authority.host.toString() val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup) @@ -531,6 +541,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte throw new IllegalUriException(ErrorInfo(msg)) } + /** INTERNAL API */ private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: Materializer): Future[PoolGateway] = { val gatewayPromise = Promise[PoolGateway]() hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match { @@ -575,15 +586,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte } } - private def effectiveHttpsContext(ctx: Option[HttpsContext]): Option[HttpsContext] = - ctx orElse Some(defaultClientHttpsContext) - - private[http] def sslTlsStage(httpsContext: Option[HttpsContext], role: Role, hostInfo: Option[(String, Int)] = None) = - httpsContext match { - case Some(hctx) ⇒ - SslTls(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) - case None ⇒ - SslTlsPlacebo.forScala + /** Creates real or placebo SslTls stage based on if ConnectionContext is HTTPS or not. */ + private[http] def sslTlsStage(connectionContext: ConnectionContext, role: Role, hostInfo: Option[(String, Int)] = None) = + connectionContext match { + case hctx: HttpsConnectionContext ⇒ SslTls(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) + case other ⇒ SslTlsPlacebo.forScala // if it's not HTTPS, we don't enable SSL/TLS } } @@ -709,46 +716,18 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { new HttpExt(system.settings.config getConfig "akka.http")(system) } -import scala.collection.JavaConverters._ - -//# https-context-impl /** * TLS configuration for an HTTPS server binding or client connection. * For the sslContext please refer to the com.typeasfe.ssl-config library. * The remaining four parameters configure the initial session that will * be negotiated, see [[akka.stream.io.NegotiateNewSession]] for details. */ -final case class HttpsContext(sslContext: SSLContext, - enabledCipherSuites: Option[immutable.Seq[String]] = None, - enabledProtocols: Option[immutable.Seq[String]] = None, - clientAuth: Option[ClientAuth] = None, - sslParameters: Option[SSLParameters] = None) - //# - extends akka.http.javadsl.HttpsContext { - def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) - - /** Java API */ - override def getSslContext: SSLContext = sslContext - - /** Java API */ - override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava - - /** Java API */ - override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava - - /** Java API */ - override def getClientAuth: Optional[ClientAuth] = clientAuth.asJava - - /** Java API */ - override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava -} - trait DefaultSSLContextCreation { protected def system: ActorSystem protected def sslConfig: AkkaSSLConfig - protected def createDefaultClientHttpsContext(): HttpsContext = { + protected def createDefaultClientHttpsContext(): HttpsConnectionContext = { val config = sslConfig.config val log = Logging(system, getClass) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala index 49a498602a..3ddb661074 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala @@ -2,7 +2,7 @@ package akka.http.impl.engine.client import javax.net.ssl.SSLContext -import akka.http.scaladsl.{ HttpsContext, Http } +import akka.http.scaladsl.{ ConnectionContext, Http } import akka.http.scaladsl.model.{ HttpHeader, HttpResponse, HttpRequest } import akka.stream.ActorMaterializer import akka.stream.scaladsl.{ Flow, Sink, Source } @@ -28,7 +28,7 @@ class ClientCancellationSpec extends AkkaSpec(""" { req ⇒ HttpResponse() }, // TLS client does full-close, no need for the connection:close header addressTls.getHostName, addressTls.getPort, - httpsContext = Some(HttpsContext(SSLContext.getDefault)))(noncheckedMaterializer) + connectionContext = ConnectionContext.https(SSLContext.getDefault))(noncheckedMaterializer) def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped { val requests = TestPublisher.probe[HttpRequest]() diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index da73e5b07a..01e434f9e5 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009-2014 Typesafe Inc. + * Copyright (C) 2009-2016 Typesafe Inc. */ package akka.http.impl.engine.client @@ -303,8 +303,7 @@ class ConnectionPoolSpec extends AkkaSpec(""" .transform(StreamUtils.recover { case NoErrorComplete ⇒ ByteString.empty }), Flow[ByteString].map(SessionBytes(null, _))) val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink.fromSubscriber(incomingConnections) - // TODO getHostString in Java7 - Tcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout) + Tcp().bind(serverEndpoint.getHostString, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout) .map { c ⇒ val layer = Http().serverLayer(serverSettings, log = log) Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow) @@ -340,7 +339,7 @@ class ConnectionPoolSpec extends AkkaSpec(""" ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = { val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit, idleTimeout, ClientConnectionSettings(system)) - flowTestBench(Http().superPool[T](settings)) + flowTestBench(Http().superPool[T](settings = settings)) } def flowTestBench[T, Mat](poolFlow: Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]) = { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala index 6d90a9459f..2050474119 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/TlsEndpointVerificationSpec.scala @@ -12,7 +12,7 @@ import akka.stream.io._ import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import akka.http.impl.util._ -import akka.http.scaladsl.{ HttpsContext, Http } +import akka.http.scaladsl.{ ConnectionContext, Http } import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest } import akka.http.scaladsl.model.headers.{ Host, `Tls-Session-Info` } import org.scalatest.time.{ Span, Seconds } @@ -88,10 +88,10 @@ class TlsEndpointVerificationSpec extends AkkaSpec(""" } } - def pipeline(clientContext: HttpsContext, hostname: String): HttpRequest ⇒ Future[HttpResponse] = req ⇒ + def pipeline(clientContext: ConnectionContext, hostname: String): HttpRequest ⇒ Future[HttpResponse] = req ⇒ Source.single(req).via(pipelineFlow(clientContext, hostname)).runWith(Sink.head) - def pipelineFlow(clientContext: HttpsContext, hostname: String): Flow[HttpRequest, HttpResponse, NotUsed] = { + def pipelineFlow(clientContext: ConnectionContext, hostname: String): Flow[HttpRequest, HttpResponse, NotUsed] = { val handler: HttpRequest ⇒ HttpResponse = { req ⇒ // verify Tls-Session-Info header information val name = req.header[`Tls-Session-Info`].flatMap(_.localPrincipal).map(_.getName) @@ -99,8 +99,8 @@ class TlsEndpointVerificationSpec extends AkkaSpec(""" else HttpResponse(StatusCodes.BadRequest, entity = "Tls-Session-Info header verification failed") } - val serverSideTls = Http().sslTlsStage(Some(ExampleHttpContexts.exampleServerContext), Server) - val clientSideTls = Http().sslTlsStage(Some(clientContext), Client, Some(hostname -> 8080)) + val serverSideTls = Http().sslTlsStage(ExampleHttpContexts.exampleServerContext, Server) + val clientSideTls = Http().sslTlsStage(clientContext, Client, Some(hostname -> 8080)) val server = Http().serverLayer() diff --git a/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala b/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala index 7211b79605..39e617dba8 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/util/ExampleHttpContexts.scala @@ -9,12 +9,15 @@ import java.security.{ SecureRandom, KeyStore } import java.security.cert.{ CertificateFactory, Certificate } import javax.net.ssl.{ SSLParameters, SSLContext, TrustManagerFactory, KeyManagerFactory } -import akka.http.scaladsl.HttpsContext +import akka.http.scaladsl.HttpsConnectionContext /** * These are HTTPS example configurations that take key material from the resources/key folder. */ object ExampleHttpContexts { + + // TODO show example how to obtain pre-configured context from ssl-config + val exampleServerContext = { // never put passwords into code! val password = "abcdef".toCharArray @@ -28,8 +31,9 @@ object ExampleHttpContexts { val context = SSLContext.getInstance("TLS") context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom) - HttpsContext(context) + new HttpsConnectionContext(context) } + val exampleClientContext = { val certStore = KeyStore.getInstance(KeyStore.getDefaultType) certStore.load(null, null) @@ -44,7 +48,7 @@ object ExampleHttpContexts { val params = new SSLParameters() params.setEndpointIdentificationAlgorithm("https") - HttpsContext(context, sslParameters = Some(params)) + new HttpsConnectionContext(context, sslParameters = Some(params)) } def resourceStream(resourceName: String): InputStream = { diff --git a/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala b/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala index 158af746c6..515bead527 100644 --- a/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala +++ b/akka-http-core/src/test/scala/akka/http/javadsl/model/JavaApiTestCaseSpecs.scala @@ -10,7 +10,7 @@ import javax.net.ssl.{ SSLParameters, SSLContext } import akka.http.javadsl.model.headers.Cookie import akka.http.scaladsl.model import akka.http.scaladsl.model.headers.BasicHttpCredentials -import akka.stream.io.ClientAuth +import com.typesafe.sslconfig.ssl.ClientAuth import org.scalatest.{ FreeSpec, MustMatchers } import scala.collection.immutable @@ -61,7 +61,7 @@ class JavaApiTestCaseSpecs extends FreeSpec with MustMatchers { Uri.create("/order").query(JavaApiTestCases.addSessionId(orderId)) must be(Uri.create("/order?orderId=123&session=abcdefghijkl")) } "create HttpsContext" in { - akka.http.javadsl.HttpsContext.create(SSLContext.getDefault, + akka.http.javadsl.ConnectionContext.https(SSLContext.getDefault, Optional.empty[java.util.Collection[String]], Optional.empty[java.util.Collection[String]], Optional.empty[ClientAuth], diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala index 323e08c853..113226546f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala @@ -14,7 +14,8 @@ import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl._ import akka.util.ByteString -import com.typesafe.sslconfig.akka.{ AkkaSSLConfig, SSLEngineConfigurator } +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import com.typesafe.sslconfig.ssl.ClientAuth import scala.annotation.tailrec import akka.stream.io._ @@ -158,16 +159,15 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, applySessionParameters(firstSession) def applySessionParameters(params: NegotiateNewSession): Unit = { - import params._ - enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) - enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) - clientAuth match { + params.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) + params.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) + params.clientAuth match { case Some(ClientAuth.None) ⇒ engine.setNeedClientAuth(false) case Some(ClientAuth.Want) ⇒ engine.setWantClientAuth(true) case Some(ClientAuth.Need) ⇒ engine.setNeedClientAuth(true) - case None ⇒ // do nothing + case _ ⇒ // do nothing } - sslParameters foreach (p ⇒ engine.setSSLParameters(p)) + params.sslParameters foreach (p ⇒ engine.setSSLParameters(p)) engine.beginHandshake() lastHandshakeStatus = engine.getHandshakeStatus diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 6982548e6a..81a19b8287 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -12,9 +12,11 @@ import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.util.ByteString import javax.net.ssl._ +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import com.typesafe.sslconfig.ssl.ClientAuth + import scala.annotation.varargs import scala.collection.immutable -import java.security.cert.Certificate import scala.compat.java8.OptionConverters /** @@ -149,7 +151,7 @@ object SslTls { } /** - * This object holds simple wrapping [[BidiFlow]] implementations that can + * This object holds simple wrapping [[akka.stream.scaladsl.BidiFlow]] implementations that can * be used instead of [[SslTls]] when no encryption is desired. The flows will * just adapt the message protocol by wrapping into [[SessionBytes]] and * unwrapping [[SendBytes]]. @@ -422,6 +424,7 @@ object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) { * settings unchanged). */ def withDefaults = this + } /** @@ -429,24 +432,3 @@ object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) { * peer. */ case class SendBytes(bytes: ByteString) extends SslTlsOutbound - -/** - * An SSLEngine can either demand, allow or ignore its peer’s authentication - * (via certificates), where `Need` will fail the handshake if the peer does - * not provide valid credentials, `Want` allows the peer to send credentials - * and verifies them if provided, and `None` disables peer certificate - * verification. - * - * See the documentation for `SSLEngine::setWantClientAuth` for more - * information. - */ -sealed abstract class ClientAuth -object ClientAuth { - case object None extends ClientAuth - case object Want extends ClientAuth - case object Need extends ClientAuth - - def none: ClientAuth = None - def want: ClientAuth = Want - def need: ClientAuth = Need -} diff --git a/notes.md b/notes.md new file mode 100644 index 0000000000..a16d40ab7e --- /dev/null +++ b/notes.md @@ -0,0 +1,62 @@ +Notes on changes + +- hidden "Setup" using methods on Http +- super pool to be "dead simple" +- we want to move away from Option[HttpsContext] as it's a lie, None => defaultContext anyway +- config performed in ssl-config, applying these settings done in Akka + - e.g. NegotiateNewSession +- was: singleRequest(req, settings, context: Option[HttpsContext]) == None meant default +- default port in context is useful for starting the https server + +- in WS, we'll always want to be TLS in practice. APIs use HttpsContext, but provide default one + - if request is to "ws://" then the https is not used of course + +### Server + +Needs to know upfront. + +**bind / bindAndHandle** + - has context + - default HTTP + - if no port given, based on Context 80/443 + +=> Type: ConnectionContext - based on type HTTP / HTTPS +Note: context should be obtainable Http().defaultServerHttpsContext + +### Client + +## connections +Needs to know upfront. + +**outgoingConnection** + - no context + +**outgoingConnectionTls** + - needs https context + - provides default HTTPS + +**outgoingConnection** + - no context + +**newHostConnectionPoolTls** + - needs https context + +=> Tls methods provide default HTTPS config +Type: HttpsConnectionConfig on Tls methods + +## request sensitive (adds TLS when needed): +Needs context "just in case", enables when request needs it. + +**singleRequest** + - has context, default HTTPS, may drop it + +**singleWebSocketRequest** + - has context, default HTTPS, may drop it + +**singleWebSocketRequest** + - needs context, "just in case" + - provides default HTTPS + +=> normal methods, Tls methods +=> Tls methods provide default HTTPS config +Type: HttpsConnectionConfig on Tls methods diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e735e22d3e..23354b42c2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -45,7 +45,7 @@ object Dependencies { val reactiveStreams = "org.reactivestreams" % "reactive-streams" % "1.0.0" // CC0 // ssl-config - val sslConfigAkka = "com.typesafe" %% "ssl-config-akka" % "0.1.0" // ApacheV2 + val sslConfigAkka = "com.typesafe" %% "ssl-config-akka" % "0.1.1" // ApacheV2 // For akka-http spray-json support val sprayJson = "io.spray" %% "spray-json" % "1.3.2" // ApacheV2