From 72e2e51b832ec7c89848b5346861bc89d4e10f3a Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 22 May 2015 11:31:45 +0200 Subject: [PATCH 1/2] !htc move backlog and socket options to configuration in an effort to decrease API surface --- .../src/main/resources/reference.conf | 31 +++++++++++ .../akka/http/ClientConnectionSettings.scala | 15 ++++-- .../akka/http/ConnectionPoolSettings.scala | 6 +-- .../main/scala/akka/http/ServerSettings.scala | 22 +++++--- .../engine/client/PoolInterfaceActor.scala | 4 +- .../http/impl/util/SocketOptionSettings.scala | 30 +++++++++++ .../main/scala/akka/http/javadsl/Http.scala | 36 +++++-------- .../main/scala/akka/http/scaladsl/Http.scala | 53 +++++++------------ .../engine/client/ConnectionPoolSpec.scala | 4 +- 9 files changed, 127 insertions(+), 74 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/impl/util/SocketOptionSettings.scala diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index 3c3b279edf..cb24db39f8 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -51,6 +51,13 @@ akka.http { # doesn't have to be fiddled with in most applications. response-header-size-hint = 512 + # The requested maximum length of the queue of incoming connections. + # If the server is busy and the backlog is full the OS will start dropping + # SYN-packets and connection attempts may fail. Note, that the backlog + # size is usually only a maximum size hint for the OS and the OS can + # restrict the number further based on global limits. + backlog = 100 + # If this setting is empty the server only accepts requests that carry a # non-empty `Host` header. Otherwise it responds with `400 Bad Request`. # Set to a non-empty value to be used in lieu of a missing or empty `Host` @@ -61,6 +68,18 @@ akka.http { # Examples: `www.spray.io` or `example.com:8080` default-host-header = "" + # Socket options to set for the listening socket. If a setting is left + # undefined, it will use whatever the default on the system is. + socket-options { + so-receive-buffer-size = undefined + so-send-buffer-size = undefined + so-reuse-address = undefined + so-traffic-class = undefined + tcp-keep-alive = undefined + tcp-oob-inline = undefined + tcp-no-delay = undefined + } + # Modify to tweak parsing settings on the server-side only. parsing = ${akka.http.parsing} } @@ -103,6 +122,18 @@ akka.http { https = default } + # Socket options to set for the listening socket. If a setting is left + # undefined, it will use whatever the default on the system is. + socket-options { + so-receive-buffer-size = undefined + so-send-buffer-size = undefined + so-reuse-address = undefined + so-traffic-class = undefined + tcp-keep-alive = undefined + tcp-oob-inline = undefined + tcp-no-delay = undefined + } + # Modify to tweak parsing settings on the client-side only. parsing = ${akka.http.parsing} } diff --git a/akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala b/akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala index 829d3597aa..dc1aacaa74 100644 --- a/akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala @@ -4,18 +4,24 @@ package akka.http -import akka.actor.ActorSystem -import akka.http.impl.util._ -import akka.http.scaladsl.model.headers.`User-Agent` -import com.typesafe.config.Config +import akka.io.Inet.SocketOption import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.collection.immutable + +import com.typesafe.config.Config +import akka.actor.ActorSystem + +import akka.http.impl.util._ + +import akka.http.scaladsl.model.headers.`User-Agent` final case class ClientConnectionSettings( userAgentHeader: Option[`User-Agent`], connectingTimeout: FiniteDuration, idleTimeout: Duration, requestHeaderSizeHint: Int, + socketOptions: immutable.Traversable[SocketOption], parserSettings: ParserSettings) { require(connectingTimeout >= Duration.Zero, "connectingTimeout must be >= 0") @@ -29,6 +35,7 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin c getFiniteDuration "connecting-timeout", c getPotentiallyInfiniteDuration "idle-timeout", c getIntBytes "request-header-size-hint", + SocketOptionSettings fromSubConfig c.getConfig("socket-options"), ParserSettings fromSubConfig c.getConfig("parsing")) } diff --git a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala index b7d4a2427b..7161854889 100644 --- a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala @@ -18,18 +18,16 @@ import akka.io.Inet final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup) final case class ConnectionPoolSetup( - options: immutable.Traversable[Inet.SocketOption], settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter) object ConnectionPoolSetup { /** Java API */ - def create(options: JIterable[Inet.SocketOption], - settings: ConnectionPoolSettings, + def create(settings: ConnectionPoolSettings, httpsContext: akka.japi.Option[akka.http.javadsl.HttpsContext], log: LoggingAdapter): ConnectionPoolSetup = - ConnectionPoolSetup(immutableSeq(options), settings, httpsContext.map(_.asInstanceOf[HttpsContext]), log) + ConnectionPoolSetup(settings, httpsContext.map(_.asInstanceOf[HttpsContext]), log) } final case class ConnectionPoolSettings( diff --git a/akka-http-core/src/main/scala/akka/http/ServerSettings.scala b/akka-http-core/src/main/scala/akka/http/ServerSettings.scala index 6ab673445a..4d356ae268 100644 --- a/akka-http-core/src/main/scala/akka/http/ServerSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ServerSettings.scala @@ -4,15 +4,20 @@ package akka.http -import akka.ConfigurationException -import akka.actor.{ ActorSystem, ActorRefFactory } -import akka.http.impl.util._ -import akka.http.scaladsl.model.HttpHeader -import akka.http.scaladsl.model.headers.{ Host, Server } import com.typesafe.config.Config -import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.collection.immutable +import scala.concurrent.duration._ + +import akka.ConfigurationException +import akka.actor.{ ActorSystem, ActorRefFactory } +import akka.io.Inet.SocketOption + +import akka.http.impl.util._ + +import akka.http.scaladsl.model.HttpHeader +import akka.http.scaladsl.model.headers.{ Host, Server } final case class ServerSettings( serverHeader: Option[Server], @@ -22,10 +27,13 @@ final case class ServerSettings( transparentHeadRequests: Boolean, verboseErrorMessages: Boolean, responseHeaderSizeHint: Int, + backlog: Int, + socketOptions: immutable.Traversable[SocketOption], defaultHostHeader: Host, parserSettings: ParserSettings) { require(0 <= responseHeaderSizeHint, "response-size-hint must be > 0") + require(0 <= backlog, "backlog must be > 0") } object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") { @@ -45,6 +53,8 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve c getBoolean "transparent-head-requests", c getBoolean "verbose-error-messages", c getIntBytes "response-header-size-hint", + c getInt "backlog", + SocketOptionSettings fromSubConfig c.getConfig("socket-options"), defaultHostHeader = HttpHeader.parse("Host", c getString "default-host-header") match { case HttpHeader.ParsingResult.Ok(x: Host, Nil) ⇒ x diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 72238cb30b..1bec01199c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -59,8 +59,8 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, import hcps._ import setup._ val connectionFlow = - if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, options, settings.connectionSettings, setup.log) - else Http().outgoingConnectionTls(host, port, None, options, settings.connectionSettings, httpsContext, setup.log) + if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) + else Http().outgoingConnectionTls(host, port, None, settings.connectionSettings, httpsContext, setup.log) val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log) Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self))) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/SocketOptionSettings.scala b/akka-http-core/src/main/scala/akka/http/impl/util/SocketOptionSettings.scala new file mode 100644 index 0000000000..d79e02e6f1 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/impl/util/SocketOptionSettings.scala @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.impl.util + +import akka.io.{ Tcp, Inet } + +import scala.collection.immutable + +import akka.io.Inet.SocketOption +import com.typesafe.config.Config + +private[http] object SocketOptionSettings { + def fromSubConfig(config: Config): immutable.Traversable[SocketOption] = { + def so[T](setting: String)(f: (Config, String) ⇒ T)(cons: T ⇒ SocketOption): List[SocketOption] = + config.getString(setting) match { + case "undefined" ⇒ Nil + case x ⇒ cons(f(config, setting)) :: Nil + } + + so("so-receive-buffer-size")(_ getIntBytes _)(Inet.SO.ReceiveBufferSize) ::: + so("so-send-buffer-size")(_ getIntBytes _)(Inet.SO.SendBufferSize) ::: + so("so-reuse-address")(_ getBoolean _)(Inet.SO.ReuseAddress) ::: + so("so-traffic-class")(_ getInt _)(Inet.SO.TrafficClass) ::: + so("tcp-keep-alive")(_ getBoolean _)(Tcp.SO.KeepAlive) ::: + so("tcp-oob-inline")(_ getBoolean _)(Tcp.SO.OOBInline) ::: + so("tcp-no-delay")(_ getBoolean _)(Tcp.SO.TcpNoDelay) + } +} diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 7fe028751e..fb0f8ea581 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -62,12 +62,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * [[ServerBinding]]. */ def bind(interface: String, port: Int, - backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer) + Source.adapt(delegate.bind(interface, port, settings, httpsContext, log)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) @@ -96,13 +95,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int, - backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer) + interface, port, settings, httpsContext, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -129,13 +127,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], interface: String, port: Int, - backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandleSync(handler.apply(_).asScala, - interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer) + interface, port, settings, httpsContext, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -162,12 +159,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], interface: String, port: Int, - backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, httpsContext: Option[HttpsContext], parallelism: Int, log: LoggingAdapter, materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], - interface, port, backlog, immutableSeq(options), settings, httpsContext, parallelism, log)(materializer) + interface, port, settings, httpsContext, parallelism, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -197,12 +193,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress], - options: JIterable[Inet.SocketOption], settings: ClientConnectionSettings, log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = Flow.wrap { akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, immutableSeq(options), settings, log))(Keep.right) + .viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, settings, log))(Keep.right) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } @@ -214,13 +209,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def outgoingConnectionTls(host: String, port: Int, localAddress: Option[InetSocketAddress], - options: JIterable[Inet.SocketOption], settings: ClientConnectionSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = Flow.wrap { akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, immutableSeq(options), settings, + .viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, settings, httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log))(Keep.right) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } @@ -266,7 +260,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer)) + adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer)) /** * Same as [[newHostConnectionPool]] but with HTTPS encryption. @@ -279,7 +273,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, immutableSeq(options), settings, + adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings, httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) /** @@ -343,10 +337,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * object of type ``T`` from the application which is emitted together with the corresponding response. */ def cachedHostConnectionPool[T](host: String, port: Int, - options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer)) + adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer)) /** * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. @@ -355,11 +348,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * for encryption on the connection. */ def cachedHostConnectionPoolTls[T](host: String, port: Int, - options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, immutableSeq(options), settings, + adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings, httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) /** @@ -412,11 +404,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def superPool[T](options: JIterable[Inet.SocketOption], - settings: ConnectionPoolSettings, + def superPool[T](settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = - adaptTupleFlow(delegate.superPool[T](immutableSeq(options), settings, httpsContext, log)(materializer)) + adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer)) /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's @@ -439,11 +430,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * the future will be completed with an error. */ def singleRequest(request: HttpRequest, - options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Future[HttpResponse] = - delegate.singleRequest(request.asScala, immutableSeq(options), settings, httpsContext, log)(materializer) + delegate.singleRequest(request.asScala, settings, httpsContext, log)(materializer) /** * Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]]. diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 264b48dcaf..8277d840ad 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -50,15 +50,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * If no ``port`` is explicitly given (or the port value is negative) the protocol's default port will be used, * which is 80 for HTTP and 443 for HTTPS. */ - def bind(interface: String, port: Int = -1, backlog: Int = 100, - options: immutable.Traversable[Inet.SocketOption] = Nil, + def bind(interface: String, port: Int = -1, settings: ServerSettings = ServerSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = { val effectivePort = if (port >= 0) port else if (httpsContext.isEmpty) 80 else 443 val tlsStage = sslTlsStage(httpsContext, Server) val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = - Tcp().bind(interface, effectivePort, backlog, options, settings.timeouts.idleTimeout) + Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, settings.timeouts.idleTimeout) connections.map { case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ val layer = serverLayer(settings, log) @@ -77,12 +76,11 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * present a DoS risk! */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any], - interface: String, port: Int = -1, backlog: Int = 100, - options: immutable.Traversable[Inet.SocketOption] = Nil, + interface: String, port: Int = -1, settings: ServerSettings = ServerSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bind(interface, port, backlog, options, settings, httpsContext, log).to { + bind(interface, port, settings, httpsContext, log).to { Sink.foreach { incomingConnection ⇒ try incomingConnection.flow.joinMat(handler)(Keep.both).run() catch { @@ -102,12 +100,11 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * present a DoS risk! */ def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse, - interface: String, port: Int = -1, backlog: Int = 100, - options: immutable.Traversable[Inet.SocketOption] = Nil, + interface: String, port: Int = -1, settings: ServerSettings = ServerSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, httpsContext, log) + bindAndHandle(Flow[HttpRequest].map(handler), interface, port, settings, httpsContext, log) /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` @@ -118,13 +115,12 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * present a DoS risk! */ def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse], - interface: String, port: Int = -1, backlog: Int = 100, - options: immutable.Traversable[Inet.SocketOption] = Nil, + interface: String, port: Int = -1, settings: ServerSettings = ServerSettings(system), httpsContext: Option[HttpsContext] = None, parallelism: Int = 1, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, backlog, options, settings, httpsContext, log) + bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, settings, httpsContext, log) /** * The type of the server-side HTTP layer as a stand-alone BidiStage @@ -160,10 +156,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E */ def outgoingConnection(host: String, port: Int = 80, localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - _outgoingConnection(host, port, localAddress, options, settings, None, log) + _outgoingConnection(host, port, localAddress, settings, None, log) /** * Same as [[outgoingConnection]] but for encrypted (HTTPS) connections. @@ -173,21 +168,19 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E */ def outgoingConnectionTls(host: String, port: Int = 443, localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ClientConnectionSettings = ClientConnectionSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - _outgoingConnection(host, port, localAddress, options, settings, effectiveHttpsContext(httpsContext), log) + _outgoingConnection(host, port, localAddress, settings, effectiveHttpsContext(httpsContext), log) private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress], - options: immutable.Traversable[Inet.SocketOption], settings: ClientConnectionSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { val remoteAddr = new InetSocketAddress(host, port) val layer = clientLayer(remoteAddr, settings, log) val tlsStage = sslTlsStage(httpsContext, Client) val transportFlow = Tcp().outgoingConnection(remoteAddr, localAddress, - options, settings.connectingTimeout, settings.idleTimeout) + settings.socketOptions, settings.connectingTimeout, settings.idleTimeout) layer.atop(tlsStage).joinMat(transportFlow) { (_, tcpConnFuture) ⇒ import system.dispatcher @@ -238,10 +231,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * object of type ``T`` from the application which is emitted together with the corresponding response. */ def newHostConnectionPool[T](host: String, port: Int = 80, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(options, settings, None, log) + val cps = ConnectionPoolSetup(settings, None, log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) } @@ -252,11 +244,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * for encryption on the connections. */ def newHostConnectionPoolTls[T](host: String, port: Int = 443, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(options, settings, effectiveHttpsContext(httpsContext), log) + val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) } @@ -298,10 +289,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * object of type ``T`` from the application which is emitted together with the corresponding response. */ def cachedHostConnectionPool[T](host: String, port: Int = 80, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(options, settings, None, log) + val cps = ConnectionPoolSetup(settings, None, log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) } @@ -313,11 +303,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * for encryption on the connections. */ def cachedHostConnectionPoolTls[T](host: String, port: Int = 80, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(options, settings, effectiveHttpsContext(httpsContext), log) + val cps = ConnectionPoolSetup(settings, effectiveHttpsContext(httpsContext), log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) } @@ -357,11 +346,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def superPool[T](options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + def superPool[T](settings: ConnectionPoolSettings = ConnectionPoolSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = - clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, options, settings, httpsContext, log) } + clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, settings, httpsContext, log) } /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's @@ -373,12 +361,11 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * Note that the request must have an absolute URI, otherwise the future will be completed with an error. */ def singleRequest(request: HttpRequest, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[HttpResponse] = try { - val gatewayFuture = cachedGateway(request, options, settings, httpsContext, log) + val gatewayFuture = cachedGateway(request, settings, httpsContext, log) gatewayFuture.flatMap(_(request))(fm.executionContext) } catch { case e: IllegalUriException ⇒ FastFuture.failed(e) @@ -425,12 +412,12 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E // every ActorSystem maintains its own connection pools private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] - private def cachedGateway(request: HttpRequest, options: immutable.Traversable[Inet.SocketOption], + private def cachedGateway(request: HttpRequest, settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], log: LoggingAdapter)(implicit fm: FlowMaterializer): Future[PoolGateway] = if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) { val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) effectiveHttpsContext(httpsContext) else None - val setup = ConnectionPoolSetup(options, settings, httpsCtx, log) + val setup = ConnectionPoolSetup(settings, httpsCtx, log) val host = request.uri.authority.host.toString() val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup) cachedGateway(hcps) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 42f4228753..50c0c3dee2 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -275,7 +275,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = { val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit, idleTimeout, ClientConnectionSettings(system)) - flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, Nil, settings)) + flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, settings)) } def superPool[T](maxConnections: Int = 2, @@ -286,7 +286,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = { val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit, idleTimeout, ClientConnectionSettings(system)) - flowTestBench(Http().superPool[T](Nil, settings)) + flowTestBench(Http().superPool[T](settings)) } def flowTestBench[T, Mat](poolFlow: Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]) = { From 6c14af6859fe96509b7524dc356b13b952cd0ef0 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 22 May 2015 14:12:10 +0200 Subject: [PATCH 2/2] !htc #17512 `max-connections` setting to limit connections accepted with bindAndHandle concurrently --- .../src/main/resources/reference.conf | 7 +++ .../main/scala/akka/http/ServerSettings.scala | 9 ++-- .../akka/http/impl/util/StreamUtils.scala | 42 +++++++++++++++++- .../main/scala/akka/http/scaladsl/Http.scala | 28 +++++++----- .../akka/http/scaladsl/ClientServerSpec.scala | 44 ++++++++++++++++++- 5 files changed, 115 insertions(+), 15 deletions(-) diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index cb24db39f8..2173c7ec21 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -22,6 +22,13 @@ akka.http { # Set to `infinite` to disable. bind-timeout = 1s + # The maximum number of concurrently accepted connections when using the + # `Http().bindAndHandle` methods. + # + # This setting doesn't apply to the `Http().bind` method which will still + # deliver an unlimited backpressured stream of incoming connections. + max-connections = 1024 + # Enables/disables the addition of a `Remote-Address` header # holding the clients (remote) IP address. remote-address-header = off diff --git a/akka-http-core/src/main/scala/akka/http/ServerSettings.scala b/akka-http-core/src/main/scala/akka/http/ServerSettings.scala index 4d356ae268..baba8b8152 100644 --- a/akka-http-core/src/main/scala/akka/http/ServerSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ServerSettings.scala @@ -22,6 +22,7 @@ import akka.http.scaladsl.model.headers.{ Host, Server } final case class ServerSettings( serverHeader: Option[Server], timeouts: ServerSettings.Timeouts, + maxConnections: Int, remoteAddressHeader: Boolean, rawRequestUriHeader: Boolean, transparentHeadRequests: Boolean, @@ -32,14 +33,15 @@ final case class ServerSettings( defaultHostHeader: Host, parserSettings: ParserSettings) { - require(0 <= responseHeaderSizeHint, "response-size-hint must be > 0") - require(0 <= backlog, "backlog must be > 0") + require(0 < maxConnections, "max-connections must be > 0") + require(0 < responseHeaderSizeHint, "response-size-hint must be > 0") + require(0 < backlog, "backlog must be > 0") } object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") { final case class Timeouts(idleTimeout: Duration, bindTimeout: FiniteDuration) { - require(bindTimeout >= Duration.Zero, "bindTimeout must be > 0") + require(bindTimeout > Duration.Zero, "bindTimeout must be > 0") } implicit def timeoutsShortcut(s: ServerSettings): Timeouts = s.timeouts @@ -48,6 +50,7 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve Timeouts( c getPotentiallyInfiniteDuration "idle-timeout", c getFiniteDuration "bind-timeout"), + c getInt "max-connections", c getBoolean "remote-address-header", c getBoolean "raw-request-uri-header", c getBoolean "transparent-head-requests", diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index fbfe1fc470..123240fd9f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -12,7 +12,7 @@ import akka.stream.scaladsl.FlexiMerge._ import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ Promise, ExecutionContext, Future } import akka.util.ByteString import akka.http.scaladsl.model.RequestEntity import akka.stream._ @@ -286,6 +286,46 @@ private[http] object StreamUtils { } () ⇒ stage } + + /** + * Returns a no-op flow that materialized to a future that will be completed when the flow gets a + * completion or error signal. It doesn't necessarily mean, though, that all of a streaming pipeline + * is finished, only that the part that contains this flow has finished work. + */ + def identityFinishReporter[T]: Flow[T, T, Future[Unit]] = { + // copy from Sink.foreach + def newForeachStage(): (PushStage[T, T], Future[Unit]) = { + val promise = Promise[Unit]() + + val stage = new PushStage[T, T] { + override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem) + + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { + promise.failure(cause) + ctx.fail(cause) + } + + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + promise.success(()) + ctx.finish() + } + + override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { + promise.success(()) + ctx.finish() + } + + override def decide(cause: Throwable): Supervision.Directive = { + // supervision will be implemented by #16916 + promise.tryFailure(cause) + super.decide(cause) + } + } + + (stage, promise.future) + } + Flow[T].transformMaterializing(newForeachStage) + } } /** diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 8277d840ad..40b6b86de5 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -8,6 +8,7 @@ import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap import java.util.{ Collection ⇒ JCollection } import javax.net.ssl.{ SSLParameters, SSLContext } +import akka.http.impl.util.StreamUtils import akka.japi import com.typesafe.config.Config import scala.util.Try @@ -79,17 +80,24 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E interface: String, port: Int = -1, settings: ServerSettings = ServerSettings(system), httpsContext: Option[HttpsContext] = None, - log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bind(interface, port, settings, httpsContext, log).to { - Sink.foreach { incomingConnection ⇒ - try incomingConnection.flow.joinMat(handler)(Keep.both).run() - catch { - case NonFatal(e) ⇒ - log.error(e, "Could not materialize handling flow for {}", incomingConnection) - throw e - } + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = { + def handleOneConnection(incomingConnection: IncomingConnection): Future[Unit] = + try + incomingConnection.flow + .viaMat(StreamUtils.identityFinishReporter)(Keep.right) + .joinMat(handler)(Keep.left) + .run() + catch { + case NonFatal(e) ⇒ + log.error(e, "Could not materialize handling flow for {}", incomingConnection) + throw e } - }.run() + + bind(interface, port, settings, httpsContext, log) + .mapAsyncUnordered(settings.maxConnections)(handleOneConnection) + .to(Sink.ignore) + .run() + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 12eba85673..7be5fa76f3 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -10,7 +10,7 @@ import akka.http.scaladsl.Http.ServerBinding import akka.http.{ ClientConnectionSettings, ServerSettings } import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec -import scala.concurrent.{ Future, Await } +import scala.concurrent.{ Promise, Future, Await } import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem @@ -24,6 +24,8 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.impl.util._ +import scala.util.Success + class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" akka.loggers = ["akka.testkit.TestEventListener"] @@ -88,6 +90,46 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { Await.result(b1.unbind(), 1.second) } + "prevent more than the configured number of max-connections with bindAndHandle" in { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val settings = ServerSettings(system).copy(maxConnections = 1) + + val receivedSlow = Promise[Long]() + val receivedFast = Promise[Long]() + + def handle(req: HttpRequest): Future[HttpResponse] = { + req.uri.path.toString match { + case "/slow" ⇒ + receivedSlow.complete(Success(System.nanoTime())) + akka.pattern.after(1.seconds, system.scheduler)(Future.successful(HttpResponse())) + case "/fast" ⇒ + receivedFast.complete(Success(System.nanoTime())) + Future.successful(HttpResponse()) + } + } + + val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings) + val b1 = Await.result(binding, 3.seconds) + + def runRequest(uri: Uri): Unit = + Http().outgoingConnection(hostname, port) + .runWith(Source.single(HttpRequest(uri = uri)), Sink.head) + + runRequest("/slow") + + // wait until first request was received (but not yet answered) + val slowTime = Await.result(receivedSlow.future, 2.second) + + // should be blocked by the slow connection still being open + runRequest("/fast") + + val fastTime = Await.result(receivedFast.future, 2.second) + val diff = fastTime - slowTime + diff should be > 1000000000L // the diff must be at least the time to complete the first request and to close the first connection + + Await.result(b1.unbind(), 1.second) + } + "log materialization errors in `bindAndHandle`" which { "are triggered in `transform`" in { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()