Merge pull request #17627 from spray/wip-17403-mathias

=htc #17403 fix connection pool reusing existing connections too early
This commit is contained in:
drewhk 2015-06-15 17:12:53 +02:00
commit 8769a3e79b
6 changed files with 108 additions and 39 deletions

View file

@ -10,16 +10,17 @@ import scala.collection.immutable
import akka.event.LoggingAdapter
import akka.stream.scaladsl._
import akka.stream._
import akka.http.scaladsl.util.FastFuture
import akka.http.scaladsl.model.HttpMethod
import akka.http.impl.util._
private object PoolConductor {
import PoolFlow.RequestContext
import PoolSlot.{ SlotEvent, SimpleSlotEvent }
import PoolSlot.{ RawSlotEvent, SlotEvent }
case class Ports(
requestIn: Inlet[RequestContext],
slotEventIn: Inlet[SlotEvent],
slotEventIn: Inlet[RawSlotEvent],
slotOuts: immutable.Seq[Outlet[RequestContext]]) extends Shape {
override val inlets = requestIn :: slotEventIn :: Nil
@ -34,7 +35,7 @@ private object PoolConductor {
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape =
Ports(
inlets.head.asInstanceOf[Inlet[RequestContext]],
inlets.last.asInstanceOf[Inlet[SlotEvent]],
inlets.last.asInstanceOf[Inlet[RawSlotEvent]],
outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]])
}
@ -47,11 +48,15 @@ private object PoolConductor {
+--------->| Merge +---->|Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
| | | | | | | Route) +-------------->
+----+------+ +-----+-----+ +-------------+ +-----------+ to slots
^ ^
| |SimpleSlotEvent
^ ^
| | SlotEvent
| +----+----+
| |flatten | mapAsync
| +----+----+
| |RawSlotEvent
| Request- |
| Context +---------+
+-------------+ retry |<-------- Slot Event (from slotEventMerge)
+-------------+ retry |<-------- RawSlotEvent (from slotEventMerge)
| Split |
+---------+
@ -67,10 +72,15 @@ private object PoolConductor {
val doubler = Flow[SwitchCommand].mapConcat(x x :: x :: Nil) // work-around for https://github.com/akka/akka/issues/17004
val route = b.add(new Route(slotCount))
val retrySplit = b.add(new RetrySplit())
val flatten = Flow[RawSlotEvent].mapAsyncUnordered(slotCount) {
case x: SlotEvent.Disconnected FastFuture.successful(x)
case SlotEvent.RequestCompletedFuture(future) future
case x throw new IllegalStateException("Unexpected " + x)
}
retryMerge.out ~> slotSelector.in0
slotSelector.out ~> doubler ~> route.in
retrySplit.out0 ~> slotSelector.in1
retrySplit.out0 ~> flatten ~> slotSelector.in1
retrySplit.out1 ~> retryMerge.in1
Ports(retryMerge.in0, retrySplit.in, route.outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]])
@ -97,11 +107,11 @@ private object PoolConductor {
private object Busy extends Busy(1)
private class SlotSelector(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter)
extends FlexiMerge[SwitchCommand, FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]](
extends FlexiMerge[SwitchCommand, FanInShape2[RequestContext, SlotEvent, SwitchCommand]](
new FanInShape2("PoolConductor.SlotSelector"), OperationAttributes.name("PoolConductor.SlotSelector")) {
import FlexiMerge._
def createMergeLogic(s: FanInShape2[RequestContext, SimpleSlotEvent, SwitchCommand]): MergeLogic[SwitchCommand] =
def createMergeLogic(s: FanInShape2[RequestContext, SlotEvent, SwitchCommand]): MergeLogic[SwitchCommand] =
new MergeLogic[SwitchCommand] {
val slotStates = Array.fill[SlotState](slotCount)(Unconnected)
def initialState = nextState(0)
@ -197,16 +207,16 @@ private object PoolConductor {
}
// FIXME: remove when #17038 is cleared
private class RetrySplit extends FlexiRoute[SlotEvent, FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]](
private class RetrySplit extends FlexiRoute[RawSlotEvent, FanOutShape2[RawSlotEvent, RawSlotEvent, RequestContext]](
new FanOutShape2("PoolConductor.RetrySplit"), OperationAttributes.name("PoolConductor.RetrySplit")) {
import FlexiRoute._
def createRouteLogic(s: FanOutShape2[SlotEvent, SimpleSlotEvent, RequestContext]): RouteLogic[SlotEvent] =
new RouteLogic[SlotEvent] {
def createRouteLogic(s: FanOutShape2[RawSlotEvent, RawSlotEvent, RequestContext]): RouteLogic[RawSlotEvent] =
new RouteLogic[RawSlotEvent] {
def initialState: State[_] = State(DemandFromAll(s)) { (ctx, _, ev)
ev match {
case x: SimpleSlotEvent ctx.emit(s.out0)(x)
case SlotEvent.RetryRequest(rc) ctx.emit(s.out1)(rc)
case x ctx.emit(s.out0)(x)
}
SameState
}

View file

@ -40,11 +40,11 @@ private object PoolFlow {
^ ||| |
| |+-------------------+|
| ||||
SlotEvent| +---->|ConnectionSlot3+---->
RawSlotEvent| +---->|ConnectionSlot3+---->
| ||
| +---------------+---+
| | ||
+-----------+ SlotEvent | ||
+-----------+ RawSlotEvent | ||
|slotEvent| <-------------+ ||
| Merge | <-------------------+|
|| <-------------------------+
@ -79,7 +79,7 @@ private object PoolFlow {
.tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings))
.map(b.add(_))
val responseMerge = b.add(Merge[ResponseContext](maxConnections))
val slotEventMerge = b.add(Merge[PoolSlot.SlotEvent](maxConnections))
val slotEventMerge = b.add(Merge[PoolSlot.RawSlotEvent](maxConnections))
slotEventMerge.out ~> conductor.slotEventIn
for ((slot, ix) slots.zipWithIndex) {

View file

@ -6,10 +6,12 @@ package akka.http.impl.engine.client
import language.existentials
import java.net.InetSocketAddress
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.collection.immutable
import akka.actor._
import akka.http.scaladsl.model.{ HttpResponse, HttpRequest }
import akka.http.scaladsl.model.{ HttpEntity, HttpResponse, HttpRequest }
import akka.http.scaladsl.util.FastFuture
import akka.http.ConnectionPoolSettings
import akka.http.impl.util._
import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor }
@ -22,15 +24,16 @@ private object PoolSlot {
sealed trait ProcessorOut
final case class ResponseDelivery(response: ResponseContext) extends ProcessorOut
sealed trait SlotEvent extends ProcessorOut
sealed trait SimpleSlotEvent extends SlotEvent
sealed trait RawSlotEvent extends ProcessorOut
sealed trait SlotEvent extends RawSlotEvent
object SlotEvent {
final case class RequestCompleted(slotIx: Int) extends SimpleSlotEvent
final case class Disconnected(slotIx: Int, failedRequests: Int) extends SimpleSlotEvent
final case class RetryRequest(rc: RequestContext) extends SlotEvent
final case class RequestCompletedFuture(future: Future[RequestCompleted]) extends RawSlotEvent
final case class RetryRequest(rc: RequestContext) extends RawSlotEvent
final case class RequestCompleted(slotIx: Int) extends SlotEvent
final case class Disconnected(slotIx: Int, failedRequests: Int) extends SlotEvent
}
type Ports = FanOutShape2[RequestContext, ResponseContext, SlotEvent]
type Ports = FanOutShape2[RequestContext, ResponseContext, RawSlotEvent]
private val slotProcessorActorName = new SeqActorName("SlotProcessor")
@ -43,7 +46,7 @@ private object PoolSlot {
+--------->|Processor+------------->| (MapConcat) +------------->| (MapConcat) +---->|Split+------------->
|| Processor- | | Out | | || Context
+-----------+ Out] +-------------+ +-------------+ +-----+------+
|SlotEvent
|RawSlotEvent
|(toConductor
| via slotEventMerge)
v
@ -150,8 +153,21 @@ private object PoolSlot {
case FromConnection(OnNext(response: HttpResponse))
val requestContext = inflightRequests.head
inflightRequests = inflightRequests.tail
val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response)))
val requestCompleted = SlotEvent.RequestCompleted(slotIx)
val (entity, whenCompleted) = response.entity match {
case x: HttpEntity.Strict x -> FastFuture.successful(())
case x: HttpEntity.Default
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData) -> whenCompleted
case x: HttpEntity.CloseDelimited
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData) -> whenCompleted
case x: HttpEntity.Chunked
val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks)
x.copy(chunks = newChunks) -> whenCompleted
}
val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response withEntity entity)))
import fm.executionContext
val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ SlotEvent.RequestCompleted(slotIx)))
onNext(delivery :: requestCompleted :: Nil)
case FromConnection(OnComplete) handleDisconnect(None)
@ -211,11 +227,11 @@ private object PoolSlot {
}
// FIXME: remove when #17038 is cleared
private class SlotEventSplit extends FlexiRoute[ProcessorOut, FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]](
private class SlotEventSplit extends FlexiRoute[ProcessorOut, FanOutShape2[ProcessorOut, ResponseContext, RawSlotEvent]](
new FanOutShape2("PoolSlot.SlotEventSplit"), OperationAttributes.name("PoolSlot.SlotEventSplit")) {
import FlexiRoute._
def createRouteLogic(s: FanOutShape2[ProcessorOut, ResponseContext, SlotEvent]): RouteLogic[ProcessorOut] =
def createRouteLogic(s: FanOutShape2[ProcessorOut, ResponseContext, RawSlotEvent]): RouteLogic[ProcessorOut] =
new RouteLogic[ProcessorOut] {
val initialState: State[_] = State(DemandFromAny(s)) {
case (_, _, ResponseDelivery(x))
@ -223,7 +239,7 @@ private object PoolSlot {
ctx.emit(s.out0)(x)
initialState
}
case (_, _, x: SlotEvent)
case (_, _, x: RawSlotEvent)
State(DemandFrom(s.out1)) { (ctx, _, _)
ctx.emit(s.out1)(x)
initialState

View file

@ -7,10 +7,9 @@ package akka.http.impl.util
import java.io.InputStream
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.{ SourceModule, SinkModule, ActorFlowMaterializerImpl, PublisherSink }
import akka.stream.impl.{ SourceModule, SinkModule, PublisherSink }
import akka.stream.scaladsl.FlexiMerge._
import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.{ Promise, ExecutionContext, Future }
import akka.util.ByteString
@ -64,6 +63,22 @@ private[http] object StreamUtils {
Flow[ByteString].transform(() transformer).named("transformError")
}
def captureTermination[T, Mat](source: Source[T, Mat]): (Source[T, Mat], Future[Unit]) = {
val promise = Promise[Unit]()
val transformer = new PushStage[T, T] {
def onPush(element: T, ctx: Context[T]) = ctx.push(element)
override def onUpstreamFinish(ctx: Context[T]) = {
promise.success(())
super.onUpstreamFinish(ctx)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = {
promise.failure(cause)
ctx.fail(cause)
}
}
source.transform(() transformer) -> promise.future
}
def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString, Unit] = {
val transformer = new StatefulStage[ByteString, ByteString] {

View file

@ -300,8 +300,8 @@ object HttpEntity {
}
object Chunked {
/**
* Returns a ``Chunked`` entity where one Chunk is produced for every non-empty ByteString of the given
* ``Publisher[ByteString]``.
* Returns a ``Chunked`` entity where one Chunk is produced for every non-empty ByteString produced by the given
* ``Source``.
*/
def fromData(contentType: ContentType, chunks: Source[ByteString, Any]): Chunked =
Chunked(contentType, chunks.collect[ChunkStreamPart] {

View file

@ -16,7 +16,7 @@ import akka.util.ByteString
import akka.http.scaladsl.{ TestUtils, Http }
import akka.http.impl.util.{ SingletonException, StreamUtils }
import akka.http.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.stream.io.{ SessionBytes, SendBytes, SslTlsInbound, SslTlsOutbound }
import akka.stream.io.{ SessionBytes, SendBytes, SslTlsOutbound }
import akka.stream.{ BidiShape, ActorFlowMaterializer }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec }
import akka.stream.scaladsl._
@ -83,6 +83,35 @@ class ConnectionPoolSpec extends AkkaSpec("""
Seq(r1, r2).map(t connNr(t._1.get)) should contain allOf (1, 2)
}
"open a second connection if the request on the first one is dispatch but not yet completed" in new TestSetup {
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]()
val responseEntityPub = TestPublisher.probe[ByteString]()
override def testServerHandler(connNr: Int): HttpRequest HttpResponse = {
case request @ HttpRequest(_, Uri.Path("/a"), _, _, _)
val entity = HttpEntity.Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, Source(responseEntityPub))
super.testServerHandler(connNr)(request) withEntity entity
case x super.testServerHandler(connNr)(x)
}
requestIn.sendNext(HttpRequest(uri = "/a") -> 42)
responseOutSub.request(1)
acceptIncomingConnection()
val (Success(r1), 42) = responseOut.expectNext()
val responseEntityProbe = TestSubscriber.probe[ByteString]()
r1.entity.dataBytes.runWith(Sink(responseEntityProbe))
responseEntityProbe.expectSubscription().request(2)
responseEntityPub.sendNext(ByteString("YEAH"))
responseEntityProbe.expectNext(ByteString("YEAH"))
requestIn.sendNext(HttpRequest(uri = "/b") -> 43)
responseOutSub.request(1)
acceptIncomingConnection()
val (Success(r2), 43) = responseOut.expectNext()
connNr(r2) shouldEqual 2
}
"not open a second connection if there is an idle one available" in new TestSetup {
val (requestIn, responseOut, responseOutSub, hcp) = cachedHostConnectionPool[Int]()
@ -258,13 +287,12 @@ class ConnectionPoolSpec extends AkkaSpec("""
val (serverEndpoint, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort()
def testServerHandler(connNr: Int): HttpRequest HttpResponse = {
case HttpRequest(_, uri, headers, entity, _)
val responseHeaders =
ConnNrHeader(connNr) +:
RawHeader("Req-Uri", uri.toString) +: headers.map(h RawHeader("Req-" + h.name, h.value))
HttpResponse(headers = responseHeaders, entity = entity)
case r: HttpRequest HttpResponse(headers = responseHeaders(r, connNr), entity = r.entity)
}
def responseHeaders(r: HttpRequest, connNr: Int) =
ConnNrHeader(connNr) +: RawHeader("Req-Uri", r.uri.toString) +: r.headers.map(h RawHeader("Req-" + h.name, h.value))
def mapServerSideOutboundRawBytes(bytes: ByteString): ByteString = bytes
val incomingConnectionCounter = new AtomicInteger