From 6024b9ff65fe69c31d198497710fe030f8af5f19 Mon Sep 17 00:00:00 2001 From: Mathias Date: Mon, 1 Jun 2015 13:14:27 +0200 Subject: [PATCH] =htc #17403 fix connection pool reusing existing connections too early --- .../impl/engine/client/PoolConductor.scala | 36 ++++++++++------ .../http/impl/engine/client/PoolFlow.scala | 6 +-- .../http/impl/engine/client/PoolSlot.scala | 42 +++++++++++++------ .../akka/http/impl/util/StreamUtils.scala | 19 ++++++++- .../akka/http/scaladsl/model/HttpEntity.scala | 4 +- .../engine/client/ConnectionPoolSpec.scala | 40 +++++++++++++++--- .../main/scala/akka/stream/javadsl/Sink.scala | 2 +- 7 files changed, 109 insertions(+), 40 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala index 67d978253b..9347308fee 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala @@ -10,16 +10,17 @@ import scala.collection.immutable import akka.event.LoggingAdapter import akka.stream.scaladsl._ import akka.stream._ +import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.model.HttpMethod import akka.http.impl.util._ private object PoolConductor { import PoolFlow.RequestContext - import PoolSlot.{ SlotEvent, SimpleSlotEvent } + import PoolSlot.{ RawSlotEvent, SlotEvent } case class Ports( requestIn: Inlet[RequestContext], - slotEventIn: Inlet[SlotEvent], + slotEventIn: Inlet[RawSlotEvent], slotOuts: immutable.Seq[Outlet[RequestContext]]) extends Shape { override val inlets = requestIn :: slotEventIn :: Nil @@ -34,7 +35,7 @@ private object PoolConductor { override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = Ports( inlets.head.asInstanceOf[Inlet[RequestContext]], - inlets.last.asInstanceOf[Inlet[SlotEvent]], + inlets.last.asInstanceOf[Inlet[RawSlotEvent]], outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]]) } @@ -47,11 +48,15 @@ private object PoolConductor { +--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->|  (Flexi  +--------------> | |  | |    | |    |  Route) +--------------> +----+------+ +-----+-----+             +-------------+    +-----------+     to slots      -   ^     ^                                            -   |     | SimpleSlotEvent                           +   ^     ^  + | | SlotEvent + | +----+----+ + | | flatten | mapAsync + | +----+----+ +   |     | RawSlotEvent   | Request-    |                                           | Context +---------+ - +-------------+  retry |<-------- Slot Event (from slotEventMerge) + +-------------+  retry |<-------- RawSlotEvent (from slotEventMerge) |  Split  | +---------+ @@ -67,10 +72,15 @@ private object PoolConductor { val doubler = Flow[SwitchCommand].mapConcat(x ⇒ x :: x :: Nil) // work-around for https://github.com/akka/akka/issues/17004 val route = b.add(new Route(slotCount)) val retrySplit = b.add(new RetrySplit()) + val flatten = Flow[RawSlotEvent].mapAsyncUnordered(slotCount) { + case x: SlotEvent.Disconnected ⇒ FastFuture.successful(x) + case SlotEvent.RequestCompletedFuture(future) ⇒ future + case x ⇒ throw new IllegalStateException("Unexpected " + x) + } retryMerge.out ~> slotSelector.in0 slotSelector.out ~> doubler ~> route.in - retrySplit.out0 ~> slotSelector.in1 + retrySplit.out0 ~> flatten ~> slotSelector.in1 retrySplit.out1 ~> retryMerge.in1 Ports(retryMerge.in0, retrySplit.in, route.outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]]) @@ -97,11 +107,11 @@ private object PoolConductor { private object Busy extends Busy(1) private class SlotSelector(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter) - extends FlexiMerge[SwitchCommand, FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]]( + extends FlexiMerge[SwitchCommand, FanInShape2[RequestContext, SlotEvent, SwitchCommand]]( new FanInShape2("PoolConductor.SlotSelector"), OperationAttributes.name("PoolConductor.SlotSelector")) { import FlexiMerge._ - def createMergeLogic(s: FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]): MergeLogic[SwitchCommand] = + def createMergeLogic(s: FanInShape2[RequestContext, SlotEvent, SwitchCommand]): MergeLogic[SwitchCommand] = new MergeLogic[SwitchCommand] { val slotStates = Array.fill[SlotState](slotCount)(Unconnected) def initialState = nextState(0) @@ -197,16 +207,16 @@ private object PoolConductor { } // FIXME: remove when #17038 is cleared - private class RetrySplit extends FlexiRoute[SlotEvent, FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]]( + private class RetrySplit extends FlexiRoute[RawSlotEvent, FanOutShape2[RawSlotEvent, RawSlotEvent, RequestContext]]( new FanOutShape2("PoolConductor.RetrySplit"), OperationAttributes.name("PoolConductor.RetrySplit")) { import FlexiRoute._ - def createRouteLogic(s: FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]): RouteLogic[SlotEvent] = - new RouteLogic[SlotEvent] { + def createRouteLogic(s: FanOutShape2[RawSlotEvent, RawSlotEvent, RequestContext]): RouteLogic[RawSlotEvent] = + new RouteLogic[RawSlotEvent] { def initialState: State[_] = State(DemandFromAll(s)) { (ctx, _, ev) ⇒ ev match { - case x: SimpleSlotEvent ⇒ ctx.emit(s.out0)(x) case SlotEvent.RetryRequest(rc) ⇒ ctx.emit(s.out1)(rc) + case x ⇒ ctx.emit(s.out0)(x) } SameState } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala index a6f162718d..257688db12 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala @@ -40,11 +40,11 @@ private object PoolFlow {                    ^         |          |     |              |                                            |         |      +-------------------+    |                                  |         |      |                   |    |                         -            SlotEvent |         +----> | Connection Slot 3 +---->                         +           RawSlotEvent |         +----> | Connection Slot 3 +---->                                            |                |                   |                                                 |                +---------------+---+                                            |    |     |     |                                     - +-----------+ SlotEvent |     |     |  + +-----------+ RawSlotEvent |     |     |               | slotEvent | <-------------+     |     |              |  Merge  | <-------------------+     |                                                |           | <-------------------------+                                   @@ -79,7 +79,7 @@ private object PoolFlow { .tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings)) .map(b.add(_)) val responseMerge = b.add(Merge[ResponseContext](maxConnections)) - val slotEventMerge = b.add(Merge[PoolSlot.SlotEvent](maxConnections)) + val slotEventMerge = b.add(Merge[PoolSlot.RawSlotEvent](maxConnections)) slotEventMerge.out ~> conductor.slotEventIn for ((slot, ix) ← slots.zipWithIndex) { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 002284510c..45b5129d24 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -6,10 +6,12 @@ package akka.http.impl.engine.client import language.existentials import java.net.InetSocketAddress +import scala.concurrent.Future import scala.util.{ Failure, Success } import scala.collection.immutable import akka.actor._ -import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } +import akka.http.scaladsl.model.{ HttpEntity, HttpResponse, HttpRequest } +import akka.http.scaladsl.util.FastFuture import akka.http.ConnectionPoolSettings import akka.http.impl.util._ import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor } @@ -22,15 +24,16 @@ private object PoolSlot { sealed trait ProcessorOut final case class ResponseDelivery(response: ResponseContext) extends ProcessorOut - sealed trait SlotEvent extends ProcessorOut - sealed trait SimpleSlotEvent extends SlotEvent + sealed trait RawSlotEvent extends ProcessorOut + sealed trait SlotEvent extends RawSlotEvent object SlotEvent { - final case class RequestCompleted(slotIx: Int) extends SimpleSlotEvent - final case class Disconnected(slotIx: Int, failedRequests: Int) extends SimpleSlotEvent - final case class RetryRequest(rc: RequestContext) extends SlotEvent + final case class RequestCompletedFuture(future: Future[RequestCompleted]) extends RawSlotEvent + final case class RetryRequest(rc: RequestContext) extends RawSlotEvent + final case class RequestCompleted(slotIx: Int) extends SlotEvent + final case class Disconnected(slotIx: Int, failedRequests: Int) extends SlotEvent } - type Ports = FanOutShape2[RequestContext, ResponseContext, SlotEvent] + type Ports = FanOutShape2[RequestContext, ResponseContext, RawSlotEvent] private val slotProcessorActorName = new SeqActorName("SlotProcessor") @@ -43,7 +46,7 @@ private object PoolSlot { +--------->| Processor +------------->| (MapConcat) +------------->| (MapConcat) +---->| Split      +------------->           |           |  Processor- | | Out  | |    |            |  Context                             +-----------+  Out] +-------------+ +-------------+    +-----+------+                    -                          | SlotEvent                                    +                          | RawSlotEvent                                                     | (to Conductor | via slotEventMerge)                              v  @@ -150,8 +153,21 @@ private object PoolSlot { case FromConnection(OnNext(response: HttpResponse)) ⇒ val requestContext = inflightRequests.head inflightRequests = inflightRequests.tail - val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response))) - val requestCompleted = SlotEvent.RequestCompleted(slotIx) + val (entity, whenCompleted) = response.entity match { + case x: HttpEntity.Strict ⇒ x -> FastFuture.successful(()) + case x: HttpEntity.Default ⇒ + val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) + x.copy(data = newData) -> whenCompleted + case x: HttpEntity.CloseDelimited ⇒ + val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) + x.copy(data = newData) -> whenCompleted + case x: HttpEntity.Chunked ⇒ + val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks) + x.copy(chunks = newChunks) -> whenCompleted + } + val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response withEntity entity))) + import fm.executionContext + val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx))) onNext(delivery :: requestCompleted :: Nil) case FromConnection(OnComplete) ⇒ handleDisconnect(None) @@ -211,11 +227,11 @@ private object PoolSlot { } // FIXME: remove when #17038 is cleared - private class SlotEventSplit extends FlexiRoute[ProcessorOut, FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]]( + private class SlotEventSplit extends FlexiRoute[ProcessorOut, FanOutShape2[ProcessorOut, ResponseContext, RawSlotEvent]]( new FanOutShape2("PoolSlot.SlotEventSplit"), OperationAttributes.name("PoolSlot.SlotEventSplit")) { import FlexiRoute._ - def createRouteLogic(s: FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]): RouteLogic[ProcessorOut] = + def createRouteLogic(s: FanOutShape2[ProcessorOut, ResponseContext, RawSlotEvent]): RouteLogic[ProcessorOut] = new RouteLogic[ProcessorOut] { val initialState: State[_] = State(DemandFromAny(s)) { case (_, _, ResponseDelivery(x)) ⇒ @@ -223,7 +239,7 @@ private object PoolSlot { ctx.emit(s.out0)(x) initialState } - case (_, _, x: SlotEvent) ⇒ + case (_, _, x: RawSlotEvent) ⇒ State(DemandFrom(s.out1)) { (ctx, _, _) ⇒ ctx.emit(s.out1)(x) initialState diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 46ebb9a305..806597c9fe 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -7,10 +7,9 @@ package akka.http.impl.util import java.io.InputStream import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{ SourceModule, SinkModule, ActorFlowMaterializerImpl, PublisherSink } +import akka.stream.impl.{ SourceModule, SinkModule, PublisherSink } import akka.stream.scaladsl.FlexiMerge._ import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher } -import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.{ Promise, ExecutionContext, Future } import akka.util.ByteString @@ -64,6 +63,22 @@ private[http] object StreamUtils { Flow[ByteString].transform(() ⇒ transformer).named("transformError") } + def captureTermination[T, Mat](source: Source[T, Mat]): (Source[T, Mat], Future[Unit]) = { + val promise = Promise[Unit]() + val transformer = new PushStage[T, T] { + def onPush(element: T, ctx: Context[T]) = ctx.push(element) + override def onUpstreamFinish(ctx: Context[T]) = { + promise.success(()) + super.onUpstreamFinish(ctx) + } + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = { + promise.failure(cause) + ctx.fail(cause) + } + } + source.transform(() ⇒ transformer) -> promise.future + } + def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString, Unit] = { val transformer = new StatefulStage[ByteString, ByteString] { diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 7cc5db26d5..8876de025a 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -300,8 +300,8 @@ object HttpEntity { } object Chunked { /** - * Returns a ``Chunked`` entity where one Chunk is produced for every non-empty ByteString of the given - * ``Publisher[ByteString]``. + * Returns a ``Chunked`` entity where one Chunk is produced for every non-empty ByteString produced by the given + * ``Source``. */ def fromData(contentType: ContentType, chunks: Source[ByteString, Any]): Chunked = Chunked(contentType, chunks.collect[ChunkStreamPart] { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index dbead72306..e0a4cbc073 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -16,7 +16,7 @@ import akka.util.ByteString import akka.http.scaladsl.{ TestUtils, Http } import akka.http.impl.util.{ SingletonException, StreamUtils } import akka.http.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } -import akka.stream.io.{ SessionBytes, SendBytes, SslTlsInbound, SslTlsOutbound } +import akka.stream.io.{ SessionBytes, SendBytes, SslTlsOutbound } import akka.stream.{ BidiShape, ActorFlowMaterializer } import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } import akka.stream.scaladsl._ @@ -83,6 +83,35 @@ class ConnectionPoolSpec extends AkkaSpec(""" Seq(r1, r2).map(t ⇒ connNr(t._1.get)) should contain allOf (1, 2) } + "open a second connection if the request on the first one is dispatch but not yet completed" in new TestSetup { + val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]() + + val responseEntityPub = TestPublisher.probe[ByteString]() + + override def testServerHandler(connNr: Int): HttpRequest ⇒ HttpResponse = { + case request @ HttpRequest(_, Uri.Path("/a"), _, _, _) ⇒ + val entity = HttpEntity.Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, Source(responseEntityPub)) + super.testServerHandler(connNr)(request) withEntity entity + case x ⇒ super.testServerHandler(connNr)(x) + } + + requestIn.sendNext(HttpRequest(uri = "/a") -> 42) + responseOutSub.request(1) + acceptIncomingConnection() + val (Success(r1), 42) = responseOut.expectNext() + val responseEntityProbe = TestSubscriber.probe[ByteString]() + r1.entity.dataBytes.runWith(Sink(responseEntityProbe)) + responseEntityProbe.expectSubscription().request(2) + responseEntityPub.sendNext(ByteString("YEAH")) + responseEntityProbe.expectNext(ByteString("YEAH")) + + requestIn.sendNext(HttpRequest(uri = "/b") -> 43) + responseOutSub.request(1) + acceptIncomingConnection() + val (Success(r2), 43) = responseOut.expectNext() + connNr(r2) shouldEqual 2 + } + "not open a second connection if there is an idle one available" in new TestSetup { val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]() @@ -258,13 +287,12 @@ class ConnectionPoolSpec extends AkkaSpec(""" val (serverEndpoint, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() def testServerHandler(connNr: Int): HttpRequest ⇒ HttpResponse = { - case HttpRequest(_, uri, headers, entity, _) ⇒ - val responseHeaders = - ConnNrHeader(connNr) +: - RawHeader("Req-Uri", uri.toString) +: headers.map(h ⇒ RawHeader("Req-" + h.name, h.value)) - HttpResponse(headers = responseHeaders, entity = entity) + case r: HttpRequest ⇒ HttpResponse(headers = responseHeaders(r, connNr), entity = r.entity) } + def responseHeaders(r: HttpRequest, connNr: Int) = + ConnNrHeader(connNr) +: RawHeader("Req-Uri", r.uri.toString) +: r.headers.map(h ⇒ RawHeader("Req-" + h.name, h.value)) + def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = bytes val incomingConnectionCounter = new AtomicInteger diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index e6a556463b..199e8eca8b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -65,7 +65,7 @@ object Sink { */ def foreach[T](f: function.Procedure[T]): Sink[T, Future[Unit]] = new Sink(scaladsl.Sink.foreach(f.apply)) - + /** * A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized * into a [[scala.concurrent.Future]].