Merge pull request #19684 from 2m/wip-#19662-reuse-pool-settings-2m

#19662 Reuse connection pool settings if none specified
This commit is contained in:
Roland Kuhn 2016-02-11 14:55:40 +01:00
commit af95476d1e
9 changed files with 153 additions and 14 deletions

View file

@ -29,6 +29,8 @@ private[akka] final case class ClientConnectionSettingsImpl(
require(connectingTimeout >= Duration.Zero, "connectingTimeout must be >= 0")
require(requestHeaderSizeHint > 0, "request-size-hint must be > 0")
override def productPrefix = "ClientConnectionSettings"
}
object ClientConnectionSettingsImpl extends SettingsCompanion[ClientConnectionSettingsImpl]("akka.http.client") {
@ -44,4 +46,4 @@ object ClientConnectionSettingsImpl extends SettingsCompanion[ClientConnectionSe
parserSettings = ParserSettingsImpl.fromSubConfig(root, c.getConfig("parsing")))
}
}
}

View file

@ -26,6 +26,7 @@ private[akka] final case class ConnectionPoolSettingsImpl(
require(pipeliningLimit > 0, "pipelining-limit must be > 0")
require(idleTimeout >= Duration.Zero, "idle-timeout must be >= 0")
override def productPrefix = "ConnectionPoolSettings"
}
object ConnectionPoolSettingsImpl extends SettingsCompanion[ConnectionPoolSettingsImpl]("akka.http.host-connection-pool") {

View file

@ -46,9 +46,16 @@ private[akka] final case class ParserSettingsImpl(
override def headerValueCacheLimit(headerName: String): Int =
headerValueCacheLimits.getOrElse(headerName, defaultHeaderValueCacheLimit)
override def productPrefix = "ParserSettings"
}
object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.http.parsing") {
// for equality
private[this] val noCustomMethods: String Option[HttpMethod] = _ None
private[this] val noCustomStatusCodes: Int Option[StatusCode] = _ None
def fromSubConfig(root: Config, inner: Config) = {
val c = inner.withFallback(root.getConfig(prefix))
val cacheConfig = c getConfig "header-cache"
@ -69,8 +76,8 @@ object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.ht
ErrorLoggingVerbosity(c getString "error-logging-verbosity"),
cacheConfig.entrySet.asScala.map(kvp kvp.getKey -> cacheConfig.getInt(kvp.getKey))(collection.breakOut),
c getBoolean "tls-session-info-header",
_ None,
_ None)
noCustomMethods,
noCustomStatusCodes)
}
}

View file

@ -15,7 +15,10 @@ final case class RoutingSettingsImpl(
rangeCountLimit: Int,
rangeCoalescingThreshold: Long,
decodeMaxBytesPerChunk: Int,
fileIODispatcher: String) extends akka.http.scaladsl.settings.RoutingSettings
fileIODispatcher: String) extends akka.http.scaladsl.settings.RoutingSettings {
override def productPrefix = "RoutingSettings"
}
object RoutingSettingsImpl extends SettingsCompanion[RoutingSettingsImpl]("akka.http.routing") {
def fromSubConfig(root: Config, c: Config) = new RoutingSettingsImpl(

View file

@ -45,6 +45,7 @@ private[akka] final case class ServerSettingsImpl(
require(0 < responseHeaderSizeHint, "response-size-hint must be > 0")
require(0 < backlog, "backlog must be > 0")
override def productPrefix = "ServerSettings"
}
object ServerSettingsImpl extends SettingsCompanion[ServerSettingsImpl]("akka.http.server") {
@ -88,4 +89,4 @@ object ServerSettingsImpl extends SettingsCompanion[ServerSettingsImpl]("akka.ht
// def apply(optionalSettings: Option[ServerSettings])(implicit actorRefFactory: ActorRefFactory): ServerSettings =
// optionalSettings getOrElse apply(actorSystem)
}
}

View file

@ -42,6 +42,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
override val sslConfig = AkkaSSLConfig(system)
private[this] val defaultConnectionPoolSettings = ConnectionPoolSettings(system)
// configured default HttpsContext for the client-side
// SYNCHRONIZED ACCESS ONLY!
private[this] var _defaultClientHttpsConnectionContext: HttpsConnectionContext = _
@ -278,7 +280,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def newHostConnectionPool[T](host: String, port: Int = 80,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
settings: ConnectionPoolSettings = defaultConnectionPoolSettings,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
@ -295,7 +297,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
*/
def newHostConnectionPoolHttps[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
settings: ConnectionPoolSettings = defaultConnectionPoolSettings,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, connectionContext, log)
newHostConnectionPool(HostConnectionPoolSetup(host, port, cps))
@ -344,7 +346,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def cachedHostConnectionPool[T](host: String, port: Int = 80,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
settings: ConnectionPoolSettings = defaultConnectionPoolSettings,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log)
val setup = HostConnectionPoolSetup(host, port, cps)
@ -362,7 +364,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
*/
def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
settings: ConnectionPoolSettings = defaultConnectionPoolSettings,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
val cps = ConnectionPoolSetup(settings, connectionContext, log)
val setup = HostConnectionPoolSetup(host, port, cps)
@ -408,7 +410,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
*/
def superPool[T](connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
settings: ConnectionPoolSettings = defaultConnectionPoolSettings,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] =
clientFlow[T](settings) { request request -> cachedGateway(request, settings, connectionContext, log) }
@ -417,13 +419,13 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
* effective URI to produce a response future.
*
* If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
* for setting up the HTTPS connection pool, if the request is targetted towards an `https` endpoint.
* for setting up the HTTPS connection pool, if the request is targeted towards an `https` endpoint.
*
* Note that the request must have an absolute URI, otherwise the future will be completed with an error.
*/
def singleRequest(request: HttpRequest,
connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
settings: ConnectionPoolSettings = defaultConnectionPoolSettings,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[HttpResponse] =
try {
val gatewayFuture = cachedGateway(request, settings, connectionContext, log)
@ -544,7 +546,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
}
// every ActorSystem maintains its own connection pools
private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]]
private[http] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]]
private def cachedGateway(request: HttpRequest,
settings: ConnectionPoolSettings, connectionContext: ConnectionContext,

View file

@ -45,4 +45,4 @@ abstract class ConnectionPoolSettings extends js.ConnectionPoolSettings { self:
object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings] {
override def apply(config: Config) = ConnectionPoolSettingsImpl(config)
override def apply(configOverrides: String) = ConnectionPoolSettingsImpl(configOverrides)
}
}

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.http.impl.engine.client.PoolGateway
import akka.http.impl.settings.HostConnectionPoolSetup
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import akka.stream.ActorMaterializer
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
class ClientSpec extends WordSpec with Matchers {
val testConf: Config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
windows-connection-abort-workaround-enabled = auto
akka.log-dead-letters = OFF
akka.http.server.request-timeout = infinite""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
"HTTP Client" should {
"reuse connection pool" in {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val bindingFuture = Http().bindAndHandleSync(_ HttpResponse(), hostname, port)
val binding = Await.result(bindingFuture, 3.seconds)
val respFuture = Http().singleRequest(HttpRequest(POST, s"http://$hostname:$port/"))
val resp = Await.result(respFuture, 3.seconds)
resp.status shouldBe StatusCodes.OK
Http().hostPoolCache.size shouldBe 1
val respFuture2 = Http().singleRequest(HttpRequest(POST, s"http://$hostname:$port/"))
val resp2 = Await.result(respFuture, 3.seconds)
resp2.status shouldBe StatusCodes.OK
Http().hostPoolCache.size shouldBe 1
Await.ready(binding.unbind(), 1.second)
}
}
}

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl.settings
import com.typesafe.config.ConfigFactory
import org.scalatest.Matchers
import org.scalatest.WordSpec
class SettingsEqualitySpec extends WordSpec with Matchers {
val config = ConfigFactory.parseString("""
akka.http.routing {
verbose-error-messages = off
file-get-conditional = on
render-vanity-footer = yes
range-coalescing-threshold = 80
range-count-limit = 16
decode-max-bytes-per-chunk = 1m
file-io-dispatcher = ${akka.stream.blocking-io-dispatcher}
}
""").withFallback(ConfigFactory.load).resolve
"equality" should {
"hold for ConnectionPoolSettings" in {
val s1 = ConnectionPoolSettings(config)
val s2 = ConnectionPoolSettings(config)
s1 shouldBe s2
s1.toString should startWith("ConnectionPoolSettings(")
}
"hold for ParserSettings" in {
val s1 = ParserSettings(config)
val s2 = ParserSettings(config)
s1 shouldBe s2
s1.toString should startWith("ParserSettings(")
}
"hold for ClientConnectionSettings" in {
val s1 = ClientConnectionSettings(config)
val s2 = ClientConnectionSettings(config)
s1 shouldBe s2
s1.toString should startWith("ClientConnectionSettings(")
}
"hold for RoutingSettings" in {
val s1 = RoutingSettings(config)
val s2 = RoutingSettings(config)
s1 shouldBe s2
s1.toString should startWith("RoutingSettings(")
}
"hold for ServerSettings" in {
val s1 = ServerSettings(config)
val s2 = ServerSettings(config)
s1 shouldBe s2
s1.toString should startWith("ServerSettings(")
}
}
}