Merge pull request #19528 from ataraxer/wip-http-response-cancellation

Http: HttpEntity stream cancellation
This commit is contained in:
Konrad Malawski 2016-02-10 10:41:27 +01:00
commit b3b0dce9ad
17 changed files with 415 additions and 51 deletions

View file

@ -53,6 +53,10 @@ The connection can also be closed by the server.
An application can actively trigger the closing of the connection by completing the request stream. In this case the
underlying TCP connection will be closed when the last pending response has been received.
The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled()``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be
explicitly drained by attaching it to ``Sink.ignore()``.
Timeouts
--------

View file

@ -130,6 +130,10 @@ connection. An often times more convenient alternative is to explicitly add a ``
``HttpResponse``. This response will then be the last one on the connection and the server will actively close the
connection when it has been sent out.
Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled()``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be
explicitly drained by attaching it to ``Sink.ignore()``.
.. _serverSideHTTPS-java:

View file

@ -156,3 +156,17 @@ Routing settings parameter name
and were accessible via ``settings``. We now made it possible to configure the parsers
settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is
now accessible via ``parserSettings``.
Client / server behaviour on cancelled entity
---------------------------------------------
Previously if request or response were cancelled or consumed only partially
(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling
the connection, since there could still be more requests / responses incoming. Now the default
behaviour is to close the connection in order to prevent using excessive resource usage in case
of huge entities.
The old behaviour can be achieved by explicitly draining the entity:
response.entity().getDataBytes().runWith(Sink.ignore())

View file

@ -55,6 +55,10 @@ The connection can also be closed by the server.
An application can actively trigger the closing of the connection by completing the request stream. In this case the
underlying TCP connection will be closed when the last pending response has been received.
The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be
explicitly drained by attaching it to ``Sink.ignore``.
Timeouts
--------

View file

@ -132,6 +132,10 @@ connection. An often times more convenient alternative is to explicitly add a ``
``HttpResponse``. This response will then be the last one on the connection and the server will actively close the
connection when it has been sent out.
Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be
explicitly drained by attaching it to ``Sink.ignore``.
.. _serverSideHTTPS:

View file

@ -102,6 +102,19 @@ and were accessible via ``settings``. We now made it possible to configure the p
settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is
now accessible via ``parserSettings``.
Client / server behaviour on cancelled entity
---------------------------------------------
Previously if request or response were cancelled or consumed only partially
(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling
the connection, since there could still be more requests / responses incoming. Now the default
behaviour is to close the connection in order to prevent using excessive resource usage in case
of huge entities.
The old behaviour can be achieved by explicitly draining the entity:
response.entity.dataBytes.runWith(Sink.ignore)
Changed Sources / Sinks
=======================

View file

@ -5,7 +5,7 @@
package akka.http.impl.engine.client
import akka.NotUsed
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ParserSettings }
import language.existentials
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer
@ -16,13 +16,13 @@ import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse }
import akka.http.scaladsl.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse, ResponseEntity }
import akka.http.impl.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.impl.engine.parsing._
import akka.http.impl.util._
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.{ InHandler, OutHandler }
import akka.stream.impl.fusing.SubSource
/**
@ -69,24 +69,7 @@ private[http] object OutgoingConnectionBlueprint {
import ParserOutput._
val responsePrep = Flow[List[ResponseOutput]]
.mapConcat(conforms)
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.prefixAndTail(1)
.filter {
case (Seq(MessageEnd), remaining)
SubSource.kill(remaining)
false
case (seq, _)
seq.nonEmpty
}
.map {
case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, _)), entityParts)
val entity = createEntity(entityParts) withSizeLimit parserSettings.maxContentLength
HttpResponse(statusCode, headers, entity, protocol)
case (Seq(MessageStartError(_, info)), tail)
// Tails can be empty, but still need one pull to figure that out -- never drop tails.
SubSource.kill(tail)
throw IllegalResponseException(info)
}.concatSubstreams
.via(new ResponsePrep(parserSettings))
val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
@ -148,6 +131,95 @@ private[http] object OutgoingConnectionBlueprint {
import ParserOutput._
private final class ResponsePrep(parserSettings: ParserSettings)
extends GraphStage[FlowShape[ResponseOutput, HttpResponse]] {
private val in = Inlet[ResponseOutput]("ResponsePrep.in")
private val out = Outlet[HttpResponse]("ResponsePrep.out")
val shape = new FlowShape(in, out)
override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
private var entitySource: SubSourceOutlet[ResponseOutput] = _
private def entitySubstreamStarted = entitySource ne null
private def idle = this
def setIdleHandlers(): Unit = {
setHandler(in, idle)
setHandler(out, idle)
}
def onPush(): Unit = grab(in) match {
case ResponseStart(statusCode, protocol, headers, entityCreator, _)
val entity = createEntity(entityCreator) withSizeLimit parserSettings.maxContentLength
push(out, HttpResponse(statusCode, headers, entity, protocol))
case MessageStartError(_, info)
throw IllegalResponseException(info)
case other
throw new IllegalStateException(s"ResponseStart expected but $other received.")
}
def onPull(): Unit = {
if (!entitySubstreamStarted) pull(in)
}
setIdleHandlers()
private lazy val waitForMessageEnd = new InHandler {
def onPush(): Unit = grab(in) match {
case MessageEnd setHandler(in, idle)
case other throw new IllegalStateException(s"MessageEnd expected but $other received.")
}
}
private lazy val substreamHandler = new InHandler with OutHandler {
override def onPush(): Unit = grab(in) match {
case MessageEnd
entitySource.complete()
entitySource = null
setIdleHandlers()
case messagePart
entitySource.push(messagePart)
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
entitySource.complete()
completeStage()
}
override def onUpstreamFailure(reason: Throwable): Unit = {
entitySource.fail(reason)
failStage(reason)
}
override def onDownstreamFinish(): Unit = {
entitySource.complete()
completeStage()
}
}
private def createEntity(creator: EntityCreator[ResponseOutput, ResponseEntity]): ResponseEntity = {
creator match {
case StrictEntityCreator(entity)
pull(in)
setHandler(in, waitForMessageEnd)
entity
case StreamedEntityCreator(creator)
entitySource = new SubSourceOutlet[ResponseOutput]("EntitySource")
entitySource.setHandler(substreamHandler)
setHandler(in, substreamHandler)
creator(Source.fromGraph(entitySource.source))
}
}
}
}
/**
* A merge that follows this logic:
* 1. Wait on the methodBypass for the method of the request corresponding to the next response to be received

View file

@ -104,12 +104,23 @@ private[http] object HttpServerBluePrint {
val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address)
var downstreamPullWaiting = false
var completionDeferred = false
var entitySource: SubSourceOutlet[RequestOutput] = _
// optimization: to avoid allocations the "idle" case in and out handlers are put directly on the GraphStageLogic itself
override def onPull(): Unit = {
pull(in)
}
// optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler
override def onDownstreamFinish(): Unit = {
if (entitySource ne null) {
// application layer has cancelled or only partially consumed response entity:
// connection will be closed
entitySource.complete()
completeStage()
}
}
override def onPush(): Unit = grab(in) match {
case RequestStart(method, uri, protocol, hdrs, entityCreator, _, _)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
@ -126,7 +137,7 @@ private[http] object HttpServerBluePrint {
setIdleHandlers()
def setIdleHandlers() {
def setIdleHandlers(): Unit = {
if (completionDeferred) {
completeStage()
} else {
@ -150,15 +161,17 @@ private[http] object HttpServerBluePrint {
// stream incoming chunks into the request entity until we reach the end of it
// and then toggle back to "idle"
val entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
// optimization: re-use the idle outHandler
entitySource.setHandler(this)
setHandler(in, new InHandler {
// optimization: handlers are combined to reduce allocations
val chunkedRequestHandler = new InHandler with OutHandler {
def onPush(): Unit = {
grab(in) match {
case MessageEnd
entitySource.complete()
entitySource = null
setIdleHandlers()
case x entitySource.push(x)
@ -172,8 +185,6 @@ private[http] object HttpServerBluePrint {
entitySource.fail(ex)
failStage(ex)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// remember this until we are done with the chunked entity
// so can pull downstream then
@ -185,7 +196,10 @@ private[http] object HttpServerBluePrint {
// when it completes complete the stage
completionDeferred = true
}
})
}
setHandler(in, chunkedRequestHandler)
setHandler(out, chunkedRequestHandler)
creator(Source.fromGraph(entitySource.source))
}

View file

@ -217,8 +217,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
_outgoingConnection(host, port, localAddress, settings, connectionContext, log)
private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress],
settings: ClientConnectionSettings, connectionContext: ConnectionContext,
private def _outgoingConnection(host: String,
port: Int,
localAddress: Option[InetSocketAddress],
settings: ClientConnectionSettings,
connectionContext: ConnectionContext,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
val hostHeader = if (port == connectionContext.defaultPort) Host(host) else Host(host, port)
val layer = clientLayer(hostHeader, settings, log)

View file

@ -7,6 +7,7 @@ package akka.http.impl.engine.client
import scala.concurrent.duration._
import scala.reflect.ClassTag
import org.scalatest.Inside
import org.scalatest.concurrent.ScalaFutures
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes }
import akka.util.ByteString
@ -137,6 +138,76 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
}
}
"close the connection if response entity stream has been cancelled" in new TestSetup {
// two requests are sent in order to make sure that connection
// isn't immediately closed after the first one by the server
requestsSub.sendNext(HttpRequest())
requestsSub.sendNext(HttpRequest())
requestsSub.sendComplete()
expectWireData(
"""GET / HTTP/1.1
|Host: example.com
|User-Agent: akka-http/test
|
|""")
// two chunks sent by server
sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|
|6
|abcdef
|6
|abcdef
|0
|
|""")
inside(expectResponse()) {
case HttpResponse(StatusCodes.OK, _, HttpEntity.Chunked(_, data), _) =>
val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart]
// but only one consumed by server
data.take(1).to(Sink.fromSubscriber(dataProbe)).run()
val sub = dataProbe.expectSubscription()
sub.request(1)
dataProbe.expectNext(Chunk(ByteString("abcdef")))
dataProbe.expectComplete()
// connection is closed once requested elements are consumed
netInSub.expectCancellation()
}
}
"proceed to next response once previous response's entity has been drained" in new TestSetup with ScalaFutures {
def twice(action: => Unit): Unit = { action; action }
twice {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com
|User-Agent: akka-http/test
|
|""")
sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|
|6
|abcdef
|0
|
|""")
val whenComplete = expectResponse().entity.dataBytes.runWith(Sink.ignore)
whenComplete.futureValue should be (akka.Done)
}
}
"handle several requests on one persistent connection" which {
"has a first response that was chunked" in new TestSetup {
requestsSub.sendNext(HttpRequest())

View file

@ -11,6 +11,7 @@ import scala.util.Random
import scala.annotation.tailrec
import scala.concurrent.duration._
import org.scalatest.Inside
import org.scalatest.concurrent.ScalaFutures
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.ActorMaterializer
@ -325,6 +326,53 @@ class HttpServerSpec extends AkkaSpec(
}
}
"close the connection if request entity stream has been cancelled" in new TestSetup {
// two chunks sent by client
send("""POST / HTTP/1.1
|Host: example.com
|Transfer-Encoding: chunked
|
|6
|abcdef
|6
|abcdef
|0
|
|""")
inside(expectRequest()) {
case HttpRequest(POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart]
// but only one consumed by server
data.take(1).to(Sink.fromSubscriber(dataProbe)).run()
val sub = dataProbe.expectSubscription()
sub.request(1)
dataProbe.expectNext(Chunk(ByteString("abcdef")))
dataProbe.expectComplete()
// connection closes once requested elements are consumed
netIn.expectCancellation()
}
}
"proceed to next request once previous request's entity has beed drained" in new TestSetup with ScalaFutures {
def twice(action: => Unit): Unit = { action; action }
twice {
send("""POST / HTTP/1.1
|Host: example.com
|Transfer-Encoding: chunked
|
|6
|abcdef
|0
|
|""")
val whenComplete = expectRequest().entity.dataBytes.runWith(Sink.ignore)
whenComplete.futureValue should be (akka.Done)
}
}
"report a truncated entity stream on the entity data stream and the main stream for a Default entity" in new TestSetup {
send("""POST / HTTP/1.1
|Host: example.com

View file

@ -4,9 +4,7 @@
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.ActorAttributes
import akka.stream._
import akka.stream.Supervision.resumingDecider
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.TestPublisher
@ -49,9 +47,13 @@ class FlowSplitAfterSpec extends AkkaSpec {
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6) {
class SubstreamsSupport(
splitAfter: Int = 3,
elementCount: Int = 6,
substreamCancelStrategy: SubstreamCancelStrategy = SubstreamCancelStrategy.drain) {
val source = Source(1 to elementCount)
val groupStream = source.splitAfter(_ == splitAfter).lift.runWith(Sink.asPublisher(false))
val groupStream = source.splitAfter(substreamCancelStrategy)(_ == splitAfter).lift.runWith(Sink.asPublisher(false))
val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]()
groupStream.subscribe(masterSubscriber)
@ -253,6 +255,14 @@ class FlowSplitAfterSpec extends AkkaSpec {
upsub.expectCancellation()
}
"support eager cancellation of master stream on cancelling substreams" in assertAllStagesStopped {
new SubstreamsSupport(splitAfter = 5, elementCount = 8, SubstreamCancelStrategy.propagate) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.asPublisher(false)))
s1.cancel()
masterSubscriber.expectComplete()
}
}
}
}

View file

@ -35,9 +35,13 @@ class FlowSplitWhenSpec extends AkkaSpec {
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
class SubstreamsSupport(
splitWhen: Int = 3,
elementCount: Int = 6,
substreamCancelStrategy: SubstreamCancelStrategy = SubstreamCancelStrategy.drain) {
val source = Source(1 to elementCount)
val groupStream = source.splitWhen(_ == splitWhen).lift.runWith(Sink.asPublisher(false))
val groupStream = source.splitWhen(substreamCancelStrategy)(_ == splitWhen).lift.runWith(Sink.asPublisher(false))
val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]()
groupStream.subscribe(masterSubscriber)
@ -342,6 +346,14 @@ class FlowSplitWhenSpec extends AkkaSpec {
upsub.expectCancellation()
}
"support eager cancellation of master stream on cancelling substreams" in assertAllStagesStopped {
new SubstreamsSupport(splitWhen = 5, elementCount = 8, SubstreamCancelStrategy.propagate) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.asPublisher(false)))
s1.cancel()
masterSubscriber.expectComplete()
}
}
}
}

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import SubstreamCancelStrategies._
/**
* Represents a strategy that decides how to deal with substream events.
*/
sealed abstract class SubstreamCancelStrategy
private[akka] object SubstreamCancelStrategies {
/**
* INTERNAL API
*/
private[akka] final case object Propagate extends SubstreamCancelStrategy
/**
* INTERNAL API
*/
private[akka] final case object Drain extends SubstreamCancelStrategy
}
object SubstreamCancelStrategy {
/**
* Cancel the stream of streams if any substream is cancelled.
*/
def propagate: SubstreamCancelStrategy = Propagate
/**
* Drain substream on cancellation in order to prevent stailling of the stream of streams.
*/
def drain: SubstreamCancelStrategy = Drain
}

View file

@ -214,18 +214,26 @@ object Split {
/** Splits after the current element. The current element will be the last element in the current substream. */
case object SplitAfter extends SplitDecision
def when[T](p: T Boolean): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = new Split(Split.SplitBefore, p)
def after[T](p: T Boolean): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = new Split(Split.SplitAfter, p)
def when[T](p: T Boolean, substreamCancelStrategy: SubstreamCancelStrategy): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] =
new Split(Split.SplitBefore, p, substreamCancelStrategy)
def after[T](p: T Boolean, substreamCancelStrategy: SubstreamCancelStrategy): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] =
new Split(Split.SplitAfter, p, substreamCancelStrategy)
}
/**
* INERNAL API
*/
final class Split[T](decision: Split.SplitDecision, p: T Boolean) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
final class Split[T](decision: Split.SplitDecision, p: T Boolean, substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("Split.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
private val propagateSubstreamCancel = substreamCancelStrategy match {
case SubstreamCancelStrategies.Propagate true
case SubstreamCancelStrategies.Drain false
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
import Split._
@ -329,8 +337,9 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean) extends Gr
override def onDownstreamFinish(): Unit = {
substreamCancelled = true
if (isClosed(in)) completeStage()
else {
if (isClosed(in) || propagateSubstreamCancel) {
completeStage()
} else {
// Start draining
if (!hasBeenPulled(in)) pull(in)
}

View file

@ -1087,13 +1087,24 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
* '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain()`, downstream
* cancels or any substream cancels on `SubstreamCancelStrategy.propagate()`
*
* See also [[Flow.splitAfter]].
*/
def splitWhen(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it.
*
* @see [[#splitWhen]]
*/
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
@ -1134,13 +1145,24 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
* '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream
* cancels or any substream cancels on `SubstreamCancelStrategy.propagate`
*
* See also [[Flow.splitWhen]].
*/
def splitAfter[U >: Out](p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true.
*
* @see [[#splitAfter]]
*/
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,

View file

@ -1201,23 +1201,36 @@ trait FlowOps[+Out, +Mat] {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
* '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream
* cancels or any substream cancels on `SubstreamCancelStrategy.propagate`
*
* See also [[FlowOps.splitAfter]].
*/
def splitWhen(p: Out Boolean): SubFlow[Out, Mat, Repr, Closed] = {
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(Split.when(p))
via(Split.when(p, substreamCancelStrategy))
.map(_.via(flow))
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, NotUsed]) Closed = s
via(Split.when(p))
via(Split.when(p, substreamCancelStrategy))
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
new SubFlowImpl(Flow[Out], merge, finish)
}
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it.
*
* @see [[#splitWhen]]
*/
def splitWhen(p: Out Boolean): SubFlow[Out, Mat, Repr, Closed] =
splitWhen(SubstreamCancelStrategy.drain)(p)
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
@ -1258,23 +1271,34 @@ trait FlowOps[+Out, +Mat] {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
* '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream
* cancels or any substream cancels on `SubstreamCancelStrategy.propagate`
*
* See also [[FlowOps.splitWhen]].
*/
def splitAfter(p: Out Boolean): SubFlow[Out, Mat, Repr, Closed] = {
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(Split.after(p))
via(Split.after(p, substreamCancelStrategy))
.map(_.via(flow))
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, NotUsed]) Closed = s
via(Split.after(p))
via(Split.after(p, substreamCancelStrategy))
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
new SubFlowImpl(Flow[Out], merge, finish)
}
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true.
*
* @see [[#splitAfter]]
*/
def splitAfter(p: Out Boolean): SubFlow[Out, Mat, Repr, Closed] =
splitAfter(SubstreamCancelStrategy.drain)(p)
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,