diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index 717c0f77cb..3e16de49eb 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -107,6 +107,42 @@ akka.http { parsing = ${akka.http.parsing} } + host-connection-pool { + # The maximum number of parallel connections that a connection pool to a + # single host endpoint is allowed to establish. Must be greater than zero. + max-connections = 4 + + # The maximum number of times failed requests are attempted again, + # (if the request can be safely retried) before giving up and returning an error. + max-retries = 5 + + # The maximum number of open requests accepted into the pool across all + # materializations of any of its client flows. + # Protects against (accidentally) overloading a single pool with too many client flow materializations. + # Note that with N concurrent materializations the max number of open request in the pool + # will never exceed N * max-connections * pipelining-limit. + # Must be a power of 2 and > 0! + max-open-requests = 32 + + # The maximum number of requests that are dispatched to the target host in + # batch-mode across a single connection (HTTP pipelining). + # A setting of 1 disables HTTP pipelining, since only one request per + # connection can be "in flight" at any time. + # Set to higher values to enable HTTP pipelining. + # This value must be > 0. + # (Note that, independently of this setting, pipelining will never be done + # on a connection that still has a non-idempotent request in flight. + # See http://tools.ietf.org/html/rfc7230#section-6.3.2 for more info.) + pipelining-limit = 1 + + # The time after which an idle connection pool (without pending requests) + # will automatically terminate itself. Set to `infinite` to completely disable idle timeouts. + idle-timeout = 30 s + + # Modify to tweak client settings for host connection pools only. + client = ${akka.http.client} + } + # The (default) configuration of the HTTP message parser for the server and the client. # IMPORTANT: These settings (i.e. children of `akka.http.parsing`) can't be directly # overridden in `application.conf` to change the parser settings for client and server diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index f7707ac3f0..8e0e543bc8 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -5,9 +5,13 @@ package akka.http import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentHashMap +import akka.http.util.FastFuture import com.typesafe.config.Config +import scala.util.control.NonFatal +import scala.util.Try import scala.collection.immutable -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Promise, Future } import akka.event.LoggingAdapter import akka.util.ByteString import akka.io.Inet @@ -15,7 +19,7 @@ import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.engine.client._ import akka.http.engine.server._ -import akka.http.model.{ HttpResponse, HttpRequest } +import akka.http.model._ import akka.actor._ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension { @@ -33,21 +37,17 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E */ def bind(interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ServerSettings] = None, + settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = { val endpoint = new InetSocketAddress(interface, port) - val effectiveSettings = ServerSettings(settings) - val connections: Source[StreamTcp.IncomingConnection, Future[StreamTcp.ServerBinding]] = - StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout) - + StreamTcp().bind(endpoint, backlog, options, settings.timeouts.idleTimeout) connections.map { case StreamTcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ - val layer = serverLayer(effectiveSettings, log) + val layer = serverLayer(settings, log) IncomingConnection(localAddress, remoteAddress, layer join flow) - }.mapMaterialized { tcpBindingFuture ⇒ - import system.dispatcher - tcpBindingFuture.map { tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()) } + }.mapMaterialized { + _.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext) } } @@ -62,7 +62,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ServerSettings] = None, + settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = bind(interface, port, backlog, options, settings, log).to { Sink.foreach { _.flow.join(handler).run() } @@ -79,7 +79,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse, interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ServerSettings] = None, + settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = bindAndHandle(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log) @@ -94,7 +94,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse], interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ServerSettings] = None, + settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = bindAndHandle(Flow[HttpRequest].mapAsync(1, handler), interface, port, backlog, options, settings, log) @@ -131,14 +131,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E def outgoingConnection(host: String, port: Int = 80, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ClientConnectionSettings] = None, + settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { - val effectiveSettings = ClientConnectionSettings(settings) val remoteAddr = new InetSocketAddress(host, port) - val layer = clientLayer(remoteAddr, effectiveSettings, log) + val layer = clientLayer(remoteAddr, settings, log) val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress, - options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout) + options, settings.connectingTimeout, settings.idleTimeout) layer.joinMat(transportFlow) { (_, tcpConnFuture) ⇒ import system.dispatcher @@ -173,6 +172,211 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E settings: ClientConnectionSettings, log: LoggingAdapter = system.log): ClientLayer = BidiFlow.wrap(OutgoingConnectionBlueprint(remoteAddress, settings, log)) + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](host: String, port: Int = 80, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + val cps = ConnectionPoolSetup(encrypted = false, options, settings, log) + val setup = HostConnectionPoolSetup(host, port, cps) + newHostConnectionPool(setup) + } + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](setup: HostConnectionPoolSetup)( + implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + val gatewayFuture = FastFuture.successful(new PoolGateway(setup, Promise())) + gatewayClientFlow(setup, gatewayFuture) + } + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](host: String, port: Int = 80, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + val cps = ConnectionPoolSetup(encrypted = false, options, settings, log) + val setup = HostConnectionPoolSetup(host, port, cps) + cachedHostConnectionPool(setup) + } + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)( + implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + gatewayClientFlow(setup, cachedGateway(setup)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URI. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def superPool[T](options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = { + val setup = ConnectionPoolSetup(encrypted = false, options, settings, log) + clientFlow[T](settings) { request ⇒ + val absoluteRequest = request.withEffectiveUri(securedConnection = false) + val Uri.Authority(host, port, _) = absoluteRequest.uri.authority + val hcps = HostConnectionPoolSetup(host.toString(), port, setup) + val theHostHeader = hostHeader(hcps.host, port, absoluteRequest.uri.scheme) + val effectiveRequest = absoluteRequest.withDefaultHeaders(theHostHeader) + effectiveRequest -> cachedGateway(hcps) + } + } + + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest(request: HttpRequest, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[HttpResponse] = + try { + val setup = ConnectionPoolSetup(encrypted = false, options, settings, log) + val effectiveRequest = request.withEffectiveUri(securedConnection = false) + val uri = effectiveRequest.uri + val hcps = HostConnectionPoolSetup(uri.authority.host.toString(), uri.effectivePort, setup) + cachedGateway(hcps).flatMap(_(effectiveRequest))(fm.executionContext) + } catch { + case e: IllegalUriException ⇒ FastFuture.failed(e) + } + + /** + * Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]]. + * The returned future is completed when all pools that were live at the time of this method call + * have completed their shutdown process. + * + * If existing pool client flows are re-used or new ones materialized concurrently with or after this + * method call the respective connection pools will be restarted and not contribute to the returned future. + */ + def shutdownAllConnectionPools(): Future[Unit] = { + import scala.collection.JavaConverters._ + import system.dispatcher + val gateways = hostPoolCache.values().asScala + system.log.info("Initiating orderly shutdown of all active host connections pools...") + Future.sequence(gateways.map(_.flatMap(_.shutdown()))).map(_ ⇒ ()) + } + + // every ActorSystem maintains its own connection pools + private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] + + private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: FlowMaterializer): Future[PoolGateway] = { + val gatewayPromise = Promise[PoolGateway]() + hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match { + case null ⇒ // only one thread can get here at a time + val whenShuttingDown = Promise[Unit]() + val gateway = + try new PoolGateway(setup, whenShuttingDown) + catch { + case NonFatal(e) ⇒ + hostPoolCache.remove(setup) + gatewayPromise.failure(e) + throw e + } + val fastFuture = FastFuture.successful(gateway) + hostPoolCache.put(setup, fastFuture) // optimize subsequent gateway accesses + gatewayPromise.success(gateway) // satisfy everyone who got a hold of our promise while we were starting up + whenShuttingDown.future.onComplete(_ ⇒ hostPoolCache.remove(setup, fastFuture))(fm.executionContext) + fastFuture + + case future ⇒ future // return cached instance + } + } + + private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup, + gatewayFuture: Future[PoolGateway])( + implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + import hcps._ + val theHostHeader = hostHeader(host, port, Uri.httpScheme(setup.encrypted)) + clientFlow[T](setup.settings)(_.withDefaultHeaders(theHostHeader) -> gatewayFuture) + .mapMaterialized(_ ⇒ HostConnectionPool(hcps)(gatewayFuture)) + } + + private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, Future[PoolGateway]))( + implicit system: ActorSystem, fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = { + // a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point + val parallelism = settings.pipeliningLimit * settings.maxConnections + Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism, { + case (request, userContext) ⇒ + val (effectiveRequest, gatewayFuture) = f(request) + val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12 + gatewayFuture + .flatMap(_(effectiveRequest))(fm.executionContext) + .onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext) + result.future + }) + } + + private def hostHeader(host: String, port: Int, scheme: String) = headers.Host(host, Uri.normalizePort(port, scheme)) } object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { @@ -229,6 +433,20 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { */ case class OutgoingConnection(localAddress: InetSocketAddress, remoteAddress: InetSocketAddress) + /** + * Represents a connection pool to a specific target host and pool configuration. + */ + case class HostConnectionPool(setup: HostConnectionPoolSetup)( + private[http] val gatewayFuture: Future[PoolGateway]) { // enable test access + + /** + * Asynchronously triggers the shutdown of the host connection pool. + * + * The produced [[Future]] is fulfilled when the shutdown has been completed. + */ + def shutdown()(implicit ec: ExecutionContext): Future[Unit] = gatewayFuture.flatMap(_.shutdown()) + } + //////////////////// EXTENSION SETUP /////////////////// def apply()(implicit system: ActorSystem): HttpExt = super.apply(system) diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/ClientConnectionSettings.scala b/akka-http-core/src/main/scala/akka/http/engine/client/ClientConnectionSettings.scala index c438a5d4c7..d8e44794ac 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/ClientConnectionSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/ClientConnectionSettings.scala @@ -31,7 +31,4 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin c getIntBytes "request-header-size-hint", ParserSettings fromSubConfig c.getConfig("parsing")) } - - def apply(optionalSettings: Option[ClientConnectionSettings])(implicit actorRefFactory: ActorRefFactory): ClientConnectionSettings = - optionalSettings getOrElse apply(actorSystem) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/engine/client/ConnectionPoolSettings.scala new file mode 100644 index 0000000000..edd3b7bfa7 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/ConnectionPoolSettings.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import akka.event.LoggingAdapter +import akka.io.Inet +import com.typesafe.config.Config +import scala.collection.immutable +import scala.concurrent.duration.{ FiniteDuration, Duration } +import akka.http.util._ + +final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup) + +final case class ConnectionPoolSetup( + encrypted: Boolean, + options: immutable.Traversable[Inet.SocketOption], + settings: ConnectionPoolSettings, + log: LoggingAdapter) + +final case class ConnectionPoolSettings( + maxConnections: Int, + maxRetries: Int, + maxOpenRequests: Int, + pipeliningLimit: Int, + idleTimeout: Duration, + connectionSettings: ClientConnectionSettings) { + + require(maxConnections > 0, "max-connections must be > 0") + require(maxRetries >= 0, "max-retries must be >= 0") + require(maxOpenRequests > 0 && (maxOpenRequests & (maxOpenRequests - 1)) == 0, "max-open-requests must be a power of 2 > 0") + require(pipeliningLimit > 0, "pipelining-limit must be > 0") + require(idleTimeout >= Duration.Zero, "idleTimeout must be >= 0") +} + +object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings]("akka.http.host-connection-pool") { + def fromSubConfig(c: Config) = { + apply( + c getInt "max-connections", + c getInt "max-retries", + c getInt "max-open-requests", + c getInt "pipelining-limit", + c getPotentiallyInfiniteDuration "idle-timeout", + ClientConnectionSettings fromSubConfig c.getConfig("client")) + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/PoolConductor.scala b/akka-http-core/src/main/scala/akka/http/engine/client/PoolConductor.scala new file mode 100644 index 0000000000..36eab274df --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/PoolConductor.scala @@ -0,0 +1,215 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import language.existentials +import scala.annotation.tailrec +import scala.collection.immutable +import akka.event.LoggingAdapter +import akka.stream.scaladsl._ +import akka.stream._ +import akka.http.model.HttpMethod +import akka.http.util._ + +private object PoolConductor { + import PoolFlow.RequestContext + import PoolSlot.{ SlotEvent, SimpleSlotEvent } + + case class Ports( + requestIn: Inlet[RequestContext], + slotEventIn: Inlet[SlotEvent], + slotOuts: immutable.Seq[Outlet[RequestContext]]) extends Shape { + + override val inlets = requestIn :: slotEventIn :: Nil + override def outlets = slotOuts + + override def deepCopy(): Shape = + Ports( + new Inlet(requestIn.toString), + new Inlet(slotEventIn.toString), + slotOuts.map(o ⇒ new Outlet(o.toString))) + + override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = + Ports( + inlets.head.asInstanceOf[Inlet[RequestContext]], + inlets.last.asInstanceOf[Inlet[SlotEvent]], + outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]]) + } + + /* + Stream Setup + ============ +                                            Request-  + Request- +-----------+ +-----------+   Switch-   +-------------+    +-----------+  Context + Context |  retry | |  slot- |   Command  | doubler |    |  route  +--------------> + +--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->|  (Flexi  +--------------> + | |  | |    | |    |  Route) +--------------> + +----+------+ +-----+-----+             +-------------+    +-----------+     to slots      +   ^     ^                                            +   |     | SimpleSlotEvent                           +   | Request-    |                                           + | Context +---------+ + +-------------+  retry |<-------- Slot Event (from slotEventMerge) + |  Split  | + +---------+ + + */ + def apply(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter): Graph[Ports, Any] = + FlowGraph.partial() { implicit b ⇒ + import FlowGraph.Implicits._ + + // actually we want a `MergePreferred` here (and prefer the `retryInlet`), + // but as MergePreferred doesn't propagate completion on the secondary input we can't use it here + val retryMerge = b.add(new StreamUtils.EagerCloseMerge2[RequestContext]("PoolConductor.retryMerge")) + val slotSelector = b.add(new SlotSelector(slotCount, maxRetries, pipeliningLimit, log)) + val doubler = Flow[SwitchCommand].mapConcat(x ⇒ x :: x :: Nil) // work-around for https://github.com/akka/akka/issues/17004 + val route = b.add(new Route(slotCount)) + val retrySplit = b.add(new RetrySplit()) + + retryMerge.out ~> slotSelector.in0 + slotSelector.out ~> doubler ~> route.in + retrySplit.out0 ~> slotSelector.in1 + retrySplit.out1 ~> retryMerge.in1 + + Ports(retryMerge.in0, retrySplit.in, route.outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]]) + } + + private case class SwitchCommand(rc: RequestContext, slotIx: Int) + + // the SlotSelector keeps the state of all slots as instances of this ADT + private sealed trait SlotState + + // the connection of the respective slot is not connected + private case object Unconnected extends SlotState + + // the connection of the respective slot is connected with no requests currently in flight + private case object Idle extends SlotState + + // the connection of the respective slot has a number of requests in flight and all of them + // are idempotent which allows more requests to be pipelined onto the connection if required + private final case class Loaded(openIdempotentRequests: Int) extends SlotState { require(openIdempotentRequests > 0) } + + // the connection of the respective slot has a number of requests in flight and the + // last one of these is not idempotent which blocks the connection for more pipelined requests + private case class Busy(openRequests: Int) extends SlotState { require(openRequests > 0) } + private object Busy extends Busy(1) + + private class SlotSelector(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter) + extends FlexiMerge[SwitchCommand, FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]]( + new FanInShape2("PoolConductor.SlotSelector"), OperationAttributes.name("PoolConductor.SlotSelector")) { + import FlexiMerge._ + + def createMergeLogic(s: FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]): MergeLogic[SwitchCommand] = + new MergeLogic[SwitchCommand] { + val slotStates = Array.fill[SlotState](slotCount)(Unconnected) + def initialState = nextState(0) + override def initialCompletionHandling = eagerClose + + def nextState(currentSlot: Int): State[_] = { + val read: ReadCondition[_] = currentSlot match { + case -1 ⇒ Read(s.in1) // if we have no slot available we are not reading from upstream (only SlotEvents) + case _ ⇒ ReadAny(s) // otherwise we read SlotEvents *as well as* from upstream + } + State(read) { (ctx, inlet, element) ⇒ + element match { + case rc: RequestContext ⇒ + ctx.emit(SwitchCommand(rc, currentSlot)) + slotStates(currentSlot) = slotStateAfterDispatch(slotStates(currentSlot), rc.request.method) + case SlotEvent.RequestCompleted(slotIx) ⇒ + slotStates(slotIx) = slotStateAfterRequestCompleted(slotStates(slotIx)) + case SlotEvent.Disconnected(slotIx, failed) ⇒ + slotStates(slotIx) = slotStateAfterDisconnect(slotStates(slotIx), failed) + } + nextState(bestSlot()) + } + } + + def slotStateAfterDispatch(slotState: SlotState, method: HttpMethod): SlotState = + slotState match { + case Unconnected | Idle ⇒ if (method.isIdempotent) Loaded(1) else Busy(1) + case Loaded(n) ⇒ if (method.isIdempotent) Loaded(n + 1) else Busy(n + 1) + case Busy(_) ⇒ throw new IllegalStateException("Request scheduled onto busy connection?") + } + + def slotStateAfterRequestCompleted(slotState: SlotState): SlotState = + slotState match { + case Loaded(1) ⇒ Idle + case Loaded(n) ⇒ Loaded(n - 1) + case Busy(1) ⇒ Idle + case Busy(n) ⇒ Busy(n - 1) + case _ ⇒ throw new IllegalStateException(s"RequestCompleted on $slotState connection?") + } + + def slotStateAfterDisconnect(slotState: SlotState, failed: Int): SlotState = + slotState match { + case Idle if failed == 0 ⇒ Unconnected + case Loaded(n) if n > failed ⇒ Loaded(n - failed) + case Loaded(n) if n == failed ⇒ Unconnected + case Busy(n) if n > failed ⇒ Busy(n - failed) + case Busy(n) if n == failed ⇒ Unconnected + case _ ⇒ throw new IllegalStateException(s"Disconnect(_, $failed) on $slotState connection?") + } + + /** + * Implements the following Connection Slot selection strategy + * - Select the first idle connection in the pool, if there is one. + * - If none is idle select the first unconnected connection, if there is one. + * - If all are loaded select the connection with the least open requests (< pipeliningLimit) + * that only has requests with idempotent methods scheduled to it, if there is one. + * - Otherwise return -1 (which applies back-pressure to the request source) + * + * See http://tools.ietf.org/html/rfc7230#section-6.3.2 for more info on HTTP pipelining. + */ + @tailrec def bestSlot(ix: Int = 0, bestIx: Int = -1, bestState: SlotState = Busy): Int = + if (ix < slotStates.length) { + val pl = pipeliningLimit + slotStates(ix) -> bestState match { + case (Idle, _) ⇒ ix + case (Unconnected, Loaded(_) | Busy) ⇒ bestSlot(ix + 1, ix, Unconnected) + case (x @ Loaded(a), Loaded(b)) if a < b ⇒ bestSlot(ix + 1, ix, x) + case (x @ Loaded(a), Busy) if a < pl ⇒ bestSlot(ix + 1, ix, x) + case _ ⇒ bestSlot(ix + 1, bestIx, bestState) + } + } else bestIx + } + } + + private class Route(slotCount: Int) extends FlexiRoute[SwitchCommand, UniformFanOutShape[SwitchCommand, RequestContext]]( + new UniformFanOutShape(slotCount, "PoolConductor.Route"), OperationAttributes.name("PoolConductor.Route")) { + import FlexiRoute._ + + def createRouteLogic(s: UniformFanOutShape[SwitchCommand, RequestContext]): RouteLogic[SwitchCommand] = + new RouteLogic[SwitchCommand] { + val initialState: State[_] = State(DemandFromAny(s)) { + case (_, _, SwitchCommand(req, slotIx)) ⇒ + State(DemandFrom(s.out(slotIx))) { (ctx, out, _) ⇒ + ctx.emit(out)(req) + initialState + } + } + override def initialCompletionHandling = CompletionHandling( + onUpstreamFinish = ctx ⇒ { ctx.finish(); SameState }, + onUpstreamFailure = (ctx, cause) ⇒ { ctx.fail(cause); SameState }, + onDownstreamFinish = (ctx, _) ⇒ SameState) + } + } + + // FIXME: remove when #17038 is cleared + private class RetrySplit extends FlexiRoute[SlotEvent, FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]]( + new FanOutShape2("PoolConductor.RetrySplit"), OperationAttributes.name("PoolConductor.RetrySplit")) { + import FlexiRoute._ + + def createRouteLogic(s: FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]): RouteLogic[SlotEvent] = + new RouteLogic[SlotEvent] { + def initialState: State[_] = State(DemandFromAll(s)) { (ctx, _, ev) ⇒ + ev match { + case x: SimpleSlotEvent ⇒ ctx.emit(s.out0)(x) + case SlotEvent.RetryRequest(rc) ⇒ ctx.emit(s.out1)(rc) + } + SameState + } + } + } +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/PoolFlow.scala b/akka-http-core/src/main/scala/akka/http/engine/client/PoolFlow.scala new file mode 100644 index 0000000000..20df591906 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/PoolFlow.scala @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import java.net.InetSocketAddress +import scala.concurrent.{ Promise, Future } +import scala.util.Try +import akka.event.LoggingAdapter +import akka.actor._ +import akka.stream.FlowMaterializer +import akka.stream.scaladsl._ +import akka.http.model._ +import akka.http.Http + +private object PoolFlow { + + case class RequestContext(request: HttpRequest, responsePromise: Promise[HttpResponse], retriesLeft: Int) { + require(retriesLeft >= 0) + } + case class ResponseContext(rc: RequestContext, response: Try[HttpResponse]) + + /* + Pool Flow Stream Setup + ====================== +                                  +-------------------+                              +                                      |                   |                              +                               +----> | Connection Slot 1 +---->                         +                               |      |                   |    |                         +                               |      +---+---------------+    |                         +                               |          |                    |                         +              +-----------+    |      +-------------------+    |      +---------------+ + RequestContext   |           +----+      |                   |    +----> |               | ResponseContext + +----------------> | Conductor |---------> | Connection Slot 2 +---------> | responseMerge +------------------> +              |           +----+      |                   |    +----> |               | +              +-----------+    |      +---------+---------+    |      +---------------+ +                    ^         |          |     |              |                         +                    |         |      +-------------------+    |                         +          |         |      |                   |    |                         +            SlotEvent |         +----> | Connection Slot 3 +---->                         +                    |                |                   |                              +                    |                +---------------+---+                              +               |    |     |     |                                     + +-----------+ SlotEvent |     |     |  +              | slotEvent | <-------------+     |     | +              |  Merge  | <-------------------+     |                                   +              |           | <-------------------------+                                   + +-----------+ + + Conductor: + - Maintains slot state overview by running a simple state machine per Connection Slot + - Decides which slot will receive the next request from upstream according to current slot state and dispatch configuration + - Forwards demand from selected slot to upstream + - Always maintains demand for SlotEvents from the Connection Slots + - Implemented as a sub-graph + + Connection Slot: + - Wraps a low-level outgoing connection flow and (re-)materializes and uses it whenever necessary + - Directly forwards demand from the underlying connection to the Conductor + - Dispatches SlotEvents to the Conductor (via the SlotEventMerge) + - Implemented as a sub-graph + + Response Merge: + - Simple merge of the Connection Slots' outputs + + */ + def apply(connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]], + remoteAddress: InetSocketAddress, settings: ConnectionPoolSettings, log: LoggingAdapter)( + implicit system: ActorSystem, fm: FlowMaterializer): Flow[RequestContext, ResponseContext, Unit] = + Flow() { implicit b ⇒ + import settings._ + import FlowGraph.Implicits._ + + val conductor = b.add(PoolConductor(maxConnections, maxRetries, pipeliningLimit, log)) + val slots = Vector + .tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings)) + .map(b.add(_)) + val responseMerge = b.add(Merge[ResponseContext](maxConnections)) + val slotEventMerge = b.add(Merge[PoolSlot.SlotEvent](maxConnections)) + + slotEventMerge.out ~> conductor.slotEventIn + for ((slot, ix) ← slots.zipWithIndex) { + conductor.slotOuts(ix) ~> slot.in + slot.out0 ~> responseMerge.in(ix) + slot.out1 ~> slotEventMerge.in(ix) + } + (conductor.requestIn, responseMerge.out) + } +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/engine/client/PoolGateway.scala new file mode 100644 index 0000000000..d210dfd68c --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/PoolGateway.scala @@ -0,0 +1,87 @@ +package akka.http.engine.client + +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import scala.concurrent.{ Future, Promise } +import akka.actor.{ Props, ActorSystem, ActorRef } +import akka.http.Http +import akka.http.model.{ HttpResponse, HttpRequest } +import akka.stream.FlowMaterializer + +private object PoolGateway { + + sealed trait State + final case class Running(interfaceActorRef: ActorRef, + shutdownStartedPromise: Promise[Unit], + shutdownCompletedPromise: Promise[Unit]) extends State + final case class IsShutdown(shutdownCompleted: Future[Unit]) extends State + final case class NewIncarnation(gatewayFuture: Future[PoolGateway]) extends State +} + +/** + * Manages access to a host connection pool or rather: a sequence of pool incarnations. + * + * A host connection pool for a given [[HostConnectionPoolSetup]] is a running stream, whose outside interface is + * provided by its [[PoolInterfaceActor]] actor. The actor accepts [[PoolInterfaceActor.PoolRequest]] messages + * and completes their `responsePromise` whenever the respective response has been received (or an error occurred). + * + * A [[PoolGateway]] provides a layer of indirection between the pool cache and the actual + * pools that is required to allow a pool incarnation to fully terminate (e.g. after an idle-timeout) + * and be transparently replaced by a new incarnation if required. + * Removal of cache entries for terminated pools is also supported, because old gateway references that + * get reused will automatically forward requests directed at them to the latest pool incarnation from the cache. + */ +private[http] class PoolGateway(hcps: HostConnectionPoolSetup, + _shutdownStartedPromise: Promise[Unit])( // constructor arg only + implicit system: ActorSystem, fm: FlowMaterializer) { + import PoolGateway._ + import fm.executionContext + + private val state = { + val shutdownCompletedPromise = Promise[Unit]() + val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)) + val ref = system.actorOf(props, PoolInterfaceActor.name.next()) + new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise)) + } + + def currentState: Any = state.get() // enables test access + + def apply(request: HttpRequest, previousIncarnation: PoolGateway = null): Future[HttpResponse] = + state.get match { + case Running(ref, _, _) ⇒ + val responsePromise = Promise[HttpResponse]() + ref ! PoolInterfaceActor.PoolRequest(request, responsePromise) + responsePromise.future + + case IsShutdown(shutdownCompleted) ⇒ + // delay starting the next pool incarnation until the current pool has completed its shutdown + shutdownCompleted.flatMap { _ ⇒ + val newGatewayFuture = Http().cachedGateway(hcps) + // a simple set is fine here as `newGatewayFuture` will be identical for all threads getting here + state.set(NewIncarnation(newGatewayFuture)) + apply(request) + } + + case x @ NewIncarnation(newGatewayFuture) ⇒ + if (previousIncarnation != null) + previousIncarnation.state.set(x) // collapse incarnation chain + newGatewayFuture.flatMap(_(request, this)) + } + + // triggers a shutdown of the current pool, even if it is already a later incarnation + @tailrec final def shutdown(): Future[Unit] = + state.get match { + case x @ Running(ref, shutdownStartedPromise, shutdownCompletedPromise) ⇒ + if (state.compareAndSet(x, IsShutdown(shutdownCompletedPromise.future))) { + shutdownStartedPromise.success(()) // trigger cache removal + ref ! PoolInterfaceActor.Shutdown + shutdownCompletedPromise.future + } else shutdown() // CAS loop (not a spinlock) + + case IsShutdown(x) ⇒ x + + case NewIncarnation(newGatewayFuture) ⇒ newGatewayFuture.flatMap(_.shutdownAux()) + } + + private def shutdownAux() = shutdown() // alias required for @tailrec +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/engine/client/PoolInterfaceActor.scala new file mode 100644 index 0000000000..ff73f6d248 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/PoolInterfaceActor.scala @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import java.net.InetSocketAddress +import scala.annotation.tailrec +import scala.concurrent.Promise +import scala.concurrent.duration.FiniteDuration +import akka.actor.{ PoisonPill, DeadLetterSuppression, ActorLogging, Cancellable } +import akka.stream.FlowMaterializer +import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } +import akka.stream.actor.ActorPublisherMessage._ +import akka.stream.actor.ActorSubscriberMessage._ +import akka.stream.impl.FixedSizeBuffer +import akka.stream.scaladsl.{ Sink, Source } +import akka.http.model.{ HttpResponse, HttpRequest } +import akka.http.util.SeqActorName +import akka.http.Http +import PoolFlow._ + +private object PoolInterfaceActor { + final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse]) + + case object Shutdown extends DeadLetterSuppression + + val name = new SeqActorName("PoolInterfaceActor") +} + +/** + * An actor that wraps a completely encapsulated, running connection pool flow. + * + * Outside interface: + * The actor accepts `PoolRequest` messages and completes their `responsePromise` when the respective + * response has arrived. Incoming `PoolRequest` messages are not back-pressured but rather buffered in + * a fixed-size ringbuffer if required. Requests that would cause a buffer overflow are completed with + * a respective error. The user can prevent buffer overflows by configuring a `max-open-requests` value + * that is >= max-connections x pipelining-limit x number of respective client-flow materializations. + * + * Inside interface: + * To the inside (i.e. the running connection pool flow) the gateway actor acts as request source + * (ActorPublisher) and response sink (ActorSubscriber). + */ +private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, + shutdownCompletedPromise: Promise[Unit], + gateway: PoolGateway)(implicit fm: FlowMaterializer) + extends ActorSubscriber with ActorPublisher[RequestContext] with ActorLogging { + import PoolInterfaceActor._ + + private[this] val inputBuffer = FixedSizeBuffer[PoolRequest](hcps.setup.settings.maxOpenRequests) + private[this] var activeIdleTimeout: Option[Cancellable] = None + + log.debug("(Re-)starting host connection pool to {}:{}", hcps.host, hcps.port) + + { // start the pool flow with this actor acting as source as well as sink + import context.system + import hcps._ + import setup._ + val connectionFlow = Http().outgoingConnection(host, port, None, options, settings.connectionSettings, setup.log) + val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log) + Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self))) + } + + activateIdleTimeoutIfNecessary() + + def requestStrategy = ZeroRequestStrategy + + def receive = { + + /////////////// COMING UP FROM POOL (SOURCE SIDE) ////////////// + + case Request(_) ⇒ dispatchRequests() // the pool is ready to take on more requests + + case Cancel ⇒ + // somehow the pool shut down, however, we don't do anything here because we'll also see an + // OnComplete or OnError which we use as the sole trigger for cleaning up + + /////////////// COMING DOWN FROM POOL (SINK SIDE) ////////////// + + case OnNext(ResponseContext(rc, responseTry)) ⇒ + rc.responsePromise.complete(responseTry) + activateIdleTimeoutIfNecessary() + + case OnComplete ⇒ // the pool shut down + log.debug("Host connection pool to {}:{} has completed orderly shutdown", hcps.host, hcps.port) + shutdownCompletedPromise.success(()) + self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway + + case OnError(e) ⇒ // the pool shut down + log.debug("Host connection pool to {}:{} has shut down with error {}", hcps.host, hcps.port, e) + shutdownCompletedPromise.failure(e) + self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway + + /////////////// FROM CLIENT ////////////// + + case x: PoolRequest if isActive ⇒ + activeIdleTimeout foreach { timeout ⇒ + timeout.cancel() + activeIdleTimeout = None + } + if (totalDemand == 0) { + // if we can't dispatch right now we buffer and dispatch when demand from the pool arrives + if (inputBuffer.isFull) { + x.responsePromise.failure( + new RuntimeException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}}]")) + } else inputBuffer.enqueue(x) + } else dispatchRequest(x) // if we can dispatch right now, do it + request(1) // for every incoming request we demand one response from the pool + + case PoolRequest(request, responsePromise) ⇒ + // we have already started shutting down, i.e. this pool is not usable anymore + // so we forward the request back to the gateway + // Note that this forwarding will stop when we receive completion from the pool flow + // (because we stop ourselves then), so there is a very small chance of a request ending + // up as a dead letter if the sending thread gets interrupted for a long time right before + // the `ref ! PoolRequest(...)` in the PoolGateway + responsePromise.completeWith(gateway(request)) + + case Shutdown ⇒ // signal coming in from gateway + log.debug("Shutting down host connection pool to {}:{}", hcps.host, hcps.port) + onComplete() + while (!inputBuffer.isEmpty) { + val PoolRequest(request, responsePromise) = inputBuffer.dequeue() + responsePromise.completeWith(gateway(request)) + } + } + + @tailrec private def dispatchRequests(): Unit = + if (totalDemand > 0 && !inputBuffer.isEmpty) { + dispatchRequest(inputBuffer.dequeue()) + dispatchRequests() + } + + def dispatchRequest(pr: PoolRequest): Unit = { + val effectiveRequest = pr.request.withUri(pr.request.uri.toHttpRequestTargetOriginForm) + val retries = if (pr.request.method.isIdempotent) hcps.setup.settings.maxRetries else 0 + onNext(RequestContext(effectiveRequest, pr.responsePromise, retries)) + } + + def activateIdleTimeoutIfNecessary(): Unit = + if (remainingRequested == 0 && hcps.setup.settings.idleTimeout.isFinite) { + import context.dispatcher + val timeout = hcps.setup.settings.idleTimeout.asInstanceOf[FiniteDuration] + activeIdleTimeout = Some(context.system.scheduler.scheduleOnce(timeout)(gateway.shutdown())) + } +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/engine/client/PoolSlot.scala new file mode 100644 index 0000000000..597b690724 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/PoolSlot.scala @@ -0,0 +1,232 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import language.existentials +import java.net.InetSocketAddress +import scala.util.{ Failure, Success } +import scala.collection.immutable +import akka.actor._ +import akka.http.model.{ HttpResponse, HttpRequest } +import akka.http.util._ +import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor } +import akka.stream.actor._ +import akka.stream.scaladsl._ +import akka.stream._ + +private object PoolSlot { + import PoolFlow.{ RequestContext, ResponseContext } + + sealed trait ProcessorOut + final case class ResponseDelivery(response: ResponseContext) extends ProcessorOut + sealed trait SlotEvent extends ProcessorOut + sealed trait SimpleSlotEvent extends SlotEvent + object SlotEvent { + final case class RequestCompleted(slotIx: Int) extends SimpleSlotEvent + final case class Disconnected(slotIx: Int, failedRequests: Int) extends SimpleSlotEvent + final case class RetryRequest(rc: RequestContext) extends SlotEvent + } + + type Ports = FanOutShape2[RequestContext, ResponseContext, SlotEvent] + + private val slotProcessorActorName = new SeqActorName("SlotProcessor") + + /* + Stream Setup + ============ + + Request-  +-----------+  +-------------+  +-------------+    +------------+  + Context  | Slot-     |  List[ | flatten | Processor-  | doubler |    | SlotEvent- |  Response- + +--------->| Processor +------------->| (MapConcat) +------------->| (MapConcat) +---->| Split      +-------------> +           |           |  Processor- | | Out  | |    |            |  Context                   +           +-----------+  Out] +-------------+ +-------------+    +-----+------+                    +                          | SlotEvent                                    +                  | (to Conductor + | via slotEventMerge) +                              v  + */ + def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], + remoteAddress: InetSocketAddress, // TODO: remove after #16168 is cleared + settings: ConnectionPoolSettings)(implicit system: ActorSystem, fm: FlowMaterializer): Graph[Ports, Any] = + FlowGraph.partial() { implicit b ⇒ + import FlowGraph.Implicits._ + + val slotProcessor = b.add { + Flow[RequestContext] andThenMat { () ⇒ + val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)), slotProcessorActorName.next()) + (ActorProcessor[RequestContext, List[ProcessorOut]](actor), ()) + } + } + val flattenDouble = Flow[List[ProcessorOut]].mapConcat(_.flatMap(x ⇒ x :: x :: Nil)) + val split = b.add(new SlotEventSplit) + + slotProcessor ~> flattenDouble ~> split.in + new Ports(slotProcessor.inlet, split.out0, split.out1) + } + + import ActorSubscriberMessage._ + import ActorPublisherMessage._ + + /** + * An actor mananging a series of materializations of the given `connectionFlow`. + * To the outside it provides a stable flow stage, consuming `RequestContext` instances on its + * input (ActorSubscriber) side and producing `List[ProcessorOut]` instances on its output + * (ActorPublisher) side. + * The given `connectionFlow` is materialized into a running flow whenever required. + * Completion and errors from the connection are not surfaced to the outside (unless we are + * shutting down completely). + */ + private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], + settings: ConnectionPoolSettings)(implicit fm: FlowMaterializer) + extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging { + + var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _ + var inflightRequests = immutable.Queue.empty[RequestContext] + val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self))) + .via(connectionFlow) + .toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self))))(Keep.both) + + def requestStrategy = ZeroRequestStrategy + def receive = waitingExposedPublisher + + def waitingExposedPublisher: Receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + context.become(waitingForSubscribePending) + case other ⇒ throw new IllegalStateException(s"The first message must be `ExposedPublisher` but was [$other]") + } + + def waitingForSubscribePending: Receive = { + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach (s ⇒ self ! ActorPublisher.Internal.Subscribe(s)) + context.become(unconnected) + } + + val unconnected: Receive = { + case OnNext(rc: RequestContext) ⇒ + val (connInport, connOutport) = runnableFlow.run() + connOutport ! Request(totalDemand) + context.become(waitingForDemandFromConnection(connInport, connOutport, rc)) + + case Request(_) ⇒ if (remainingRequested == 0) request(1) // ask for first request if necessary + + case Cancel ⇒ { cancel(); shutdown() } + case OnComplete ⇒ onComplete() + case OnError(e) ⇒ onError(e) + } + + def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef, + firstRequest: RequestContext): Receive = { + case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev + case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev + case OnNext(x) ⇒ throw new IllegalStateException("Unrequested RequestContext: " + x) + + case FromConnection(Request(n)) ⇒ + inflightRequests = inflightRequests.enqueue(firstRequest) + request(n - remainingRequested) + connInport ! OnNext(firstRequest.request) + context.become(running(connInport, connOutport)) + + case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError + case FromConnection(OnComplete) ⇒ handleDisconnect(None) + case FromConnection(OnError(e)) ⇒ handleDisconnect(Some(e)) + case FromConnection(OnNext(x)) ⇒ throw new IllegalStateException("Unexpected HttpResponse: " + x) + } + + def running(connInport: ActorRef, connOutport: ActorRef): Receive = { + case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev + case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev + case OnNext(rc: RequestContext) ⇒ + inflightRequests = inflightRequests.enqueue(rc) + connInport ! OnNext(rc.request) + + case FromConnection(Request(n)) ⇒ request(n) + case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError + + case FromConnection(OnNext(response: HttpResponse)) ⇒ + val requestContext = inflightRequests.head + inflightRequests = inflightRequests.tail + val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response))) + val requestCompleted = SlotEvent.RequestCompleted(slotIx) + onNext(delivery :: requestCompleted :: Nil) + + case FromConnection(OnComplete) ⇒ handleDisconnect(None) + case FromConnection(OnError(e)) ⇒ handleDisconnect(Some(e)) + } + + def handleDisconnect(error: Option[Throwable]): Unit = { + log.debug("Slot {} disconnected after {}", slotIx, error getOrElse "regular connection close") + val results: List[ProcessorOut] = inflightRequests.map { rc ⇒ + if (rc.retriesLeft == 0) { + val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc) + ResponseDelivery(ResponseContext(rc, Failure(reason))) + } else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1)) + }(collection.breakOut) + inflightRequests = immutable.Queue.empty + onNext(SlotEvent.Disconnected(slotIx, results.size) :: results) + if (canceled) onComplete() + + context.become(unconnected) + } + + override def onComplete(): Unit = { + exposedPublisher.shutdown(None) + super.onComplete() + shutdown() + } + + override def onError(cause: Throwable): Unit = { + exposedPublisher.shutdown(Some(cause)) + super.onError(cause) + shutdown() + } + + def shutdown(): Unit = context.stop(self) + } + + private case class FromConnection(ev: Any) + + private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] { + def receive: Receive = { + case ev: Request ⇒ slotProcessor ! FromConnection(ev) + case Cancel ⇒ { slotProcessor ! FromConnection(Cancel); context.stop(self) } + case OnNext(r: HttpRequest) ⇒ onNext(r) + case OnComplete ⇒ { onComplete(); context.stop(self) } + case OnError(e) ⇒ { onError(e); context.stop(self) } + } + } + + private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber { + def requestStrategy = ZeroRequestStrategy + def receive: Receive = { + case Request(n) ⇒ request(n) + case Cancel ⇒ cancel() + case ev: OnNext ⇒ slotProcessor ! FromConnection(ev) + case ev @ (OnComplete | OnError(_)) ⇒ { slotProcessor ! FromConnection(ev); context.stop(self) } + } + } + + // FIXME: remove when #17038 is cleared + private class SlotEventSplit extends FlexiRoute[ProcessorOut, FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]]( + new FanOutShape2("PoolSlot.SlotEventSplit"), OperationAttributes.name("PoolSlot.SlotEventSplit")) { + import FlexiRoute._ + + def createRouteLogic(s: FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]): RouteLogic[ProcessorOut] = + new RouteLogic[ProcessorOut] { + val initialState: State[_] = State(DemandFromAny(s)) { + case (_, _, ResponseDelivery(x)) ⇒ + State(DemandFrom(s.out0)) { (ctx, _, _) ⇒ + ctx.emit(s.out0)(x) + initialState + } + case (_, _, x: SlotEvent) ⇒ + State(DemandFrom(s.out1)) { (ctx, _, _) ⇒ + ctx.emit(s.out1)(x) + initialState + } + } + } + } +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/ServerSettings.scala b/akka-http-core/src/main/scala/akka/http/engine/server/ServerSettings.scala index bf018e5a2b..ae5dacd63f 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/ServerSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/ServerSettings.scala @@ -53,8 +53,5 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve throw new ConfigurationException(info.formatPretty) }, ParserSettings fromSubConfig c.getConfig("parsing")) - - def apply(optionalSettings: Option[ServerSettings])(implicit actorRefFactory: ActorRefFactory): ServerSettings = - optionalSettings getOrElse apply(actorSystem) } diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 6eaf982c09..e79785c531 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -38,7 +38,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { "The low-level HTTP infrastructure" should { "properly bind a server" in { - val (hostname, port) = temporaryServerHostnameAndPort() + val (_, hostname, port) = temporaryServerHostnameAndPort() val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() val binding = Http().bind(hostname, port).toMat(Sink(probe))(Keep.left).run() val sub = probe.expectSubscription() // if we get it we are bound @@ -47,7 +47,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "report failure if bind fails" in { - val (hostname, port) = temporaryServerHostnameAndPort() + val (_, hostname, port) = temporaryServerHostnameAndPort() val binding = Http().bind(hostname, port) val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() // Bind succeeded, we have a local address @@ -78,7 +78,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "run with bindAndHandle" in { - val (hostname, port) = temporaryServerHostnameAndPort() + val (_, hostname, port) = temporaryServerHostnameAndPort() val binding = Http().bindAndHandle(Flow[HttpRequest].map(_ ⇒ HttpResponse()), hostname, port) val b1 = Await.result(binding, 3.seconds) @@ -186,12 +186,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { override def afterAll() = system.shutdown() class TestSetup { - val (hostname, port) = temporaryServerHostnameAndPort() + val (_, hostname, port) = temporaryServerHostnameAndPort() def configOverrides = "" // automatically bind a server val connSource = { - val settings = configOverrides.toOption.map(ServerSettings.apply) + val settings = configOverrides.toOption.fold(ServerSettings(system))(ServerSettings(_)) val binding = Http().bind(hostname, port, settings = settings) val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection] binding.runWith(Sink(probe)) @@ -199,7 +199,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } val connSourceSub = connSource.expectSubscription() - def openNewClientConnection(settings: Option[ClientConnectionSettings] = None): (PublisherProbe[HttpRequest], SubscriberProbe[HttpResponse]) = { + def openNewClientConnection(settings: ClientConnectionSettings = ClientConnectionSettings(system)) = { val requestPublisherProbe = StreamTestKit.PublisherProbe[HttpRequest]() val responseSubscriberProbe = StreamTestKit.SubscriberProbe[HttpResponse]() diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 0d481817d3..074e95c011 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -6,30 +6,61 @@ package akka.http import com.typesafe.config.{ Config, ConfigFactory } import scala.util.{ Failure, Success } -import akka.actor.ActorSystem +import akka.actor.{ UnhandledMessage, ActorSystem } import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl.{ Sink, Source } import akka.http.model._ +import akka.http.util._ object TestClient extends App { val testConf: Config = ConfigFactory.parseString(""" akka.loglevel = INFO akka.log-dead-letters = off - """) + akka.io.tcp.trace-logging = on""") implicit val system = ActorSystem("ServerTest", testConf) implicit val fm = ActorFlowMaterializer() import system.dispatcher + installEventStreamLoggerFor[UnhandledMessage] + val host = "spray.io" - println(s"Fetching HTTP server version of host `$host` ...") + fetchServerVersion1() - val connection = Http().outgoingConnection(host) - val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head) + // Console.readLine() + // system.shutdown() - result.map(_.header[headers.Server]) onComplete { - case Success(res) ⇒ println(s"$host is running ${res mkString ", "}") - case Failure(error) ⇒ println(s"Error: $error") + def fetchServerVersion1(): Unit = { + println(s"Fetching HTTP server version of host `$host` via a direct low-level connection ...") + + val connection = Http().outgoingConnection(host) + val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head) + result.map(_.header[headers.Server]) onComplete { + case Success(res) ⇒ + println(s"$host is running ${res mkString ", "}") + println() + fetchServerVersion2() + + case Failure(error) ⇒ + println(s"Error: $error") + println() + fetchServerVersion2() + } } - result onComplete { _ ⇒ system.shutdown() } + + def fetchServerVersion2(): Unit = { + println(s"Fetching HTTP server version of host `$host` via the high-level API ...") + val result = Http().singleRequest(HttpRequest(uri = s"http://$host/")) + result.map(_.header[headers.Server]) onComplete { + case Success(res) ⇒ + println(s"$host is running ${res mkString ", "}") + Http().shutdownAllConnectionPools().onComplete { _ ⇒ system.log.info("STOPPED"); shutdown() } + + case Failure(error) ⇒ + println(s"Error: $error") + shutdown() + } + } + + def shutdown(): Unit = system.shutdown() } \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/TestUtils.scala b/akka-http-core/src/test/scala/akka/http/TestUtils.scala index d6f1265be2..d4c4fd2ed1 100644 --- a/akka-http-core/src/test/scala/akka/http/TestUtils.scala +++ b/akka-http-core/src/test/scala/akka/http/TestUtils.scala @@ -17,8 +17,8 @@ object TestUtils { } finally serverSocket.close() } - def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = { + def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (InetSocketAddress, String, Int) = { val socketAddress = temporaryServerAddress(interface) - socketAddress.getHostName -> socketAddress.getPort + (socketAddress, socketAddress.getHostName, socketAddress.getPort) } } diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala new file mode 100644 index 0000000000..719260a682 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import java.util.concurrent.atomic.AtomicInteger +import akka.http.engine.client.PoolGateway + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{ Failure, Success, Try } +import akka.http.engine.server.ServerSettings +import akka.http.util.{ SingletonException, StreamUtils } +import akka.util.ByteString +import akka.stream.{ BidiShape, ActorFlowMaterializer } +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.scaladsl._ +import akka.http.{ Http, TestUtils } +import akka.http.model.headers._ +import akka.http.model._ + +class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF\n akka.io.tcp.trace-logging = off") { + implicit val materializer = ActorFlowMaterializer() + + "The host-level client infrastructure" should { + + "properly complete a simple request/response cycle" in new TestSetup { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]() + + requestIn.sendNext(HttpRequest(uri = "/") -> 42) + + acceptIncomingConnection() + responseOutSub.request(1) + val (Success(response), 42) = responseOut.expectNext() + response.headers should contain(RawHeader("Req-Host", s"localhost:$serverPort")) + } + + "open a second connection if the first one is loaded" in new TestSetup { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]() + + requestIn.sendNext(HttpRequest(uri = "/a") -> 42) + requestIn.sendNext(HttpRequest(uri = "/b") -> 43) + + responseOutSub.request(2) + acceptIncomingConnection() + val r1 = responseOut.expectNext() + acceptIncomingConnection() + val r2 = responseOut.expectNext() + + Seq(r1, r2) foreach { + case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") + case (Success(x), 43) ⇒ requestUri(x) should endWith("/b") + case x ⇒ fail(x.toString) + } + Seq(r1, r2).map(t ⇒ connNr(t._1.get)) should contain allOf (1, 2) + } + + "not open a second connection if there is an idle one available" in new TestSetup { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]() + + requestIn.sendNext(HttpRequest(uri = "/a") -> 42) + acceptIncomingConnection() + responseOutSub.request(1) + val (Success(response1), 42) = responseOut.expectNext() + connNr(response1) shouldEqual 1 + + requestIn.sendNext(HttpRequest(uri = "/b") -> 43) + responseOutSub.request(1) + val (Success(response2), 43) = responseOut.expectNext() + connNr(response2) shouldEqual 1 + } + + "be able to handle 500 pipelined requests against the test server" in new TestSetup { + val settings = ConnectionPoolSettings(system).copy(maxConnections = 4, pipeliningLimit = 2) + val poolFlow = Http().cachedHostConnectionPool[Int](serverHostName, serverPort, settings = settings) + + val N = 500 + val requestIds = Source(() ⇒ Iterator.from(1)).take(N) + val idSum = requestIds.map(id ⇒ HttpRequest(uri = s"/r$id") -> id).via(poolFlow).map { + case (Success(response), id) ⇒ + requestUri(response) should endWith(s"/r$id") + id + case x ⇒ fail(x.toString) + }.runFold(0)(_ + _) + + acceptIncomingConnection() + acceptIncomingConnection() + acceptIncomingConnection() + acceptIncomingConnection() + + Await.result(idSum, 10.seconds) shouldEqual N * (N + 1) / 2 + } + + "properly surface connection-level errors" in new TestSetup(autoAccept = true) { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](maxRetries = 0) + + requestIn.sendNext(HttpRequest(uri = "/a") -> 42) + requestIn.sendNext(HttpRequest(uri = "/crash") -> 43) + responseOutSub.request(2) + + override def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = + if (bytes.utf8String.contains("/crash")) sys.error("CRASH BOOM BANG") else bytes + + val responses = Seq(responseOut.expectNext(), responseOut.expectNext()) + + responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") } + responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include("reset by peer") } + } + + "retry failed requests" in new TestSetup(autoAccept = true) { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]() + + requestIn.sendNext(HttpRequest(uri = "/a") -> 42) + requestIn.sendNext(HttpRequest(uri = "/crash") -> 43) + responseOutSub.request(2) + + val remainingResponsesToKill = new AtomicInteger(1) + override def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = + if (bytes.utf8String.contains("/crash") && remainingResponsesToKill.decrementAndGet() >= 0) + sys.error("CRASH BOOM BANG") + else bytes + + val responses = Seq(responseOut.expectNext(), responseOut.expectNext()) + + responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") } + responses mustContainLike { case (Success(x), 43) ⇒ requestUri(x) should endWith("/crash") } + } + + "respect the configured `maxRetries` value" in new TestSetup(autoAccept = true) { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](maxRetries = 4) + + requestIn.sendNext(HttpRequest(uri = "/a") -> 42) + requestIn.sendNext(HttpRequest(uri = "/crash") -> 43) + responseOutSub.request(2) + + val remainingResponsesToKill = new AtomicInteger(5) + override def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = + if (bytes.utf8String.contains("/crash") && remainingResponsesToKill.decrementAndGet() >= 0) + sys.error("CRASH BOOM BANG") + else bytes + + val responses = Seq(responseOut.expectNext(), responseOut.expectNext()) + + responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") } + responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include("reset by peer") } + remainingResponsesToKill.get() shouldEqual 0 + } + + "automatically shutdown after configured timeout periods" in new TestSetup() { + val (_, _, _, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second) + val gateway = Await.result(hcp.gatewayFuture, 500.millis) + val PoolGateway.Running(_, shutdownStartedPromise, shutdownCompletedPromise) = gateway.currentState + shutdownStartedPromise.isCompleted shouldEqual false + shutdownCompletedPromise.isCompleted shouldEqual false + Await.result(shutdownStartedPromise.future, 1500.millis) shouldEqual () // verify shutdown start (after idle) + Await.result(shutdownCompletedPromise.future, 1500.millis) shouldEqual () // verify shutdown completed + } + + "transparently restart after idle shutdown" in new TestSetup() { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second) + + val gateway = Await.result(hcp.gatewayFuture, 500.millis) + val PoolGateway.Running(_, _, shutdownCompletedPromise) = gateway.currentState + Await.result(shutdownCompletedPromise.future, 1500.millis) shouldEqual () // verify shutdown completed + + requestIn.sendNext(HttpRequest(uri = "/") -> 42) + + acceptIncomingConnection() + responseOutSub.request(1) + val (Success(_), 42) = responseOut.expectNext() + } + } + + "The single-request client infrastructure" should { + class LocalTestSetup extends TestSetup(ServerSettings(system).copy(rawRequestUriHeader = true), autoAccept = true) + + "properly complete a simple request/response cycle with a host header request" in new LocalTestSetup { + val request = HttpRequest(uri = "/abc", headers = List(Host(serverHostName, serverPort))) + val responseFuture = Http().singleRequest(request) + val responseHeaders = Await.result(responseFuture, 1.second).headers + responseHeaders should contain(RawHeader("Req-Uri", s"http://localhost:$serverPort/abc")) + responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/abc")) + } + + "transform absolute request URIs into relative URIs plus host header" in new LocalTestSetup { + val request = HttpRequest(uri = s"http://$serverHostName:$serverPort/abc?query#fragment") + val responseFuture = Http().singleRequest(request) + val responseHeaders = Await.result(responseFuture, 1.second).headers + responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/abc?query")) + responseHeaders should contain(RawHeader("Req-Host", s"localhost:$serverPort")) + } + + "support absolute request URIs without path component" in new LocalTestSetup { + val request = HttpRequest(uri = s"http://$serverHostName:$serverPort") + val responseFuture = Http().singleRequest(request) + val responseHeaders = Await.result(responseFuture, 1.second).headers + responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/")) + } + + "support absolute request URIs with a double slash path component" in new LocalTestSetup { + val request = HttpRequest(uri = s"http://$serverHostName:$serverPort//foo") + val responseFuture = Http().singleRequest(request) + val responseHeaders = Await.result(responseFuture, 1.second).headers + responseHeaders should contain(RawHeader("Req-Uri", s"http://localhost:$serverPort//foo")) + responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "//foo")) + } + + "produce an error if the request does not contain a Host-header or an absolute URI" in { + val request = HttpRequest(uri = "/foo") + val responseFuture = Http().singleRequest(request) + val thrown = the[IllegalUriException] thrownBy Await.result(responseFuture, 1.second) + thrown should have message "Cannot establish effective URI of request to `/foo`, request has a relative URI and is missing a `Host` header" + } + } + + "The superPool client infrastructure" should { + + "route incoming requests to the right cached host connection pool" in new TestSetup(autoAccept = true) { + val (serverEndpoint2, serverHostName2, serverPort2) = TestUtils.temporaryServerHostnameAndPort() + Http().bindAndHandleSync(testServerHandler(0), serverHostName2, serverPort2) + + val (requestIn, responseOut, responseOutSub, hcp) = superPool[Int]() + + requestIn.sendNext(HttpRequest(uri = s"http://$serverHostName:$serverPort/a") -> 42) + requestIn.sendNext(HttpRequest(uri = s"http://$serverHostName2:$serverPort2/b") -> 43) + + responseOutSub.request(2) + Seq(responseOut.expectNext(), responseOut.expectNext()) foreach { + case (Success(x), 42) ⇒ requestUri(x) shouldEqual s"http://$serverHostName:$serverPort/a" + case (Success(x), 43) ⇒ requestUri(x) shouldEqual s"http://$serverHostName2:$serverPort2/b" + case x ⇒ fail(x.toString) + } + } + } + + class TestSetup(serverSettings: ServerSettings = ServerSettings(system), + autoAccept: Boolean = false) { + val (serverEndpoint, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() + + def testServerHandler(connNr: Int): HttpRequest ⇒ HttpResponse = { + case HttpRequest(_, uri, headers, entity, _) ⇒ + val responseHeaders = + ConnNrHeader(connNr) +: + RawHeader("Req-Uri", uri.toString) +: headers.map(h ⇒ RawHeader("Req-" + h.name, h.value)) + HttpResponse(headers = responseHeaders, entity = entity) + } + + def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = bytes + + val incomingConnectionCounter = new AtomicInteger + val incomingConnections = StreamTestKit.SubscriberProbe[Http.IncomingConnection] + val incomingConnectionsSub = { + val rawBytesInjection = BidiFlow() { b ⇒ + val top = b.add(Flow[ByteString].map(mapServerSideOutboundRawBytes) + .transform(StreamUtils.recover { case NoErrorComplete ⇒ ByteString.empty })) + val bottom = b.add(Flow[ByteString]) + BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + } + val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink(incomingConnections) + StreamTcp().bind(serverEndpoint, idleTimeout = serverSettings.timeouts.idleTimeout) + .map { c ⇒ + val layer = Http().serverLayer(serverSettings, log) + Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow) + }.runWith(sink) + if (autoAccept) null else incomingConnections.expectSubscription() + } + + def acceptIncomingConnection(): Unit = { + incomingConnectionsSub.request(1) + val conn = incomingConnections.expectNext() + handleConnection(conn) + } + + private def handleConnection(c: Http.IncomingConnection) = + c.handleWithSyncHandler(testServerHandler(incomingConnectionCounter.incrementAndGet())) + + def cachedHostConnectionPool[T](maxConnections: Int = 2, + maxRetries: Int = 2, + maxOpenRequests: Int = 8, + pipeliningLimit: Int = 1, + idleTimeout: Duration = 5.seconds, + ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = { + val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit, + idleTimeout, ClientConnectionSettings(system)) + flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, Nil, settings)) + } + + def superPool[T](maxConnections: Int = 2, + maxRetries: Int = 2, + maxOpenRequests: Int = 8, + pipeliningLimit: Int = 1, + idleTimeout: Duration = 5.seconds, + ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = { + val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit, + idleTimeout, ClientConnectionSettings(system)) + flowTestBench(Http().superPool[T](Nil, settings)) + } + + def flowTestBench[T, Mat](poolFlow: Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]) = { + val requestIn = StreamTestKit.PublisherProbe[(HttpRequest, T)] + val responseOut = StreamTestKit.SubscriberProbe[(Try[HttpResponse], T)] + val hcp = Source(requestIn).viaMat(poolFlow)(Keep.right).toMat(Sink(responseOut))(Keep.left).run() + val responseOutSub = responseOut.expectSubscription() + (new StreamTestKit.AutoPublisher(requestIn), responseOut, responseOutSub, hcp) + } + + def connNr(r: HttpResponse): Int = r.headers.find(_ is "conn-nr").get.value.toInt + def requestUri(r: HttpResponse): String = r.headers.find(_ is "req-uri").get.value + } + + case class ConnNrHeader(nr: Int) extends CustomHeader { + def name = "Conn-Nr" + def value = nr.toString + } + + implicit class MustContain[T](specimen: Seq[T]) { + def mustContainLike(pf: PartialFunction[T, Unit]): Unit = + specimen.collectFirst(pf) getOrElse fail("did not contain") + } + + object NoErrorComplete extends SingletonException +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala index ef2877c024..cccd88455c 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -19,7 +19,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka "The connection-level client implementation" should { "be able to handle 100 pipelined requests across one connection" in { - val (serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() + val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), serverHostName, serverPort) @@ -37,7 +37,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka } "be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in { - val (serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() + val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), serverHostName, serverPort) diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 178d4c3305..684b52fe5a 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -29,7 +29,7 @@ abstract class FlowMaterializer { * can be used by parts of the flow to submit processing jobs for execution, * run Future callbacks, etc. */ - def executionContext: ExecutionContextExecutor + implicit def executionContext: ExecutionContextExecutor }