From 81579bd40325a1e5e8145dce433845f902358bc2 Mon Sep 17 00:00:00 2001 From: Samuel Tardieu Date: Fri, 8 Apr 2016 15:26:42 +0200 Subject: [PATCH] =htc #20080,#20081,#20082 Simplify pool gateway synch * Rewrite the pool gateway synchronization Rewrite the pool gateway synchronization so that: - The documented race condition in PoolInterfaceActor is gone. No PoolInterfaceActor will receive new requests after the gateway shutdown has been initiated (fix #20081). - A gateway created using newHostConnectionPool will no longer share its pool with others even when it has been shutdown due to idle-timeout and recreated. Also, its original materializer will be used to create all the successive pools incarnations (fix #20080). - Collapsing chains of gateways do no longer need to be created. The gateways are now only an entrypoint to the pool master actor, and this actor is in charge of keeping a cache of currently active pools and recreate them from the information given by the gateway when needed. * Add copyright header * Mark PoolMasterActor as INTERNAL API * Larger outer timeout * Define Props in PoolMasterActor object * Comment INTERNAL API * Remove unused import --- .../http/impl/engine/client/PoolGateway.scala | 134 ++++++++------- .../engine/client/PoolInterfaceActor.scala | 34 ++-- .../impl/engine/client/PoolMasterActor.scala | 159 ++++++++++++++++++ .../main/scala/akka/http/scaladsl/Http.scala | 100 +++++------ .../engine/client/ConnectionPoolSpec.scala | 42 ++--- .../scala/akka/http/scaladsl/ClientSpec.scala | 4 +- project/MiMa.scala | 22 ++- 7 files changed, 324 insertions(+), 171 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index 91490e9aba..a3573c9c1d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -4,91 +4,89 @@ package akka.http.impl.engine.client -import java.util.concurrent.atomic.AtomicReference -import akka.Done +import java.util.concurrent.atomic.AtomicLong -import scala.annotation.tailrec -import scala.concurrent.{ Future, Promise } +import akka.Done +import akka.actor.ActorRef +import akka.http.impl.engine.client.PoolGateway.{ GatewayIdentifier, SharedGateway } +import akka.http.impl.engine.client.PoolMasterActor._ import akka.http.impl.settings.HostConnectionPoolSetup -import akka.actor.{ Deploy, Props, ActorSystem, ActorRef } -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } import akka.stream.Materializer -private object PoolGateway { - - sealed trait State - final case class Running(interfaceActorRef: ActorRef, - shutdownStartedPromise: Promise[Done], - shutdownCompletedPromise: Promise[Done]) extends State - final case class IsShutdown(shutdownCompleted: Future[Done]) extends State - final case class NewIncarnation(gatewayFuture: Future[PoolGateway]) extends State -} +import scala.concurrent.{ Future, Promise } /** - * Manages access to a host connection pool or rather: a sequence of pool incarnations. + * Manages access to a host connection pool through the [[PoolMasterActor]] * - * A host connection pool for a given [[HostConnectionPoolSetup]] is a running stream, whose outside interface is - * provided by its [[PoolInterfaceActor]] actor. The actor accepts [[PoolInterfaceActor.PoolRequest]] messages - * and completes their `responsePromise` whenever the respective response has been received (or an error occurred). - * - * A [[PoolGateway]] provides a layer of indirection between the pool cache and the actual - * pools that is required to allow a pool incarnation to fully terminate (e.g. after an idle-timeout) - * and be transparently replaced by a new incarnation if required. - * Removal of cache entries for terminated pools is also supported, because old gateway references that - * get reused will automatically forward requests directed at them to the latest pool incarnation from the cache. + * A [[PoolGateway]] is represented by its [[HostConnectionPoolSetup]] and its [[GatewayIdentifier]]. If the later + * is [[SharedGateway]], it means that a shared pool must be used for this particular [[HostConnectionPoolSetup]]. */ -private[http] class PoolGateway(hcps: HostConnectionPoolSetup, - _shutdownStartedPromise: Promise[Done])( // constructor arg only - implicit system: ActorSystem, fm: Materializer) { - import PoolGateway._ - import fm.executionContext +private[http] final class PoolGateway(gatewayRef: ActorRef, val hcps: HostConnectionPoolSetup, val gatewayId: GatewayIdentifier)(implicit fm: Materializer) { - private val state = { - val shutdownCompletedPromise = Promise[Done]() - val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local) - val ref = system.actorOf(props, PoolInterfaceActor.name.next()) - new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise)) + /** + * Send a request through the corresponding pool. If the pool is not running, it will be started + * automatically. If it is shutting down, it will restart as soon as the shutdown operation is + * complete and serve this request. + * + * @param request the request + * @return the response + */ + def apply(request: HttpRequest): Future[HttpResponse] = { + val responsePromise = Promise[HttpResponse]() + gatewayRef ! SendRequest(this, request, responsePromise, fm) + responsePromise.future } - def currentState: Any = state.get() // enables test access + /** + * Start the corresponding pool to make it ready to serve requests. If the pool is already started, + * this does nothing. If it is being shutdown, it will restart as soon as the shutdown operation + * is complete. + * + * @return the gateway itself + */ + def startPool(): PoolGateway = { + gatewayRef ! StartPool(this, fm) + this + } - def apply(request: HttpRequest, previousIncarnation: PoolGateway = null): Future[HttpResponse] = - state.get match { - case Running(ref, _, _) ⇒ - val responsePromise = Promise[HttpResponse]() - ref ! PoolInterfaceActor.PoolRequest(request, responsePromise) - responsePromise.future + /** + * Shutdown the corresponding pool and signal its termination. If the pool is not running or is + * being shutting down, this does nothing, + * + * @return a Future completed when the pool has been shutdown. + */ + def shutdown(): Future[Done] = { + val shutdownCompletedPromise = Promise[Done]() + gatewayRef ! Shutdown(this, shutdownCompletedPromise) + shutdownCompletedPromise.future + } - case IsShutdown(shutdownCompleted) ⇒ - // delay starting the next pool incarnation until the current pool has completed its shutdown - shutdownCompleted.flatMap { _ ⇒ - val newGatewayFuture = Http().cachedGateway(hcps) - // a simple set is fine here as `newGatewayFuture` will be identical for all threads getting here - state.set(NewIncarnation(newGatewayFuture)) - apply(request) - } + override def toString = s"PoolGateway(hcps = $hcps)" - case x @ NewIncarnation(newGatewayFuture) ⇒ - if (previousIncarnation != null) - previousIncarnation.state.set(x) // collapse incarnation chain - newGatewayFuture.flatMap(_(request, this)) + // INTERNAL API (testing only) + private[client] def poolStatus(): Future[Option[PoolInterfaceStatus]] = { + val statusPromise = Promise[Option[PoolInterfaceStatus]]() + gatewayRef ! PoolStatus(this, statusPromise) + statusPromise.future + } + + override def equals(that: Any): Boolean = + that match { + case p: PoolGateway ⇒ p.hcps == hcps && p.gatewayId == gatewayId + case _ ⇒ false } - // triggers a shutdown of the current pool, even if it is already a later incarnation - @tailrec final def shutdown(): Future[Done] = - state.get match { - case x @ Running(ref, shutdownStartedPromise, shutdownCompletedPromise) ⇒ - if (state.compareAndSet(x, IsShutdown(shutdownCompletedPromise.future))) { - shutdownStartedPromise.success(Done) // trigger cache removal - ref ! PoolInterfaceActor.Shutdown - shutdownCompletedPromise.future - } else shutdown() // CAS loop (not a spinlock) + override def hashCode(): Int = hcps.hashCode() ^ gatewayId.hashCode() +} - case IsShutdown(x) ⇒ x +private[http] object PoolGateway { - case NewIncarnation(newGatewayFuture) ⇒ newGatewayFuture.flatMap(_.shutdownAux()) - } + sealed trait GatewayIdentifier + case object SharedGateway extends GatewayIdentifier + final case class UniqueGateway(id: Long) extends GatewayIdentifier + + private[this] val uniqueGatewayId = new AtomicLong(0) + def newUniqueGatewayIdentifier = UniqueGateway(uniqueGatewayId.incrementAndGet()) - private def shutdownAux() = shutdown() // alias required for @tailrec } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index c2851a55b3..104a10ea0f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -6,23 +6,20 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress -import akka.Done -import akka.stream.BufferOverflowException +import akka.actor._ +import akka.http.impl.engine.client.PoolFlow._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.{ Http, HttpsConnectionContext } +import akka.stream.actor.ActorPublisherMessage._ +import akka.stream.actor.ActorSubscriberMessage._ +import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } +import akka.stream.impl.{ Buffer, SeqActorName } +import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } +import akka.stream.{ BufferOverflowException, Materializer } import scala.annotation.tailrec import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration -import akka.actor._ -import akka.stream.Materializer -import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } -import akka.stream.actor.ActorPublisherMessage._ -import akka.stream.actor.ActorSubscriberMessage._ -import akka.stream.impl.{ SeqActorName, Buffer } -import akka.stream.scaladsl.{ Keep, Flow, Sink, Source } -import akka.http.impl.settings.HostConnectionPoolSetup -import akka.http.scaladsl.model._ -import akka.http.scaladsl.{ HttpsConnectionContext, Http } -import PoolFlow._ private object PoolInterfaceActor { final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse]) extends NoSerializationVerificationNeeded @@ -46,12 +43,11 @@ private object PoolInterfaceActor { * To the inside (i.e. the running connection pool flow) the gateway actor acts as request source * (ActorPublisher) and response sink (ActorSubscriber). */ -private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, - shutdownCompletedPromise: Promise[Done], - gateway: PoolGateway)(implicit fm: Materializer) +private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer) extends ActorSubscriber with ActorPublisher[RequestContext] with ActorLogging { import PoolInterfaceActor._ + private[this] val hcps = gateway.hcps private[this] val inputBuffer = Buffer[PoolRequest](hcps.setup.settings.maxOpenRequests, fm) private[this] var activeIdleTimeout: Option[Cancellable] = None @@ -100,12 +96,10 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, case OnComplete ⇒ // the pool shut down log.debug("Host connection pool to {}:{} has completed orderly shutdown", hcps.host, hcps.port) - shutdownCompletedPromise.success(Done) self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway case OnError(e) ⇒ // the pool shut down log.debug("Host connection pool to {}:{} has shut down with error {}", hcps.host, hcps.port, e) - shutdownCompletedPromise.failure(e) self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway /////////////// FROM CLIENT ////////////// @@ -127,10 +121,6 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, case PoolRequest(request, responsePromise) ⇒ // we have already started shutting down, i.e. this pool is not usable anymore // so we forward the request back to the gateway - // Note that this forwarding will stop when we receive completion from the pool flow - // (because we stop ourselves then), so there is a very small chance of a request ending - // up as a dead letter if the sending thread gets interrupted for a long time right before - // the `ref ! PoolRequest(...)` in the PoolGateway responsePromise.completeWith(gateway(request)) case Shutdown ⇒ // signal coming in from gateway diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala new file mode 100644 index 0000000000..e9de06d473 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.http.impl.engine.client + +import akka.Done +import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, Deploy, NoSerializationVerificationNeeded, Props, Terminated } +import akka.http.impl.engine.client.PoolInterfaceActor.PoolRequest +import akka.http.impl.settings.HostConnectionPoolSetup +import akka.http.scaladsl.HttpExt +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.stream.Materializer + +import scala.concurrent.{ Future, Promise } + +/** + * INTERNAL API + * + * Manages access to a host connection pool or rather: a sequence of pool incarnations. + * + * A host connection pool for a given [[HostConnectionPoolSetup]] is a running stream, whose outside interface is + * provided by its [[PoolInterfaceActor]] actor. The actor accepts [[PoolInterfaceActor.PoolRequest]] messages + * and completes their `responsePromise` whenever the respective response has been received (or an error occurred). + * + * The [[PoolMasterActor]] provides a layer of indirection between a [[PoolGateway]], which represents a pool, + * and the [[PoolInterfaceActor]] instances which are created on-demand and stopped after an idle-timeout. + * + * Several [[PoolGateway]] objects may be mapped to the same pool if they have the same [[HostConnectionPoolSetup]] + * and are marked as being shared. This is the case for example for gateways obtained through + * [[HttpExt.cachedHostConnectionPool]]. Some other gateways are not shared, such as those obtained through + * [[HttpExt.newHostConnectionPool]], and will have their dedicated restartable pool. + * + */ +private[http] final class PoolMasterActor extends Actor with ActorLogging { + + import PoolMasterActor._ + + private[this] var poolStatus = Map[PoolGateway, PoolInterfaceStatus]() + private[this] var poolInterfaces = Map[ActorRef, PoolGateway]() + + /** + * Start a new pool interface actor, register it in our maps, and watch its death. No actor should + * currently exist for this pool. + * + * @param gateway the pool gateway this pool corresponds to + * @param fm the materializer to use for this pool + * @return the newly created actor ref + */ + private[this] def startPoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer): ActorRef = { + if (poolStatus.contains(gateway)) { + throw new IllegalStateException(s"pool interface actor for $gateway already exists") + } + val props = Props(new PoolInterfaceActor(gateway)).withDeploy(Deploy.local) + val ref = context.actorOf(props, PoolInterfaceActor.name.next()) + poolStatus += gateway -> PoolInterfaceRunning(ref) + poolInterfaces += ref -> gateway + context.watch(ref) + } + + def receive = { + + // Start or restart a pool without sending it a request. This is used to ensure that + // freshly created pools will be ready to serve requests immediately. + case s @ StartPool(gateway, materializer) ⇒ + poolStatus.get(gateway) match { + case Some(PoolInterfaceRunning(_)) ⇒ + case Some(PoolInterfaceShuttingDown(shutdownCompletedPromise)) ⇒ + // Pool is being shutdown. When this is done, start the pool again. + shutdownCompletedPromise.future.onComplete(_ ⇒ self ! s)(context.dispatcher) + case None ⇒ + startPoolInterfaceActor(gateway)(materializer) + } + + // Send a request to a pool. If needed, the pool will be started or restarted. + case s @ SendRequest(gateway, request, responsePromise, materializer) ⇒ + poolStatus.get(gateway) match { + case Some(PoolInterfaceRunning(ref)) ⇒ + ref ! PoolRequest(request, responsePromise) + case Some(PoolInterfaceShuttingDown(shutdownCompletedPromise)) ⇒ + // The request will be resent when the pool shutdown is complete (the first + // request will recreate the pool). + shutdownCompletedPromise.future.foreach(_ ⇒ self ! s)(context.dispatcher) + case None ⇒ + startPoolInterfaceActor(gateway)(materializer) ! PoolRequest(request, responsePromise) + } + + // Shutdown a pool and signal its termination. + case Shutdown(gateway, shutdownCompletedPromise) ⇒ + poolStatus.get(gateway).foreach { + case PoolInterfaceRunning(ref) ⇒ + // Ask the pool to shutdown itself. Queued connections will be resent here + // to this actor by the pool actor, they will be retried once the shutdown + // has completed. + ref ! PoolInterfaceActor.Shutdown + poolStatus += gateway -> PoolInterfaceShuttingDown(shutdownCompletedPromise) + case PoolInterfaceShuttingDown(formerPromise) ⇒ + // Pool is already shutting down, mirror the existing promise. + shutdownCompletedPromise.tryCompleteWith(formerPromise.future) + case _ ⇒ + // Pool does not exist, shutdown is not needed. + shutdownCompletedPromise.trySuccess(Done) + } + + // Shutdown all known pools and signal their termination. + case ShutdownAll(shutdownCompletedPromise) ⇒ + import context.dispatcher + def track(remaining: Iterator[Future[Done]]): Unit = + if (remaining.hasNext) remaining.next().onComplete(_ ⇒ track(remaining)) + else shutdownCompletedPromise.trySuccess(Done) + track(poolStatus.keys.map(_.shutdown()).toIterator) + + // When a pool actor terminate, signal its termination and remove it from our maps. + case Terminated(ref) ⇒ + poolInterfaces.get(ref).foreach { gateway ⇒ + poolStatus.get(gateway) match { + case Some(PoolInterfaceRunning(_)) ⇒ + log.error("connection pool for {} has shut down unexpectedly", gateway) + case Some(PoolInterfaceShuttingDown(shutdownCompletedPromise)) ⇒ + shutdownCompletedPromise.trySuccess(Done) + case None ⇒ + // This will never happen as poolInterfaces and poolStatus are modified + // together. If there is no status then there is no gateway to start with. + } + poolStatus -= gateway + poolInterfaces -= ref + } + + // Testing only. + case PoolStatus(gateway, statusPromise) ⇒ + statusPromise.success(poolStatus.get(gateway)) + + // Testing only. + case PoolSize(sizePromise) ⇒ + sizePromise.success(poolStatus.size) + + } + +} + +private[http] object PoolMasterActor { + + val props = Props[PoolMasterActor].withDeploy(Deploy.local) + + sealed trait PoolInterfaceStatus + final case class PoolInterfaceRunning(ref: ActorRef) extends PoolInterfaceStatus + final case class PoolInterfaceShuttingDown(shutdownCompletedPromise: Promise[Done]) extends PoolInterfaceStatus + + final case class StartPool(gateway: PoolGateway, materializer: Materializer) extends NoSerializationVerificationNeeded + final case class SendRequest(gateway: PoolGateway, request: HttpRequest, responsePromise: Promise[HttpResponse], materializer: Materializer) + extends NoSerializationVerificationNeeded + final case class Shutdown(gateway: PoolGateway, shutdownCompletedPromise: Promise[Done]) extends NoSerializationVerificationNeeded with DeadLetterSuppression + final case class ShutdownAll(shutdownCompletedPromise: Promise[Done]) extends NoSerializationVerificationNeeded with DeadLetterSuppression + + // INTERNAL API (for testing only) + final case class PoolStatus(gateway: PoolGateway, statusPromise: Promise[Option[PoolInterfaceStatus]]) extends NoSerializationVerificationNeeded + final case class PoolSize(sizePromise: Promise[Int]) extends NoSerializationVerificationNeeded + +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 102cd07612..a13e6b17cc 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -5,12 +5,13 @@ package akka.http.scaladsl import java.net.InetSocketAddress -import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } +import java.util.concurrent.CompletionStage import javax.net.ssl._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } import akka.http.impl.engine.HttpConnectionTimeoutException +import akka.http.impl.engine.client.PoolMasterActor.{ PoolSize, ShutdownAll } import akka.http.impl.engine.client._ import akka.http.impl.engine.server._ import akka.http.impl.engine.ws.WebSocketClientBlueprint @@ -19,7 +20,7 @@ import akka.http.impl.util.{ MapError, StreamUtils } import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Host import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgradeResponse } -import akka.http.scaladsl.settings.{ ServerSettings, ClientConnectionSettings, ConnectionPoolSettings } +import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import akka.http.scaladsl.util.FastFuture import akka.{ Done, NotUsed } import akka.stream._ @@ -190,6 +191,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte // ** CLIENT ** // + private[this] val poolMasterActorRef = system.actorOf(PoolMasterActor.props, "pool-master") + private[this] val systemMaterializer = ActorMaterializer() + /** * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. @@ -321,8 +325,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ private[akka] def newHostConnectionPool[T](setup: HostConnectionPoolSetup)( implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val gatewayFuture = FastFuture.successful(new PoolGateway(setup, Promise())) - gatewayClientFlow(setup, gatewayFuture) + val gateway = new PoolGateway(poolMasterActorRef, setup, PoolGateway.newUniqueGatewayIdentifier) + gatewayClientFlow(setup, gateway.startPool()) } /** @@ -389,8 +393,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * object of type `T` from the application which is emitted together with the corresponding response. */ private def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)( - implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - gatewayClientFlow(setup, cachedGateway(setup)) + implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + gatewayClientFlow(setup, sharedGateway(setup).startPool()) + } /** * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool @@ -412,7 +417,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte def superPool[T](connectionContext: HttpsConnectionContext = defaultClientHttpsContext, settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = - clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, settings, connectionContext, log) } + clientFlow[T](settings) { request ⇒ request -> sharedGateway(request, settings, connectionContext, log) } /** * Fires a single [[akka.http.scaladsl.model.HttpRequest]] across the (cached) host connection pool for the request's @@ -428,8 +433,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[HttpResponse] = try { - val gatewayFuture = cachedGateway(request, settings, connectionContext, log) - gatewayFuture.flatMap(_(request))(fm.executionContext) + val gateway = sharedGateway(request, settings, connectionContext, log) + gateway(request) } catch { case e: IllegalUriException ⇒ FastFuture.failed(e) } @@ -495,12 +500,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * method call the respective connection pools will be restarted and not contribute to the returned future. */ def shutdownAllConnectionPools(): Future[Unit] = { - import system.dispatcher - - import scala.collection.JavaConverters._ - val gateways = hostPoolCache.values().asScala - system.log.debug("Initiating orderly shutdown of all active host connections pools...") - Future.sequence(gateways.map(_.flatMap(_.shutdown()))).map(_ ⇒ ()) + val shutdownCompletedPromise = Promise[Done]() + poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise) + shutdownCompletedPromise.future.map(_ ⇒ ())(system.dispatcher) } /** @@ -545,64 +547,37 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte _defaultClientHttpsConnectionContext = context } - // every ActorSystem maintains its own connection pools - private[http] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] - - private def cachedGateway(request: HttpRequest, - settings: ConnectionPoolSettings, connectionContext: ConnectionContext, - log: LoggingAdapter)(implicit fm: Materializer): Future[PoolGateway] = + private def sharedGateway(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: ConnectionContext, log: LoggingAdapter): PoolGateway = { if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) { val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) connectionContext else ConnectionContext.noEncryption() val setup = ConnectionPoolSetup(settings, httpsCtx, log) val host = request.uri.authority.host.toString() val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup) - cachedGateway(hcps) + sharedGateway(hcps) } else { val msg = s"Cannot determine request scheme and target endpoint as ${request.method} request to ${request.uri} doesn't have an absolute URI" throw new IllegalUriException(ErrorInfo(msg)) } - - /** INTERNAL API */ - private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: Materializer): Future[PoolGateway] = { - val gatewayPromise = Promise[PoolGateway]() - hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match { - case null ⇒ // only one thread can get here at a time - val whenShuttingDown = Promise[Done]() - val gateway = - try new PoolGateway(setup, whenShuttingDown) - catch { - case NonFatal(e) ⇒ - hostPoolCache.remove(setup) - gatewayPromise.failure(e) - throw e - } - val fastFuture = FastFuture.successful(gateway) - hostPoolCache.put(setup, fastFuture) // optimize subsequent gateway accesses - gatewayPromise.success(gateway) // satisfy everyone who got a hold of our promise while we were starting up - whenShuttingDown.future.onComplete(_ ⇒ hostPoolCache.remove(setup, fastFuture))(fm.executionContext) - fastFuture - - case future ⇒ future // return cached instance - } } - private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup, - gatewayFuture: Future[PoolGateway])( - implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - clientFlow[T](hcps.setup.settings)(_ -> gatewayFuture) - .mapMaterializedValue(_ ⇒ HostConnectionPool(hcps)(gatewayFuture)) + private def sharedGateway(hcps: HostConnectionPoolSetup): PoolGateway = + new PoolGateway(poolMasterActorRef, hcps, PoolGateway.SharedGateway)(systemMaterializer) - private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, Future[PoolGateway]))( + private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup, + gateway: PoolGateway)( + implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + clientFlow[T](hcps.setup.settings)(_ -> gateway) + .mapMaterializedValue(_ ⇒ HostConnectionPool(hcps)(gateway)) + + private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, PoolGateway))( implicit system: ActorSystem, fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = { // a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point val parallelism = settings.pipeliningLimit * settings.maxConnections Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) { case (request, userContext) ⇒ - val (effectiveRequest, gatewayFuture) = f(request) + val (effectiveRequest, gateway) = f(request) val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12 - gatewayFuture - .flatMap(_(effectiveRequest))(fm.executionContext) - .onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext) + gateway(effectiveRequest).onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext) result.future } } @@ -613,6 +588,17 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte case hctx: HttpsConnectionContext ⇒ TLS(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) case other ⇒ TLSPlacebo() // if it's not HTTPS, we don't enable SSL/TLS } + + /** + * INTERNAL API + * + * For testing only + */ + private[scaladsl] def poolSize: Future[Int] = { + val sizePromise = Promise[Int]() + poolMasterActorRef ! PoolSize(sizePromise) + sizePromise.future + } } object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { @@ -717,14 +703,14 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * Represents a connection pool to a specific target host and pool configuration. */ final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)( - private[http] val gatewayFuture: Future[PoolGateway]) { // enable test access + private[http] val gateway: PoolGateway) { // enable test access /** * Asynchronously triggers the shutdown of the host connection pool. * * The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed. */ - def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gatewayFuture.flatMap(_.shutdown()) + def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown() private[http] def toJava = new akka.http.javadsl.HostConnectionPool { override def setup = HostConnectionPool.this.setup diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 315e87757f..ff8addc3df 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -6,24 +6,27 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress import java.nio.ByteBuffer -import java.nio.channels.{ SocketChannel, ServerSocketChannel } +import java.nio.channels.{ ServerSocketChannel, SocketChannel } import java.util.concurrent.atomic.AtomicInteger + +import akka.http.impl.engine.client.PoolMasterActor.PoolInterfaceRunning import akka.http.impl.settings.ConnectionPoolSettingsImpl +import akka.http.impl.util.{ SingletonException, StreamUtils } +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } +import akka.http.scaladsl.{ Http, TestUtils } +import akka.stream.ActorMaterializer +import akka.stream.TLSProtocol._ +import akka.stream.scaladsl._ +import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.testkit.AkkaSpec +import akka.util.ByteString + import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.util.ByteString -import akka.http.scaladsl.{ TestUtils, Http } -import akka.http.impl.util.{ SingletonException, StreamUtils } -import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } -import akka.stream.{ ActorMaterializer } -import akka.stream.TLSProtocol._ -import akka.stream.testkit.{ TestPublisher, TestSubscriber } -import akka.stream.scaladsl._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.model._ -import akka.testkit.AkkaSpec class ConnectionPoolSpec extends AkkaSpec(""" akka.loggers = [] @@ -207,20 +210,17 @@ class ConnectionPoolSpec extends AkkaSpec(""" "automatically shutdown after configured timeout periods" in new TestSetup() { val (_, _, _, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second) - val gateway = Await.result(hcp.gatewayFuture, 500.millis) - val PoolGateway.Running(_, shutdownStartedPromise, shutdownCompletedPromise) = gateway.currentState - shutdownStartedPromise.isCompleted shouldEqual false - shutdownCompletedPromise.isCompleted shouldEqual false - Await.result(shutdownStartedPromise.future, 1500.millis) // verify shutdown start (after idle) - Await.result(shutdownCompletedPromise.future, 1500.millis) // verify shutdown completed + val gateway = hcp.gateway + Await.result(gateway.poolStatus(), 1500.millis).get shouldBe a[PoolInterfaceRunning] + awaitCond({ Await.result(gateway.poolStatus(), 1500.millis).isEmpty }, 2000.millis) } "transparently restart after idle shutdown" in new TestSetup() { val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second) - val gateway = Await.result(hcp.gatewayFuture, 500.millis) - val PoolGateway.Running(_, _, shutdownCompletedPromise) = gateway.currentState - Await.result(shutdownCompletedPromise.future, 1500.millis) // verify shutdown completed + val gateway = hcp.gateway + Await.result(gateway.poolStatus(), 1500.millis).get shouldBe a[PoolInterfaceRunning] + awaitCond({ Await.result(gateway.poolStatus(), 1500.millis).isEmpty }, 2000.millis) requestIn.sendNext(HttpRequest(uri = "/") -> 42) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala index 1c020f24c6..166cca74cd 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala @@ -36,13 +36,13 @@ class ClientSpec extends WordSpec with Matchers { val resp = Await.result(respFuture, 3.seconds) resp.status shouldBe StatusCodes.OK - Http().hostPoolCache.size shouldBe 1 + Await.result(Http().poolSize, 1.second) shouldEqual 1 val respFuture2 = Http().singleRequest(HttpRequest(POST, s"http://$hostname:$port/")) val resp2 = Await.result(respFuture, 3.seconds) resp2.status shouldBe StatusCodes.OK - Http().hostPoolCache.size shouldBe 1 + Await.result(Http().poolSize, 1.second) shouldEqual 1 Await.ready(binding.unbind(), 1.second) } diff --git a/project/MiMa.scala b/project/MiMa.scala index dcc2959cd7..1ddd05a8a3 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -714,7 +714,27 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), // #19390 Add flow monitor - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor"), + + // #20080, #20081 remove race condition on HTTP client + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.copy"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.HttpExt.hostPoolCache"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.HttpExt.cachedGateway"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.apply"), + ProblemFilters.exclude[FinalClassProblem]("akka.http.impl.engine.client.PoolGateway"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.client.PoolGateway.currentState"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.client.PoolGateway.apply"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.impl.engine.client.PoolGateway.this"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$NewIncarnation$"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$Running$"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$IsShutdown$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.client.PoolInterfaceActor.this"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$Running"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$IsShutdown"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$NewIncarnation"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$State") ) ) }