From 8cea39cdef7f19e7f0d5b6edf97183344f1684a5 Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Fri, 22 Jan 2016 12:03:21 +0300 Subject: [PATCH 1/3] +str #19528 Add `propagateSubstreamCancel` flag to `splitWhen` and `splitAfter` --- .../stream/scaladsl/FlowSplitAfterSpec.scala | 12 +++++- .../stream/scaladsl/FlowSplitWhenSpec.scala | 12 +++++- .../stream/impl/fusing/StreamOfStreams.scala | 14 ++++--- .../scala/akka/stream/scaladsl/Flow.scala | 40 +++++++++++++++---- 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala index 5d40c82d66..a97dac0bca 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala @@ -49,9 +49,9 @@ class FlowSplitAfterSpec extends AkkaSpec { def cancel(): Unit = subscription.cancel() } - class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6) { + class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6, propagateSubstreamCancel: Boolean = false) { val source = Source(1 to elementCount) - val groupStream = source.splitAfter(_ == splitAfter).lift.runWith(Sink.asPublisher(false)) + val groupStream = source.splitAfter(propagateSubstreamCancel)(_ == splitAfter).lift.runWith(Sink.asPublisher(false)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -253,6 +253,14 @@ class FlowSplitAfterSpec extends AkkaSpec { upsub.expectCancellation() } + "support eager cancellation of master stream on cancelling substreams" in assertAllStagesStopped { + new SubstreamsSupport(splitAfter = 5, elementCount = 8, propagateSubstreamCancel = true) { + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.asPublisher(false))) + s1.cancel() + masterSubscriber.expectComplete() + } + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index fc03a5e159..cf837a5cd6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -35,9 +35,9 @@ class FlowSplitWhenSpec extends AkkaSpec { def cancel(): Unit = subscription.cancel() } - class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { + class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6, propagateSubstreamCancel: Boolean = false) { val source = Source(1 to elementCount) - val groupStream = source.splitWhen(_ == splitWhen).lift.runWith(Sink.asPublisher(false)) + val groupStream = source.splitWhen(propagateSubstreamCancel)(_ == splitWhen).lift.runWith(Sink.asPublisher(false)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -342,6 +342,14 @@ class FlowSplitWhenSpec extends AkkaSpec { upsub.expectCancellation() } + "support eager cancellation of master stream on cancelling substreams" in assertAllStagesStopped { + new SubstreamsSupport(splitWhen = 5, elementCount = 8, propagateSubstreamCancel = true) { + val s1 = StreamPuppet(getSubFlow().runWith(Sink.asPublisher(false))) + s1.cancel() + masterSubscriber.expectComplete() + } + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index f21b54b5cd..083572d56d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -214,14 +214,17 @@ object Split { /** Splits after the current element. The current element will be the last element in the current substream. */ case object SplitAfter extends SplitDecision - def when[T](p: T ⇒ Boolean): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = new Split(Split.SplitBefore, p) - def after[T](p: T ⇒ Boolean): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = new Split(Split.SplitAfter, p) + def when[T](p: T ⇒ Boolean, propagateSubstreamCancel: Boolean = false): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = + new Split(Split.SplitBefore, p, propagateSubstreamCancel) + + def after[T](p: T ⇒ Boolean, propagateSubstreamCancel: Boolean = false): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = + new Split(Split.SplitAfter, p, propagateSubstreamCancel) } /** * INERNAL API */ -final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, propagateSubstreamCancel: Boolean) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("Split.in") val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) @@ -329,8 +332,9 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean) extends Gr override def onDownstreamFinish(): Unit = { substreamCancelled = true - if (isClosed(in)) completeStage() - else { + if (isClosed(in) || propagateSubstreamCancel) { + completeStage() + } else { // Start draining if (!hasBeenPulled(in)) pull(in) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index fee1ddcc14..9026bfdbfe 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1201,23 +1201,36 @@ trait FlowOps[+Out, +Mat] { * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels and substreams cancel + * '''Cancels when''' downstream cancels and substreams cancel if propagateSubstreamCancel=false, downstream + * cancels or any substream cancels if propagateSubstreamCancel=true * * See also [[FlowOps.splitAfter]]. */ - def splitWhen(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { + def splitWhen(propagateSubstreamCancel: Boolean)(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - via(Split.when(p)) + via(Split.when(p, propagateSubstreamCancel)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } + val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - via(Split.when(p)) + via(Split.when(p, propagateSubstreamCancel)) .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) + new SubFlowImpl(Flow[Out], merge, finish) } + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. + * + * @see [[#splitWhen]] + */ + def splitWhen(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = + splitWhen(propagateSubstreamCancel = false)(p) + /** * This operation applies the given predicate to all incoming elements and * emits them to a stream of output streams. It *ends* the current substream when the @@ -1258,23 +1271,34 @@ trait FlowOps[+Out, +Mat] { * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels and substreams cancel + * '''Cancels when''' downstream cancels and substreams cancel if propagateSubstreamCancel=false, downstream + * cancels or any substream cancels if propagateSubstreamCancel=true * * See also [[FlowOps.splitWhen]]. */ - def splitAfter(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { + def splitAfter(propagateSubstreamCancel: Boolean)(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - via(Split.after(p)) + via(Split.after(p, propagateSubstreamCancel)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - via(Split.after(p)) + via(Split.after(p, propagateSubstreamCancel)) .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) new SubFlowImpl(Flow[Out], merge, finish) } + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams. It *ends* the current substream when the + * predicate is true. + * + * @see [[#splitAfter]] + */ + def splitAfter(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = + splitAfter(propagateSubstreamCancel = false)(p) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by concatenation, From 6ef9ab3276ccb1961193e863e77c02abc4b70a4b Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Fri, 22 Jan 2016 12:03:32 +0300 Subject: [PATCH 2/3] =htc #19528 OutgoingConnectionBlueprint: replace splitWhen/prefixAndTail with custom stage --- .../client/OutgoingConnectionBlueprint.scala | 114 ++++++++++++++---- .../main/scala/akka/http/scaladsl/Http.scala | 7 +- .../stream/scaladsl/FlowSplitAfterSpec.scala | 14 ++- .../stream/scaladsl/FlowSplitWhenSpec.scala | 10 +- .../akka/stream/SubstreamCancelStrategy.scala | 36 ++++++ .../stream/impl/fusing/StreamOfStreams.scala | 15 ++- .../main/scala/akka/stream/javadsl/Flow.scala | 26 +++- .../scala/akka/stream/scaladsl/Flow.scala | 24 ++-- 8 files changed, 195 insertions(+), 51 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/SubstreamCancelStrategy.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index b68264ab67..78a423dfb9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -5,7 +5,7 @@ package akka.http.impl.engine.client import akka.NotUsed -import akka.http.scaladsl.settings.ClientConnectionSettings +import akka.http.scaladsl.settings.{ ClientConnectionSettings, ParserSettings } import language.existentials import scala.annotation.tailrec import scala.collection.mutable.ListBuffer @@ -16,13 +16,13 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.Host -import akka.http.scaladsl.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse } +import akka.http.scaladsl.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse, ResponseEntity } import akka.http.impl.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } import akka.http.impl.engine.parsing._ import akka.http.impl.util._ import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic -import akka.stream.stage.InHandler +import akka.stream.stage.{ InHandler, OutHandler } import akka.stream.impl.fusing.SubSource /** @@ -69,24 +69,7 @@ private[http] object OutgoingConnectionBlueprint { import ParserOutput._ val responsePrep = Flow[List[ResponseOutput]] .mapConcat(conforms) - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .prefixAndTail(1) - .filter { - case (Seq(MessageEnd), remaining) ⇒ - SubSource.kill(remaining) - false - case (seq, _) ⇒ - seq.nonEmpty - } - .map { - case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, _)), entityParts) ⇒ - val entity = createEntity(entityParts) withSizeLimit parserSettings.maxContentLength - HttpResponse(statusCode, headers, entity, protocol) - case (Seq(MessageStartError(_, info)), tail) ⇒ - // Tails can be empty, but still need one pull to figure that out -- never drop tails. - SubSource.kill(tail) - throw IllegalResponseException(info) - }.concatSubstreams + .via(new ResponsePrep(parserSettings)) val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ @@ -148,6 +131,95 @@ private[http] object OutgoingConnectionBlueprint { import ParserOutput._ + private final class ResponsePrep(parserSettings: ParserSettings) + extends GraphStage[FlowShape[ResponseOutput, HttpResponse]] { + + private val in = Inlet[ResponseOutput]("ResponsePrep.in") + private val out = Outlet[HttpResponse]("ResponsePrep.out") + + val shape = new FlowShape(in, out) + + override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + private var entitySource: SubSourceOutlet[ResponseOutput] = _ + private def entitySubstreamStarted = entitySource ne null + private def idle = this + + def setIdleHandlers(): Unit = { + setHandler(in, idle) + setHandler(out, idle) + } + + def onPush(): Unit = grab(in) match { + case ResponseStart(statusCode, protocol, headers, entityCreator, _) ⇒ + val entity = createEntity(entityCreator) withSizeLimit parserSettings.maxContentLength + push(out, HttpResponse(statusCode, headers, entity, protocol)) + + case MessageStartError(_, info) ⇒ + throw IllegalResponseException(info) + + case other ⇒ + throw new IllegalStateException(s"ResponseStart expected but $other received.") + } + + def onPull(): Unit = { + if (!entitySubstreamStarted) pull(in) + } + + setIdleHandlers() + + private lazy val waitForMessageEnd = new InHandler { + def onPush(): Unit = grab(in) match { + case MessageEnd ⇒ setHandler(in, idle) + case other ⇒ throw new IllegalStateException(s"MessageEnd expected but $other received.") + } + } + + private lazy val substreamHandler = new InHandler with OutHandler { + override def onPush(): Unit = grab(in) match { + case MessageEnd ⇒ + entitySource.complete() + entitySource = null + setIdleHandlers() + + case messagePart ⇒ + entitySource.push(messagePart) + } + + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + entitySource.complete() + completeStage() + } + + override def onUpstreamFailure(reason: Throwable): Unit = { + entitySource.fail(reason) + failStage(reason) + } + + override def onDownstreamFinish(): Unit = { + entitySource.complete() + completeStage() + } + } + + private def createEntity(creator: EntityCreator[ResponseOutput, ResponseEntity]): ResponseEntity = { + creator match { + case StrictEntityCreator(entity) ⇒ + pull(in) + setHandler(in, waitForMessageEnd) + entity + + case StreamedEntityCreator(creator) ⇒ + entitySource = new SubSourceOutlet[ResponseOutput]("EntitySource") + entitySource.setHandler(substreamHandler) + setHandler(in, substreamHandler) + creator(Source.fromGraph(entitySource.source)) + } + } + } + } + /** * A merge that follows this logic: * 1. Wait on the methodBypass for the method of the request corresponding to the next response to be received diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 60afdea724..c85455d1bb 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -217,8 +217,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = _outgoingConnection(host, port, localAddress, settings, connectionContext, log) - private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress], - settings: ClientConnectionSettings, connectionContext: ConnectionContext, + private def _outgoingConnection(host: String, + port: Int, + localAddress: Option[InetSocketAddress], + settings: ClientConnectionSettings, + connectionContext: ConnectionContext, log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { val hostHeader = if (port == connectionContext.defaultPort) Host(host) else Host(host, port) val layer = clientLayer(hostHeader, settings, log) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala index a97dac0bca..1ded0cc141 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala @@ -4,9 +4,7 @@ package akka.stream.scaladsl import akka.NotUsed -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.ActorAttributes +import akka.stream._ import akka.stream.Supervision.resumingDecider import akka.stream.testkit.AkkaSpec import akka.stream.testkit.TestPublisher @@ -49,9 +47,13 @@ class FlowSplitAfterSpec extends AkkaSpec { def cancel(): Unit = subscription.cancel() } - class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6, propagateSubstreamCancel: Boolean = false) { + class SubstreamsSupport( + splitAfter: Int = 3, + elementCount: Int = 6, + substreamCancelStrategy: SubstreamCancelStrategy = SubstreamCancelStrategy.drain) { + val source = Source(1 to elementCount) - val groupStream = source.splitAfter(propagateSubstreamCancel)(_ == splitAfter).lift.runWith(Sink.asPublisher(false)) + val groupStream = source.splitAfter(substreamCancelStrategy)(_ == splitAfter).lift.runWith(Sink.asPublisher(false)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -254,7 +256,7 @@ class FlowSplitAfterSpec extends AkkaSpec { } "support eager cancellation of master stream on cancelling substreams" in assertAllStagesStopped { - new SubstreamsSupport(splitAfter = 5, elementCount = 8, propagateSubstreamCancel = true) { + new SubstreamsSupport(splitAfter = 5, elementCount = 8, SubstreamCancelStrategy.propagate) { val s1 = StreamPuppet(expectSubFlow().runWith(Sink.asPublisher(false))) s1.cancel() masterSubscriber.expectComplete() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index cf837a5cd6..df1122c908 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -35,9 +35,13 @@ class FlowSplitWhenSpec extends AkkaSpec { def cancel(): Unit = subscription.cancel() } - class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6, propagateSubstreamCancel: Boolean = false) { + class SubstreamsSupport( + splitWhen: Int = 3, + elementCount: Int = 6, + substreamCancelStrategy: SubstreamCancelStrategy = SubstreamCancelStrategy.drain) { + val source = Source(1 to elementCount) - val groupStream = source.splitWhen(propagateSubstreamCancel)(_ == splitWhen).lift.runWith(Sink.asPublisher(false)) + val groupStream = source.splitWhen(substreamCancelStrategy)(_ == splitWhen).lift.runWith(Sink.asPublisher(false)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -343,7 +347,7 @@ class FlowSplitWhenSpec extends AkkaSpec { } "support eager cancellation of master stream on cancelling substreams" in assertAllStagesStopped { - new SubstreamsSupport(splitWhen = 5, elementCount = 8, propagateSubstreamCancel = true) { + new SubstreamsSupport(splitWhen = 5, elementCount = 8, SubstreamCancelStrategy.propagate) { val s1 = StreamPuppet(getSubFlow().runWith(Sink.asPublisher(false))) s1.cancel() masterSubscriber.expectComplete() diff --git a/akka-stream/src/main/scala/akka/stream/SubstreamCancelStrategy.scala b/akka-stream/src/main/scala/akka/stream/SubstreamCancelStrategy.scala new file mode 100644 index 0000000000..91912402d6 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/SubstreamCancelStrategy.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2016 Typesafe Inc. + */ +package akka.stream + +import SubstreamCancelStrategies._ + +/** + * Represents a strategy that decides how to deal with substream events. + */ +sealed abstract class SubstreamCancelStrategy + +private[akka] object SubstreamCancelStrategies { + /** + * INTERNAL API + */ + private[akka] final case object Propagate extends SubstreamCancelStrategy + + /** + * INTERNAL API + */ + private[akka] final case object Drain extends SubstreamCancelStrategy +} + +object SubstreamCancelStrategy { + /** + * Cancel the stream of streams if any substream is cancelled. + */ + def propagate: SubstreamCancelStrategy = Propagate + + /** + * Drain substream on cancellation in order to prevent stailling of the stream of streams. + */ + def drain: SubstreamCancelStrategy = Drain +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 083572d56d..ef2a19eb45 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -214,21 +214,26 @@ object Split { /** Splits after the current element. The current element will be the last element in the current substream. */ case object SplitAfter extends SplitDecision - def when[T](p: T ⇒ Boolean, propagateSubstreamCancel: Boolean = false): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = - new Split(Split.SplitBefore, p, propagateSubstreamCancel) + def when[T](p: T ⇒ Boolean, substreamCancelStrategy: SubstreamCancelStrategy): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = + new Split(Split.SplitBefore, p, substreamCancelStrategy) - def after[T](p: T ⇒ Boolean, propagateSubstreamCancel: Boolean = false): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = - new Split(Split.SplitAfter, p, propagateSubstreamCancel) + def after[T](p: T ⇒ Boolean, substreamCancelStrategy: SubstreamCancelStrategy): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = + new Split(Split.SplitAfter, p, substreamCancelStrategy) } /** * INERNAL API */ -final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, propagateSubstreamCancel: Boolean) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("Split.in") val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) + private val propagateSubstreamCancel = substreamCancelStrategy match { + case SubstreamCancelStrategies.Propagate ⇒ true + case SubstreamCancelStrategies.Drain ⇒ false + } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { import Split._ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 7888e00950..4e7effdeb3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1087,13 +1087,24 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels and substreams cancel + * '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain()`, downstream + * cancels or any substream cancels on `SubstreamCancelStrategy.propagate()` * * See also [[Flow.splitAfter]]. */ def splitWhen(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.splitWhen(p.test)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. + * + * @see [[#splitWhen]] + */ + def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test)) + /** * This operation applies the given predicate to all incoming elements and * emits them to a stream of output streams. It *ends* the current substream when the @@ -1134,13 +1145,24 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels and substreams cancel + * '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream + * cancels or any substream cancels on `SubstreamCancelStrategy.propagate` * * See also [[Flow.splitWhen]]. */ def splitAfter[U >: Out](p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.splitAfter(p.test)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams. It *ends* the current substream when the + * predicate is true. + * + * @see [[#splitAfter]] + */ + def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test)) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by concatenation, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9026bfdbfe..92c179973e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1201,21 +1201,21 @@ trait FlowOps[+Out, +Mat] { * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels and substreams cancel if propagateSubstreamCancel=false, downstream - * cancels or any substream cancels if propagateSubstreamCancel=true + * '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream + * cancels or any substream cancels on `SubstreamCancelStrategy.propagate` * * See also [[FlowOps.splitAfter]]. */ - def splitWhen(propagateSubstreamCancel: Boolean)(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { + def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - via(Split.when(p, propagateSubstreamCancel)) + via(Split.when(p, substreamCancelStrategy)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - via(Split.when(p, propagateSubstreamCancel)) + via(Split.when(p, substreamCancelStrategy)) .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) new SubFlowImpl(Flow[Out], merge, finish) @@ -1229,7 +1229,7 @@ trait FlowOps[+Out, +Mat] { * @see [[#splitWhen]] */ def splitWhen(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = - splitWhen(propagateSubstreamCancel = false)(p) + splitWhen(SubstreamCancelStrategy.drain)(p) /** * This operation applies the given predicate to all incoming elements and @@ -1271,20 +1271,20 @@ trait FlowOps[+Out, +Mat] { * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels and substreams cancel if propagateSubstreamCancel=false, downstream - * cancels or any substream cancels if propagateSubstreamCancel=true + * '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream + * cancels or any substream cancels on `SubstreamCancelStrategy.propagate` * * See also [[FlowOps.splitWhen]]. */ - def splitAfter(propagateSubstreamCancel: Boolean)(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { + def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - via(Split.after(p, propagateSubstreamCancel)) + via(Split.after(p, substreamCancelStrategy)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - via(Split.after(p, propagateSubstreamCancel)) + via(Split.after(p, substreamCancelStrategy)) .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) new SubFlowImpl(Flow[Out], merge, finish) } @@ -1297,7 +1297,7 @@ trait FlowOps[+Out, +Mat] { * @see [[#splitAfter]] */ def splitAfter(p: Out ⇒ Boolean): SubFlow[Out, Mat, Repr, Closed] = - splitAfter(propagateSubstreamCancel = false)(p) + splitAfter(SubstreamCancelStrategy.drain)(p) /** * Transform each input element into a `Source` of output elements that is From af3fc0c9a62aa0892d396941a5886cc3388316f3 Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Fri, 5 Feb 2016 11:23:57 +0300 Subject: [PATCH 3/3] !htp #19528 HttpServerBluePrint: close connection on request entity cancellation --- .../scala/akka/actor/dungeon/DeathWatch.scala | 6 +- .../http/client-side/connection-level.rst | 4 ++ .../server-side/low-level-server-side-api.rst | 4 ++ .../stream/migration-guide-2.0-2.4-java.rst | 14 ++++ .../http/client-side/connection-level.rst | 4 ++ .../scala/http/low-level-server-side-api.rst | 4 ++ .../stream/migration-guide-2.0-2.4-scala.rst | 13 ++++ .../engine/server/HttpServerBluePrint.scala | 26 +++++-- .../LowLevelOutgoingConnectionSpec.scala | 71 +++++++++++++++++++ .../impl/engine/server/HttpServerSpec.scala | 48 +++++++++++++ 10 files changed, 185 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index dbeaf3a7a6..cf49c7f157 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -4,9 +4,9 @@ package akka.actor.dungeon -import akka.dispatch.sysmsg.{Unwatch, Watch, DeathWatchNotification} -import akka.event.Logging.{Warning, Debug} -import akka.actor.{InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef} +import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification } +import akka.event.Logging.{ Warning, Debug } +import akka.actor.{ InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef } import akka.event.AddressTerminatedTopic private[akka] trait DeathWatch { this: ActorCell ⇒ diff --git a/akka-docs/rst/java/http/client-side/connection-level.rst b/akka-docs/rst/java/http/client-side/connection-level.rst index 36eec3c7bc..eb2ff3c021 100644 --- a/akka-docs/rst/java/http/client-side/connection-level.rst +++ b/akka-docs/rst/java/http/client-side/connection-level.rst @@ -53,6 +53,10 @@ The connection can also be closed by the server. An application can actively trigger the closing of the connection by completing the request stream. In this case the underlying TCP connection will be closed when the last pending response has been received. +The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled()``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be +explicitly drained by attaching it to ``Sink.ignore()``. + Timeouts -------- diff --git a/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst b/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst index f13ebd2647..ec883e5e26 100644 --- a/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst +++ b/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst @@ -130,6 +130,10 @@ connection. An often times more convenient alternative is to explicitly add a `` ``HttpResponse``. This response will then be the last one on the connection and the server will actively close the connection when it has been sent out. +Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled()``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be +explicitly drained by attaching it to ``Sink.ignore()``. + .. _serverSideHTTPS-java: diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index fcf3156e04..89206a3128 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -156,3 +156,17 @@ Routing settings parameter name and were accessible via ``settings``. We now made it possible to configure the parsers settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is now accessible via ``parserSettings``. + +Client / server behaviour on cancelled entity +--------------------------------------------- + +Previously if request or response were cancelled or consumed only partially +(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling +the connection, since there could still be more requests / responses incoming. Now the default +behaviour is to close the connection in order to prevent using excessive resource usage in case +of huge entities. + +The old behaviour can be achieved by explicitly draining the entity: + + response.entity().getDataBytes().runWith(Sink.ignore()) + diff --git a/akka-docs/rst/scala/http/client-side/connection-level.rst b/akka-docs/rst/scala/http/client-side/connection-level.rst index 452a633d20..ec98c22686 100644 --- a/akka-docs/rst/scala/http/client-side/connection-level.rst +++ b/akka-docs/rst/scala/http/client-side/connection-level.rst @@ -55,6 +55,10 @@ The connection can also be closed by the server. An application can actively trigger the closing of the connection by completing the request stream. In this case the underlying TCP connection will be closed when the last pending response has been received. +The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be +explicitly drained by attaching it to ``Sink.ignore``. + Timeouts -------- diff --git a/akka-docs/rst/scala/http/low-level-server-side-api.rst b/akka-docs/rst/scala/http/low-level-server-side-api.rst index b4c2156b44..91b7faadd2 100644 --- a/akka-docs/rst/scala/http/low-level-server-side-api.rst +++ b/akka-docs/rst/scala/http/low-level-server-side-api.rst @@ -132,6 +132,10 @@ connection. An often times more convenient alternative is to explicitly add a `` ``HttpResponse``. This response will then be the last one on the connection and the server will actively close the connection when it has been sent out. +Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be +explicitly drained by attaching it to ``Sink.ignore``. + .. _serverSideHTTPS: diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 69ed312d23..6658f28540 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -102,6 +102,19 @@ and were accessible via ``settings``. We now made it possible to configure the p settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is now accessible via ``parserSettings``. +Client / server behaviour on cancelled entity +--------------------------------------------- + +Previously if request or response were cancelled or consumed only partially +(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling +the connection, since there could still be more requests / responses incoming. Now the default +behaviour is to close the connection in order to prevent using excessive resource usage in case +of huge entities. + +The old behaviour can be achieved by explicitly draining the entity: + + response.entity.dataBytes.runWith(Sink.ignore) + Changed Sources / Sinks ======================= diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 2da12a0ec1..679cde6793 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -104,12 +104,23 @@ private[http] object HttpServerBluePrint { val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address) var downstreamPullWaiting = false var completionDeferred = false + var entitySource: SubSourceOutlet[RequestOutput] = _ // optimization: to avoid allocations the "idle" case in and out handlers are put directly on the GraphStageLogic itself override def onPull(): Unit = { pull(in) } + // optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler + override def onDownstreamFinish(): Unit = { + if (entitySource ne null) { + // application layer has cancelled or only partially consumed response entity: + // connection will be closed + entitySource.complete() + completeStage() + } + } + override def onPush(): Unit = grab(in) match { case RequestStart(method, uri, protocol, hdrs, entityCreator, _, _) ⇒ val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method @@ -126,7 +137,7 @@ private[http] object HttpServerBluePrint { setIdleHandlers() - def setIdleHandlers() { + def setIdleHandlers(): Unit = { if (completionDeferred) { completeStage() } else { @@ -150,15 +161,17 @@ private[http] object HttpServerBluePrint { // stream incoming chunks into the request entity until we reach the end of it // and then toggle back to "idle" - val entitySource = new SubSourceOutlet[RequestOutput]("EntitySource") + entitySource = new SubSourceOutlet[RequestOutput]("EntitySource") // optimization: re-use the idle outHandler entitySource.setHandler(this) - setHandler(in, new InHandler { + // optimization: handlers are combined to reduce allocations + val chunkedRequestHandler = new InHandler with OutHandler { def onPush(): Unit = { grab(in) match { case MessageEnd ⇒ entitySource.complete() + entitySource = null setIdleHandlers() case x ⇒ entitySource.push(x) @@ -172,8 +185,6 @@ private[http] object HttpServerBluePrint { entitySource.fail(ex) failStage(ex) } - }) - setHandler(out, new OutHandler { override def onPull(): Unit = { // remember this until we are done with the chunked entity // so can pull downstream then @@ -185,7 +196,10 @@ private[http] object HttpServerBluePrint { // when it completes complete the stage completionDeferred = true } - }) + } + + setHandler(in, chunkedRequestHandler) + setHandler(out, chunkedRequestHandler) creator(Source.fromGraph(entitySource.source)) } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 237e57b45d..01ba290723 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -7,6 +7,7 @@ package akka.http.impl.engine.client import scala.concurrent.duration._ import scala.reflect.ClassTag import org.scalatest.Inside +import org.scalatest.concurrent.ScalaFutures import akka.http.scaladsl.settings.ClientConnectionSettings import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes } import akka.util.ByteString @@ -137,6 +138,76 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. } } + "close the connection if response entity stream has been cancelled" in new TestSetup { + // two requests are sent in order to make sure that connection + // isn't immediately closed after the first one by the server + requestsSub.sendNext(HttpRequest()) + requestsSub.sendNext(HttpRequest()) + requestsSub.sendComplete() + + expectWireData( + """GET / HTTP/1.1 + |Host: example.com + |User-Agent: akka-http/test + | + |""") + + // two chunks sent by server + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |6 + |abcdef + |6 + |abcdef + |0 + | + |""") + + inside(expectResponse()) { + case HttpResponse(StatusCodes.OK, _, HttpEntity.Chunked(_, data), _) => + val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart] + // but only one consumed by server + data.take(1).to(Sink.fromSubscriber(dataProbe)).run() + val sub = dataProbe.expectSubscription() + sub.request(1) + dataProbe.expectNext(Chunk(ByteString("abcdef"))) + dataProbe.expectComplete() + // connection is closed once requested elements are consumed + netInSub.expectCancellation() + } + } + + "proceed to next response once previous response's entity has been drained" in new TestSetup with ScalaFutures { + def twice(action: => Unit): Unit = { action; action } + + twice { + requestsSub.sendNext(HttpRequest()) + + expectWireData( + """GET / HTTP/1.1 + |Host: example.com + |User-Agent: akka-http/test + | + |""") + + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |6 + |abcdef + |0 + | + |""") + + val whenComplete = expectResponse().entity.dataBytes.runWith(Sink.ignore) + whenComplete.futureValue should be (akka.Done) + } + } + + "handle several requests on one persistent connection" which { "has a first response that was chunked" in new TestSetup { requestsSub.sendNext(HttpRequest()) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index 720bf6a1dd..0fc5101a16 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -11,6 +11,7 @@ import scala.util.Random import scala.annotation.tailrec import scala.concurrent.duration._ import org.scalatest.Inside +import org.scalatest.concurrent.ScalaFutures import akka.util.ByteString import akka.stream.scaladsl._ import akka.stream.ActorMaterializer @@ -325,6 +326,53 @@ class HttpServerSpec extends AkkaSpec( } } + "close the connection if request entity stream has been cancelled" in new TestSetup { + // two chunks sent by client + send("""POST / HTTP/1.1 + |Host: example.com + |Transfer-Encoding: chunked + | + |6 + |abcdef + |6 + |abcdef + |0 + | + |""") + + inside(expectRequest()) { + case HttpRequest(POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ + val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart] + // but only one consumed by server + data.take(1).to(Sink.fromSubscriber(dataProbe)).run() + val sub = dataProbe.expectSubscription() + sub.request(1) + dataProbe.expectNext(Chunk(ByteString("abcdef"))) + dataProbe.expectComplete() + // connection closes once requested elements are consumed + netIn.expectCancellation() + } + } + + "proceed to next request once previous request's entity has beed drained" in new TestSetup with ScalaFutures { + def twice(action: => Unit): Unit = { action; action } + + twice { + send("""POST / HTTP/1.1 + |Host: example.com + |Transfer-Encoding: chunked + | + |6 + |abcdef + |0 + | + |""") + + val whenComplete = expectRequest().entity.dataBytes.runWith(Sink.ignore) + whenComplete.futureValue should be (akka.Done) + } + } + "report a truncated entity stream on the entity data stream and the main stream for a Default entity" in new TestSetup { send("""POST / HTTP/1.1 |Host: example.com