Also: simplify API style for settings passing
This commit is contained in:
parent
4d3b0e4edc
commit
21baac0c52
16 changed files with 1464 additions and 44 deletions
|
|
@ -107,6 +107,42 @@ akka.http {
|
|||
parsing = ${akka.http.parsing}
|
||||
}
|
||||
|
||||
host-connection-pool {
|
||||
# The maximum number of parallel connections that a connection pool to a
|
||||
# single host endpoint is allowed to establish. Must be greater than zero.
|
||||
max-connections = 4
|
||||
|
||||
# The maximum number of times failed requests are attempted again,
|
||||
# (if the request can be safely retried) before giving up and returning an error.
|
||||
max-retries = 5
|
||||
|
||||
# The maximum number of open requests accepted into the pool across all
|
||||
# materializations of any of its client flows.
|
||||
# Protects against (accidentally) overloading a single pool with too many client flow materializations.
|
||||
# Note that with N concurrent materializations the max number of open request in the pool
|
||||
# will never exceed N * max-connections * pipelining-limit.
|
||||
# Must be a power of 2 and > 0!
|
||||
max-open-requests = 32
|
||||
|
||||
# The maximum number of requests that are dispatched to the target host in
|
||||
# batch-mode across a single connection (HTTP pipelining).
|
||||
# A setting of 1 disables HTTP pipelining, since only one request per
|
||||
# connection can be "in flight" at any time.
|
||||
# Set to higher values to enable HTTP pipelining.
|
||||
# This value must be > 0.
|
||||
# (Note that, independently of this setting, pipelining will never be done
|
||||
# on a connection that still has a non-idempotent request in flight.
|
||||
# See http://tools.ietf.org/html/rfc7230#section-6.3.2 for more info.)
|
||||
pipelining-limit = 1
|
||||
|
||||
# The time after which an idle connection pool (without pending requests)
|
||||
# will automatically terminate itself. Set to `infinite` to completely disable idle timeouts.
|
||||
idle-timeout = 30 s
|
||||
|
||||
# Modify to tweak client settings for host connection pools only.
|
||||
client = ${akka.http.client}
|
||||
}
|
||||
|
||||
# The (default) configuration of the HTTP message parser for the server and the client.
|
||||
# IMPORTANT: These settings (i.e. children of `akka.http.parsing`) can't be directly
|
||||
# overridden in `application.conf` to change the parser settings for client and server
|
||||
|
|
|
|||
|
|
@ -5,9 +5,13 @@
|
|||
package akka.http
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.http.util.FastFuture
|
||||
import com.typesafe.config.Config
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.Try
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ ExecutionContext, Promise, Future }
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.util.ByteString
|
||||
import akka.io.Inet
|
||||
|
|
@ -15,7 +19,7 @@ import akka.stream.FlowMaterializer
|
|||
import akka.stream.scaladsl._
|
||||
import akka.http.engine.client._
|
||||
import akka.http.engine.server._
|
||||
import akka.http.model.{ HttpResponse, HttpRequest }
|
||||
import akka.http.model._
|
||||
import akka.actor._
|
||||
|
||||
class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension {
|
||||
|
|
@ -33,21 +37,17 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
*/
|
||||
def bind(interface: String, port: Int = 80, backlog: Int = 100,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: Option[ServerSettings] = None,
|
||||
settings: ServerSettings = ServerSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = {
|
||||
val endpoint = new InetSocketAddress(interface, port)
|
||||
val effectiveSettings = ServerSettings(settings)
|
||||
|
||||
val connections: Source[StreamTcp.IncomingConnection, Future[StreamTcp.ServerBinding]] =
|
||||
StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout)
|
||||
|
||||
StreamTcp().bind(endpoint, backlog, options, settings.timeouts.idleTimeout)
|
||||
connections.map {
|
||||
case StreamTcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒
|
||||
val layer = serverLayer(effectiveSettings, log)
|
||||
val layer = serverLayer(settings, log)
|
||||
IncomingConnection(localAddress, remoteAddress, layer join flow)
|
||||
}.mapMaterialized { tcpBindingFuture ⇒
|
||||
import system.dispatcher
|
||||
tcpBindingFuture.map { tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()) }
|
||||
}.mapMaterialized {
|
||||
_.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -62,7 +62,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
|
||||
interface: String, port: Int = 80, backlog: Int = 100,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: Option[ServerSettings] = None,
|
||||
settings: ServerSettings = ServerSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
|
||||
bind(interface, port, backlog, options, settings, log).to {
|
||||
Sink.foreach { _.flow.join(handler).run() }
|
||||
|
|
@ -79,7 +79,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse,
|
||||
interface: String, port: Int = 80, backlog: Int = 100,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: Option[ServerSettings] = None,
|
||||
settings: ServerSettings = ServerSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
|
||||
bindAndHandle(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log)
|
||||
|
||||
|
|
@ -94,7 +94,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse],
|
||||
interface: String, port: Int = 80, backlog: Int = 100,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: Option[ServerSettings] = None,
|
||||
settings: ServerSettings = ServerSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
|
||||
bindAndHandle(Flow[HttpRequest].mapAsync(1, handler), interface, port, backlog, options, settings, log)
|
||||
|
||||
|
|
@ -131,14 +131,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
def outgoingConnection(host: String, port: Int = 80,
|
||||
localAddress: Option[InetSocketAddress] = None,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: Option[ClientConnectionSettings] = None,
|
||||
settings: ClientConnectionSettings = ClientConnectionSettings(system),
|
||||
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
|
||||
val effectiveSettings = ClientConnectionSettings(settings)
|
||||
val remoteAddr = new InetSocketAddress(host, port)
|
||||
val layer = clientLayer(remoteAddr, effectiveSettings, log)
|
||||
val layer = clientLayer(remoteAddr, settings, log)
|
||||
|
||||
val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress,
|
||||
options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout)
|
||||
options, settings.connectingTimeout, settings.idleTimeout)
|
||||
|
||||
layer.joinMat(transportFlow) { (_, tcpConnFuture) ⇒
|
||||
import system.dispatcher
|
||||
|
|
@ -173,6 +172,211 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
settings: ClientConnectionSettings,
|
||||
log: LoggingAdapter = system.log): ClientLayer =
|
||||
BidiFlow.wrap(OutgoingConnectionBlueprint(remoteAddress, settings, log))
|
||||
|
||||
/**
|
||||
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
|
||||
* the requests from all its materializations across this pool.
|
||||
* While the started host connection pool internally shuts itself down automatically after the configured idle
|
||||
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
|
||||
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
|
||||
*
|
||||
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||
* responses in an order that doesn't directly match the consumed requests.
|
||||
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||
* response for A.
|
||||
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||
*/
|
||||
def newHostConnectionPool[T](host: String, port: Int = 80,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
|
||||
val cps = ConnectionPoolSetup(encrypted = false, options, settings, log)
|
||||
val setup = HostConnectionPoolSetup(host, port, cps)
|
||||
newHostConnectionPool(setup)
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
|
||||
* the requests from all its materializations across this pool.
|
||||
* While the started host connection pool internally shuts itself down automatically after the configured idle
|
||||
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
|
||||
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
|
||||
*
|
||||
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||
* responses in an order that doesn't directly match the consumed requests.
|
||||
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||
* response for A.
|
||||
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||
*/
|
||||
def newHostConnectionPool[T](setup: HostConnectionPoolSetup)(
|
||||
implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
|
||||
val gatewayFuture = FastFuture.successful(new PoolGateway(setup, Promise()))
|
||||
gatewayClientFlow(setup, gatewayFuture)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
|
||||
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
|
||||
* configuration a separate connection pool is maintained.
|
||||
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
|
||||
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
|
||||
*
|
||||
* The internal caching logic guarantees that there will never be more than a single pool running for the
|
||||
* given target host endpoint and configuration (in this ActorSystem).
|
||||
*
|
||||
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||
* responses in an order that doesn't directly match the consumed requests.
|
||||
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||
* response for A.
|
||||
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||
*/
|
||||
def cachedHostConnectionPool[T](host: String, port: Int = 80,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
|
||||
val cps = ConnectionPoolSetup(encrypted = false, options, settings, log)
|
||||
val setup = HostConnectionPoolSetup(host, port, cps)
|
||||
cachedHostConnectionPool(setup)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
|
||||
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
|
||||
* configuration a separate connection pool is maintained.
|
||||
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
|
||||
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
|
||||
*
|
||||
* The internal caching logic guarantees that there will never be more than a single pool running for the
|
||||
* given target host endpoint and configuration (in this ActorSystem).
|
||||
*
|
||||
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||
* responses in an order that doesn't directly match the consumed requests.
|
||||
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||
* response for A.
|
||||
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||
*/
|
||||
def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)(
|
||||
implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||
gatewayClientFlow(setup, cachedGateway(setup))
|
||||
|
||||
/**
|
||||
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
|
||||
* depending on their respective effective URI. Note that incoming requests must have either an absolute URI or
|
||||
* a valid `Host` header.
|
||||
*
|
||||
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||
* responses in an order that doesn't directly match the consumed requests.
|
||||
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||
* response for A.
|
||||
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||
*/
|
||||
def superPool[T](options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = {
|
||||
val setup = ConnectionPoolSetup(encrypted = false, options, settings, log)
|
||||
clientFlow[T](settings) { request ⇒
|
||||
val absoluteRequest = request.withEffectiveUri(securedConnection = false)
|
||||
val Uri.Authority(host, port, _) = absoluteRequest.uri.authority
|
||||
val hcps = HostConnectionPoolSetup(host.toString(), port, setup)
|
||||
val theHostHeader = hostHeader(hcps.host, port, absoluteRequest.uri.scheme)
|
||||
val effectiveRequest = absoluteRequest.withDefaultHeaders(theHostHeader)
|
||||
effectiveRequest -> cachedGateway(hcps)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
|
||||
* effective URI to produce a response future.
|
||||
*
|
||||
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
|
||||
* the future will be completed with an error.
|
||||
*/
|
||||
def singleRequest(request: HttpRequest,
|
||||
options: immutable.Traversable[Inet.SocketOption] = Nil,
|
||||
settings: ConnectionPoolSettings = ConnectionPoolSettings(system),
|
||||
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[HttpResponse] =
|
||||
try {
|
||||
val setup = ConnectionPoolSetup(encrypted = false, options, settings, log)
|
||||
val effectiveRequest = request.withEffectiveUri(securedConnection = false)
|
||||
val uri = effectiveRequest.uri
|
||||
val hcps = HostConnectionPoolSetup(uri.authority.host.toString(), uri.effectivePort, setup)
|
||||
cachedGateway(hcps).flatMap(_(effectiveRequest))(fm.executionContext)
|
||||
} catch {
|
||||
case e: IllegalUriException ⇒ FastFuture.failed(e)
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]].
|
||||
* The returned future is completed when all pools that were live at the time of this method call
|
||||
* have completed their shutdown process.
|
||||
*
|
||||
* If existing pool client flows are re-used or new ones materialized concurrently with or after this
|
||||
* method call the respective connection pools will be restarted and not contribute to the returned future.
|
||||
*/
|
||||
def shutdownAllConnectionPools(): Future[Unit] = {
|
||||
import scala.collection.JavaConverters._
|
||||
import system.dispatcher
|
||||
val gateways = hostPoolCache.values().asScala
|
||||
system.log.info("Initiating orderly shutdown of all active host connections pools...")
|
||||
Future.sequence(gateways.map(_.flatMap(_.shutdown()))).map(_ ⇒ ())
|
||||
}
|
||||
|
||||
// every ActorSystem maintains its own connection pools
|
||||
private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]]
|
||||
|
||||
private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: FlowMaterializer): Future[PoolGateway] = {
|
||||
val gatewayPromise = Promise[PoolGateway]()
|
||||
hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match {
|
||||
case null ⇒ // only one thread can get here at a time
|
||||
val whenShuttingDown = Promise[Unit]()
|
||||
val gateway =
|
||||
try new PoolGateway(setup, whenShuttingDown)
|
||||
catch {
|
||||
case NonFatal(e) ⇒
|
||||
hostPoolCache.remove(setup)
|
||||
gatewayPromise.failure(e)
|
||||
throw e
|
||||
}
|
||||
val fastFuture = FastFuture.successful(gateway)
|
||||
hostPoolCache.put(setup, fastFuture) // optimize subsequent gateway accesses
|
||||
gatewayPromise.success(gateway) // satisfy everyone who got a hold of our promise while we were starting up
|
||||
whenShuttingDown.future.onComplete(_ ⇒ hostPoolCache.remove(setup, fastFuture))(fm.executionContext)
|
||||
fastFuture
|
||||
|
||||
case future ⇒ future // return cached instance
|
||||
}
|
||||
}
|
||||
|
||||
private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup,
|
||||
gatewayFuture: Future[PoolGateway])(
|
||||
implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
|
||||
import hcps._
|
||||
val theHostHeader = hostHeader(host, port, Uri.httpScheme(setup.encrypted))
|
||||
clientFlow[T](setup.settings)(_.withDefaultHeaders(theHostHeader) -> gatewayFuture)
|
||||
.mapMaterialized(_ ⇒ HostConnectionPool(hcps)(gatewayFuture))
|
||||
}
|
||||
|
||||
private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, Future[PoolGateway]))(
|
||||
implicit system: ActorSystem, fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = {
|
||||
// a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point
|
||||
val parallelism = settings.pipeliningLimit * settings.maxConnections
|
||||
Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism, {
|
||||
case (request, userContext) ⇒
|
||||
val (effectiveRequest, gatewayFuture) = f(request)
|
||||
val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12
|
||||
gatewayFuture
|
||||
.flatMap(_(effectiveRequest))(fm.executionContext)
|
||||
.onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext)
|
||||
result.future
|
||||
})
|
||||
}
|
||||
|
||||
private def hostHeader(host: String, port: Int, scheme: String) = headers.Host(host, Uri.normalizePort(port, scheme))
|
||||
}
|
||||
|
||||
object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
|
||||
|
|
@ -229,6 +433,20 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
|
|||
*/
|
||||
case class OutgoingConnection(localAddress: InetSocketAddress, remoteAddress: InetSocketAddress)
|
||||
|
||||
/**
|
||||
* Represents a connection pool to a specific target host and pool configuration.
|
||||
*/
|
||||
case class HostConnectionPool(setup: HostConnectionPoolSetup)(
|
||||
private[http] val gatewayFuture: Future[PoolGateway]) { // enable test access
|
||||
|
||||
/**
|
||||
* Asynchronously triggers the shutdown of the host connection pool.
|
||||
*
|
||||
* The produced [[Future]] is fulfilled when the shutdown has been completed.
|
||||
*/
|
||||
def shutdown()(implicit ec: ExecutionContext): Future[Unit] = gatewayFuture.flatMap(_.shutdown())
|
||||
}
|
||||
|
||||
//////////////////// EXTENSION SETUP ///////////////////
|
||||
|
||||
def apply()(implicit system: ActorSystem): HttpExt = super.apply(system)
|
||||
|
|
|
|||
|
|
@ -31,7 +31,4 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin
|
|||
c getIntBytes "request-header-size-hint",
|
||||
ParserSettings fromSubConfig c.getConfig("parsing"))
|
||||
}
|
||||
|
||||
def apply(optionalSettings: Option[ClientConnectionSettings])(implicit actorRefFactory: ActorRefFactory): ClientConnectionSettings =
|
||||
optionalSettings getOrElse apply(actorSystem)
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.engine.client
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.io.Inet
|
||||
import com.typesafe.config.Config
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.{ FiniteDuration, Duration }
|
||||
import akka.http.util._
|
||||
|
||||
final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup)
|
||||
|
||||
final case class ConnectionPoolSetup(
|
||||
encrypted: Boolean,
|
||||
options: immutable.Traversable[Inet.SocketOption],
|
||||
settings: ConnectionPoolSettings,
|
||||
log: LoggingAdapter)
|
||||
|
||||
final case class ConnectionPoolSettings(
|
||||
maxConnections: Int,
|
||||
maxRetries: Int,
|
||||
maxOpenRequests: Int,
|
||||
pipeliningLimit: Int,
|
||||
idleTimeout: Duration,
|
||||
connectionSettings: ClientConnectionSettings) {
|
||||
|
||||
require(maxConnections > 0, "max-connections must be > 0")
|
||||
require(maxRetries >= 0, "max-retries must be >= 0")
|
||||
require(maxOpenRequests > 0 && (maxOpenRequests & (maxOpenRequests - 1)) == 0, "max-open-requests must be a power of 2 > 0")
|
||||
require(pipeliningLimit > 0, "pipelining-limit must be > 0")
|
||||
require(idleTimeout >= Duration.Zero, "idleTimeout must be >= 0")
|
||||
}
|
||||
|
||||
object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings]("akka.http.host-connection-pool") {
|
||||
def fromSubConfig(c: Config) = {
|
||||
apply(
|
||||
c getInt "max-connections",
|
||||
c getInt "max-retries",
|
||||
c getInt "max-open-requests",
|
||||
c getInt "pipelining-limit",
|
||||
c getPotentiallyInfiniteDuration "idle-timeout",
|
||||
ClientConnectionSettings fromSubConfig c.getConfig("client"))
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,215 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.engine.client
|
||||
|
||||
import language.existentials
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream._
|
||||
import akka.http.model.HttpMethod
|
||||
import akka.http.util._
|
||||
|
||||
private object PoolConductor {
|
||||
import PoolFlow.RequestContext
|
||||
import PoolSlot.{ SlotEvent, SimpleSlotEvent }
|
||||
|
||||
case class Ports(
|
||||
requestIn: Inlet[RequestContext],
|
||||
slotEventIn: Inlet[SlotEvent],
|
||||
slotOuts: immutable.Seq[Outlet[RequestContext]]) extends Shape {
|
||||
|
||||
override val inlets = requestIn :: slotEventIn :: Nil
|
||||
override def outlets = slotOuts
|
||||
|
||||
override def deepCopy(): Shape =
|
||||
Ports(
|
||||
new Inlet(requestIn.toString),
|
||||
new Inlet(slotEventIn.toString),
|
||||
slotOuts.map(o ⇒ new Outlet(o.toString)))
|
||||
|
||||
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape =
|
||||
Ports(
|
||||
inlets.head.asInstanceOf[Inlet[RequestContext]],
|
||||
inlets.last.asInstanceOf[Inlet[SlotEvent]],
|
||||
outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]])
|
||||
}
|
||||
|
||||
/*
|
||||
Stream Setup
|
||||
============
|
||||
Request-
|
||||
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Context
|
||||
Context | retry | | slot- | Command | doubler | | route +-------------->
|
||||
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
|
||||
| | | | | | | Route) +-------------->
|
||||
+----+------+ +-----+-----+ +-------------+ +-----------+ to slots
|
||||
^ ^
|
||||
| | SimpleSlotEvent
|
||||
| Request- |
|
||||
| Context +---------+
|
||||
+-------------+ retry |<-------- Slot Event (from slotEventMerge)
|
||||
| Split |
|
||||
+---------+
|
||||
|
||||
*/
|
||||
def apply(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter): Graph[Ports, Any] =
|
||||
FlowGraph.partial() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
// actually we want a `MergePreferred` here (and prefer the `retryInlet`),
|
||||
// but as MergePreferred doesn't propagate completion on the secondary input we can't use it here
|
||||
val retryMerge = b.add(new StreamUtils.EagerCloseMerge2[RequestContext]("PoolConductor.retryMerge"))
|
||||
val slotSelector = b.add(new SlotSelector(slotCount, maxRetries, pipeliningLimit, log))
|
||||
val doubler = Flow[SwitchCommand].mapConcat(x ⇒ x :: x :: Nil) // work-around for https://github.com/akka/akka/issues/17004
|
||||
val route = b.add(new Route(slotCount))
|
||||
val retrySplit = b.add(new RetrySplit())
|
||||
|
||||
retryMerge.out ~> slotSelector.in0
|
||||
slotSelector.out ~> doubler ~> route.in
|
||||
retrySplit.out0 ~> slotSelector.in1
|
||||
retrySplit.out1 ~> retryMerge.in1
|
||||
|
||||
Ports(retryMerge.in0, retrySplit.in, route.outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]])
|
||||
}
|
||||
|
||||
private case class SwitchCommand(rc: RequestContext, slotIx: Int)
|
||||
|
||||
// the SlotSelector keeps the state of all slots as instances of this ADT
|
||||
private sealed trait SlotState
|
||||
|
||||
// the connection of the respective slot is not connected
|
||||
private case object Unconnected extends SlotState
|
||||
|
||||
// the connection of the respective slot is connected with no requests currently in flight
|
||||
private case object Idle extends SlotState
|
||||
|
||||
// the connection of the respective slot has a number of requests in flight and all of them
|
||||
// are idempotent which allows more requests to be pipelined onto the connection if required
|
||||
private final case class Loaded(openIdempotentRequests: Int) extends SlotState { require(openIdempotentRequests > 0) }
|
||||
|
||||
// the connection of the respective slot has a number of requests in flight and the
|
||||
// last one of these is not idempotent which blocks the connection for more pipelined requests
|
||||
private case class Busy(openRequests: Int) extends SlotState { require(openRequests > 0) }
|
||||
private object Busy extends Busy(1)
|
||||
|
||||
private class SlotSelector(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter)
|
||||
extends FlexiMerge[SwitchCommand, FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]](
|
||||
new FanInShape2("PoolConductor.SlotSelector"), OperationAttributes.name("PoolConductor.SlotSelector")) {
|
||||
import FlexiMerge._
|
||||
|
||||
def createMergeLogic(s: FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]): MergeLogic[SwitchCommand] =
|
||||
new MergeLogic[SwitchCommand] {
|
||||
val slotStates = Array.fill[SlotState](slotCount)(Unconnected)
|
||||
def initialState = nextState(0)
|
||||
override def initialCompletionHandling = eagerClose
|
||||
|
||||
def nextState(currentSlot: Int): State[_] = {
|
||||
val read: ReadCondition[_] = currentSlot match {
|
||||
case -1 ⇒ Read(s.in1) // if we have no slot available we are not reading from upstream (only SlotEvents)
|
||||
case _ ⇒ ReadAny(s) // otherwise we read SlotEvents *as well as* from upstream
|
||||
}
|
||||
State(read) { (ctx, inlet, element) ⇒
|
||||
element match {
|
||||
case rc: RequestContext ⇒
|
||||
ctx.emit(SwitchCommand(rc, currentSlot))
|
||||
slotStates(currentSlot) = slotStateAfterDispatch(slotStates(currentSlot), rc.request.method)
|
||||
case SlotEvent.RequestCompleted(slotIx) ⇒
|
||||
slotStates(slotIx) = slotStateAfterRequestCompleted(slotStates(slotIx))
|
||||
case SlotEvent.Disconnected(slotIx, failed) ⇒
|
||||
slotStates(slotIx) = slotStateAfterDisconnect(slotStates(slotIx), failed)
|
||||
}
|
||||
nextState(bestSlot())
|
||||
}
|
||||
}
|
||||
|
||||
def slotStateAfterDispatch(slotState: SlotState, method: HttpMethod): SlotState =
|
||||
slotState match {
|
||||
case Unconnected | Idle ⇒ if (method.isIdempotent) Loaded(1) else Busy(1)
|
||||
case Loaded(n) ⇒ if (method.isIdempotent) Loaded(n + 1) else Busy(n + 1)
|
||||
case Busy(_) ⇒ throw new IllegalStateException("Request scheduled onto busy connection?")
|
||||
}
|
||||
|
||||
def slotStateAfterRequestCompleted(slotState: SlotState): SlotState =
|
||||
slotState match {
|
||||
case Loaded(1) ⇒ Idle
|
||||
case Loaded(n) ⇒ Loaded(n - 1)
|
||||
case Busy(1) ⇒ Idle
|
||||
case Busy(n) ⇒ Busy(n - 1)
|
||||
case _ ⇒ throw new IllegalStateException(s"RequestCompleted on $slotState connection?")
|
||||
}
|
||||
|
||||
def slotStateAfterDisconnect(slotState: SlotState, failed: Int): SlotState =
|
||||
slotState match {
|
||||
case Idle if failed == 0 ⇒ Unconnected
|
||||
case Loaded(n) if n > failed ⇒ Loaded(n - failed)
|
||||
case Loaded(n) if n == failed ⇒ Unconnected
|
||||
case Busy(n) if n > failed ⇒ Busy(n - failed)
|
||||
case Busy(n) if n == failed ⇒ Unconnected
|
||||
case _ ⇒ throw new IllegalStateException(s"Disconnect(_, $failed) on $slotState connection?")
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements the following Connection Slot selection strategy
|
||||
* - Select the first idle connection in the pool, if there is one.
|
||||
* - If none is idle select the first unconnected connection, if there is one.
|
||||
* - If all are loaded select the connection with the least open requests (< pipeliningLimit)
|
||||
* that only has requests with idempotent methods scheduled to it, if there is one.
|
||||
* - Otherwise return -1 (which applies back-pressure to the request source)
|
||||
*
|
||||
* See http://tools.ietf.org/html/rfc7230#section-6.3.2 for more info on HTTP pipelining.
|
||||
*/
|
||||
@tailrec def bestSlot(ix: Int = 0, bestIx: Int = -1, bestState: SlotState = Busy): Int =
|
||||
if (ix < slotStates.length) {
|
||||
val pl = pipeliningLimit
|
||||
slotStates(ix) -> bestState match {
|
||||
case (Idle, _) ⇒ ix
|
||||
case (Unconnected, Loaded(_) | Busy) ⇒ bestSlot(ix + 1, ix, Unconnected)
|
||||
case (x @ Loaded(a), Loaded(b)) if a < b ⇒ bestSlot(ix + 1, ix, x)
|
||||
case (x @ Loaded(a), Busy) if a < pl ⇒ bestSlot(ix + 1, ix, x)
|
||||
case _ ⇒ bestSlot(ix + 1, bestIx, bestState)
|
||||
}
|
||||
} else bestIx
|
||||
}
|
||||
}
|
||||
|
||||
private class Route(slotCount: Int) extends FlexiRoute[SwitchCommand, UniformFanOutShape[SwitchCommand, RequestContext]](
|
||||
new UniformFanOutShape(slotCount, "PoolConductor.Route"), OperationAttributes.name("PoolConductor.Route")) {
|
||||
import FlexiRoute._
|
||||
|
||||
def createRouteLogic(s: UniformFanOutShape[SwitchCommand, RequestContext]): RouteLogic[SwitchCommand] =
|
||||
new RouteLogic[SwitchCommand] {
|
||||
val initialState: State[_] = State(DemandFromAny(s)) {
|
||||
case (_, _, SwitchCommand(req, slotIx)) ⇒
|
||||
State(DemandFrom(s.out(slotIx))) { (ctx, out, _) ⇒
|
||||
ctx.emit(out)(req)
|
||||
initialState
|
||||
}
|
||||
}
|
||||
override def initialCompletionHandling = CompletionHandling(
|
||||
onUpstreamFinish = ctx ⇒ { ctx.finish(); SameState },
|
||||
onUpstreamFailure = (ctx, cause) ⇒ { ctx.fail(cause); SameState },
|
||||
onDownstreamFinish = (ctx, _) ⇒ SameState)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: remove when #17038 is cleared
|
||||
private class RetrySplit extends FlexiRoute[SlotEvent, FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]](
|
||||
new FanOutShape2("PoolConductor.RetrySplit"), OperationAttributes.name("PoolConductor.RetrySplit")) {
|
||||
import FlexiRoute._
|
||||
|
||||
def createRouteLogic(s: FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]): RouteLogic[SlotEvent] =
|
||||
new RouteLogic[SlotEvent] {
|
||||
def initialState: State[_] = State(DemandFromAll(s)) { (ctx, _, ev) ⇒
|
||||
ev match {
|
||||
case x: SimpleSlotEvent ⇒ ctx.emit(s.out0)(x)
|
||||
case SlotEvent.RetryRequest(rc) ⇒ ctx.emit(s.out1)(rc)
|
||||
}
|
||||
SameState
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.engine.client
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import scala.concurrent.{ Promise, Future }
|
||||
import scala.util.Try
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.actor._
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.model._
|
||||
import akka.http.Http
|
||||
|
||||
private object PoolFlow {
|
||||
|
||||
case class RequestContext(request: HttpRequest, responsePromise: Promise[HttpResponse], retriesLeft: Int) {
|
||||
require(retriesLeft >= 0)
|
||||
}
|
||||
case class ResponseContext(rc: RequestContext, response: Try[HttpResponse])
|
||||
|
||||
/*
|
||||
Pool Flow Stream Setup
|
||||
======================
|
||||
+-------------------+
|
||||
| |
|
||||
+----> | Connection Slot 1 +---->
|
||||
| | | |
|
||||
| +---+---------------+ |
|
||||
| | |
|
||||
+-----------+ | +-------------------+ | +---------------+
|
||||
RequestContext | +----+ | | +----> | | ResponseContext
|
||||
+----------------> | Conductor |---------> | Connection Slot 2 +---------> | responseMerge +------------------>
|
||||
| +----+ | | +----> | |
|
||||
+-----------+ | +---------+---------+ | +---------------+
|
||||
^ | | | |
|
||||
| | +-------------------+ |
|
||||
| | | | |
|
||||
SlotEvent | +----> | Connection Slot 3 +---->
|
||||
| | |
|
||||
| +---------------+---+
|
||||
| | | |
|
||||
+-----------+ SlotEvent | | |
|
||||
| slotEvent | <-------------+ | |
|
||||
| Merge | <-------------------+ |
|
||||
| | <-------------------------+
|
||||
+-----------+
|
||||
|
||||
Conductor:
|
||||
- Maintains slot state overview by running a simple state machine per Connection Slot
|
||||
- Decides which slot will receive the next request from upstream according to current slot state and dispatch configuration
|
||||
- Forwards demand from selected slot to upstream
|
||||
- Always maintains demand for SlotEvents from the Connection Slots
|
||||
- Implemented as a sub-graph
|
||||
|
||||
Connection Slot:
|
||||
- Wraps a low-level outgoing connection flow and (re-)materializes and uses it whenever necessary
|
||||
- Directly forwards demand from the underlying connection to the Conductor
|
||||
- Dispatches SlotEvents to the Conductor (via the SlotEventMerge)
|
||||
- Implemented as a sub-graph
|
||||
|
||||
Response Merge:
|
||||
- Simple merge of the Connection Slots' outputs
|
||||
|
||||
*/
|
||||
def apply(connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]],
|
||||
remoteAddress: InetSocketAddress, settings: ConnectionPoolSettings, log: LoggingAdapter)(
|
||||
implicit system: ActorSystem, fm: FlowMaterializer): Flow[RequestContext, ResponseContext, Unit] =
|
||||
Flow() { implicit b ⇒
|
||||
import settings._
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val conductor = b.add(PoolConductor(maxConnections, maxRetries, pipeliningLimit, log))
|
||||
val slots = Vector
|
||||
.tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings))
|
||||
.map(b.add(_))
|
||||
val responseMerge = b.add(Merge[ResponseContext](maxConnections))
|
||||
val slotEventMerge = b.add(Merge[PoolSlot.SlotEvent](maxConnections))
|
||||
|
||||
slotEventMerge.out ~> conductor.slotEventIn
|
||||
for ((slot, ix) ← slots.zipWithIndex) {
|
||||
conductor.slotOuts(ix) ~> slot.in
|
||||
slot.out0 ~> responseMerge.in(ix)
|
||||
slot.out1 ~> slotEventMerge.in(ix)
|
||||
}
|
||||
(conductor.requestIn, responseMerge.out)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
package akka.http.engine.client
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import akka.actor.{ Props, ActorSystem, ActorRef }
|
||||
import akka.http.Http
|
||||
import akka.http.model.{ HttpResponse, HttpRequest }
|
||||
import akka.stream.FlowMaterializer
|
||||
|
||||
private object PoolGateway {
|
||||
|
||||
sealed trait State
|
||||
final case class Running(interfaceActorRef: ActorRef,
|
||||
shutdownStartedPromise: Promise[Unit],
|
||||
shutdownCompletedPromise: Promise[Unit]) extends State
|
||||
final case class IsShutdown(shutdownCompleted: Future[Unit]) extends State
|
||||
final case class NewIncarnation(gatewayFuture: Future[PoolGateway]) extends State
|
||||
}
|
||||
|
||||
/**
|
||||
* Manages access to a host connection pool or rather: a sequence of pool incarnations.
|
||||
*
|
||||
* A host connection pool for a given [[HostConnectionPoolSetup]] is a running stream, whose outside interface is
|
||||
* provided by its [[PoolInterfaceActor]] actor. The actor accepts [[PoolInterfaceActor.PoolRequest]] messages
|
||||
* and completes their `responsePromise` whenever the respective response has been received (or an error occurred).
|
||||
*
|
||||
* A [[PoolGateway]] provides a layer of indirection between the pool cache and the actual
|
||||
* pools that is required to allow a pool incarnation to fully terminate (e.g. after an idle-timeout)
|
||||
* and be transparently replaced by a new incarnation if required.
|
||||
* Removal of cache entries for terminated pools is also supported, because old gateway references that
|
||||
* get reused will automatically forward requests directed at them to the latest pool incarnation from the cache.
|
||||
*/
|
||||
private[http] class PoolGateway(hcps: HostConnectionPoolSetup,
|
||||
_shutdownStartedPromise: Promise[Unit])( // constructor arg only
|
||||
implicit system: ActorSystem, fm: FlowMaterializer) {
|
||||
import PoolGateway._
|
||||
import fm.executionContext
|
||||
|
||||
private val state = {
|
||||
val shutdownCompletedPromise = Promise[Unit]()
|
||||
val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this))
|
||||
val ref = system.actorOf(props, PoolInterfaceActor.name.next())
|
||||
new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise))
|
||||
}
|
||||
|
||||
def currentState: Any = state.get() // enables test access
|
||||
|
||||
def apply(request: HttpRequest, previousIncarnation: PoolGateway = null): Future[HttpResponse] =
|
||||
state.get match {
|
||||
case Running(ref, _, _) ⇒
|
||||
val responsePromise = Promise[HttpResponse]()
|
||||
ref ! PoolInterfaceActor.PoolRequest(request, responsePromise)
|
||||
responsePromise.future
|
||||
|
||||
case IsShutdown(shutdownCompleted) ⇒
|
||||
// delay starting the next pool incarnation until the current pool has completed its shutdown
|
||||
shutdownCompleted.flatMap { _ ⇒
|
||||
val newGatewayFuture = Http().cachedGateway(hcps)
|
||||
// a simple set is fine here as `newGatewayFuture` will be identical for all threads getting here
|
||||
state.set(NewIncarnation(newGatewayFuture))
|
||||
apply(request)
|
||||
}
|
||||
|
||||
case x @ NewIncarnation(newGatewayFuture) ⇒
|
||||
if (previousIncarnation != null)
|
||||
previousIncarnation.state.set(x) // collapse incarnation chain
|
||||
newGatewayFuture.flatMap(_(request, this))
|
||||
}
|
||||
|
||||
// triggers a shutdown of the current pool, even if it is already a later incarnation
|
||||
@tailrec final def shutdown(): Future[Unit] =
|
||||
state.get match {
|
||||
case x @ Running(ref, shutdownStartedPromise, shutdownCompletedPromise) ⇒
|
||||
if (state.compareAndSet(x, IsShutdown(shutdownCompletedPromise.future))) {
|
||||
shutdownStartedPromise.success(()) // trigger cache removal
|
||||
ref ! PoolInterfaceActor.Shutdown
|
||||
shutdownCompletedPromise.future
|
||||
} else shutdown() // CAS loop (not a spinlock)
|
||||
|
||||
case IsShutdown(x) ⇒ x
|
||||
|
||||
case NewIncarnation(newGatewayFuture) ⇒ newGatewayFuture.flatMap(_.shutdownAux())
|
||||
}
|
||||
|
||||
private def shutdownAux() = shutdown() // alias required for @tailrec
|
||||
}
|
||||
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.engine.client
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.{ PoisonPill, DeadLetterSuppression, ActorLogging, Cancellable }
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy }
|
||||
import akka.stream.actor.ActorPublisherMessage._
|
||||
import akka.stream.actor.ActorSubscriberMessage._
|
||||
import akka.stream.impl.FixedSizeBuffer
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.http.model.{ HttpResponse, HttpRequest }
|
||||
import akka.http.util.SeqActorName
|
||||
import akka.http.Http
|
||||
import PoolFlow._
|
||||
|
||||
private object PoolInterfaceActor {
|
||||
final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse])
|
||||
|
||||
case object Shutdown extends DeadLetterSuppression
|
||||
|
||||
val name = new SeqActorName("PoolInterfaceActor")
|
||||
}
|
||||
|
||||
/**
|
||||
* An actor that wraps a completely encapsulated, running connection pool flow.
|
||||
*
|
||||
* Outside interface:
|
||||
* The actor accepts `PoolRequest` messages and completes their `responsePromise` when the respective
|
||||
* response has arrived. Incoming `PoolRequest` messages are not back-pressured but rather buffered in
|
||||
* a fixed-size ringbuffer if required. Requests that would cause a buffer overflow are completed with
|
||||
* a respective error. The user can prevent buffer overflows by configuring a `max-open-requests` value
|
||||
* that is >= max-connections x pipelining-limit x number of respective client-flow materializations.
|
||||
*
|
||||
* Inside interface:
|
||||
* To the inside (i.e. the running connection pool flow) the gateway actor acts as request source
|
||||
* (ActorPublisher) and response sink (ActorSubscriber).
|
||||
*/
|
||||
private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
|
||||
shutdownCompletedPromise: Promise[Unit],
|
||||
gateway: PoolGateway)(implicit fm: FlowMaterializer)
|
||||
extends ActorSubscriber with ActorPublisher[RequestContext] with ActorLogging {
|
||||
import PoolInterfaceActor._
|
||||
|
||||
private[this] val inputBuffer = FixedSizeBuffer[PoolRequest](hcps.setup.settings.maxOpenRequests)
|
||||
private[this] var activeIdleTimeout: Option[Cancellable] = None
|
||||
|
||||
log.debug("(Re-)starting host connection pool to {}:{}", hcps.host, hcps.port)
|
||||
|
||||
{ // start the pool flow with this actor acting as source as well as sink
|
||||
import context.system
|
||||
import hcps._
|
||||
import setup._
|
||||
val connectionFlow = Http().outgoingConnection(host, port, None, options, settings.connectionSettings, setup.log)
|
||||
val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log)
|
||||
Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self)))
|
||||
}
|
||||
|
||||
activateIdleTimeoutIfNecessary()
|
||||
|
||||
def requestStrategy = ZeroRequestStrategy
|
||||
|
||||
def receive = {
|
||||
|
||||
/////////////// COMING UP FROM POOL (SOURCE SIDE) //////////////
|
||||
|
||||
case Request(_) ⇒ dispatchRequests() // the pool is ready to take on more requests
|
||||
|
||||
case Cancel ⇒
|
||||
// somehow the pool shut down, however, we don't do anything here because we'll also see an
|
||||
// OnComplete or OnError which we use as the sole trigger for cleaning up
|
||||
|
||||
/////////////// COMING DOWN FROM POOL (SINK SIDE) //////////////
|
||||
|
||||
case OnNext(ResponseContext(rc, responseTry)) ⇒
|
||||
rc.responsePromise.complete(responseTry)
|
||||
activateIdleTimeoutIfNecessary()
|
||||
|
||||
case OnComplete ⇒ // the pool shut down
|
||||
log.debug("Host connection pool to {}:{} has completed orderly shutdown", hcps.host, hcps.port)
|
||||
shutdownCompletedPromise.success(())
|
||||
self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway
|
||||
|
||||
case OnError(e) ⇒ // the pool shut down
|
||||
log.debug("Host connection pool to {}:{} has shut down with error {}", hcps.host, hcps.port, e)
|
||||
shutdownCompletedPromise.failure(e)
|
||||
self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway
|
||||
|
||||
/////////////// FROM CLIENT //////////////
|
||||
|
||||
case x: PoolRequest if isActive ⇒
|
||||
activeIdleTimeout foreach { timeout ⇒
|
||||
timeout.cancel()
|
||||
activeIdleTimeout = None
|
||||
}
|
||||
if (totalDemand == 0) {
|
||||
// if we can't dispatch right now we buffer and dispatch when demand from the pool arrives
|
||||
if (inputBuffer.isFull) {
|
||||
x.responsePromise.failure(
|
||||
new RuntimeException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}}]"))
|
||||
} else inputBuffer.enqueue(x)
|
||||
} else dispatchRequest(x) // if we can dispatch right now, do it
|
||||
request(1) // for every incoming request we demand one response from the pool
|
||||
|
||||
case PoolRequest(request, responsePromise) ⇒
|
||||
// we have already started shutting down, i.e. this pool is not usable anymore
|
||||
// so we forward the request back to the gateway
|
||||
// Note that this forwarding will stop when we receive completion from the pool flow
|
||||
// (because we stop ourselves then), so there is a very small chance of a request ending
|
||||
// up as a dead letter if the sending thread gets interrupted for a long time right before
|
||||
// the `ref ! PoolRequest(...)` in the PoolGateway
|
||||
responsePromise.completeWith(gateway(request))
|
||||
|
||||
case Shutdown ⇒ // signal coming in from gateway
|
||||
log.debug("Shutting down host connection pool to {}:{}", hcps.host, hcps.port)
|
||||
onComplete()
|
||||
while (!inputBuffer.isEmpty) {
|
||||
val PoolRequest(request, responsePromise) = inputBuffer.dequeue()
|
||||
responsePromise.completeWith(gateway(request))
|
||||
}
|
||||
}
|
||||
|
||||
@tailrec private def dispatchRequests(): Unit =
|
||||
if (totalDemand > 0 && !inputBuffer.isEmpty) {
|
||||
dispatchRequest(inputBuffer.dequeue())
|
||||
dispatchRequests()
|
||||
}
|
||||
|
||||
def dispatchRequest(pr: PoolRequest): Unit = {
|
||||
val effectiveRequest = pr.request.withUri(pr.request.uri.toHttpRequestTargetOriginForm)
|
||||
val retries = if (pr.request.method.isIdempotent) hcps.setup.settings.maxRetries else 0
|
||||
onNext(RequestContext(effectiveRequest, pr.responsePromise, retries))
|
||||
}
|
||||
|
||||
def activateIdleTimeoutIfNecessary(): Unit =
|
||||
if (remainingRequested == 0 && hcps.setup.settings.idleTimeout.isFinite) {
|
||||
import context.dispatcher
|
||||
val timeout = hcps.setup.settings.idleTimeout.asInstanceOf[FiniteDuration]
|
||||
activeIdleTimeout = Some(context.system.scheduler.scheduleOnce(timeout)(gateway.shutdown()))
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,232 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.engine.client
|
||||
|
||||
import language.existentials
|
||||
import java.net.InetSocketAddress
|
||||
import scala.util.{ Failure, Success }
|
||||
import scala.collection.immutable
|
||||
import akka.actor._
|
||||
import akka.http.model.{ HttpResponse, HttpRequest }
|
||||
import akka.http.util._
|
||||
import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor }
|
||||
import akka.stream.actor._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream._
|
||||
|
||||
private object PoolSlot {
|
||||
import PoolFlow.{ RequestContext, ResponseContext }
|
||||
|
||||
sealed trait ProcessorOut
|
||||
final case class ResponseDelivery(response: ResponseContext) extends ProcessorOut
|
||||
sealed trait SlotEvent extends ProcessorOut
|
||||
sealed trait SimpleSlotEvent extends SlotEvent
|
||||
object SlotEvent {
|
||||
final case class RequestCompleted(slotIx: Int) extends SimpleSlotEvent
|
||||
final case class Disconnected(slotIx: Int, failedRequests: Int) extends SimpleSlotEvent
|
||||
final case class RetryRequest(rc: RequestContext) extends SlotEvent
|
||||
}
|
||||
|
||||
type Ports = FanOutShape2[RequestContext, ResponseContext, SlotEvent]
|
||||
|
||||
private val slotProcessorActorName = new SeqActorName("SlotProcessor")
|
||||
|
||||
/*
|
||||
Stream Setup
|
||||
============
|
||||
|
||||
Request- +-----------+ +-------------+ +-------------+ +------------+
|
||||
Context | Slot- | List[ | flatten | Processor- | doubler | | SlotEvent- | Response-
|
||||
+--------->| Processor +------------->| (MapConcat) +------------->| (MapConcat) +---->| Split +------------->
|
||||
| | Processor- | | Out | | | | Context
|
||||
+-----------+ Out] +-------------+ +-------------+ +-----+------+
|
||||
| SlotEvent
|
||||
| (to Conductor
|
||||
| via slotEventMerge)
|
||||
v
|
||||
*/
|
||||
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
|
||||
remoteAddress: InetSocketAddress, // TODO: remove after #16168 is cleared
|
||||
settings: ConnectionPoolSettings)(implicit system: ActorSystem, fm: FlowMaterializer): Graph[Ports, Any] =
|
||||
FlowGraph.partial() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val slotProcessor = b.add {
|
||||
Flow[RequestContext] andThenMat { () ⇒
|
||||
val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)), slotProcessorActorName.next())
|
||||
(ActorProcessor[RequestContext, List[ProcessorOut]](actor), ())
|
||||
}
|
||||
}
|
||||
val flattenDouble = Flow[List[ProcessorOut]].mapConcat(_.flatMap(x ⇒ x :: x :: Nil))
|
||||
val split = b.add(new SlotEventSplit)
|
||||
|
||||
slotProcessor ~> flattenDouble ~> split.in
|
||||
new Ports(slotProcessor.inlet, split.out0, split.out1)
|
||||
}
|
||||
|
||||
import ActorSubscriberMessage._
|
||||
import ActorPublisherMessage._
|
||||
|
||||
/**
|
||||
* An actor mananging a series of materializations of the given `connectionFlow`.
|
||||
* To the outside it provides a stable flow stage, consuming `RequestContext` instances on its
|
||||
* input (ActorSubscriber) side and producing `List[ProcessorOut]` instances on its output
|
||||
* (ActorPublisher) side.
|
||||
* The given `connectionFlow` is materialized into a running flow whenever required.
|
||||
* Completion and errors from the connection are not surfaced to the outside (unless we are
|
||||
* shutting down completely).
|
||||
*/
|
||||
private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
|
||||
settings: ConnectionPoolSettings)(implicit fm: FlowMaterializer)
|
||||
extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging {
|
||||
|
||||
var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _
|
||||
var inflightRequests = immutable.Queue.empty[RequestContext]
|
||||
val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)))
|
||||
.via(connectionFlow)
|
||||
.toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self))))(Keep.both)
|
||||
|
||||
def requestStrategy = ZeroRequestStrategy
|
||||
def receive = waitingExposedPublisher
|
||||
|
||||
def waitingExposedPublisher: Receive = {
|
||||
case ExposedPublisher(publisher) ⇒
|
||||
exposedPublisher = publisher
|
||||
context.become(waitingForSubscribePending)
|
||||
case other ⇒ throw new IllegalStateException(s"The first message must be `ExposedPublisher` but was [$other]")
|
||||
}
|
||||
|
||||
def waitingForSubscribePending: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach (s ⇒ self ! ActorPublisher.Internal.Subscribe(s))
|
||||
context.become(unconnected)
|
||||
}
|
||||
|
||||
val unconnected: Receive = {
|
||||
case OnNext(rc: RequestContext) ⇒
|
||||
val (connInport, connOutport) = runnableFlow.run()
|
||||
connOutport ! Request(totalDemand)
|
||||
context.become(waitingForDemandFromConnection(connInport, connOutport, rc))
|
||||
|
||||
case Request(_) ⇒ if (remainingRequested == 0) request(1) // ask for first request if necessary
|
||||
|
||||
case Cancel ⇒ { cancel(); shutdown() }
|
||||
case OnComplete ⇒ onComplete()
|
||||
case OnError(e) ⇒ onError(e)
|
||||
}
|
||||
|
||||
def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef,
|
||||
firstRequest: RequestContext): Receive = {
|
||||
case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev
|
||||
case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev
|
||||
case OnNext(x) ⇒ throw new IllegalStateException("Unrequested RequestContext: " + x)
|
||||
|
||||
case FromConnection(Request(n)) ⇒
|
||||
inflightRequests = inflightRequests.enqueue(firstRequest)
|
||||
request(n - remainingRequested)
|
||||
connInport ! OnNext(firstRequest.request)
|
||||
context.become(running(connInport, connOutport))
|
||||
|
||||
case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError
|
||||
case FromConnection(OnComplete) ⇒ handleDisconnect(None)
|
||||
case FromConnection(OnError(e)) ⇒ handleDisconnect(Some(e))
|
||||
case FromConnection(OnNext(x)) ⇒ throw new IllegalStateException("Unexpected HttpResponse: " + x)
|
||||
}
|
||||
|
||||
def running(connInport: ActorRef, connOutport: ActorRef): Receive = {
|
||||
case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev
|
||||
case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev
|
||||
case OnNext(rc: RequestContext) ⇒
|
||||
inflightRequests = inflightRequests.enqueue(rc)
|
||||
connInport ! OnNext(rc.request)
|
||||
|
||||
case FromConnection(Request(n)) ⇒ request(n)
|
||||
case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError
|
||||
|
||||
case FromConnection(OnNext(response: HttpResponse)) ⇒
|
||||
val requestContext = inflightRequests.head
|
||||
inflightRequests = inflightRequests.tail
|
||||
val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response)))
|
||||
val requestCompleted = SlotEvent.RequestCompleted(slotIx)
|
||||
onNext(delivery :: requestCompleted :: Nil)
|
||||
|
||||
case FromConnection(OnComplete) ⇒ handleDisconnect(None)
|
||||
case FromConnection(OnError(e)) ⇒ handleDisconnect(Some(e))
|
||||
}
|
||||
|
||||
def handleDisconnect(error: Option[Throwable]): Unit = {
|
||||
log.debug("Slot {} disconnected after {}", slotIx, error getOrElse "regular connection close")
|
||||
val results: List[ProcessorOut] = inflightRequests.map { rc ⇒
|
||||
if (rc.retriesLeft == 0) {
|
||||
val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc)
|
||||
ResponseDelivery(ResponseContext(rc, Failure(reason)))
|
||||
} else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))
|
||||
}(collection.breakOut)
|
||||
inflightRequests = immutable.Queue.empty
|
||||
onNext(SlotEvent.Disconnected(slotIx, results.size) :: results)
|
||||
if (canceled) onComplete()
|
||||
|
||||
context.become(unconnected)
|
||||
}
|
||||
|
||||
override def onComplete(): Unit = {
|
||||
exposedPublisher.shutdown(None)
|
||||
super.onComplete()
|
||||
shutdown()
|
||||
}
|
||||
|
||||
override def onError(cause: Throwable): Unit = {
|
||||
exposedPublisher.shutdown(Some(cause))
|
||||
super.onError(cause)
|
||||
shutdown()
|
||||
}
|
||||
|
||||
def shutdown(): Unit = context.stop(self)
|
||||
}
|
||||
|
||||
private case class FromConnection(ev: Any)
|
||||
|
||||
private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] {
|
||||
def receive: Receive = {
|
||||
case ev: Request ⇒ slotProcessor ! FromConnection(ev)
|
||||
case Cancel ⇒ { slotProcessor ! FromConnection(Cancel); context.stop(self) }
|
||||
case OnNext(r: HttpRequest) ⇒ onNext(r)
|
||||
case OnComplete ⇒ { onComplete(); context.stop(self) }
|
||||
case OnError(e) ⇒ { onError(e); context.stop(self) }
|
||||
}
|
||||
}
|
||||
|
||||
private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber {
|
||||
def requestStrategy = ZeroRequestStrategy
|
||||
def receive: Receive = {
|
||||
case Request(n) ⇒ request(n)
|
||||
case Cancel ⇒ cancel()
|
||||
case ev: OnNext ⇒ slotProcessor ! FromConnection(ev)
|
||||
case ev @ (OnComplete | OnError(_)) ⇒ { slotProcessor ! FromConnection(ev); context.stop(self) }
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: remove when #17038 is cleared
|
||||
private class SlotEventSplit extends FlexiRoute[ProcessorOut, FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]](
|
||||
new FanOutShape2("PoolSlot.SlotEventSplit"), OperationAttributes.name("PoolSlot.SlotEventSplit")) {
|
||||
import FlexiRoute._
|
||||
|
||||
def createRouteLogic(s: FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]): RouteLogic[ProcessorOut] =
|
||||
new RouteLogic[ProcessorOut] {
|
||||
val initialState: State[_] = State(DemandFromAny(s)) {
|
||||
case (_, _, ResponseDelivery(x)) ⇒
|
||||
State(DemandFrom(s.out0)) { (ctx, _, _) ⇒
|
||||
ctx.emit(s.out0)(x)
|
||||
initialState
|
||||
}
|
||||
case (_, _, x: SlotEvent) ⇒
|
||||
State(DemandFrom(s.out1)) { (ctx, _, _) ⇒
|
||||
ctx.emit(s.out1)(x)
|
||||
initialState
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -53,8 +53,5 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve
|
|||
throw new ConfigurationException(info.formatPretty)
|
||||
},
|
||||
ParserSettings fromSubConfig c.getConfig("parsing"))
|
||||
|
||||
def apply(optionalSettings: Option[ServerSettings])(implicit actorRefFactory: ActorRefFactory): ServerSettings =
|
||||
optionalSettings getOrElse apply(actorSystem)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
"The low-level HTTP infrastructure" should {
|
||||
|
||||
"properly bind a server" in {
|
||||
val (hostname, port) = temporaryServerHostnameAndPort()
|
||||
val (_, hostname, port) = temporaryServerHostnameAndPort()
|
||||
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
|
||||
val binding = Http().bind(hostname, port).toMat(Sink(probe))(Keep.left).run()
|
||||
val sub = probe.expectSubscription() // if we get it we are bound
|
||||
|
|
@ -47,7 +47,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
"report failure if bind fails" in {
|
||||
val (hostname, port) = temporaryServerHostnameAndPort()
|
||||
val (_, hostname, port) = temporaryServerHostnameAndPort()
|
||||
val binding = Http().bind(hostname, port)
|
||||
val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
|
||||
// Bind succeeded, we have a local address
|
||||
|
|
@ -78,7 +78,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
"run with bindAndHandle" in {
|
||||
val (hostname, port) = temporaryServerHostnameAndPort()
|
||||
val (_, hostname, port) = temporaryServerHostnameAndPort()
|
||||
val binding = Http().bindAndHandle(Flow[HttpRequest].map(_ ⇒ HttpResponse()), hostname, port)
|
||||
val b1 = Await.result(binding, 3.seconds)
|
||||
|
||||
|
|
@ -186,12 +186,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
override def afterAll() = system.shutdown()
|
||||
|
||||
class TestSetup {
|
||||
val (hostname, port) = temporaryServerHostnameAndPort()
|
||||
val (_, hostname, port) = temporaryServerHostnameAndPort()
|
||||
def configOverrides = ""
|
||||
|
||||
// automatically bind a server
|
||||
val connSource = {
|
||||
val settings = configOverrides.toOption.map(ServerSettings.apply)
|
||||
val settings = configOverrides.toOption.fold(ServerSettings(system))(ServerSettings(_))
|
||||
val binding = Http().bind(hostname, port, settings = settings)
|
||||
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]
|
||||
binding.runWith(Sink(probe))
|
||||
|
|
@ -199,7 +199,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
val connSourceSub = connSource.expectSubscription()
|
||||
|
||||
def openNewClientConnection(settings: Option[ClientConnectionSettings] = None): (PublisherProbe[HttpRequest], SubscriberProbe[HttpResponse]) = {
|
||||
def openNewClientConnection(settings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
|
||||
val requestPublisherProbe = StreamTestKit.PublisherProbe[HttpRequest]()
|
||||
val responseSubscriberProbe = StreamTestKit.SubscriberProbe[HttpResponse]()
|
||||
|
||||
|
|
|
|||
|
|
@ -6,30 +6,61 @@ package akka.http
|
|||
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import scala.util.{ Failure, Success }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.{ UnhandledMessage, ActorSystem }
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.http.model._
|
||||
import akka.http.util._
|
||||
|
||||
object TestClient extends App {
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.log-dead-letters = off
|
||||
""")
|
||||
akka.io.tcp.trace-logging = on""")
|
||||
implicit val system = ActorSystem("ServerTest", testConf)
|
||||
implicit val fm = ActorFlowMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
installEventStreamLoggerFor[UnhandledMessage]
|
||||
|
||||
val host = "spray.io"
|
||||
|
||||
println(s"Fetching HTTP server version of host `$host` ...")
|
||||
fetchServerVersion1()
|
||||
|
||||
val connection = Http().outgoingConnection(host)
|
||||
val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head)
|
||||
// Console.readLine()
|
||||
// system.shutdown()
|
||||
|
||||
result.map(_.header[headers.Server]) onComplete {
|
||||
case Success(res) ⇒ println(s"$host is running ${res mkString ", "}")
|
||||
case Failure(error) ⇒ println(s"Error: $error")
|
||||
def fetchServerVersion1(): Unit = {
|
||||
println(s"Fetching HTTP server version of host `$host` via a direct low-level connection ...")
|
||||
|
||||
val connection = Http().outgoingConnection(host)
|
||||
val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head)
|
||||
result.map(_.header[headers.Server]) onComplete {
|
||||
case Success(res) ⇒
|
||||
println(s"$host is running ${res mkString ", "}")
|
||||
println()
|
||||
fetchServerVersion2()
|
||||
|
||||
case Failure(error) ⇒
|
||||
println(s"Error: $error")
|
||||
println()
|
||||
fetchServerVersion2()
|
||||
}
|
||||
}
|
||||
result onComplete { _ ⇒ system.shutdown() }
|
||||
|
||||
def fetchServerVersion2(): Unit = {
|
||||
println(s"Fetching HTTP server version of host `$host` via the high-level API ...")
|
||||
val result = Http().singleRequest(HttpRequest(uri = s"http://$host/"))
|
||||
result.map(_.header[headers.Server]) onComplete {
|
||||
case Success(res) ⇒
|
||||
println(s"$host is running ${res mkString ", "}")
|
||||
Http().shutdownAllConnectionPools().onComplete { _ ⇒ system.log.info("STOPPED"); shutdown() }
|
||||
|
||||
case Failure(error) ⇒
|
||||
println(s"Error: $error")
|
||||
shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
def shutdown(): Unit = system.shutdown()
|
||||
}
|
||||
|
|
@ -17,8 +17,8 @@ object TestUtils {
|
|||
} finally serverSocket.close()
|
||||
}
|
||||
|
||||
def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = {
|
||||
def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (InetSocketAddress, String, Int) = {
|
||||
val socketAddress = temporaryServerAddress(interface)
|
||||
socketAddress.getHostName -> socketAddress.getPort
|
||||
(socketAddress, socketAddress.getHostName, socketAddress.getPort)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,323 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.engine.client
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.http.engine.client.PoolGateway
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import akka.http.engine.server.ServerSettings
|
||||
import akka.http.util.{ SingletonException, StreamUtils }
|
||||
import akka.util.ByteString
|
||||
import akka.stream.{ BidiShape, ActorFlowMaterializer }
|
||||
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.{ Http, TestUtils }
|
||||
import akka.http.model.headers._
|
||||
import akka.http.model._
|
||||
|
||||
class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF\n akka.io.tcp.trace-logging = off") {
|
||||
implicit val materializer = ActorFlowMaterializer()
|
||||
|
||||
"The host-level client infrastructure" should {
|
||||
|
||||
"properly complete a simple request/response cycle" in new TestSetup {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]()
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/") -> 42)
|
||||
|
||||
acceptIncomingConnection()
|
||||
responseOutSub.request(1)
|
||||
val (Success(response), 42) = responseOut.expectNext()
|
||||
response.headers should contain(RawHeader("Req-Host", s"localhost:$serverPort"))
|
||||
}
|
||||
|
||||
"open a second connection if the first one is loaded" in new TestSetup {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]()
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/a") -> 42)
|
||||
requestIn.sendNext(HttpRequest(uri = "/b") -> 43)
|
||||
|
||||
responseOutSub.request(2)
|
||||
acceptIncomingConnection()
|
||||
val r1 = responseOut.expectNext()
|
||||
acceptIncomingConnection()
|
||||
val r2 = responseOut.expectNext()
|
||||
|
||||
Seq(r1, r2) foreach {
|
||||
case (Success(x), 42) ⇒ requestUri(x) should endWith("/a")
|
||||
case (Success(x), 43) ⇒ requestUri(x) should endWith("/b")
|
||||
case x ⇒ fail(x.toString)
|
||||
}
|
||||
Seq(r1, r2).map(t ⇒ connNr(t._1.get)) should contain allOf (1, 2)
|
||||
}
|
||||
|
||||
"not open a second connection if there is an idle one available" in new TestSetup {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]()
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/a") -> 42)
|
||||
acceptIncomingConnection()
|
||||
responseOutSub.request(1)
|
||||
val (Success(response1), 42) = responseOut.expectNext()
|
||||
connNr(response1) shouldEqual 1
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/b") -> 43)
|
||||
responseOutSub.request(1)
|
||||
val (Success(response2), 43) = responseOut.expectNext()
|
||||
connNr(response2) shouldEqual 1
|
||||
}
|
||||
|
||||
"be able to handle 500 pipelined requests against the test server" in new TestSetup {
|
||||
val settings = ConnectionPoolSettings(system).copy(maxConnections = 4, pipeliningLimit = 2)
|
||||
val poolFlow = Http().cachedHostConnectionPool[Int](serverHostName, serverPort, settings = settings)
|
||||
|
||||
val N = 500
|
||||
val requestIds = Source(() ⇒ Iterator.from(1)).take(N)
|
||||
val idSum = requestIds.map(id ⇒ HttpRequest(uri = s"/r$id") -> id).via(poolFlow).map {
|
||||
case (Success(response), id) ⇒
|
||||
requestUri(response) should endWith(s"/r$id")
|
||||
id
|
||||
case x ⇒ fail(x.toString)
|
||||
}.runFold(0)(_ + _)
|
||||
|
||||
acceptIncomingConnection()
|
||||
acceptIncomingConnection()
|
||||
acceptIncomingConnection()
|
||||
acceptIncomingConnection()
|
||||
|
||||
Await.result(idSum, 10.seconds) shouldEqual N * (N + 1) / 2
|
||||
}
|
||||
|
||||
"properly surface connection-level errors" in new TestSetup(autoAccept = true) {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](maxRetries = 0)
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/a") -> 42)
|
||||
requestIn.sendNext(HttpRequest(uri = "/crash") -> 43)
|
||||
responseOutSub.request(2)
|
||||
|
||||
override def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString =
|
||||
if (bytes.utf8String.contains("/crash")) sys.error("CRASH BOOM BANG") else bytes
|
||||
|
||||
val responses = Seq(responseOut.expectNext(), responseOut.expectNext())
|
||||
|
||||
responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") }
|
||||
responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include("reset by peer") }
|
||||
}
|
||||
|
||||
"retry failed requests" in new TestSetup(autoAccept = true) {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]()
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/a") -> 42)
|
||||
requestIn.sendNext(HttpRequest(uri = "/crash") -> 43)
|
||||
responseOutSub.request(2)
|
||||
|
||||
val remainingResponsesToKill = new AtomicInteger(1)
|
||||
override def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString =
|
||||
if (bytes.utf8String.contains("/crash") && remainingResponsesToKill.decrementAndGet() >= 0)
|
||||
sys.error("CRASH BOOM BANG")
|
||||
else bytes
|
||||
|
||||
val responses = Seq(responseOut.expectNext(), responseOut.expectNext())
|
||||
|
||||
responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") }
|
||||
responses mustContainLike { case (Success(x), 43) ⇒ requestUri(x) should endWith("/crash") }
|
||||
}
|
||||
|
||||
"respect the configured `maxRetries` value" in new TestSetup(autoAccept = true) {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](maxRetries = 4)
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/a") -> 42)
|
||||
requestIn.sendNext(HttpRequest(uri = "/crash") -> 43)
|
||||
responseOutSub.request(2)
|
||||
|
||||
val remainingResponsesToKill = new AtomicInteger(5)
|
||||
override def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString =
|
||||
if (bytes.utf8String.contains("/crash") && remainingResponsesToKill.decrementAndGet() >= 0)
|
||||
sys.error("CRASH BOOM BANG")
|
||||
else bytes
|
||||
|
||||
val responses = Seq(responseOut.expectNext(), responseOut.expectNext())
|
||||
|
||||
responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") }
|
||||
responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include("reset by peer") }
|
||||
remainingResponsesToKill.get() shouldEqual 0
|
||||
}
|
||||
|
||||
"automatically shutdown after configured timeout periods" in new TestSetup() {
|
||||
val (_, _, _, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second)
|
||||
val gateway = Await.result(hcp.gatewayFuture, 500.millis)
|
||||
val PoolGateway.Running(_, shutdownStartedPromise, shutdownCompletedPromise) = gateway.currentState
|
||||
shutdownStartedPromise.isCompleted shouldEqual false
|
||||
shutdownCompletedPromise.isCompleted shouldEqual false
|
||||
Await.result(shutdownStartedPromise.future, 1500.millis) shouldEqual () // verify shutdown start (after idle)
|
||||
Await.result(shutdownCompletedPromise.future, 1500.millis) shouldEqual () // verify shutdown completed
|
||||
}
|
||||
|
||||
"transparently restart after idle shutdown" in new TestSetup() {
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second)
|
||||
|
||||
val gateway = Await.result(hcp.gatewayFuture, 500.millis)
|
||||
val PoolGateway.Running(_, _, shutdownCompletedPromise) = gateway.currentState
|
||||
Await.result(shutdownCompletedPromise.future, 1500.millis) shouldEqual () // verify shutdown completed
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = "/") -> 42)
|
||||
|
||||
acceptIncomingConnection()
|
||||
responseOutSub.request(1)
|
||||
val (Success(_), 42) = responseOut.expectNext()
|
||||
}
|
||||
}
|
||||
|
||||
"The single-request client infrastructure" should {
|
||||
class LocalTestSetup extends TestSetup(ServerSettings(system).copy(rawRequestUriHeader = true), autoAccept = true)
|
||||
|
||||
"properly complete a simple request/response cycle with a host header request" in new LocalTestSetup {
|
||||
val request = HttpRequest(uri = "/abc", headers = List(Host(serverHostName, serverPort)))
|
||||
val responseFuture = Http().singleRequest(request)
|
||||
val responseHeaders = Await.result(responseFuture, 1.second).headers
|
||||
responseHeaders should contain(RawHeader("Req-Uri", s"http://localhost:$serverPort/abc"))
|
||||
responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/abc"))
|
||||
}
|
||||
|
||||
"transform absolute request URIs into relative URIs plus host header" in new LocalTestSetup {
|
||||
val request = HttpRequest(uri = s"http://$serverHostName:$serverPort/abc?query#fragment")
|
||||
val responseFuture = Http().singleRequest(request)
|
||||
val responseHeaders = Await.result(responseFuture, 1.second).headers
|
||||
responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/abc?query"))
|
||||
responseHeaders should contain(RawHeader("Req-Host", s"localhost:$serverPort"))
|
||||
}
|
||||
|
||||
"support absolute request URIs without path component" in new LocalTestSetup {
|
||||
val request = HttpRequest(uri = s"http://$serverHostName:$serverPort")
|
||||
val responseFuture = Http().singleRequest(request)
|
||||
val responseHeaders = Await.result(responseFuture, 1.second).headers
|
||||
responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/"))
|
||||
}
|
||||
|
||||
"support absolute request URIs with a double slash path component" in new LocalTestSetup {
|
||||
val request = HttpRequest(uri = s"http://$serverHostName:$serverPort//foo")
|
||||
val responseFuture = Http().singleRequest(request)
|
||||
val responseHeaders = Await.result(responseFuture, 1.second).headers
|
||||
responseHeaders should contain(RawHeader("Req-Uri", s"http://localhost:$serverPort//foo"))
|
||||
responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "//foo"))
|
||||
}
|
||||
|
||||
"produce an error if the request does not contain a Host-header or an absolute URI" in {
|
||||
val request = HttpRequest(uri = "/foo")
|
||||
val responseFuture = Http().singleRequest(request)
|
||||
val thrown = the[IllegalUriException] thrownBy Await.result(responseFuture, 1.second)
|
||||
thrown should have message "Cannot establish effective URI of request to `/foo`, request has a relative URI and is missing a `Host` header"
|
||||
}
|
||||
}
|
||||
|
||||
"The superPool client infrastructure" should {
|
||||
|
||||
"route incoming requests to the right cached host connection pool" in new TestSetup(autoAccept = true) {
|
||||
val (serverEndpoint2, serverHostName2, serverPort2) = TestUtils.temporaryServerHostnameAndPort()
|
||||
Http().bindAndHandleSync(testServerHandler(0), serverHostName2, serverPort2)
|
||||
|
||||
val (requestIn, responseOut, responseOutSub, hcp) = superPool[Int]()
|
||||
|
||||
requestIn.sendNext(HttpRequest(uri = s"http://$serverHostName:$serverPort/a") -> 42)
|
||||
requestIn.sendNext(HttpRequest(uri = s"http://$serverHostName2:$serverPort2/b") -> 43)
|
||||
|
||||
responseOutSub.request(2)
|
||||
Seq(responseOut.expectNext(), responseOut.expectNext()) foreach {
|
||||
case (Success(x), 42) ⇒ requestUri(x) shouldEqual s"http://$serverHostName:$serverPort/a"
|
||||
case (Success(x), 43) ⇒ requestUri(x) shouldEqual s"http://$serverHostName2:$serverPort2/b"
|
||||
case x ⇒ fail(x.toString)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestSetup(serverSettings: ServerSettings = ServerSettings(system),
|
||||
autoAccept: Boolean = false) {
|
||||
val (serverEndpoint, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
|
||||
|
||||
def testServerHandler(connNr: Int): HttpRequest ⇒ HttpResponse = {
|
||||
case HttpRequest(_, uri, headers, entity, _) ⇒
|
||||
val responseHeaders =
|
||||
ConnNrHeader(connNr) +:
|
||||
RawHeader("Req-Uri", uri.toString) +: headers.map(h ⇒ RawHeader("Req-" + h.name, h.value))
|
||||
HttpResponse(headers = responseHeaders, entity = entity)
|
||||
}
|
||||
|
||||
def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = bytes
|
||||
|
||||
val incomingConnectionCounter = new AtomicInteger
|
||||
val incomingConnections = StreamTestKit.SubscriberProbe[Http.IncomingConnection]
|
||||
val incomingConnectionsSub = {
|
||||
val rawBytesInjection = BidiFlow() { b ⇒
|
||||
val top = b.add(Flow[ByteString].map(mapServerSideOutboundRawBytes)
|
||||
.transform(StreamUtils.recover { case NoErrorComplete ⇒ ByteString.empty }))
|
||||
val bottom = b.add(Flow[ByteString])
|
||||
BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet)
|
||||
}
|
||||
val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink(incomingConnections)
|
||||
StreamTcp().bind(serverEndpoint, idleTimeout = serverSettings.timeouts.idleTimeout)
|
||||
.map { c ⇒
|
||||
val layer = Http().serverLayer(serverSettings, log)
|
||||
Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow)
|
||||
}.runWith(sink)
|
||||
if (autoAccept) null else incomingConnections.expectSubscription()
|
||||
}
|
||||
|
||||
def acceptIncomingConnection(): Unit = {
|
||||
incomingConnectionsSub.request(1)
|
||||
val conn = incomingConnections.expectNext()
|
||||
handleConnection(conn)
|
||||
}
|
||||
|
||||
private def handleConnection(c: Http.IncomingConnection) =
|
||||
c.handleWithSyncHandler(testServerHandler(incomingConnectionCounter.incrementAndGet()))
|
||||
|
||||
def cachedHostConnectionPool[T](maxConnections: Int = 2,
|
||||
maxRetries: Int = 2,
|
||||
maxOpenRequests: Int = 8,
|
||||
pipeliningLimit: Int = 1,
|
||||
idleTimeout: Duration = 5.seconds,
|
||||
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
|
||||
val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
|
||||
idleTimeout, ClientConnectionSettings(system))
|
||||
flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, Nil, settings))
|
||||
}
|
||||
|
||||
def superPool[T](maxConnections: Int = 2,
|
||||
maxRetries: Int = 2,
|
||||
maxOpenRequests: Int = 8,
|
||||
pipeliningLimit: Int = 1,
|
||||
idleTimeout: Duration = 5.seconds,
|
||||
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
|
||||
val settings = ConnectionPoolSettings(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
|
||||
idleTimeout, ClientConnectionSettings(system))
|
||||
flowTestBench(Http().superPool[T](Nil, settings))
|
||||
}
|
||||
|
||||
def flowTestBench[T, Mat](poolFlow: Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]) = {
|
||||
val requestIn = StreamTestKit.PublisherProbe[(HttpRequest, T)]
|
||||
val responseOut = StreamTestKit.SubscriberProbe[(Try[HttpResponse], T)]
|
||||
val hcp = Source(requestIn).viaMat(poolFlow)(Keep.right).toMat(Sink(responseOut))(Keep.left).run()
|
||||
val responseOutSub = responseOut.expectSubscription()
|
||||
(new StreamTestKit.AutoPublisher(requestIn), responseOut, responseOutSub, hcp)
|
||||
}
|
||||
|
||||
def connNr(r: HttpResponse): Int = r.headers.find(_ is "conn-nr").get.value.toInt
|
||||
def requestUri(r: HttpResponse): String = r.headers.find(_ is "req-uri").get.value
|
||||
}
|
||||
|
||||
case class ConnNrHeader(nr: Int) extends CustomHeader {
|
||||
def name = "Conn-Nr"
|
||||
def value = nr.toString
|
||||
}
|
||||
|
||||
implicit class MustContain[T](specimen: Seq[T]) {
|
||||
def mustContainLike(pf: PartialFunction[T, Unit]): Unit =
|
||||
specimen.collectFirst(pf) getOrElse fail("did not contain")
|
||||
}
|
||||
|
||||
object NoErrorComplete extends SingletonException
|
||||
}
|
||||
|
|
@ -19,7 +19,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka
|
|||
"The connection-level client implementation" should {
|
||||
|
||||
"be able to handle 100 pipelined requests across one connection" in {
|
||||
val (serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
|
||||
val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
|
||||
|
||||
Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse),
|
||||
serverHostName, serverPort)
|
||||
|
|
@ -37,7 +37,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka
|
|||
}
|
||||
|
||||
"be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in {
|
||||
val (serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
|
||||
val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
|
||||
|
||||
Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse),
|
||||
serverHostName, serverPort)
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ abstract class FlowMaterializer {
|
|||
* can be used by parts of the flow to submit processing jobs for execution,
|
||||
* run Future callbacks, etc.
|
||||
*/
|
||||
def executionContext: ExecutionContextExecutor
|
||||
implicit def executionContext: ExecutionContextExecutor
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue