diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/ClientConnectionSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/ClientConnectionSettingsImpl.scala index ce4ba64ddd..83a1506d68 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/ClientConnectionSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/ClientConnectionSettingsImpl.scala @@ -29,6 +29,8 @@ private[akka] final case class ClientConnectionSettingsImpl( require(connectingTimeout >= Duration.Zero, "connectingTimeout must be >= 0") require(requestHeaderSizeHint > 0, "request-size-hint must be > 0") + + override def productPrefix = "ClientConnectionSettings" } object ClientConnectionSettingsImpl extends SettingsCompanion[ClientConnectionSettingsImpl]("akka.http.client") { @@ -44,4 +46,4 @@ object ClientConnectionSettingsImpl extends SettingsCompanion[ClientConnectionSe parserSettings = ParserSettingsImpl.fromSubConfig(root, c.getConfig("parsing"))) } -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/ConnectionPoolSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/ConnectionPoolSettingsImpl.scala index eb7544e47d..6462a51dc6 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/ConnectionPoolSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/ConnectionPoolSettingsImpl.scala @@ -26,6 +26,7 @@ private[akka] final case class ConnectionPoolSettingsImpl( require(pipeliningLimit > 0, "pipelining-limit must be > 0") require(idleTimeout >= Duration.Zero, "idle-timeout must be >= 0") + override def productPrefix = "ConnectionPoolSettings" } object ConnectionPoolSettingsImpl extends SettingsCompanion[ConnectionPoolSettingsImpl]("akka.http.host-connection-pool") { diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala index 4ba0734caf..d3ada78e84 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/ParserSettingsImpl.scala @@ -46,9 +46,16 @@ private[akka] final case class ParserSettingsImpl( override def headerValueCacheLimit(headerName: String): Int = headerValueCacheLimits.getOrElse(headerName, defaultHeaderValueCacheLimit) + + override def productPrefix = "ParserSettings" } object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.http.parsing") { + + // for equality + private[this] val noCustomMethods: String ⇒ Option[HttpMethod] = _ ⇒ None + private[this] val noCustomStatusCodes: Int ⇒ Option[StatusCode] = _ ⇒ None + def fromSubConfig(root: Config, inner: Config) = { val c = inner.withFallback(root.getConfig(prefix)) val cacheConfig = c getConfig "header-cache" @@ -69,8 +76,8 @@ object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.ht ErrorLoggingVerbosity(c getString "error-logging-verbosity"), cacheConfig.entrySet.asScala.map(kvp ⇒ kvp.getKey -> cacheConfig.getInt(kvp.getKey))(collection.breakOut), c getBoolean "tls-session-info-header", - _ ⇒ None, - _ ⇒ None) + noCustomMethods, + noCustomStatusCodes) } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala index 8e7462179d..cabb1bd18b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/RoutingSettingsImpl.scala @@ -15,7 +15,10 @@ final case class RoutingSettingsImpl( rangeCountLimit: Int, rangeCoalescingThreshold: Long, decodeMaxBytesPerChunk: Int, - fileIODispatcher: String) extends akka.http.scaladsl.settings.RoutingSettings + fileIODispatcher: String) extends akka.http.scaladsl.settings.RoutingSettings { + + override def productPrefix = "RoutingSettings" +} object RoutingSettingsImpl extends SettingsCompanion[RoutingSettingsImpl]("akka.http.routing") { def fromSubConfig(root: Config, c: Config) = new RoutingSettingsImpl( diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/ServerSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/ServerSettingsImpl.scala index ce818b341d..7241d572eb 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/ServerSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/ServerSettingsImpl.scala @@ -45,6 +45,7 @@ private[akka] final case class ServerSettingsImpl( require(0 < responseHeaderSizeHint, "response-size-hint must be > 0") require(0 < backlog, "backlog must be > 0") + override def productPrefix = "ServerSettings" } object ServerSettingsImpl extends SettingsCompanion[ServerSettingsImpl]("akka.http.server") { @@ -88,4 +89,4 @@ object ServerSettingsImpl extends SettingsCompanion[ServerSettingsImpl]("akka.ht // def apply(optionalSettings: Option[ServerSettings])(implicit actorRefFactory: ActorRefFactory): ServerSettings = // optionalSettings getOrElse apply(actorSystem) -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index c85455d1bb..6f83e8507d 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -42,6 +42,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte override val sslConfig = AkkaSSLConfig(system) + private[this] val defaultConnectionPoolSettings = ConnectionPoolSettings(system) + // configured default HttpsContext for the client-side // SYNCHRONIZED ACCESS ONLY! private[this] var _defaultClientHttpsConnectionContext: HttpsConnectionContext = _ @@ -278,7 +280,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ def newHostConnectionPool[T](host: String, port: Int = 80, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) @@ -295,7 +297,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ def newHostConnectionPoolHttps[T](host: String, port: Int = 443, connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, connectionContext, log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) @@ -344,7 +346,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ def cachedHostConnectionPool[T](host: String, port: Int = 80, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, ConnectionContext.noEncryption(), log) val setup = HostConnectionPoolSetup(host, port, cps) @@ -362,7 +364,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443, connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, connectionContext, log) val setup = HostConnectionPoolSetup(host, port, cps) @@ -408,7 +410,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ def superPool[T](connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, settings, connectionContext, log) } @@ -417,13 +419,13 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * effective URI to produce a response future. * * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used - * for setting up the HTTPS connection pool, if the request is targetted towards an `https` endpoint. + * for setting up the HTTPS connection pool, if the request is targeted towards an `https` endpoint. * * Note that the request must have an absolute URI, otherwise the future will be completed with an error. */ def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[HttpResponse] = try { val gatewayFuture = cachedGateway(request, settings, connectionContext, log) @@ -544,7 +546,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte } // every ActorSystem maintains its own connection pools - private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] + private[http] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] private def cachedGateway(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: ConnectionContext, diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ConnectionPoolSettings.scala index dc58f8eebc..26984e3a6b 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ConnectionPoolSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/ConnectionPoolSettings.scala @@ -45,4 +45,4 @@ abstract class ConnectionPoolSettings extends js.ConnectionPoolSettings { self: object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings] { override def apply(config: Config) = ConnectionPoolSettingsImpl(config) override def apply(configOverrides: String) = ConnectionPoolSettingsImpl(configOverrides) -} \ No newline at end of file +} diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala new file mode 100644 index 0000000000..6162b36cf0 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ + +package akka.http.scaladsl + +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.ActorSystem +import akka.http.impl.engine.client.PoolGateway +import akka.http.impl.settings.HostConnectionPoolSetup +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.HttpMethods._ +import akka.stream.ActorMaterializer +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ Matchers, WordSpec } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } + +class ClientSpec extends WordSpec with Matchers { + val testConf: Config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel = ERROR + akka.stdout-loglevel = ERROR + windows-connection-abort-workaround-enabled = auto + akka.log-dead-letters = OFF + akka.http.server.request-timeout = infinite""") + implicit val system = ActorSystem(getClass.getSimpleName, testConf) + import system.dispatcher + implicit val materializer = ActorMaterializer() + + "HTTP Client" should { + + "reuse connection pool" in { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val bindingFuture = Http().bindAndHandleSync(_ ⇒ HttpResponse(), hostname, port) + val binding = Await.result(bindingFuture, 3.seconds) + + val respFuture = Http().singleRequest(HttpRequest(POST, s"http://$hostname:$port/")) + val resp = Await.result(respFuture, 3.seconds) + resp.status shouldBe StatusCodes.OK + + Http().hostPoolCache.size shouldBe 1 + + val respFuture2 = Http().singleRequest(HttpRequest(POST, s"http://$hostname:$port/")) + val resp2 = Await.result(respFuture, 3.seconds) + resp2.status shouldBe StatusCodes.OK + + Http().hostPoolCache.size shouldBe 1 + + Await.ready(binding.unbind(), 1.second) + } + } +} diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala new file mode 100644 index 0000000000..d2d8de1b99 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/settings/SettingsEqualitySpec.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ + +package akka.http.scaladsl.settings + +import com.typesafe.config.ConfigFactory + +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class SettingsEqualitySpec extends WordSpec with Matchers { + + val config = ConfigFactory.parseString(""" + akka.http.routing { + verbose-error-messages = off + file-get-conditional = on + render-vanity-footer = yes + range-coalescing-threshold = 80 + range-count-limit = 16 + decode-max-bytes-per-chunk = 1m + file-io-dispatcher = ${akka.stream.blocking-io-dispatcher} + } + """).withFallback(ConfigFactory.load).resolve + + "equality" should { + "hold for ConnectionPoolSettings" in { + val s1 = ConnectionPoolSettings(config) + val s2 = ConnectionPoolSettings(config) + + s1 shouldBe s2 + s1.toString should startWith("ConnectionPoolSettings(") + } + + "hold for ParserSettings" in { + val s1 = ParserSettings(config) + val s2 = ParserSettings(config) + + s1 shouldBe s2 + s1.toString should startWith("ParserSettings(") + } + + "hold for ClientConnectionSettings" in { + val s1 = ClientConnectionSettings(config) + val s2 = ClientConnectionSettings(config) + + s1 shouldBe s2 + s1.toString should startWith("ClientConnectionSettings(") + } + + "hold for RoutingSettings" in { + val s1 = RoutingSettings(config) + val s2 = RoutingSettings(config) + + s1 shouldBe s2 + s1.toString should startWith("RoutingSettings(") + } + + "hold for ServerSettings" in { + val s1 = ServerSettings(config) + val s2 = ServerSettings(config) + + s1 shouldBe s2 + s1.toString should startWith("ServerSettings(") + } + } + +}