!htc,str #19275 adjust bind/client apis on Http()

This commit is contained in:
Konrad Malawski 2016-01-21 16:14:05 +01:00
parent 1b47fbeac7
commit 1cd21d9ce2
15 changed files with 423 additions and 258 deletions

View file

@ -16,7 +16,7 @@ final case class HostConnectionPoolSetup(host: String, port: Int, setup: Connect
final case class ConnectionPoolSetup(
settings: ConnectionPoolSettings,
conContext: ConnectionContext = ConnectionContext.noEncryption(),
connectionContext: ConnectionContext = ConnectionContext.noEncryption(),
log: LoggingAdapter)
object ConnectionPoolSetup {

View file

@ -64,8 +64,8 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
import hcps._
import setup._
val connectionFlow = conContext match {
case httpsContext: HttpsConnectionContext Http().outgoingConnectionTls(host, port, httpsContext, None, settings.connectionSettings, setup.log)
val connectionFlow = connectionContext match {
case httpsContext: HttpsConnectionContext Http().outgoingConnectionHttps(host, port, httpsContext, None, settings.connectionSettings, setup.log)
case _ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log)
}
@ -148,7 +148,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
}
def dispatchRequest(pr: PoolRequest): Unit = {
val scheme = Uri.httpScheme(hcps.setup.conContext.isSecure)
val scheme = Uri.httpScheme(hcps.setup.connectionContext.isSecure)
val hostHeader = headers.Host(hcps.host, Uri.normalizePort(hcps.port, scheme))
val effectiveRequest =
pr.request

View file

@ -63,10 +63,10 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private[this] var closeMode: CloseMode = DontClose // signals what to do after the current response
private[this] def close: Boolean = closeMode != DontClose
private[this] def closeIf(cond: Boolean): Unit =
if (cond) closeMode = CloseConnection
var closeMode: CloseMode = DontClose // signals what to do after the current response
def close: Boolean = closeMode != DontClose
def closeIf(cond: Boolean): Unit = if (cond) closeMode = CloseConnection
var transferring = false
setHandler(in, new InHandler {
override def onPush(): Unit =
@ -286,4 +286,4 @@ private[http] sealed trait ResponseRenderingOutput
private[http] object ResponseRenderingOutput {
private[http] case class HttpData(bytes: ByteString) extends ResponseRenderingOutput
private[http] case class SwitchToWebSocket(httpResponseBytes: ByteString, handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends ResponseRenderingOutput
}
}

View file

@ -0,0 +1,107 @@
/*
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl
import java.util.Locale
import java.util.Optional
import akka.http.javadsl.model.Uri
abstract class ConnectHttp {
def host: String
def port: Int
def isHttps: Boolean
def connectionContext: Optional[HttpsConnectionContext]
final def effectiveConnectionContext(fallbackContext: HttpsConnectionContext): HttpsConnectionContext =
connectionContext.orElse(fallbackContext)
}
object ConnectHttp {
// TODO may be optimised a bit to avoid parsing the Uri entirely for the known port cases
/** Extracts host data from given Uri. */
def toHost(uriHost: Uri): ConnectHttp = {
val s = uriHost.scheme.toLowerCase(Locale.ROOT)
if (s == "https") new ConnectHttpsImpl(uriHost.host.address, uriHost.port)
else new ConnectHttpImpl(uriHost.host.address, uriHost.port)
}
def toHost(host: String): ConnectHttp =
toHost(Uri.create(host))
def toHost(host: String, port: Int): ConnectHttp = {
require(port > 0, "port must be > 0")
toHost(Uri.create(host).port(port))
}
/**
* Extracts host data from given Uri.
* Forces an HTTPS connection to the given host, using the default HTTPS context and default port.
*/
@throws(classOf[IllegalArgumentException])
def toHostHttps(uriHost: Uri): ConnectHttp.UsingHttps = {
val s = uriHost.scheme.toLowerCase(Locale.ROOT)
require(s == "" || s == "https", "toHostHttps used with non https scheme! Was: " + uriHost)
val httpsHost = uriHost.scheme("https") // for effective port calculation
new ConnectHttpsImpl(httpsHost.host.address, effectivePort(uriHost))
}
/** Forces an HTTPS connection to the given host, using the default HTTPS context and default port. */
@throws(classOf[IllegalArgumentException])
def toHostHttps(host: String): ConnectHttp.UsingHttps =
toHostHttps(Uri.create(host))
/** Forces an HTTPS connection to the given host, using the default HTTPS context and given port. */
@throws(classOf[IllegalArgumentException])
def toHostHttps(host: String, port: Int): ConnectHttp.UsingHttps = {
require(port > 0, "port must be > 0")
toHostHttps(Uri.create(host).port(port).host.address)
}
private def effectivePort(uri: Uri): Int = {
val s = uri.scheme.toLowerCase(Locale.ROOT)
effectivePort(s, -1)
}
private def effectivePort(scheme: String, port: Int): Int = {
val s = scheme.toLowerCase(Locale.ROOT)
if (port > 0) port
else if (s == "https" || s == "wss") 443
else if (s == "http" || s == "ws") 80
else throw new IllegalArgumentException("Scheme is not http/https/ws/wss and no port given!")
}
trait UsingHttps extends ConnectHttp {
def withCustomHttpsContext(context: HttpsConnectionContext): ConnectHttp.UsingCustomHttps
}
trait UsingCustomHttps extends ConnectHttp {
def withDefaultContext: ConnectHttp.UsingHttps
}
}
/** INTERNAL API */
final class ConnectHttpImpl(val host: String, val port: Int) extends ConnectHttp {
def isHttps: Boolean = false
def connectionContext: Optional[HttpsConnectionContext] = Optional.empty()
}
final class ConnectHttpsImpl(val host: String, val port: Int, val context: Optional[HttpsConnectionContext] = Optional.empty())
extends ConnectHttp with ConnectHttp.UsingHttps with ConnectHttp.UsingCustomHttps {
override def isHttps: Boolean = true
override def withCustomHttpsContext(context: HttpsConnectionContext): ConnectHttp.UsingCustomHttps =
new ConnectHttpsImpl(host, port, Optional.of(context))
override def withDefaultContext: ConnectHttp.UsingHttps =
new ConnectHttpsImpl(host, port, Optional.empty())
override def connectionContext: Optional[HttpsConnectionContext] = context
}

View file

@ -4,11 +4,9 @@
package akka.http.javadsl
import java.util.{ Collection JCollection, Optional }
import java.{ util ju }
import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.http.scaladsl
import com.typesafe.sslconfig.ssl.ClientAuth
import akka.stream.io.ClientAuth
import scala.compat.java8.OptionConverters
@ -27,18 +25,18 @@ object ConnectionContext {
scaladsl.ConnectionContext.noEncryption()
}
trait ConnectionContext {
abstract class ConnectionContext {
def isSecure: Boolean
/** Java API */
def getDefaultPort: Int
}
trait HttpConnectionContext extends ConnectionContext {
trait HttpConnectionContext extends akka.http.javadsl.ConnectionContext {
override final def isSecure = false
override final def getDefaultPort = 80
}
trait HttpsConnectionContext extends ConnectionContext {
trait HttpsConnectionContext extends akka.http.javadsl.ConnectionContext {
override final def isSecure = true
override final def getDefaultPort = 443

View file

@ -7,8 +7,9 @@ package akka.http.javadsl
import java.net.InetSocketAddress
import java.util.Optional
import akka.http.impl.util.JavaMapping
import akka.http.impl.util.JavaMapping.HttpsConnectionContext
import akka.http.javadsl.model.ws._
import akka.{ NotUsed, stream }
import akka.{ stream, NotUsed }
import akka.stream.io.{ SslTlsInbound, SslTlsOutbound }
import scala.language.implicitConversions
@ -103,10 +104,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
settings: ServerSettings,
connectionContext: ConnectionContext,
settings: ServerSettings,
materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] =
new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala)(materializer)
new Source(delegate.bind(interface, port, settings = settings, connectionContext = ConnectionContext.noEncryption().asScala)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
@ -123,11 +124,30 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
settings: ServerSettings,
connectionContext: ConnectionContext,
materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] =
new Source(delegate.bind(interface, port, connectionContext = connectionContext.asScala)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] =
new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala, log)(materializer)
new Source(delegate.bind(interface, port, ConnectionContext.noEncryption().asScala, settings, log)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
@ -288,61 +308,22 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* If the hostname is given with an `https://` prefix, the default [[HttpsConnectionContext]] will be used.
*/
def outgoingConnection(host: String): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
outgoingConnection(host, 80)
outgoingConnection(ConnectHttp.toHost(host))
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
* Use the [[ConnectHttp]] DSL to configure target host and whether HTTPS should be used.
*/
def outgoingConnectionTls(host: String): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
outgoingConnectionTls(host, defaultClientHttpsContext)
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*/
def outgoingConnectionTls(host: String, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
outgoingConnectionTls(host, 443, connectionContext)
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*/
def outgoingConnection(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnection(host, port))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def outgoingConnectionTls(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnection(host, port))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def outgoingConnectionTls(host: String, port: Int, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
def outgoingConnection(to: ConnectHttp): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
adaptOutgoingFlow {
if (to.isHttps) delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asScala)
else delegate.outgoingConnection(to.host, to.port)
}
/**
@ -350,46 +331,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*/
def outgoingConnection(host: String, port: Int,
connectionContext: ConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, settings, log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def outgoingConnectionTls(host: String, port: Int,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, defaultClientHttpsContext.asScala, localAddress.asScala, settings, log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def outgoingConnectionTls(host: String, port: Int,
connectionContext: HttpsConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala, localAddress.asScala, settings, log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
adaptOutgoingFlow {
connectionContext match {
case https: HttpsConnectionContext delegate.outgoingConnectionHttps(host, port, https.asScala, localAddress.asScala, settings, log)
case _ delegate.outgoingConnection(host, port, localAddress.asScala, settings, log)
}
}
/**
@ -406,28 +356,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](host: String, port: Int,
materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int,
materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int,
connectionContext: HttpsConnectionContext,
materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala)(materializer))
def newHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
newHostConnectionPool[T](ConnectHttp.toHost(host), materializer)
/**
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
@ -443,31 +373,23 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](host: String, port: Int,
def newHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](to.host, to.port)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The given [[ConnectionContext]] will be used for encryption on the connection.
*/
def newHostConnectionPool[T](to: ConnectHttp,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala, settings, log)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
*/
def newHostConnectionPoolTls[T](host: String, port: Int,
connectionContext: HttpsConnectionContext,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer))
adaptTupleFlow {
to.effectiveConnectionContext(defaultClientHttpsContext) match {
case https: HttpsConnectionContext delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings, log)(materializer)
case _ delegate.newHostConnectionPool[T](to.host, to.port, settings, log)(materializer)
}
}
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
@ -486,16 +408,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer))
/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala)(materializer))
def cachedHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
cachedHostConnectionPool(ConnectHttp.toHost(host), materializer)
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
@ -514,21 +428,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](host: String, port: Int,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer))
def cachedHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](to.host, to.port)(materializer))
/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*
* The given [[HttpsConnectionContext]] will be used for encryption on the connection.
* The given [[ConnectionContext]] will be used for encryption on the connection.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int,
settings: ConnectionPoolSettings,
connectionContext: HttpsConnectionContext,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer))
def cachedHostConnectionPool[T](to: ConnectHttp,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asScala, settings, log)(materializer))
/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
@ -596,6 +507,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def singleRequest(request: HttpRequest, materializer: Materializer): Future[HttpResponse] =
delegate.singleRequest(request.asScala)(materializer)
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext, materializer: Materializer): Future[HttpResponse] =
delegate.singleRequest(request.asScala, connectionContext.asScala)(materializer)
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
@ -656,7 +579,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* The layer is not reusable and must only be materialized once.
*/
def webSocketClientFlow(request: WebSocketRequest,
connectionContext: HttpsConnectionContext,
connectionContext: ConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
@ -687,7 +610,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def singleWebSocketRequest[T](request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
connectionContext: HttpsConnectionContext,
connectionContext: ConnectionContext,
materializer: Materializer): Pair[Future[WebSocketUpgradeResponse], T] =
adaptWsResultTuple {
delegate.singleWebSocketRequest(
@ -702,7 +625,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def singleWebSocketRequest[T](request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
connectionContext: HttpsConnectionContext,
connectionContext: ConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter,
@ -728,12 +651,19 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools()
/**
* Gets the current default client-side [[HttpsConnectionContext]].
* Gets the default
* @return
*/
def defaultClientHttpsContext: HttpsConnectionContext = delegate.defaultClientHttpsContext
def defaultServerHttpContext: ConnectionContext =
delegate.defaultServerHttpContext
/**
* Sets the default client-side [[HttpsConnectionContext]].
* Gets the current default client-side [[ConnectionContext]].
*/
def defaultClientHttpsContext: akka.http.javadsl.HttpsConnectionContext = delegate.defaultClientHttpsContext
/**
* Sets the default client-side [[ConnectionContext]].
*/
def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit =
delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsConnectionContext])
@ -743,6 +673,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat])
}
private def adaptOutgoingFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[scaladsl.model.HttpRequest, scaladsl.model.HttpResponse, Future[scaladsl.Http.OutgoingConnection]]): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(scalaFlow)(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
private def adaptServerLayer(serverLayer: scaladsl.Http.ServerLayer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
new BidiFlow(
JavaMapping.adapterBidiFlow[HttpResponse, sm.HttpResponse, sm.HttpRequest, HttpRequest]

View file

@ -4,14 +4,12 @@
package akka.http.scaladsl
import akka.stream.io.NegotiateNewSession
import akka.stream.io.{ ClientAuth, NegotiateNewSession }
import scala.collection.JavaConverters._
import java.util.{ Optional, Collection JCollection }
import javax.net.ssl._
import com.typesafe.sslconfig.ssl.ClientAuth
import scala.collection.immutable
import scala.compat.java8.OptionConverters._

View file

@ -27,7 +27,7 @@ import akka.stream.scaladsl._
import com.typesafe.config.Config
import com.typesafe.sslconfig.akka._
import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory
import com.typesafe.sslconfig.ssl._
import com.typesafe.sslconfig.ssl.ConfigSSLContextBuilder
import scala.concurrent.{ ExecutionContext, Future, Promise, TimeoutException }
import scala.util.Try
@ -42,7 +42,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
// configured default HttpsContext for the client-side
// SYNCHRONIZED ACCESS ONLY!
private[this] var _defaultClientHttpsContext: HttpsConnectionContext = _
private[this] var _defaultClientHttpsConnectionContext: HttpsConnectionContext = _
private[this] var _defaultServerConnectionContext: ConnectionContext = _
// ** SERVER ** //
@ -70,8 +71,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly.
*/
def bind(interface: String, port: Int = DefaultPortForProtocol,
settings: ServerSettings = ServerSettings(system),
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Source[IncomingConnection, Future[ServerBinding]] = {
val effectivePort = if (port >= 0) port else connectionContext.defaultPort
val tlsStage = sslTlsStage(connectionContext, Server)
@ -114,7 +115,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
throw e
}
bind(interface, port, settings, connectionContext, log)
bind(interface, port, connectionContext, settings, log)
.mapAsyncUnordered(settings.maxConnections) { connection
handleOneConnection(connection).recoverWith {
// Ignore incoming errors from the connection as they will cancel the binding.
@ -207,11 +208,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* To configure additional settings for requests made using this method,
* use the `akka.http.client` config section or pass in a [[ClientConnectionSettings]] explicitly.
*/
def outgoingConnectionTls(host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
def outgoingConnectionHttps(host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
_outgoingConnection(host, port, localAddress, settings, connectionContext, log)
private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress],
@ -287,10 +288,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* To configure additional settings for the pool (and requests made using it),
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def newHostConnectionPoolTls[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
def newHostConnectionPoolHttps[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, connectionContext, log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
}
@ -352,10 +353,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* To configure additional settings for the pool (and requests made using it),
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, connectionContext, log)
val setup = HostConnectionPoolSetup(host, port, cps)
cachedHostConnectionPool(setup)
@ -378,7 +379,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)(
private def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)(
implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
gatewayClientFlow(setup, cachedGateway(setup))
@ -449,8 +450,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
require(uri.isAbsolute, s"WebSocket request URI must be absolute but was '$uri'")
val ctx = uri.scheme match {
case "ws" ConnectionContext.noEncryption()
case "wss" connectionContext
case "ws" ConnectionContext.noEncryption()
case "wss" if connectionContext.isSecure connectionContext
case "wss" throw new IllegalArgumentException("Provided connectionContext is not secure, yet request to secure `wss` endpoint detected!")
case scheme
throw new IllegalArgumentException(s"Illegal URI scheme '$scheme' in '$uri' for WebSocket request. " +
s"WebSocket requests must use either 'ws' or 'wss'")
@ -468,7 +470,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
*/
def singleWebSocketRequest[T](request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
connectionContext: ConnectionContext = defaultClientHttpsContext,
localAddress: Option[InetSocketAddress] = None,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebSocketUpgradeResponse], T) =
@ -496,7 +498,20 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* Gets the current default server-side [[ConnectionContext]] defaults to plain HTTP.
*/
def defaultServerHttpContext: ConnectionContext =
ConnectionContext.noEncryption()
synchronized {
if (_defaultServerConnectionContext == null)
_defaultServerConnectionContext = ConnectionContext.noEncryption()
_defaultServerConnectionContext
}
/**
* Sets the default server-side [[ConnectionContext]].
* If it is an instance of [[HttpsConnectionContext]] then the server will be bound using HTTPS.
*/
def setDefaultClientHttpsContext(context: ConnectionContext): Unit =
synchronized {
_defaultServerConnectionContext = context
}
/**
* Gets the current default client-side [[HttpsConnectionContext]].
@ -504,10 +519,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
*/
def defaultClientHttpsContext: HttpsConnectionContext =
synchronized {
_defaultClientHttpsContext match {
_defaultClientHttpsConnectionContext match {
case null
val ctx = createDefaultClientHttpsContext()
_defaultClientHttpsContext = ctx
_defaultClientHttpsConnectionContext = ctx
ctx
case ctx ctx
}
@ -518,7 +533,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
*/
def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit =
synchronized {
_defaultClientHttpsContext = context
_defaultClientHttpsConnectionContext = context
}
// every ActorSystem maintains its own connection pools
@ -754,15 +769,17 @@ trait DefaultSSLContextCreation {
defaultParams.setCipherSuites(cipherSuites)
// auth!
import com.typesafe.sslconfig.ssl.{ ClientAuth SslClientAuth }
val clientAuth = config.sslParametersConfig.clientAuth match {
case ClientAuth.Default None
case auth Some(auth)
case SslClientAuth.Default None
case SslClientAuth.Want Some(ClientAuth.Want)
case SslClientAuth.Need Some(ClientAuth.Need)
case SslClientAuth.None Some(ClientAuth.None)
}
// hostname!
defaultParams.setEndpointIdentificationAlgorithm("https")
// new HttpsConnectionContext(sslContext, Some(cipherSuites.toList), Some(defaultProtocols.toList), clientAuth, Some(defaultParams))
new HttpsConnectionContext(sslContext, None, None, None, Some(defaultParams)) // previously
new HttpsConnectionContext(sslContext, Some(cipherSuites.toList), Some(defaultProtocols.toList), clientAuth, Some(defaultParams))
}
}

View file

@ -43,11 +43,13 @@ sealed abstract case class Uri(scheme: String, authority: Authority, path: Path,
def queryString(charset: Charset = UTF8): Option[String] = rawQueryString.map(s decode(s, charset))
/**
* INTERNAL API
*
* The effective port of this Uri given the currently set authority and scheme values.
* If the authority has an explicitly set port (i.e. a non-zero port value) then this port
* is the effective port. Otherwise the default port for the current scheme is returned.
*/
def effectivePort: Int = if (authority.port != 0) authority.port else defaultPorts(scheme)
private[akka] def effectivePort: Int = if (authority.port != 0) authority.port else defaultPorts(scheme)
/**
* Returns a copy of this Uri with the given components.
@ -592,7 +594,7 @@ object Uri {
}
}
val defaultPorts: Map[String, Int] =
private val defaultPorts: Map[String, Int] =
Map("ftp" -> 21, "ssh" -> 22, "telnet" -> 23, "smtp" -> 25, "domain" -> 53, "tftp" -> 69, "http" -> 80, "ws" -> 80,
"pop3" -> 110, "nntp" -> 119, "imap" -> 143, "snmp" -> 161, "ldap" -> 389, "https" -> 443, "wss" -> 443, "imaps" -> 993,
"nfs" -> 2049).withDefaultValue(-1)

View file

@ -57,7 +57,7 @@ class ClientCancellationSpec extends AkkaSpec("""
"support cancellation in simple outgoing connection with TLS" in {
pending
testCase(
Http().outgoingConnectionTls(addressTls.getHostName, addressTls.getPort))
Http().outgoingConnectionHttps(addressTls.getHostName, addressTls.getPort))
}
"support cancellation in pooled outgoing connection with TLS" in {
@ -65,7 +65,7 @@ class ClientCancellationSpec extends AkkaSpec("""
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolTls(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer))
.via(Http().cachedHostConnectionPoolHttps(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer))
.map(_._1.get))
}

View file

@ -33,7 +33,7 @@ object TestClient extends App {
def fetchServerVersion1(): Unit = {
println(s"Fetching HTTPS server version of host `$host` via a direct low-level connection ...")
val connection = Http().outgoingConnectionTls(host)
val connection = Http().outgoingConnectionHttps(host)
val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head)
result.map(_.header[headers.Server]) onComplete {
case Success(res)