diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index 91490e9aba..a3573c9c1d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -4,91 +4,89 @@ package akka.http.impl.engine.client -import java.util.concurrent.atomic.AtomicReference -import akka.Done +import java.util.concurrent.atomic.AtomicLong -import scala.annotation.tailrec -import scala.concurrent.{ Future, Promise } +import akka.Done +import akka.actor.ActorRef +import akka.http.impl.engine.client.PoolGateway.{ GatewayIdentifier, SharedGateway } +import akka.http.impl.engine.client.PoolMasterActor._ import akka.http.impl.settings.HostConnectionPoolSetup -import akka.actor.{ Deploy, Props, ActorSystem, ActorRef } -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } import akka.stream.Materializer -private object PoolGateway { - - sealed trait State - final case class Running(interfaceActorRef: ActorRef, - shutdownStartedPromise: Promise[Done], - shutdownCompletedPromise: Promise[Done]) extends State - final case class IsShutdown(shutdownCompleted: Future[Done]) extends State - final case class NewIncarnation(gatewayFuture: Future[PoolGateway]) extends State -} +import scala.concurrent.{ Future, Promise } /** - * Manages access to a host connection pool or rather: a sequence of pool incarnations. + * Manages access to a host connection pool through the [[PoolMasterActor]] * - * A host connection pool for a given [[HostConnectionPoolSetup]] is a running stream, whose outside interface is - * provided by its [[PoolInterfaceActor]] actor. The actor accepts [[PoolInterfaceActor.PoolRequest]] messages - * and completes their `responsePromise` whenever the respective response has been received (or an error occurred). - * - * A [[PoolGateway]] provides a layer of indirection between the pool cache and the actual - * pools that is required to allow a pool incarnation to fully terminate (e.g. after an idle-timeout) - * and be transparently replaced by a new incarnation if required. - * Removal of cache entries for terminated pools is also supported, because old gateway references that - * get reused will automatically forward requests directed at them to the latest pool incarnation from the cache. + * A [[PoolGateway]] is represented by its [[HostConnectionPoolSetup]] and its [[GatewayIdentifier]]. If the later + * is [[SharedGateway]], it means that a shared pool must be used for this particular [[HostConnectionPoolSetup]]. */ -private[http] class PoolGateway(hcps: HostConnectionPoolSetup, - _shutdownStartedPromise: Promise[Done])( // constructor arg only - implicit system: ActorSystem, fm: Materializer) { - import PoolGateway._ - import fm.executionContext +private[http] final class PoolGateway(gatewayRef: ActorRef, val hcps: HostConnectionPoolSetup, val gatewayId: GatewayIdentifier)(implicit fm: Materializer) { - private val state = { - val shutdownCompletedPromise = Promise[Done]() - val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local) - val ref = system.actorOf(props, PoolInterfaceActor.name.next()) - new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise)) + /** + * Send a request through the corresponding pool. If the pool is not running, it will be started + * automatically. If it is shutting down, it will restart as soon as the shutdown operation is + * complete and serve this request. + * + * @param request the request + * @return the response + */ + def apply(request: HttpRequest): Future[HttpResponse] = { + val responsePromise = Promise[HttpResponse]() + gatewayRef ! SendRequest(this, request, responsePromise, fm) + responsePromise.future } - def currentState: Any = state.get() // enables test access + /** + * Start the corresponding pool to make it ready to serve requests. If the pool is already started, + * this does nothing. If it is being shutdown, it will restart as soon as the shutdown operation + * is complete. + * + * @return the gateway itself + */ + def startPool(): PoolGateway = { + gatewayRef ! StartPool(this, fm) + this + } - def apply(request: HttpRequest, previousIncarnation: PoolGateway = null): Future[HttpResponse] = - state.get match { - case Running(ref, _, _) ⇒ - val responsePromise = Promise[HttpResponse]() - ref ! PoolInterfaceActor.PoolRequest(request, responsePromise) - responsePromise.future + /** + * Shutdown the corresponding pool and signal its termination. If the pool is not running or is + * being shutting down, this does nothing, + * + * @return a Future completed when the pool has been shutdown. + */ + def shutdown(): Future[Done] = { + val shutdownCompletedPromise = Promise[Done]() + gatewayRef ! Shutdown(this, shutdownCompletedPromise) + shutdownCompletedPromise.future + } - case IsShutdown(shutdownCompleted) ⇒ - // delay starting the next pool incarnation until the current pool has completed its shutdown - shutdownCompleted.flatMap { _ ⇒ - val newGatewayFuture = Http().cachedGateway(hcps) - // a simple set is fine here as `newGatewayFuture` will be identical for all threads getting here - state.set(NewIncarnation(newGatewayFuture)) - apply(request) - } + override def toString = s"PoolGateway(hcps = $hcps)" - case x @ NewIncarnation(newGatewayFuture) ⇒ - if (previousIncarnation != null) - previousIncarnation.state.set(x) // collapse incarnation chain - newGatewayFuture.flatMap(_(request, this)) + // INTERNAL API (testing only) + private[client] def poolStatus(): Future[Option[PoolInterfaceStatus]] = { + val statusPromise = Promise[Option[PoolInterfaceStatus]]() + gatewayRef ! PoolStatus(this, statusPromise) + statusPromise.future + } + + override def equals(that: Any): Boolean = + that match { + case p: PoolGateway ⇒ p.hcps == hcps && p.gatewayId == gatewayId + case _ ⇒ false } - // triggers a shutdown of the current pool, even if it is already a later incarnation - @tailrec final def shutdown(): Future[Done] = - state.get match { - case x @ Running(ref, shutdownStartedPromise, shutdownCompletedPromise) ⇒ - if (state.compareAndSet(x, IsShutdown(shutdownCompletedPromise.future))) { - shutdownStartedPromise.success(Done) // trigger cache removal - ref ! PoolInterfaceActor.Shutdown - shutdownCompletedPromise.future - } else shutdown() // CAS loop (not a spinlock) + override def hashCode(): Int = hcps.hashCode() ^ gatewayId.hashCode() +} - case IsShutdown(x) ⇒ x +private[http] object PoolGateway { - case NewIncarnation(newGatewayFuture) ⇒ newGatewayFuture.flatMap(_.shutdownAux()) - } + sealed trait GatewayIdentifier + case object SharedGateway extends GatewayIdentifier + final case class UniqueGateway(id: Long) extends GatewayIdentifier + + private[this] val uniqueGatewayId = new AtomicLong(0) + def newUniqueGatewayIdentifier = UniqueGateway(uniqueGatewayId.incrementAndGet()) - private def shutdownAux() = shutdown() // alias required for @tailrec } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index c2851a55b3..104a10ea0f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -6,23 +6,20 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress -import akka.Done -import akka.stream.BufferOverflowException +import akka.actor._ +import akka.http.impl.engine.client.PoolFlow._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.{ Http, HttpsConnectionContext } +import akka.stream.actor.ActorPublisherMessage._ +import akka.stream.actor.ActorSubscriberMessage._ +import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } +import akka.stream.impl.{ Buffer, SeqActorName } +import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } +import akka.stream.{ BufferOverflowException, Materializer } import scala.annotation.tailrec import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration -import akka.actor._ -import akka.stream.Materializer -import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } -import akka.stream.actor.ActorPublisherMessage._ -import akka.stream.actor.ActorSubscriberMessage._ -import akka.stream.impl.{ SeqActorName, Buffer } -import akka.stream.scaladsl.{ Keep, Flow, Sink, Source } -import akka.http.impl.settings.HostConnectionPoolSetup -import akka.http.scaladsl.model._ -import akka.http.scaladsl.{ HttpsConnectionContext, Http } -import PoolFlow._ private object PoolInterfaceActor { final case class PoolRequest(request: HttpRequest, responsePromise: Promise[HttpResponse]) extends NoSerializationVerificationNeeded @@ -46,12 +43,11 @@ private object PoolInterfaceActor { * To the inside (i.e. the running connection pool flow) the gateway actor acts as request source * (ActorPublisher) and response sink (ActorSubscriber). */ -private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, - shutdownCompletedPromise: Promise[Done], - gateway: PoolGateway)(implicit fm: Materializer) +private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer) extends ActorSubscriber with ActorPublisher[RequestContext] with ActorLogging { import PoolInterfaceActor._ + private[this] val hcps = gateway.hcps private[this] val inputBuffer = Buffer[PoolRequest](hcps.setup.settings.maxOpenRequests, fm) private[this] var activeIdleTimeout: Option[Cancellable] = None @@ -100,12 +96,10 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, case OnComplete ⇒ // the pool shut down log.debug("Host connection pool to {}:{} has completed orderly shutdown", hcps.host, hcps.port) - shutdownCompletedPromise.success(Done) self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway case OnError(e) ⇒ // the pool shut down log.debug("Host connection pool to {}:{} has shut down with error {}", hcps.host, hcps.port, e) - shutdownCompletedPromise.failure(e) self ! PoisonPill // give potentially queued requests another chance to be forwarded back to the gateway /////////////// FROM CLIENT ////////////// @@ -127,10 +121,6 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, case PoolRequest(request, responsePromise) ⇒ // we have already started shutting down, i.e. this pool is not usable anymore // so we forward the request back to the gateway - // Note that this forwarding will stop when we receive completion from the pool flow - // (because we stop ourselves then), so there is a very small chance of a request ending - // up as a dead letter if the sending thread gets interrupted for a long time right before - // the `ref ! PoolRequest(...)` in the PoolGateway responsePromise.completeWith(gateway(request)) case Shutdown ⇒ // signal coming in from gateway diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala new file mode 100644 index 0000000000..e9de06d473 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.http.impl.engine.client + +import akka.Done +import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, Deploy, NoSerializationVerificationNeeded, Props, Terminated } +import akka.http.impl.engine.client.PoolInterfaceActor.PoolRequest +import akka.http.impl.settings.HostConnectionPoolSetup +import akka.http.scaladsl.HttpExt +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.stream.Materializer + +import scala.concurrent.{ Future, Promise } + +/** + * INTERNAL API + * + * Manages access to a host connection pool or rather: a sequence of pool incarnations. + * + * A host connection pool for a given [[HostConnectionPoolSetup]] is a running stream, whose outside interface is + * provided by its [[PoolInterfaceActor]] actor. The actor accepts [[PoolInterfaceActor.PoolRequest]] messages + * and completes their `responsePromise` whenever the respective response has been received (or an error occurred). + * + * The [[PoolMasterActor]] provides a layer of indirection between a [[PoolGateway]], which represents a pool, + * and the [[PoolInterfaceActor]] instances which are created on-demand and stopped after an idle-timeout. + * + * Several [[PoolGateway]] objects may be mapped to the same pool if they have the same [[HostConnectionPoolSetup]] + * and are marked as being shared. This is the case for example for gateways obtained through + * [[HttpExt.cachedHostConnectionPool]]. Some other gateways are not shared, such as those obtained through + * [[HttpExt.newHostConnectionPool]], and will have their dedicated restartable pool. + * + */ +private[http] final class PoolMasterActor extends Actor with ActorLogging { + + import PoolMasterActor._ + + private[this] var poolStatus = Map[PoolGateway, PoolInterfaceStatus]() + private[this] var poolInterfaces = Map[ActorRef, PoolGateway]() + + /** + * Start a new pool interface actor, register it in our maps, and watch its death. No actor should + * currently exist for this pool. + * + * @param gateway the pool gateway this pool corresponds to + * @param fm the materializer to use for this pool + * @return the newly created actor ref + */ + private[this] def startPoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer): ActorRef = { + if (poolStatus.contains(gateway)) { + throw new IllegalStateException(s"pool interface actor for $gateway already exists") + } + val props = Props(new PoolInterfaceActor(gateway)).withDeploy(Deploy.local) + val ref = context.actorOf(props, PoolInterfaceActor.name.next()) + poolStatus += gateway -> PoolInterfaceRunning(ref) + poolInterfaces += ref -> gateway + context.watch(ref) + } + + def receive = { + + // Start or restart a pool without sending it a request. This is used to ensure that + // freshly created pools will be ready to serve requests immediately. + case s @ StartPool(gateway, materializer) ⇒ + poolStatus.get(gateway) match { + case Some(PoolInterfaceRunning(_)) ⇒ + case Some(PoolInterfaceShuttingDown(shutdownCompletedPromise)) ⇒ + // Pool is being shutdown. When this is done, start the pool again. + shutdownCompletedPromise.future.onComplete(_ ⇒ self ! s)(context.dispatcher) + case None ⇒ + startPoolInterfaceActor(gateway)(materializer) + } + + // Send a request to a pool. If needed, the pool will be started or restarted. + case s @ SendRequest(gateway, request, responsePromise, materializer) ⇒ + poolStatus.get(gateway) match { + case Some(PoolInterfaceRunning(ref)) ⇒ + ref ! PoolRequest(request, responsePromise) + case Some(PoolInterfaceShuttingDown(shutdownCompletedPromise)) ⇒ + // The request will be resent when the pool shutdown is complete (the first + // request will recreate the pool). + shutdownCompletedPromise.future.foreach(_ ⇒ self ! s)(context.dispatcher) + case None ⇒ + startPoolInterfaceActor(gateway)(materializer) ! PoolRequest(request, responsePromise) + } + + // Shutdown a pool and signal its termination. + case Shutdown(gateway, shutdownCompletedPromise) ⇒ + poolStatus.get(gateway).foreach { + case PoolInterfaceRunning(ref) ⇒ + // Ask the pool to shutdown itself. Queued connections will be resent here + // to this actor by the pool actor, they will be retried once the shutdown + // has completed. + ref ! PoolInterfaceActor.Shutdown + poolStatus += gateway -> PoolInterfaceShuttingDown(shutdownCompletedPromise) + case PoolInterfaceShuttingDown(formerPromise) ⇒ + // Pool is already shutting down, mirror the existing promise. + shutdownCompletedPromise.tryCompleteWith(formerPromise.future) + case _ ⇒ + // Pool does not exist, shutdown is not needed. + shutdownCompletedPromise.trySuccess(Done) + } + + // Shutdown all known pools and signal their termination. + case ShutdownAll(shutdownCompletedPromise) ⇒ + import context.dispatcher + def track(remaining: Iterator[Future[Done]]): Unit = + if (remaining.hasNext) remaining.next().onComplete(_ ⇒ track(remaining)) + else shutdownCompletedPromise.trySuccess(Done) + track(poolStatus.keys.map(_.shutdown()).toIterator) + + // When a pool actor terminate, signal its termination and remove it from our maps. + case Terminated(ref) ⇒ + poolInterfaces.get(ref).foreach { gateway ⇒ + poolStatus.get(gateway) match { + case Some(PoolInterfaceRunning(_)) ⇒ + log.error("connection pool for {} has shut down unexpectedly", gateway) + case Some(PoolInterfaceShuttingDown(shutdownCompletedPromise)) ⇒ + shutdownCompletedPromise.trySuccess(Done) + case None ⇒ + // This will never happen as poolInterfaces and poolStatus are modified + // together. If there is no status then there is no gateway to start with. + } + poolStatus -= gateway + poolInterfaces -= ref + } + + // Testing only. + case PoolStatus(gateway, statusPromise) ⇒ + statusPromise.success(poolStatus.get(gateway)) + + // Testing only. + case PoolSize(sizePromise) ⇒ + sizePromise.success(poolStatus.size) + + } + +} + +private[http] object PoolMasterActor { + + val props = Props[PoolMasterActor].withDeploy(Deploy.local) + + sealed trait PoolInterfaceStatus + final case class PoolInterfaceRunning(ref: ActorRef) extends PoolInterfaceStatus + final case class PoolInterfaceShuttingDown(shutdownCompletedPromise: Promise[Done]) extends PoolInterfaceStatus + + final case class StartPool(gateway: PoolGateway, materializer: Materializer) extends NoSerializationVerificationNeeded + final case class SendRequest(gateway: PoolGateway, request: HttpRequest, responsePromise: Promise[HttpResponse], materializer: Materializer) + extends NoSerializationVerificationNeeded + final case class Shutdown(gateway: PoolGateway, shutdownCompletedPromise: Promise[Done]) extends NoSerializationVerificationNeeded with DeadLetterSuppression + final case class ShutdownAll(shutdownCompletedPromise: Promise[Done]) extends NoSerializationVerificationNeeded with DeadLetterSuppression + + // INTERNAL API (for testing only) + final case class PoolStatus(gateway: PoolGateway, statusPromise: Promise[Option[PoolInterfaceStatus]]) extends NoSerializationVerificationNeeded + final case class PoolSize(sizePromise: Promise[Int]) extends NoSerializationVerificationNeeded + +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 102cd07612..a13e6b17cc 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -5,12 +5,13 @@ package akka.http.scaladsl import java.net.InetSocketAddress -import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } +import java.util.concurrent.CompletionStage import javax.net.ssl._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } import akka.http.impl.engine.HttpConnectionTimeoutException +import akka.http.impl.engine.client.PoolMasterActor.{ PoolSize, ShutdownAll } import akka.http.impl.engine.client._ import akka.http.impl.engine.server._ import akka.http.impl.engine.ws.WebSocketClientBlueprint @@ -19,7 +20,7 @@ import akka.http.impl.util.{ MapError, StreamUtils } import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Host import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgradeResponse } -import akka.http.scaladsl.settings.{ ServerSettings, ClientConnectionSettings, ConnectionPoolSettings } +import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import akka.http.scaladsl.util.FastFuture import akka.{ Done, NotUsed } import akka.stream._ @@ -190,6 +191,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte // ** CLIENT ** // + private[this] val poolMasterActorRef = system.actorOf(PoolMasterActor.props, "pool-master") + private[this] val systemMaterializer = ActorMaterializer() + /** * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. @@ -321,8 +325,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ private[akka] def newHostConnectionPool[T](setup: HostConnectionPoolSetup)( implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val gatewayFuture = FastFuture.successful(new PoolGateway(setup, Promise())) - gatewayClientFlow(setup, gatewayFuture) + val gateway = new PoolGateway(poolMasterActorRef, setup, PoolGateway.newUniqueGatewayIdentifier) + gatewayClientFlow(setup, gateway.startPool()) } /** @@ -389,8 +393,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * object of type `T` from the application which is emitted together with the corresponding response. */ private def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)( - implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - gatewayClientFlow(setup, cachedGateway(setup)) + implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + gatewayClientFlow(setup, sharedGateway(setup).startPool()) + } /** * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool @@ -412,7 +417,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte def superPool[T](connectionContext: HttpsConnectionContext = defaultClientHttpsContext, settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = - clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, settings, connectionContext, log) } + clientFlow[T](settings) { request ⇒ request -> sharedGateway(request, settings, connectionContext, log) } /** * Fires a single [[akka.http.scaladsl.model.HttpRequest]] across the (cached) host connection pool for the request's @@ -428,8 +433,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[HttpResponse] = try { - val gatewayFuture = cachedGateway(request, settings, connectionContext, log) - gatewayFuture.flatMap(_(request))(fm.executionContext) + val gateway = sharedGateway(request, settings, connectionContext, log) + gateway(request) } catch { case e: IllegalUriException ⇒ FastFuture.failed(e) } @@ -495,12 +500,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * method call the respective connection pools will be restarted and not contribute to the returned future. */ def shutdownAllConnectionPools(): Future[Unit] = { - import system.dispatcher - - import scala.collection.JavaConverters._ - val gateways = hostPoolCache.values().asScala - system.log.debug("Initiating orderly shutdown of all active host connections pools...") - Future.sequence(gateways.map(_.flatMap(_.shutdown()))).map(_ ⇒ ()) + val shutdownCompletedPromise = Promise[Done]() + poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise) + shutdownCompletedPromise.future.map(_ ⇒ ())(system.dispatcher) } /** @@ -545,64 +547,37 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte _defaultClientHttpsConnectionContext = context } - // every ActorSystem maintains its own connection pools - private[http] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] - - private def cachedGateway(request: HttpRequest, - settings: ConnectionPoolSettings, connectionContext: ConnectionContext, - log: LoggingAdapter)(implicit fm: Materializer): Future[PoolGateway] = + private def sharedGateway(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: ConnectionContext, log: LoggingAdapter): PoolGateway = { if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) { val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) connectionContext else ConnectionContext.noEncryption() val setup = ConnectionPoolSetup(settings, httpsCtx, log) val host = request.uri.authority.host.toString() val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup) - cachedGateway(hcps) + sharedGateway(hcps) } else { val msg = s"Cannot determine request scheme and target endpoint as ${request.method} request to ${request.uri} doesn't have an absolute URI" throw new IllegalUriException(ErrorInfo(msg)) } - - /** INTERNAL API */ - private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: Materializer): Future[PoolGateway] = { - val gatewayPromise = Promise[PoolGateway]() - hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match { - case null ⇒ // only one thread can get here at a time - val whenShuttingDown = Promise[Done]() - val gateway = - try new PoolGateway(setup, whenShuttingDown) - catch { - case NonFatal(e) ⇒ - hostPoolCache.remove(setup) - gatewayPromise.failure(e) - throw e - } - val fastFuture = FastFuture.successful(gateway) - hostPoolCache.put(setup, fastFuture) // optimize subsequent gateway accesses - gatewayPromise.success(gateway) // satisfy everyone who got a hold of our promise while we were starting up - whenShuttingDown.future.onComplete(_ ⇒ hostPoolCache.remove(setup, fastFuture))(fm.executionContext) - fastFuture - - case future ⇒ future // return cached instance - } } - private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup, - gatewayFuture: Future[PoolGateway])( - implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = - clientFlow[T](hcps.setup.settings)(_ -> gatewayFuture) - .mapMaterializedValue(_ ⇒ HostConnectionPool(hcps)(gatewayFuture)) + private def sharedGateway(hcps: HostConnectionPoolSetup): PoolGateway = + new PoolGateway(poolMasterActorRef, hcps, PoolGateway.SharedGateway)(systemMaterializer) - private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, Future[PoolGateway]))( + private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup, + gateway: PoolGateway)( + implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + clientFlow[T](hcps.setup.settings)(_ -> gateway) + .mapMaterializedValue(_ ⇒ HostConnectionPool(hcps)(gateway)) + + private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, PoolGateway))( implicit system: ActorSystem, fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = { // a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point val parallelism = settings.pipeliningLimit * settings.maxConnections Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) { case (request, userContext) ⇒ - val (effectiveRequest, gatewayFuture) = f(request) + val (effectiveRequest, gateway) = f(request) val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12 - gatewayFuture - .flatMap(_(effectiveRequest))(fm.executionContext) - .onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext) + gateway(effectiveRequest).onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext) result.future } } @@ -613,6 +588,17 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte case hctx: HttpsConnectionContext ⇒ TLS(hctx.sslContext, hctx.firstSession, role, hostInfo = hostInfo) case other ⇒ TLSPlacebo() // if it's not HTTPS, we don't enable SSL/TLS } + + /** + * INTERNAL API + * + * For testing only + */ + private[scaladsl] def poolSize: Future[Int] = { + val sizePromise = Promise[Int]() + poolMasterActorRef ! PoolSize(sizePromise) + sizePromise.future + } } object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { @@ -717,14 +703,14 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * Represents a connection pool to a specific target host and pool configuration. */ final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)( - private[http] val gatewayFuture: Future[PoolGateway]) { // enable test access + private[http] val gateway: PoolGateway) { // enable test access /** * Asynchronously triggers the shutdown of the host connection pool. * * The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed. */ - def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gatewayFuture.flatMap(_.shutdown()) + def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown() private[http] def toJava = new akka.http.javadsl.HostConnectionPool { override def setup = HostConnectionPool.this.setup diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 315e87757f..ff8addc3df 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -6,24 +6,27 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress import java.nio.ByteBuffer -import java.nio.channels.{ SocketChannel, ServerSocketChannel } +import java.nio.channels.{ ServerSocketChannel, SocketChannel } import java.util.concurrent.atomic.AtomicInteger + +import akka.http.impl.engine.client.PoolMasterActor.PoolInterfaceRunning import akka.http.impl.settings.ConnectionPoolSettingsImpl +import akka.http.impl.util.{ SingletonException, StreamUtils } +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } +import akka.http.scaladsl.{ Http, TestUtils } +import akka.stream.ActorMaterializer +import akka.stream.TLSProtocol._ +import akka.stream.scaladsl._ +import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.testkit.AkkaSpec +import akka.util.ByteString + import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.util.ByteString -import akka.http.scaladsl.{ TestUtils, Http } -import akka.http.impl.util.{ SingletonException, StreamUtils } -import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } -import akka.stream.{ ActorMaterializer } -import akka.stream.TLSProtocol._ -import akka.stream.testkit.{ TestPublisher, TestSubscriber } -import akka.stream.scaladsl._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.model._ -import akka.testkit.AkkaSpec class ConnectionPoolSpec extends AkkaSpec(""" akka.loggers = [] @@ -207,20 +210,17 @@ class ConnectionPoolSpec extends AkkaSpec(""" "automatically shutdown after configured timeout periods" in new TestSetup() { val (_, _, _, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second) - val gateway = Await.result(hcp.gatewayFuture, 500.millis) - val PoolGateway.Running(_, shutdownStartedPromise, shutdownCompletedPromise) = gateway.currentState - shutdownStartedPromise.isCompleted shouldEqual false - shutdownCompletedPromise.isCompleted shouldEqual false - Await.result(shutdownStartedPromise.future, 1500.millis) // verify shutdown start (after idle) - Await.result(shutdownCompletedPromise.future, 1500.millis) // verify shutdown completed + val gateway = hcp.gateway + Await.result(gateway.poolStatus(), 1500.millis).get shouldBe a[PoolInterfaceRunning] + awaitCond({ Await.result(gateway.poolStatus(), 1500.millis).isEmpty }, 2000.millis) } "transparently restart after idle shutdown" in new TestSetup() { val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second) - val gateway = Await.result(hcp.gatewayFuture, 500.millis) - val PoolGateway.Running(_, _, shutdownCompletedPromise) = gateway.currentState - Await.result(shutdownCompletedPromise.future, 1500.millis) // verify shutdown completed + val gateway = hcp.gateway + Await.result(gateway.poolStatus(), 1500.millis).get shouldBe a[PoolInterfaceRunning] + awaitCond({ Await.result(gateway.poolStatus(), 1500.millis).isEmpty }, 2000.millis) requestIn.sendNext(HttpRequest(uri = "/") -> 42) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala index 1c020f24c6..166cca74cd 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientSpec.scala @@ -36,13 +36,13 @@ class ClientSpec extends WordSpec with Matchers { val resp = Await.result(respFuture, 3.seconds) resp.status shouldBe StatusCodes.OK - Http().hostPoolCache.size shouldBe 1 + Await.result(Http().poolSize, 1.second) shouldEqual 1 val respFuture2 = Http().singleRequest(HttpRequest(POST, s"http://$hostname:$port/")) val resp2 = Await.result(respFuture, 3.seconds) resp2.status shouldBe StatusCodes.OK - Http().hostPoolCache.size shouldBe 1 + Await.result(Http().poolSize, 1.second) shouldEqual 1 Await.ready(binding.unbind(), 1.second) } diff --git a/project/MiMa.scala b/project/MiMa.scala index dcc2959cd7..1ddd05a8a3 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -714,7 +714,27 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), // #19390 Add flow monitor - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor"), + + // #20080, #20081 remove race condition on HTTP client + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.copy"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.HttpExt.hostPoolCache"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.HttpExt.cachedGateway"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.apply"), + ProblemFilters.exclude[FinalClassProblem]("akka.http.impl.engine.client.PoolGateway"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.client.PoolGateway.currentState"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.client.PoolGateway.apply"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.impl.engine.client.PoolGateway.this"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$NewIncarnation$"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$Running$"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$IsShutdown$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.client.PoolInterfaceActor.this"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$Running"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$IsShutdown"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$NewIncarnation"), + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$State") ) ) }