Merge pull request #17824 from drewhk/wip-17504-http-cancellation-fix-drewhk

Fix Http client cancellation
This commit is contained in:
drewhk 2015-06-24 16:59:39 +02:00
commit 911943fc92
10 changed files with 171 additions and 68 deletions

View file

@ -78,7 +78,7 @@ private[http] object OutgoingConnectionBlueprint {
FlowGraph.partial() { implicit b
import FlowGraph.Implicits._
val methodBypassFanout = b.add(Broadcast[HttpRequest](2))
val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true))
val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser))
val terminationFanout = b.add(Broadcast[HttpResponse](2))

View file

@ -0,0 +1,76 @@
package akka.http.impl.engine.client
import javax.net.ssl.SSLContext
import akka.http.scaladsl.{ HttpsContext, Http }
import akka.http.scaladsl.model.{ HttpHeader, HttpResponse, HttpRequest }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestSubscriber, TestPublisher, AkkaSpec, TestUtils, Utils }
import akka.http.scaladsl.model.headers
class ClientCancellationSpec extends AkkaSpec("""
#akka.loggers = []
akka.loglevel = DEBUG
#akka.io.tcp.trace-logging = off
akka.io.tcp.windows-connection-abort-workaround-enabled=auto""") {
implicit val materializer = ActorMaterializer()
val noncheckedMaterializer = ActorMaterializer()
"Http client connections" must {
val address = TestUtils.temporaryServerAddress()
Http().bindAndHandleSync(
{ req HttpResponse(headers = headers.Connection("close") :: Nil) },
address.getHostName,
address.getPort)(noncheckedMaterializer)
val addressTls = TestUtils.temporaryServerAddress()
Http().bindAndHandleSync(
{ req HttpResponse() }, // TLS client does full-close, no need for the connection:close header
addressTls.getHostName,
addressTls.getPort,
httpsContext = Some(HttpsContext(SSLContext.getDefault)))(noncheckedMaterializer)
def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped {
val requests = TestPublisher.probe[HttpRequest]()
val responses = TestSubscriber.probe[HttpResponse]()
Source(requests).via(connection).runWith(Sink(responses))
responses.request(1)
requests.sendNext(HttpRequest())
responses.expectNext().entity.dataBytes.runWith(Sink.cancelled)
responses.cancel()
requests.expectCancellation()
}
"support cancellation in simple outgoing connection" in {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
}
"support cancellation in pooled outgoing connection" in {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort)(noncheckedMaterializer))
.map(_._1.get))
}
"support cancellation in simple outgoing connection with TLS" in {
pending
testCase(
Http().outgoingConnectionTls(addressTls.getHostName, addressTls.getPort))
}
"support cancellation in pooled outgoing connection with TLS" in {
pending
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolTls(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer))
.map(_._1.get))
}
}
}

View file

@ -114,6 +114,72 @@ class FlowSplitWhenSpec extends AkkaSpec {
masterSubscriber.expectComplete()
}
}
"support cancelling both master and substream" in assertAllStagesStopped {
val inputs = TestPublisher.probe[Int]()
val substream = TestSubscriber.probe[Int]()
val masterStream = TestSubscriber.probe[Any]()
Source(inputs)
.splitWhen(_ == 2)
.map(_.runWith(Sink(substream)))
.runWith(Sink(masterStream))
masterStream.request(1)
inputs.sendNext(1)
substream.cancel()
masterStream.expectNext(())
masterStream.expectNoMsg(1.second)
masterStream.cancel()
inputs.expectCancellation()
val inputs2 = TestPublisher.probe[Int]()
Source(inputs2)
.splitWhen(_ == 2)
.map(_.runWith(Sink.cancelled))
.runWith(Sink.cancelled)
inputs2.expectCancellation()
val inputs3 = TestPublisher.probe[Int]()
val substream3 = TestSubscriber.probe[Int]()
val masterStream3 = TestSubscriber.probe[Source[Int, Any]]()
Source(inputs3)
.splitWhen(_ == 2)
.runWith(Sink(masterStream3))
masterStream3.request(1)
inputs3.sendNext(1)
val src = masterStream3.expectNext()
src.runWith(Sink.cancelled)
masterStream3.request(1)
inputs3.sendNext(2)
val src2 = masterStream3.expectNext()
val substream4 = TestSubscriber.probe[Int]()
src2.runWith(Sink(substream4))
substream4.requestNext(2)
substream4.expectNoMsg(1.second)
masterStream3.expectNoMsg(1.second)
inputs3.expectRequest()
inputs3.expectRequest()
inputs3.expectNoMsg(1.second)
substream4.cancel()
inputs3.expectNoMsg(1.second)
masterStream3.expectNoMsg(1.second)
masterStream3.cancel()
inputs3.expectCancellation()
}
}
"support cancelling the master stream" in assertAllStagesStopped {

View file

@ -1,55 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.Attributes._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._
import scala.concurrent.duration._
import scala.concurrent.Await
class GraphJunctionAttributesSpec extends AkkaSpec {
implicit val set = ActorMaterializerSettings(system).withInputBuffer(4, 4)
implicit val mat = ActorMaterializer(set)
"A zip" should {
"take custom inputBuffer settings" in {
sealed abstract class SlowTick
case object SlowTick extends SlowTick
sealed abstract class FastTick
case object FastTick extends FastTick
val source = Source[(SlowTick, List[FastTick])]() { implicit b
import FlowGraph.Implicits._
val slow = Source(100.millis, 100.millis, SlowTick)
val fast = Source(0.seconds, 10.millis, FastTick)
val zip = b add Zip[SlowTick, List[FastTick]]().withAttributes(inputBuffer(1, 1))
slow ~> zip.in0
fast.conflate(tick List(tick)) { case (list, tick) tick :: list } ~> zip.in1
zip.out
}
val future = source
.drop(1) // account for prefetch
.grouped(10)
.runWith(Sink.head)
val fastTicks = Await.result(future, 2.seconds).map(_._2.size)
// Account for the possibility for the zip to act as a buffer of two.
// If that happens there would be one fast tick for one slow tick in the results.
// More explanation in #16435
atLeast(8, fastTicks) shouldBe 10 +- 1
}
}
}

View file

@ -173,8 +173,8 @@ private[akka] case class ActorMaterializerImpl(
val flexi = r.flexi(r.shape)
(FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets)
case BroadcastModule(shape, _)
(Broadcast.props(effectiveSettings, shape.outArray.size), shape.in, shape.outArray.toSeq)
case BroadcastModule(shape, eagerCancel, _)
(Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq)
case BalanceModule(shape, waitForDownstreams, _)
(Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)

View file

@ -289,14 +289,15 @@ private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val
* INTERNAL API
*/
private[akka] object Broadcast {
def props(settings: ActorMaterializerSettings, outputPorts: Int): Props =
Props(new Broadcast(settings, outputPorts)).withDeploy(Deploy.local)
def props(settings: ActorMaterializerSettings, eagerCancel: Boolean, outputPorts: Int): Props =
Props(new Broadcast(settings, outputPorts, eagerCancel)).withDeploy(Deploy.local)
}
/**
* INTERNAL API
*/
private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int) extends FanOut(_settings, _outputPorts) {
private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int, eagerCancel: Boolean) extends FanOut(_settings, _outputPorts) {
outputBunch.unmarkCancelledOutputs(!eagerCancel)
outputBunch.markAllOutputs()
initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { ()

View file

@ -44,11 +44,12 @@ private[stream] object Junctions {
final case class BroadcastModule[T](
shape: UniformFanOutShape[T, T],
eagerCancel: Boolean,
override val attributes: Attributes = name("broadcast")) extends FanOutModule {
override def withAttributes(attr: Attributes): Module = copy(attributes = attr)
override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), attributes)
override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), eagerCancel, attributes)
}
final case class MergePreferredModule[T](

View file

@ -121,7 +121,7 @@ private[akka] class SplitWhereProcessorImpl(_settings: ActorMaterializerSettings
}
// Ignore elements for a cancelled substream until a new substream needs to be opened
val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { ()
val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
decideSplit(elem) match {
case Continue | Drop // ignore elem

View file

@ -92,15 +92,26 @@ object MergePreferred {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
* '''Cancels when'''
* If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*/
object Broadcast {
/**
* Create a new `Broadcast` vertex with the specified input type.
*
* @param outputCount number of output ports
* @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel.
*/
def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] =
def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], Unit] =
scaladsl.Broadcast(outputCount)
/**
* Create a new `Broadcast` vertex with the specified input type.
*
* @param outputCount number of output ports
*/
def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, eagerCancel = false)
/**
* Create a new `Broadcast` vertex with the specified input type.
*/

View file

@ -104,10 +104,11 @@ object Broadcast {
* Create a new `Broadcast` with the specified number of output ports.
*
* @param outputPorts number of output ports
* @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel.
*/
def apply[T](outputPorts: Int): Broadcast[T] = {
def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = {
val shape = new UniformFanOutShape[T, T](outputPorts)
new Broadcast(outputPorts, shape, new BroadcastModule(shape, Attributes.name("Broadcast")))
new Broadcast(outputPorts, shape, new BroadcastModule(shape, eagerCancel, Attributes.name("Broadcast")))
}
}
@ -121,7 +122,9 @@ object Broadcast {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
* '''Cancels when'''
* If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*
*/
class Broadcast[T] private (outputPorts: Int,
override val shape: UniformFanOutShape[T, T],