!htc move backlog and socket options to configuration in an effort to decrease API surface

This commit is contained in:
Johannes Rudolph 2015-05-22 11:31:45 +02:00
parent 454a393af1
commit 72e2e51b83
9 changed files with 127 additions and 74 deletions

View file

@ -51,6 +51,13 @@ akka.http {
# doesn't have to be fiddled with in most applications.
response-header-size-hint = 512
# The requested maximum length of the queue of incoming connections.
# If the server is busy and the backlog is full the OS will start dropping
# SYN-packets and connection attempts may fail. Note, that the backlog
# size is usually only a maximum size hint for the OS and the OS can
# restrict the number further based on global limits.
backlog = 100
# If this setting is empty the server only accepts requests that carry a
# non-empty `Host` header. Otherwise it responds with `400 Bad Request`.
# Set to a non-empty value to be used in lieu of a missing or empty `Host`
@ -61,6 +68,18 @@ akka.http {
# Examples: `www.spray.io` or `example.com:8080`
default-host-header = ""
# Socket options to set for the listening socket. If a setting is left
# undefined, it will use whatever the default on the system is.
socket-options {
so-receive-buffer-size = undefined
so-send-buffer-size = undefined
so-reuse-address = undefined
so-traffic-class = undefined
tcp-keep-alive = undefined
tcp-oob-inline = undefined
tcp-no-delay = undefined
}
# Modify to tweak parsing settings on the server-side only.
parsing = ${akka.http.parsing}
}
@ -103,6 +122,18 @@ akka.http {
https = default
}
# Socket options to set for the listening socket. If a setting is left
# undefined, it will use whatever the default on the system is.
socket-options {
so-receive-buffer-size = undefined
so-send-buffer-size = undefined
so-reuse-address = undefined
so-traffic-class = undefined
tcp-keep-alive = undefined
tcp-oob-inline = undefined
tcp-no-delay = undefined
}
# Modify to tweak parsing settings on the client-side only.
parsing = ${akka.http.parsing}
}

View file

@ -4,18 +4,24 @@
package akka.http
import akka.actor.ActorSystem
import akka.http.impl.util._
import akka.http.scaladsl.model.headers.`User-Agent`
import com.typesafe.config.Config
import akka.io.Inet.SocketOption
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.collection.immutable
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.http.impl.util._
import akka.http.scaladsl.model.headers.`User-Agent`
final case class ClientConnectionSettings(
userAgentHeader: Option[`User-Agent`],
connectingTimeout: FiniteDuration,
idleTimeout: Duration,
requestHeaderSizeHint: Int,
socketOptions: immutable.Traversable[SocketOption],
parserSettings: ParserSettings) {
require(connectingTimeout >= Duration.Zero, "connectingTimeout must be >= 0")
@ -29,6 +35,7 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin
c getFiniteDuration "connecting-timeout",
c getPotentiallyInfiniteDuration "idle-timeout",
c getIntBytes "request-header-size-hint",
SocketOptionSettings fromSubConfig c.getConfig("socket-options"),
ParserSettings fromSubConfig c.getConfig("parsing"))
}

View file

@ -18,18 +18,16 @@ import akka.io.Inet
final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup)
final case class ConnectionPoolSetup(
options: immutable.Traversable[Inet.SocketOption],
settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter)
object ConnectionPoolSetup {
/** Java API */
def create(options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings,
def create(settings: ConnectionPoolSettings,
httpsContext: akka.japi.Option[akka.http.javadsl.HttpsContext],
log: LoggingAdapter): ConnectionPoolSetup =
ConnectionPoolSetup(immutableSeq(options), settings, httpsContext.map(_.asInstanceOf[HttpsContext]), log)
ConnectionPoolSetup(settings, httpsContext.map(_.asInstanceOf[HttpsContext]), log)
}
final case class ConnectionPoolSettings(

View file

@ -4,15 +4,20 @@
package akka.http
import akka.ConfigurationException
import akka.actor.{ ActorSystem, ActorRefFactory }
import akka.http.impl.util._
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.{ Host, Server }
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.collection.immutable
import scala.concurrent.duration._
import akka.ConfigurationException
import akka.actor.{ ActorSystem, ActorRefFactory }
import akka.io.Inet.SocketOption
import akka.http.impl.util._
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.{ Host, Server }
final case class ServerSettings(
serverHeader: Option[Server],
@ -22,10 +27,13 @@ final case class ServerSettings(
transparentHeadRequests: Boolean,
verboseErrorMessages: Boolean,
responseHeaderSizeHint: Int,
backlog: Int,
socketOptions: immutable.Traversable[SocketOption],
defaultHostHeader: Host,
parserSettings: ParserSettings) {
require(0 <= responseHeaderSizeHint, "response-size-hint must be > 0")
require(0 <= backlog, "backlog must be > 0")
}
object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") {
@ -45,6 +53,8 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve
c getBoolean "transparent-head-requests",
c getBoolean "verbose-error-messages",
c getIntBytes "response-header-size-hint",
c getInt "backlog",
SocketOptionSettings fromSubConfig c.getConfig("socket-options"),
defaultHostHeader =
HttpHeader.parse("Host", c getString "default-host-header") match {
case HttpHeader.ParsingResult.Ok(x: Host, Nil) x

View file

@ -59,8 +59,8 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
import hcps._
import setup._
val connectionFlow =
if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, options, settings.connectionSettings, setup.log)
else Http().outgoingConnectionTls(host, port, None, options, settings.connectionSettings, httpsContext, setup.log)
if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log)
else Http().outgoingConnectionTls(host, port, None, settings.connectionSettings, httpsContext, setup.log)
val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log)
Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self)))
}

View file

@ -0,0 +1,30 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.impl.util
import akka.io.{ Tcp, Inet }
import scala.collection.immutable
import akka.io.Inet.SocketOption
import com.typesafe.config.Config
private[http] object SocketOptionSettings {
def fromSubConfig(config: Config): immutable.Traversable[SocketOption] = {
def so[T](setting: String)(f: (Config, String) T)(cons: T SocketOption): List[SocketOption] =
config.getString(setting) match {
case "undefined" Nil
case x cons(f(config, setting)) :: Nil
}
so("so-receive-buffer-size")(_ getIntBytes _)(Inet.SO.ReceiveBufferSize) :::
so("so-send-buffer-size")(_ getIntBytes _)(Inet.SO.SendBufferSize) :::
so("so-reuse-address")(_ getBoolean _)(Inet.SO.ReuseAddress) :::
so("so-traffic-class")(_ getInt _)(Inet.SO.TrafficClass) :::
so("tcp-keep-alive")(_ getBoolean _)(Tcp.SO.KeepAlive) :::
so("tcp-oob-inline")(_ getBoolean _)(Tcp.SO.OOBInline) :::
so("tcp-no-delay")(_ getBoolean _)(Tcp.SO.TcpNoDelay)
}
}

View file

@ -62,12 +62,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
backlog: Int, options: JIterable[Inet.SocketOption],
settings: ServerSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter,
materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer)
Source.adapt(delegate.bind(interface, port, settings, httpsContext, log)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec)))
@ -96,13 +95,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int,
backlog: Int, options: JIterable[Inet.SocketOption],
settings: ServerSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter,
materializer: FlowMaterializer): Future[ServerBinding] =
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer)
interface, port, settings, httpsContext, log)(materializer)
.map(new ServerBinding(_))(ec)
/**
@ -129,13 +127,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
interface: String, port: Int,
backlog: Int, options: JIterable[Inet.SocketOption],
settings: ServerSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter,
materializer: FlowMaterializer): Future[ServerBinding] =
delegate.bindAndHandleSync(handler.apply(_).asScala,
interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer)
interface, port, settings, httpsContext, log)(materializer)
.map(new ServerBinding(_))(ec)
/**
@ -162,12 +159,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
interface: String, port: Int,
backlog: Int, options: JIterable[Inet.SocketOption],
settings: ServerSettings, httpsContext: Option[HttpsContext],
parallelism: Int, log: LoggingAdapter,
materializer: FlowMaterializer): Future[ServerBinding] =
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]],
interface, port, backlog, immutableSeq(options), settings, httpsContext, parallelism, log)(materializer)
interface, port, settings, httpsContext, parallelism, log)(materializer)
.map(new ServerBinding(_))(ec)
/**
@ -197,12 +193,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def outgoingConnection(host: String, port: Int,
localAddress: Option[InetSocketAddress],
options: JIterable[Inet.SocketOption],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.wrap {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, immutableSeq(options), settings, log))(Keep.right)
.viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, settings, log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
@ -214,13 +209,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def outgoingConnectionTls(host: String, port: Int,
localAddress: Option[InetSocketAddress],
options: JIterable[Inet.SocketOption],
settings: ClientConnectionSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
Flow.wrap {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, immutableSeq(options), settings,
.viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, settings,
httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log))(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))
}
@ -266,7 +260,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer))
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
@ -279,7 +273,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, immutableSeq(options), settings,
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings,
httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer))
/**
@ -343,10 +337,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* object of type ``T`` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](host: String, port: Int,
options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer))
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer))
/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
@ -355,11 +348,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* for encryption on the connection.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int,
options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, immutableSeq(options), settings,
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings,
httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer))
/**
@ -412,11 +404,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response.
*/
def superPool[T](options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings,
def superPool[T](settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] =
adaptTupleFlow(delegate.superPool[T](immutableSeq(options), settings, httpsContext, log)(materializer))
adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer))
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
@ -439,11 +430,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* the future will be completed with an error.
*/
def singleRequest(request: HttpRequest,
options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: FlowMaterializer): Future[HttpResponse] =
delegate.singleRequest(request.asScala, immutableSeq(options), settings, httpsContext, log)(materializer)
delegate.singleRequest(request.asScala, settings, httpsContext, log)(materializer)
/**
* Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]].

View file

@ -50,15 +50,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* If no ``port`` is explicitly given (or the port value is negative) the protocol's default port will be used,
* which is 80 for HTTP and 443 for HTTPS.
*/
def bind(interface: String, port: Int = -1, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
def bind(interface: String, port: Int = -1,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = {
val effectivePort = if (port >= 0) port else if (httpsContext.isEmpty) 80 else 443
val tlsStage = sslTlsStage(httpsContext, Server)
val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
Tcp().bind(interface, effectivePort, backlog, options, settings.timeouts.idleTimeout)
Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, settings.timeouts.idleTimeout)
connections.map {
case Tcp.IncomingConnection(localAddress, remoteAddress, flow)
val layer = serverLayer(settings, log)
@ -77,12 +76,11 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* present a DoS risk!
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = -1, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
interface: String, port: Int = -1,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bind(interface, port, backlog, options, settings, httpsContext, log).to {
bind(interface, port, settings, httpsContext, log).to {
Sink.foreach { incomingConnection
try incomingConnection.flow.joinMat(handler)(Keep.both).run()
catch {
@ -102,12 +100,11 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* present a DoS risk!
*/
def bindAndHandleSync(handler: HttpRequest HttpResponse,
interface: String, port: Int = -1, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
interface: String, port: Int = -1,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bindAndHandle(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, httpsContext, log)
bindAndHandle(Flow[HttpRequest].map(handler), interface, port, settings, httpsContext, log)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler``
@ -118,13 +115,12 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* present a DoS risk!
*/
def bindAndHandleAsync(handler: HttpRequest Future[HttpResponse],
interface: String, port: Int = -1, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
interface: String, port: Int = -1,
settings: ServerSettings = ServerSettings(system),
httpsContext: Option[HttpsContext] = None,
parallelism: Int = 1,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, backlog, options, settings, httpsContext, log)
bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, settings, httpsContext, log)
/**
* The type of the server-side HTTP layer as a stand-alone BidiStage
@ -160,10 +156,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
*/
def outgoingConnection(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
_outgoingConnection(host, port, localAddress, options, settings, None, log)
_outgoingConnection(host, port, localAddress, settings, None, log)
/**
* Same as [[outgoingConnection]] but for encrypted (HTTPS) connections.
@ -173,21 +168,19 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
*/
def outgoingConnectionTls(host: String, port: Int = 443,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ClientConnectionSettings = ClientConnectionSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
_outgoingConnection(host, port, localAddress, options, settings, effectiveHttpsContext(httpsContext), log)
_outgoingConnection(host, port, localAddress, settings, effectiveHttpsContext(httpsContext), log)
private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress],
options: immutable.Traversable[Inet.SocketOption],
settings: ClientConnectionSettings, httpsContext: Option[HttpsContext],
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
val remoteAddr = new InetSocketAddress(host, port)
val layer = clientLayer(remoteAddr, settings, log)
val tlsStage = sslTlsStage(httpsContext, Client)
val transportFlow = Tcp().outgoingConnection(remoteAddr, localAddress,
options, settings.connectingTimeout, settings.idleTimeout)
settings.socketOptions, settings.connectingTimeout, settings.idleTimeout)
layer.atop(tlsStage).joinMat(transportFlow) { (_, tcpConnFuture)
import system.dispatcher
@ -238,10 +231,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* object of type ``T`` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](host: String, port: Int = 80,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(options, settings, None, log)
val cps = ConnectionPoolSetup(settings, None, log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
}
@ -252,11 +244,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* for encryption on the connections.
*/
def newHostConnectionPoolTls[T](host: String, port: Int = 443,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(options, settings, effectiveHttpsContext(httpsContext), log)
val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
}
@ -298,10 +289,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* object of type ``T`` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](host: String, port: Int = 80,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(options, settings, None, log)
val cps = ConnectionPoolSetup(settings, None, log)
val setup = HostConnectionPoolSetup(host, port, cps)
cachedHostConnectionPool(setup)
}
@ -313,11 +303,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* for encryption on the connections.
*/
def cachedHostConnectionPoolTls[T](host: String, port: Int = 80,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(options, settings, effectiveHttpsContext(httpsContext), log)
val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log)
val setup = HostConnectionPoolSetup(host, port, cps)
cachedHostConnectionPool(setup)
}
@ -357,11 +346,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response.
*/
def superPool[T](options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
def superPool[T](settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] =
clientFlow[T](settings) { request request -> cachedGateway(request, options, settings, httpsContext, log) }
clientFlow[T](settings) { request request -> cachedGateway(request, settings, httpsContext, log) }
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
@ -373,12 +361,11 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* Note that the request must have an absolute URI, otherwise the future will be completed with an error.
*/
def singleRequest(request: HttpRequest,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
httpsContext: Option[HttpsContext] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[HttpResponse] =
try {
val gatewayFuture = cachedGateway(request, options, settings, httpsContext, log)
val gatewayFuture = cachedGateway(request, settings, httpsContext, log)
gatewayFuture.flatMap(_(request))(fm.executionContext)
} catch {
case e: IllegalUriException FastFuture.failed(e)
@ -425,12 +412,12 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
// every ActorSystem maintains its own connection pools
private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]]
private def cachedGateway(request: HttpRequest, options: immutable.Traversable[Inet.SocketOption],
private def cachedGateway(request: HttpRequest,
settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext],
log: LoggingAdapter)(implicit fm: FlowMaterializer): Future[PoolGateway] =
if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) {
val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) effectiveHttpsContext(httpsContext) else None
val setup = ConnectionPoolSetup(options, settings, httpsCtx, log)
val setup = ConnectionPoolSetup(settings, httpsCtx, log)
val host = request.uri.authority.host.toString()
val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup)
cachedGateway(hcps)

View file

@ -275,7 +275,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
idleTimeout, ClientConnectionSettings(system))
flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, Nil, settings))
flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, settings))
}
def superPool[T](maxConnections: Int = 2,
@ -286,7 +286,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
idleTimeout, ClientConnectionSettings(system))
flowTestBench(Http().superPool[T](Nil, settings))
flowTestBench(Http().superPool[T](settings))
}
def flowTestBench[T, Mat](poolFlow: Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]) = {