!htc,str #19275 towards ssl-configurable server, change APIs

This commit is contained in:
Konrad Malawski 2016-01-19 20:46:34 +01:00
parent 9ba5596103
commit 100f82be84
17 changed files with 499 additions and 306 deletions

View file

@ -1,56 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import akka.japi.Util;
import akka.stream.io.ClientAuth;
import java.util.Collection;
import java.util.Optional;
import scala.compat.java8.OptionConverters;
/**
* TLS configuration for an HTTPS server binding or client connection.
* For the sslContext please refer to the com.typeasfe.ssl-config library.
* The remaining four parameters configure the initial session that will
* be negotiated, see {@link akka.stream.io.NegotiateNewSession} for details.
*/
public abstract class HttpsContext {
public abstract SSLContext getSslContext();
public abstract Optional<Collection<String>> getEnabledCipherSuites();
public abstract Optional<Collection<String>> getEnabledProtocols();
public abstract Optional<ClientAuth> getClientAuth();
public abstract Optional<SSLParameters> getSslParameters();
//#http-context-creation
public static HttpsContext create(SSLContext sslContext,
Optional<Collection<String>> enabledCipherSuites,
Optional<Collection<String>> enabledProtocols,
Optional<ClientAuth> clientAuth,
Optional<SSLParameters> sslParameters)
//#http-context-creation
{
final scala.Option<scala.collection.immutable.Seq<String>> ecs;
if (enabledCipherSuites.isPresent()) ecs = scala.Option.apply(Util.immutableSeq(enabledCipherSuites.get()));
else ecs = scala.Option.empty();
final scala.Option<scala.collection.immutable.Seq<String>> ep;
if(enabledProtocols.isPresent()) ep = scala.Option.apply(Util.immutableSeq(enabledProtocols.get()));
else ep = scala.Option.empty();
return new akka.http.scaladsl.HttpsContext(sslContext,
ecs,
ep,
OptionConverters.toScala(clientAuth),
OptionConverters.toScala(sslParameters));
}
}

View file

@ -4,31 +4,27 @@
package akka.http
import java.lang.{ Iterable JIterable }
import java.util.Optional
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.http.impl.util._
import akka.http.scaladsl.HttpsContext
import akka.http.javadsl.ConnectionContext
import com.typesafe.config.Config
import scala.concurrent.duration.Duration
import scala.compat.java8.OptionConverters._
final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup)
final case class ConnectionPoolSetup(
settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext],
conContext: ConnectionContext = ConnectionContext.noEncryption(),
log: LoggingAdapter)
object ConnectionPoolSetup {
/** Java API */
def create(settings: ConnectionPoolSettings,
httpsContext: Optional[akka.http.javadsl.HttpsContext],
connectionContext: ConnectionContext,
log: LoggingAdapter): ConnectionPoolSetup =
ConnectionPoolSetup(settings, httpsContext.asScala.map(_.asInstanceOf[HttpsContext]), log)
ConnectionPoolSetup(settings, connectionContext, log)
}
final case class ConnectionPoolSettings(

View file

@ -20,7 +20,7 @@ import akka.stream.impl.{ SeqActorName, FixedSizeBuffer }
import akka.stream.scaladsl.{ Keep, Flow, Sink, Source }
import akka.http.HostConnectionPoolSetup
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.http.scaladsl.{ ConnectionContext, HttpsConnectionContext, Http }
import PoolFlow._
private object PoolInterfaceActor {
@ -64,9 +64,10 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
import hcps._
import setup._
val connectionFlow =
if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log)
else Http().outgoingConnectionTls(host, port, None, settings.connectionSettings, httpsContext, setup.log)
val connectionFlow = conContext match {
case httpsContext: HttpsConnectionContext Http().outgoingConnectionTls(host, port, httpsContext, None, settings.connectionSettings, setup.log)
case _ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log)
}
val poolFlow = PoolFlow(
Flow[HttpRequest].viaMat(connectionFlow)(Keep.right),
@ -147,7 +148,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
}
def dispatchRequest(pr: PoolRequest): Unit = {
val scheme = Uri.httpScheme(hcps.setup.httpsContext.isDefined)
val scheme = Uri.httpScheme(hcps.setup.conContext.isSecure)
val hostHeader = headers.Host(hcps.host, Uri.normalizePort(hcps.port, scheme))
val effectiveRequest =
pr.request

View file

@ -14,7 +14,7 @@ import scala.collection.immutable
import scala.reflect.ClassTag
import akka.{ NotUsed, japi }
import akka.http.impl.model.{ JavaQuery, JavaUri }
import akka.http.javadsl.{ model jm }
import akka.http.javadsl.{ model jm, HttpConnectionContext, ConnectionContext, HttpsConnectionContext }
import akka.http.scaladsl.{ model sm }
import scala.compat.java8.OptionConverters._
@ -82,7 +82,7 @@ private[http] object JavaMapping {
}
}
/** This trivial mapping isn't enabled by default to prevent it from conflicting with the `Inherited` ones `*/
/** This trivial mapping isn't enabled by default to prevent it from conflicting with the `Inherited` ones */
def identity[T]: JavaMapping[T, T] =
new JavaMapping[T, T] {
def toJava(scalaObject: T): J = scalaObject
@ -164,6 +164,10 @@ private[http] object JavaMapping {
def toScala(javaObject: J): S = cast[S](javaObject)
}
implicit object ConnectionContext extends Inherited[ConnectionContext, akka.http.scaladsl.HttpConnectionContext]
implicit object HttpConnectionContext extends Inherited[HttpConnectionContext, akka.http.scaladsl.HttpConnectionContext]
implicit object HttpsConnectionContext extends Inherited[HttpsConnectionContext, akka.http.scaladsl.HttpsConnectionContext]
implicit object DateTime extends Inherited[jm.DateTime, akka.http.scaladsl.model.DateTime]
implicit object ContentType extends Inherited[jm.ContentType, sm.ContentType]

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl
import java.util.{ Collection JCollection, Optional }
import java.{ util ju }
import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.http.scaladsl
import com.typesafe.sslconfig.ssl.ClientAuth
import scala.compat.java8.OptionConverters
object ConnectionContext {
/** Used to serve HTTPS traffic. */
def https(sslContext: SSLContext): HttpsConnectionContext =
scaladsl.ConnectionContext.https(sslContext)
/** Used to serve HTTPS traffic. */
def https(sslContext: SSLContext, enabledCipherSuites: Optional[JCollection[String]],
enabledProtocols: Optional[JCollection[String]], clientAuth: Optional[ClientAuth], sslParameters: Optional[SSLParameters]) =
scaladsl.ConnectionContext.https(sslContext, sslParameters = OptionConverters.toScala(sslParameters))
/** Used to serve HTTP traffic. */
def noEncryption(): HttpConnectionContext =
scaladsl.ConnectionContext.noEncryption()
}
trait ConnectionContext {
def isSecure: Boolean
/** Java API */
def getDefaultPort: Int
}
trait HttpConnectionContext extends ConnectionContext {
override final def isSecure = false
override final def getDefaultPort = 80
}
trait HttpsConnectionContext extends ConnectionContext {
override final def isSecure = true
override final def getDefaultPort = 443
/** Java API */
def getEnabledCipherSuites: Optional[JCollection[String]]
/** Java API */
def getEnabledProtocols: Optional[JCollection[String]]
/** Java API */
def getClientAuth: Optional[ClientAuth]
/** Java API */
def getSslContext: SSLContext
/** Java API */
def getSslParameters: Optional[SSLParameters]
}

View file

@ -39,9 +39,6 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
private lazy val delegate = akka.http.scaladsl.Http(system)
private implicit def convertHttpsContext(hctx: Optional[HttpsContext]): Option[akka.http.scaladsl.HttpsContext] =
hctx.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext])
/**
* Constructs a server layer stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]] isn't
* reusable and can only be materialized once.
@ -96,19 +93,41 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
settings: ServerSettings,
httpsContext: Optional[HttpsContext],
connectionContext: ConnectionContext,
materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] =
new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
settings: ServerSettings,
connectionContext: ConnectionContext,
log: LoggingAdapter,
materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] =
new Source(delegate.bind(interface, port, settings, httpsContext, log)(materializer)
new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala, log)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
@ -126,6 +145,21 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
interface, port)(materializer)
.map(new ServerBinding(_))(ec)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int,
connectionContext: ConnectionContext,
materializer: Materializer): Future[ServerBinding] =
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port, connectionContext.asScala)(materializer)
.map(new ServerBinding(_))(ec)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
@ -136,11 +170,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int,
settings: ServerSettings,
httpsContext: Optional[HttpsContext],
connectionContext: ConnectionContext,
log: LoggingAdapter,
materializer: Materializer): Future[ServerBinding] =
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port, settings, httpsContext, log)(materializer)
interface, port, connectionContext.asScala, settings, log)(materializer)
.map(new ServerBinding(_))(ec)
/**
@ -156,6 +190,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port)(materializer)
.map(new ServerBinding(_))(ec)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*/
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
interface: String, port: Int,
connectionContext: ConnectionContext,
materializer: Materializer): Future[ServerBinding] =
delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port, connectionContext.asScala)(materializer)
.map(new ServerBinding(_))(ec)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
@ -166,11 +214,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
interface: String, port: Int,
settings: ServerSettings,
httpsContext: Optional[HttpsContext],
connectionContext: ConnectionContext,
log: LoggingAdapter,
materializer: Materializer): Future[ServerBinding] =
delegate.bindAndHandleSync(handler.apply(_).asScala,
interface, port, settings, httpsContext, log)(materializer)
interface, port, connectionContext.asScala, settings, log)(materializer)
.map(new ServerBinding(_))(ec)
/**
@ -195,11 +243,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
interface: String, port: Int,
settings: ServerSettings, httpsContext: Optional[HttpsContext],
connectionContext: ConnectionContext,
materializer: Materializer): Future[ServerBinding] =
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], interface, port, connectionContext.asScala)(materializer)
.map(new ServerBinding(_))(ec)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*/
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
interface: String, port: Int,
settings: ServerSettings, connectionContext: ConnectionContext,
parallelism: Int, log: LoggingAdapter,
materializer: Materializer): Future[ServerBinding] =
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]],
interface, port, settings, httpsContext, parallelism, log)(materializer)
interface, port, connectionContext.asScala, settings, parallelism, log)(materializer)
.map(new ServerBinding(_))(ec)
/**
@ -231,10 +293,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
outgoingConnection(host, 80)
/**
* Same as [[outgoingConnection]] but with HTTPS encryption.
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def outgoingConnectionTls(host: String): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
outgoingConnectionTls(host, 443)
outgoingConnectionTls(host, defaultClientHttpsContext)
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*/
def outgoingConnectionTls(host: String, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
outgoingConnectionTls(host, 443, connectionContext)
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
@ -248,12 +320,28 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
}
/**
* Same as [[outgoingConnection]] but with HTTPS encryption.
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def outgoingConnectionTls(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port))(Keep.right)
.viaMat(delegate.outgoingConnection(host, port))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def outgoingConnectionTls(host: String, port: Int, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
@ -272,20 +360,35 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
}
/**
* Same as [[outgoingConnection]] but with HTTPS encryption.
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* for encryption on the connection.
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def outgoingConnectionTls(host: String, port: Int,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
httpsContext: Optional[HttpsContext],
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, settings,
httpsContext.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log))(Keep.right)
.viaMat(delegate.outgoingConnectionTls(host, port, defaultClientHttpsContext.asScala, localAddress.asScala, settings, log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def outgoingConnectionTls(host: String, port: Int,
connectionContext: HttpsConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala, localAddress.asScala, settings, log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
@ -303,15 +406,29 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
def newHostConnectionPool[T](host: String, port: Int,
materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
def newHostConnectionPoolTls[T](host: String, port: Int,
materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int,
connectionContext: HttpsConnectionContext,
materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala)(materializer))
/**
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
* the requests from all its materializations across this pool.
@ -334,32 +451,23 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* for encryption on the connection.
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int,
settings: ConnectionPoolSettings,
httpsContext: Optional[HttpsContext],
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings,
httpsContext.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer))
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala, settings, log)(materializer))
/**
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
* the requests from all its materializations across this pool.
* While the started host connection pool internally shuts itself down automatically after the configured idle
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer))
def newHostConnectionPoolTls[T](host: String, port: Int,
connectionContext: HttpsConnectionContext,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer))
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
@ -383,9 +491,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port)(materializer))
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala)(materializer))
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
@ -412,35 +522,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* for encryption on the connection.
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int,
settings: ConnectionPoolSettings,
httpsContext: Optional[HttpsContext],
connectionContext: HttpsConnectionContext,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings,
httpsContext.asScala.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer))
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
* configuration a separate connection pool is maintained.
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
*
* The internal caching logic guarantees that there will never be more than a single pool running for the
* given target host endpoint and configuration (in this ActorSystem).
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer))
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer))
/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
@ -449,12 +537,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* For example, if two requests `A` and `B` enter the flow in that order the response for `B` might be produced before the
* response for `A`.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] =
adaptTupleFlow(delegate.superPool[T]()(materializer))
/**
@ -462,25 +550,46 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or
* a valid `Host` header.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* for setting up the HTTPS connection pool, if required.
* The given [[HttpsConnectionContext]] is used to configure TLS for the connection.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the
* response for `A`.
*
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](settings: ConnectionPoolSettings,
httpsContext: Optional[HttpsContext],
connectionContext: HttpsConnectionContext,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer))
adaptTupleFlow(delegate.superPool[T](connectionContext.asScala, settings, log)(materializer))
/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
* depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or
* a valid `Host` header.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the
* response for `A`.
*
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T](defaultClientHttpsContext.asScala, settings, log)(materializer))
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
@ -601,15 +710,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools()
/**
* Gets the current default client-side [[HttpsContext]].
* Gets the current default client-side [[HttpsConnectionContext]].
*/
def defaultClientHttpsContext: HttpsContext = delegate.defaultClientHttpsContext
def defaultClientHttpsContext: HttpsConnectionContext = delegate.defaultClientHttpsContext
/**
* Sets the default client-side [[HttpsContext]].
* Sets the default client-side [[HttpsConnectionContext]].
*/
def setDefaultClientHttpsContext(context: HttpsContext): Unit =
delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext])
def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit =
delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsConnectionContext])
private def adaptTupleFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = {
implicit val _ = JavaMapping.identity[T]

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl
import akka.stream.io.NegotiateNewSession
import scala.collection.JavaConverters._
import java.util.{ Optional, Collection JCollection }
import javax.net.ssl._
import com.typesafe.sslconfig.ssl.ClientAuth
import scala.collection.immutable
import scala.compat.java8.OptionConverters._
trait ConnectionContext extends akka.http.javadsl.ConnectionContext {
final def defaultPort = getDefaultPort
}
object ConnectionContext {
def https(sslContext: SSLContext,
enabledCipherSuites: Option[immutable.Seq[String]] = None,
enabledProtocols: Option[immutable.Seq[String]] = None,
clientAuth: Option[ClientAuth] = None,
sslParameters: Option[SSLParameters] = None) = {
new HttpsConnectionContext(sslContext, enabledCipherSuites, enabledProtocols, clientAuth, sslParameters)
}
def noEncryption() = HttpConnectionContext
}
final class HttpsConnectionContext(
val sslContext: SSLContext,
val enabledCipherSuites: Option[immutable.Seq[String]] = None,
val enabledProtocols: Option[immutable.Seq[String]] = None,
val clientAuth: Option[ClientAuth] = None,
val sslParameters: Option[SSLParameters] = None)
extends akka.http.javadsl.HttpsConnectionContext
with ConnectionContext {
def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters)
override def getSslContext = sslContext
override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava
override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava
override def getClientAuth: Optional[ClientAuth] = clientAuth.asJava
override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava
}
sealed class HttpConnectionContext extends akka.http.javadsl.HttpConnectionContext with ConnectionContext
final object HttpConnectionContext extends HttpConnectionContext {
/** Java API */
def getInstance() = this
}

View file

@ -6,7 +6,6 @@ package akka.http.scaladsl
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import java.util.{ Collection JCollection, Optional }
import javax.net.ssl._
import akka.actor._
@ -46,21 +45,25 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
// configured default HttpsContext for the client-side
// SYNCHRONIZED ACCESS ONLY!
private[this] var _defaultClientHttpsContext: HttpsContext = _
private[this] var _defaultClientHttpsContext: HttpsConnectionContext = _
// ** SERVER ** //
private[this] final val DefaultPortForProtocol = -1 // any negative value
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* If an [[HttpsContext]] is given it will be used for setting up TLS encryption on the binding.
* If an [[ConnectionContext]] is given it will be used for setting up TLS encryption on the binding.
* Otherwise the binding will be unencrypted.
*
* If no `port` is explicitly given (or the port value is negative) the protocol's default port will be used,
@ -69,12 +72,12 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* To configure additional settings for a server started using this method,
* use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly.
*/
def bind(interface: String, port: Int = -1,
def bind(interface: String, port: Int = DefaultPortForProtocol,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
connectionContext: ConnectionContext = defaultServerHttpContext,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Source[IncomingConnection, Future[ServerBinding]] = {
val effectivePort = if (port >= 0) port else if (httpsContext.isEmpty) 80 else 443
val tlsStage = sslTlsStage(httpsContext, Server)
val effectivePort = if (port >= 0) port else connectionContext.defaultPort
val tlsStage = sslTlsStage(connectionContext, Server)
val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout)
connections.map {
@ -98,9 +101,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly.
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = -1,
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = {
def handleOneConnection(incomingConnection: IncomingConnection): Future[Unit] =
try
@ -114,7 +117,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
throw e
}
bind(interface, port, settings, httpsContext, log)
bind(interface, port, settings, connectionContext, log)
.mapAsyncUnordered(settings.maxConnections) { connection
handleOneConnection(connection).recoverWith {
// Ignore incoming errors from the connection as they will cancel the binding.
@ -139,11 +142,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly.
*/
def bindAndHandleSync(handler: HttpRequest HttpResponse,
interface: String, port: Int = -1,
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] =
bindAndHandle(Flow[HttpRequest].map(handler), interface, port, settings, httpsContext, log)
bindAndHandle(Flow[HttpRequest].map(handler), interface, port, connectionContext, settings, log)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -156,12 +159,12 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly.
*/
def bindAndHandleAsync(handler: HttpRequest Future[HttpResponse],
interface: String, port: Int = -1,
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
parallelism: Int = 1,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] =
bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, settings, httpsContext, log)
bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, connectionContext, settings, log)
type ServerLayer = Http.ServerLayer
@ -196,36 +199,36 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
_outgoingConnection(host, port, localAddress, settings, None, log)
_outgoingConnection(host, port, localAddress, settings, ConnectionContext.noEncryption(), log)
/**
* Same as [[outgoingConnection]] but for encrypted (HTTPS) connections.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* If an explicit [[HttpsConnectionContext]] is given then it rather than the configured default [[HttpsConnectionContext]] will be used
* for encryption on the connection.
*
* To configure additional settings for requests made using this method,
* use the `akka.http.client` config section or pass in a [[ClientConnectionSettings]] explicitly.
*/
def outgoingConnectionTls(host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
_outgoingConnection(host, port, localAddress, settings, effectiveHttpsContext(httpsContext), log)
_outgoingConnection(host, port, localAddress, settings, connectionContext, log)
private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress],
settings: ClientConnectionSettings, httpsContext: Option[HttpsContext],
settings: ClientConnectionSettings, connectionContext: ConnectionContext,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
val hostHeader = if (port == (if (httpsContext.isEmpty) 80 else 443)) Host(host) else Host(host, port)
val hostHeader = if (port == connectionContext.defaultPort) Host(host) else Host(host, port)
val layer = clientLayer(hostHeader, settings, log)
layer.joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, httpsContext, log))(Keep.right)
layer.joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, connectionContext, log))(Keep.right)
}
private def _outgoingTlsConnectionLayer(host: String, port: Int, localAddress: Option[InetSocketAddress],
settings: ClientConnectionSettings, httpsContext: Option[HttpsContext],
settings: ClientConnectionSettings, connectionContext: ConnectionContext,
log: LoggingAdapter): Flow[SslTlsOutbound, SslTlsInbound, Future[OutgoingConnection]] = {
val tlsStage = sslTlsStage(httpsContext, Client, Some(host -> port))
val tlsStage = sslTlsStage(connectionContext, Client, Some(host -> port))
val transportFlow = Tcp().outgoingConnection(new InetSocketAddress(host, port), localAddress,
settings.socketOptions, halfClose = true, settings.connectingTimeout, settings.idleTimeout)
@ -274,24 +277,24 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
def newHostConnectionPool[T](host: String, port: Int = 80,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, None, log)
val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
}
/**
* Same as [[newHostConnectionPool]] but for encrypted (HTTPS) connections.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
* for encryption on the connections.
*
* To configure additional settings for the pool (and requests made using it),
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def newHostConnectionPoolTls[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log)
val cps = ConnectionPoolSetup(settings, connectionContext, log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
}
@ -309,7 +312,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](setup: HostConnectionPoolSetup)(
private def newHostConnectionPool[T](setup: HostConnectionPoolSetup)(
implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val gatewayFuture = FastFuture.successful(new PoolGateway(setup, Promise()))
gatewayClientFlow(setup, gatewayFuture)
@ -338,7 +341,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
def cachedHostConnectionPool[T](host: String, port: Int = 80,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, None, log)
val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log)
val setup = HostConnectionPoolSetup(host, port, cps)
cachedHostConnectionPool(setup)
}
@ -346,17 +349,17 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
/**
* Same as [[cachedHostConnectionPool]] but for encrypted (HTTPS) connections.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
* for encryption on the connections.
*
* To configure additional settings for the pool (and requests made using it),
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log)
val cps = ConnectionPoolSetup(settings, connectionContext, log)
val setup = HostConnectionPoolSetup(host, port, cps)
cachedHostConnectionPool(setup)
}
@ -386,7 +389,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
* depending on their respective effective URIs. Note that incoming requests must have an absolute URI.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
* for setting up HTTPS connection pools, if required.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
@ -399,80 +402,80 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* To configure additional settings for the pool (and requests made using it),
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def superPool[T](settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
def superPool[T](connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] =
clientFlow[T](settings) { request request -> cachedGateway(request, settings, httpsContext, log) }
clientFlow[T](settings) { request request -> cachedGateway(request, settings, connectionContext, log) }
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
*
* If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used
* for setting up the HTTPS connection pool, if required.
* If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
* for setting up the HTTPS connection pool, if the request is targetted towards an `https` endpoint.
*
* Note that the request must have an absolute URI, otherwise the future will be completed with an error.
*/
def singleRequest(request: HttpRequest,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[HttpResponse] =
try {
val gatewayFuture = cachedGateway(request, settings, httpsContext, log)
val gatewayFuture = cachedGateway(request, settings, connectionContext, log)
gatewayFuture.flatMap(_(request))(fm.executionContext)
} catch {
case e: IllegalUriException FastFuture.failed(e)
}
/**
* Constructs a [[WebsocketClientLayer]] stage using the configured default [[ClientConnectionSettings]],
* Constructs a [[WebSocketClientLayer]] stage using the configured default [[ClientConnectionSettings]],
* configured using the `akka.http.client` config section.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientLayer(request: WebsocketRequest,
def webSocketClientLayer(request: WebSocketRequest,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log): Http.WebsocketClientLayer =
WebsocketClientBlueprint(request, settings, log)
log: LoggingAdapter = system.log): Http.WebSocketClientLayer =
WebSocketClientBlueprint(request, settings, log)
/**
* Constructs a flow that once materialized establishes a Websocket connection to the given Uri.
* Constructs a flow that once materialized establishes a WebSocket connection to the given Uri.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientFlow(request: WebsocketRequest,
def webSocketClientFlow(request: WebSocketRequest,
connectionContext: ConnectionContext = defaultClientHttpsContext,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = {
log: LoggingAdapter = system.log): Flow[Message, Message, Future[WebSocketUpgradeResponse]] = {
import request.uri
require(uri.isAbsolute, s"Websocket request URI must be absolute but was '$uri'")
require(uri.isAbsolute, s"WebSocket request URI must be absolute but was '$uri'")
val ctx = uri.scheme match {
case "ws" None
case "wss" effectiveHttpsContext(httpsContext)
case scheme @ _
throw new IllegalArgumentException(s"Illegal URI scheme '$scheme' in '$uri' for Websocket request. " +
s"Websocket requests must use either 'ws' or 'wss'")
case "ws" ConnectionContext.noEncryption()
case "wss" connectionContext
case scheme
throw new IllegalArgumentException(s"Illegal URI scheme '$scheme' in '$uri' for WebSocket request. " +
s"WebSocket requests must use either 'ws' or 'wss'")
}
val host = uri.authority.host.address
val port = uri.effectivePort
websocketClientLayer(request, settings, log)
webSocketClientLayer(request, settings, log)
.joinMat(_outgoingTlsConnectionLayer(host, port, localAddress, settings, ctx, log))(Keep.left)
}
/**
* Runs a single Websocket conversation given a Uri and a flow that represents the client side of the
* Websocket conversation.
* Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the
* WebSocket conversation.
*/
def singleWebsocketRequest[T](request: WebsocketRequest,
def singleWebSocketRequest[T](request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebsocketUpgradeResponse], T) =
websocketClientFlow(request, localAddress, settings, httpsContext, log)
log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebSocketUpgradeResponse], T) =
webSocketClientFlow(request, connectionContext, localAddress, settings, log)
.joinMat(clientFlow)(Keep.both).run()
/**
@ -493,9 +496,16 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
}
/**
* Gets the current default client-side [[HttpsContext]].
* Gets the current default server-side [[ConnectionContext]] defaults to plain HTTP.
*/
def defaultClientHttpsContext: HttpsContext =
def defaultServerHttpContext: ConnectionContext =
ConnectionContext.noEncryption()
/**
* Gets the current default client-side [[HttpsConnectionContext]].
* Defaults used here can be configured using ssl-config or the context can be replaced using [[setDefaultClientHttpsContext]]
*/
def defaultClientHttpsContext: HttpsConnectionContext =
synchronized {
_defaultClientHttpsContext match {
case null
@ -507,9 +517,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
}
/**
* Sets the default client-side [[HttpsContext]].
* Sets the default client-side [[HttpsConnectionContext]].
*/
def setDefaultClientHttpsContext(context: HttpsContext): Unit =
def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit =
synchronized {
_defaultClientHttpsContext = context
}
@ -518,10 +528,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]]
private def cachedGateway(request: HttpRequest,
settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext],
settings: ConnectionPoolSettings, connectionContext: ConnectionContext,
log: LoggingAdapter)(implicit fm: Materializer): Future[PoolGateway] =
if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) {
val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) effectiveHttpsContext(httpsContext) else None
val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) connectionContext else ConnectionContext.noEncryption()
val setup = ConnectionPoolSetup(settings, httpsCtx, log)
val host = request.uri.authority.host.toString()
val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup)
@ -531,6 +541,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
throw new IllegalUriException(ErrorInfo(msg))
}
/** INTERNAL API */
private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: Materializer): Future[PoolGateway] = {
val gatewayPromise = Promise[PoolGateway]()
hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match {
@ -575,15 +586,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
}
}
private def effectiveHttpsContext(ctx: Option[HttpsContext]): Option[HttpsContext] =
ctx orElse Some(defaultClientHttpsContext)
private[http] def sslTlsStage(httpsContext: Option[HttpsContext], role: Role, hostInfo: Option[(String, Int)] = None) =
httpsContext match {
case Some(hctx)
SslTls(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo)
case None
SslTlsPlacebo.forScala
/** Creates real or placebo SslTls stage based on if ConnectionContext is HTTPS or not. */
private[http] def sslTlsStage(connectionContext: ConnectionContext, role: Role, hostInfo: Option[(String, Int)] = None) =
connectionContext match {
case hctx: HttpsConnectionContext SslTls(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo)
case other SslTlsPlacebo.forScala // if it's not HTTPS, we don't enable SSL/TLS
}
}
@ -709,46 +716,18 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
new HttpExt(system.settings.config getConfig "akka.http")(system)
}
import scala.collection.JavaConverters._
//# https-context-impl
/**
* TLS configuration for an HTTPS server binding or client connection.
* For the sslContext please refer to the com.typeasfe.ssl-config library.
* The remaining four parameters configure the initial session that will
* be negotiated, see [[akka.stream.io.NegotiateNewSession]] for details.
*/
final case class HttpsContext(sslContext: SSLContext,
enabledCipherSuites: Option[immutable.Seq[String]] = None,
enabledProtocols: Option[immutable.Seq[String]] = None,
clientAuth: Option[ClientAuth] = None,
sslParameters: Option[SSLParameters] = None)
//#
extends akka.http.javadsl.HttpsContext {
def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters)
/** Java API */
override def getSslContext: SSLContext = sslContext
/** Java API */
override def getEnabledCipherSuites: Optional[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection).asJava
/** Java API */
override def getEnabledProtocols: Optional[JCollection[String]] = enabledProtocols.map(_.asJavaCollection).asJava
/** Java API */
override def getClientAuth: Optional[ClientAuth] = clientAuth.asJava
/** Java API */
override def getSslParameters: Optional[SSLParameters] = sslParameters.asJava
}
trait DefaultSSLContextCreation {
protected def system: ActorSystem
protected def sslConfig: AkkaSSLConfig
protected def createDefaultClientHttpsContext(): HttpsContext = {
protected def createDefaultClientHttpsContext(): HttpsConnectionContext = {
val config = sslConfig.config
val log = Logging(system, getClass)

View file

@ -2,7 +2,7 @@ package akka.http.impl.engine.client
import javax.net.ssl.SSLContext
import akka.http.scaladsl.{ HttpsContext, Http }
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.http.scaladsl.model.{ HttpHeader, HttpResponse, HttpRequest }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
@ -28,7 +28,7 @@ class ClientCancellationSpec extends AkkaSpec("""
{ req HttpResponse() }, // TLS client does full-close, no need for the connection:close header
addressTls.getHostName,
addressTls.getPort,
httpsContext = Some(HttpsContext(SSLContext.getDefault)))(noncheckedMaterializer)
connectionContext = ConnectionContext.https(SSLContext.getDefault))(noncheckedMaterializer)
def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped {
val requests = TestPublisher.probe[HttpRequest]()

View file

@ -1,5 +1,5 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.impl.engine.client
@ -303,8 +303,7 @@ class ConnectionPoolSpec extends AkkaSpec("""
.transform(StreamUtils.recover { case NoErrorComplete ByteString.empty }),
Flow[ByteString].map(SessionBytes(null, _)))
val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink.fromSubscriber(incomingConnections)
// TODO getHostString in Java7
Tcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout)
Tcp().bind(serverEndpoint.getHostString, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout)
.map { c
val layer = Http().serverLayer(serverSettings, log = log)
Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow)
@ -340,7 +339,7 @@ class ConnectionPoolSpec extends AkkaSpec("""
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
idleTimeout, ClientConnectionSettings(system))
flowTestBench(Http().superPool[T](settings))
flowTestBench(Http().superPool[T](settings = settings))
}
def flowTestBench[T, Mat](poolFlow: Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]) = {

View file

@ -12,7 +12,7 @@ import akka.stream.io._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import akka.http.impl.util._
import akka.http.scaladsl.{ HttpsContext, Http }
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest }
import akka.http.scaladsl.model.headers.{ Host, `Tls-Session-Info` }
import org.scalatest.time.{ Span, Seconds }
@ -88,10 +88,10 @@ class TlsEndpointVerificationSpec extends AkkaSpec("""
}
}
def pipeline(clientContext: HttpsContext, hostname: String): HttpRequest Future[HttpResponse] = req
def pipeline(clientContext: ConnectionContext, hostname: String): HttpRequest Future[HttpResponse] = req
Source.single(req).via(pipelineFlow(clientContext, hostname)).runWith(Sink.head)
def pipelineFlow(clientContext: HttpsContext, hostname: String): Flow[HttpRequest, HttpResponse, NotUsed] = {
def pipelineFlow(clientContext: ConnectionContext, hostname: String): Flow[HttpRequest, HttpResponse, NotUsed] = {
val handler: HttpRequest HttpResponse = { req
// verify Tls-Session-Info header information
val name = req.header[`Tls-Session-Info`].flatMap(_.localPrincipal).map(_.getName)
@ -99,8 +99,8 @@ class TlsEndpointVerificationSpec extends AkkaSpec("""
else HttpResponse(StatusCodes.BadRequest, entity = "Tls-Session-Info header verification failed")
}
val serverSideTls = Http().sslTlsStage(Some(ExampleHttpContexts.exampleServerContext), Server)
val clientSideTls = Http().sslTlsStage(Some(clientContext), Client, Some(hostname -> 8080))
val serverSideTls = Http().sslTlsStage(ExampleHttpContexts.exampleServerContext, Server)
val clientSideTls = Http().sslTlsStage(clientContext, Client, Some(hostname -> 8080))
val server =
Http().serverLayer()

View file

@ -9,12 +9,15 @@ import java.security.{ SecureRandom, KeyStore }
import java.security.cert.{ CertificateFactory, Certificate }
import javax.net.ssl.{ SSLParameters, SSLContext, TrustManagerFactory, KeyManagerFactory }
import akka.http.scaladsl.HttpsContext
import akka.http.scaladsl.HttpsConnectionContext
/**
* These are HTTPS example configurations that take key material from the resources/key folder.
*/
object ExampleHttpContexts {
// TODO show example how to obtain pre-configured context from ssl-config
val exampleServerContext = {
// never put passwords into code!
val password = "abcdef".toCharArray
@ -28,8 +31,9 @@ object ExampleHttpContexts {
val context = SSLContext.getInstance("TLS")
context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom)
HttpsContext(context)
new HttpsConnectionContext(context)
}
val exampleClientContext = {
val certStore = KeyStore.getInstance(KeyStore.getDefaultType)
certStore.load(null, null)
@ -44,7 +48,7 @@ object ExampleHttpContexts {
val params = new SSLParameters()
params.setEndpointIdentificationAlgorithm("https")
HttpsContext(context, sslParameters = Some(params))
new HttpsConnectionContext(context, sslParameters = Some(params))
}
def resourceStream(resourceName: String): InputStream = {

View file

@ -10,7 +10,7 @@ import javax.net.ssl.{ SSLParameters, SSLContext }
import akka.http.javadsl.model.headers.Cookie
import akka.http.scaladsl.model
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.stream.io.ClientAuth
import com.typesafe.sslconfig.ssl.ClientAuth
import org.scalatest.{ FreeSpec, MustMatchers }
import scala.collection.immutable
@ -61,7 +61,7 @@ class JavaApiTestCaseSpecs extends FreeSpec with MustMatchers {
Uri.create("/order").query(JavaApiTestCases.addSessionId(orderId)) must be(Uri.create("/order?orderId=123&session=abcdefghijkl"))
}
"create HttpsContext" in {
akka.http.javadsl.HttpsContext.create(SSLContext.getDefault,
akka.http.javadsl.ConnectionContext.https(SSLContext.getDefault,
Optional.empty[java.util.Collection[String]],
Optional.empty[java.util.Collection[String]],
Optional.empty[ClientAuth],