stream: add flatMapPrefix operator (#28380)

This commit is contained in:
eyal farago 2020-02-05 15:37:27 +01:00 committed by GitHub
parent 722ea4ff87
commit ccd8481fec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 945 additions and 2 deletions

View file

@ -0,0 +1,37 @@
# flatMapPrefix
Use the first `n` elements from the stream to determine how to process the rest.
@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapPrefix }
@@@
## Description
Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements),
then apply *f* on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
This method returns a flow consuming the rest of the stream producing the materialized flow's output.
## Reactive Streams semantics
@@@div { .callout }
**emits** when the materialized flow emits.
Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
**backpressures** when the materialized flow backpressures
**completes** the materialized flow completes.
If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
the resulting flow will be materialized and signalled for upstream completion, it can then or continue to emit elements at its own discretion.
@@@

View file

@ -241,6 +241,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code
|--|--|--| |--|--|--|
|Source/Flow|<a name="flatmapconcat"></a>@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.| |Source/Flow|<a name="flatmapconcat"></a>@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.|
|Source/Flow|<a name="flatmapmerge"></a>@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.| |Source/Flow|<a name="flatmapmerge"></a>@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.|
|Source/Flow|<a name="flatmapprefix"></a>@ref[flatMapPrefix](Source-or-Flow/flatMapPrefix.md)|Use the first `n` elements from the stream to determine how to process the rest.|
|Source/Flow|<a name="groupby"></a>@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.| |Source/Flow|<a name="groupby"></a>@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.|
|Source/Flow|<a name="prefixandtail"></a>@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.| |Source/Flow|<a name="prefixandtail"></a>@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.|
|Source/Flow|<a name="splitafter"></a>@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.| |Source/Flow|<a name="splitafter"></a>@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.|
@ -440,6 +441,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [extrapolate](Source-or-Flow/extrapolate.md) * [extrapolate](Source-or-Flow/extrapolate.md)
* [buffer](Source-or-Flow/buffer.md) * [buffer](Source-or-Flow/buffer.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md) * [prefixAndTail](Source-or-Flow/prefixAndTail.md)
* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md)
* [groupBy](Source-or-Flow/groupBy.md) * [groupBy](Source-or-Flow/groupBy.md)
* [splitWhen](Source-or-Flow/splitWhen.md) * [splitWhen](Source-or-Flow/splitWhen.md)
* [splitAfter](Source-or-Flow/splitAfter.md) * [splitAfter](Source-or-Flow/splitAfter.md)

View file

@ -0,0 +1,20 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import org.reactivestreams.Publisher
class FlatMapPrefixTest extends AkkaPublisherVerification[Int] {
override def createPublisher(elements: Long): Publisher[Int] = {
val publisher = Source(iterable(elements))
.map(_.toInt)
.flatMapPrefixMat(1) { seq =>
Flow[Int].prepend(Source(seq))
}(Keep.left)
.runWith(Sink.asPublisher(false))
publisher
}
}

View file

@ -0,0 +1,544 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.{
AbruptStageTerminationException,
AbruptTerminationException,
Materializer,
NeverMaterializedException,
SubscriptionWithCancelException
}
import akka.{ Done, NotUsed }
class FlowFlatMapPrefixSpec extends StreamSpec {
def src10(i: Int = 0) = Source(i until (i + 10))
"A PrefixAndDownstream" must {
"work in the simple identity case" in assertAllStagesStopped {
src10()
.flatMapPrefixMat(2) { _ =>
Flow[Int]
}(Keep.left)
.runWith(Sink.seq[Int])
.futureValue should ===(2 until 10)
}
"expose mat value in the simple identity case" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(2) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 2)
suffixF.futureValue should ===(2 until 10)
}
"work when source is exactly the required prefix" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(10) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 10)
suffixF.futureValue should be(empty)
}
"work when source has less than the required prefix" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(20) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 10)
suffixF.futureValue should be(empty)
}
"simple identity case when downstream completes before consuming the entire stream" in assertAllStagesStopped {
val (prefixF, suffixF) = Source(0 until 100)
.flatMapPrefixMat(10) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.take(10)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 10)
suffixF.futureValue should ===(10 until 20)
}
"propagate failure to create the downstream flow" in assertAllStagesStopped {
val suffixF = Source(0 until 100)
.flatMapPrefixMat(10) { prefix =>
throw TE(s"I hate mondays! (${prefix.size})")
}(Keep.right)
.to(Sink.ignore)
.run
val ex = suffixF.failed.futureValue
ex.getCause should not be null
ex.getCause should ===(TE("I hate mondays! (10)"))
}
"propagate flow failures" in assertAllStagesStopped {
val (prefixF, suffixF) = Source(0 until 100)
.flatMapPrefixMat(10) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).map {
case 15 => throw TE("don't like 15 either!")
case n => n
}
}(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run
prefixF.futureValue should ===(0 until 10)
val ex = suffixF.failed.futureValue
ex should ===(TE("don't like 15 either!"))
}
"produce multiple elements per input" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(7) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n))
}(Keep.right)
.toMat(Sink.seq[Int])(Keep.both)
.run()
prefixF.futureValue should ===(0 until 7)
suffixF.futureValue should ===(7 :: 8 :: 8 :: 9 :: 9 :: 9 :: Nil)
}
"succeed when upstream produces no elements" in assertAllStagesStopped {
val (prefixF, suffixF) = Source
.empty[Int]
.flatMapPrefixMat(7) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n))
}(Keep.right)
.toMat(Sink.seq[Int])(Keep.both)
.run()
prefixF.futureValue should be(empty)
suffixF.futureValue should be(empty)
}
"apply materialized flow's semantics when upstream produces no elements" in assertAllStagesStopped {
val (prefixF, suffixF) = Source
.empty[Int]
.flatMapPrefixMat(7) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)).prepend(Source(100 to 101))
}(Keep.right)
.toMat(Sink.seq[Int])(Keep.both)
.run()
prefixF.futureValue should be(empty)
suffixF.futureValue should ===(100 :: 101 :: Nil)
}
"handles upstream completion" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
val matValue = Source
.fromPublisher(publisher)
.flatMapPrefixMat(2) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).prepend(Source(100 to 101))
}(Keep.right)
.to(Sink.fromSubscriber(subscriber))
.run()
matValue.value should be(empty)
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
downstream.request(1000)
upstream.expectRequest()
//completing publisher
upstream.sendComplete()
matValue.futureValue should ===(Nil)
subscriber.expectNext(100)
subscriber.expectNext(101).expectComplete()
}
"work when materialized flow produces no downstream elements" in assertAllStagesStopped {
val (prefixF, suffixF) = Source(0 until 100)
.flatMapPrefixMat(4) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).filter(_ => false)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 4)
suffixF.futureValue should be(empty)
}
"work when materialized flow does not consume upstream" in assertAllStagesStopped {
val (prefixF, suffixF) = Source(0 until 100)
.map { i =>
i should be <= 4
i
}
.flatMapPrefixMat(4) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).take(0)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 4)
suffixF.futureValue should be(empty)
}
"work when materialized flow cancels upstream but keep producing" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(4) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix).take(0).concat(Source(11 to 12))
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.futureValue should ===(0 until 4)
suffixF.futureValue should ===(11 :: 12 :: Nil)
}
"propagate materialization failure (when application of 'f' succeeds)" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(4) { prefix =>
Flow[Int].mapMaterializedValue(_ => throw TE(s"boom-bada-bang (${prefix.size})"))
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
prefixF.failed.futureValue should be(a[NeverMaterializedException])
prefixF.failed.futureValue.getCause should ===(TE("boom-bada-bang (4)"))
suffixF.failed.futureValue should ===(TE("boom-bada-bang (4)"))
}
"succeed when materialized flow completes downstream but keep consuming elements" in assertAllStagesStopped {
val (prefixAndTailF, suffixF) = src10()
.flatMapPrefixMat(4) { prefix =>
Flow[Int]
.mapMaterializedValue(_ => prefix)
.viaMat {
Flow.fromSinkAndSourceMat(Sink.seq[Int], Source.empty[Int])(Keep.left)
}(Keep.both)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run
suffixF.futureValue should be(empty)
val (prefix, suffix) = prefixAndTailF.futureValue
prefix should ===(0 until 4)
suffix.futureValue should ===(4 until 10)
}
"downstream cancellation is propagated via the materialized flow" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
val ((srcWatchTermF, notUsedF), suffixF) = src10()
.watchTermination()(Keep.right)
.flatMapPrefixMat(2) { prefix =>
prefix should ===(0 until 2)
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
}(Keep.both)
.take(1)
.toMat(Sink.seq)(Keep.both)
.run()
notUsedF.value should be(empty)
suffixF.value should be(empty)
srcWatchTermF.value should be(empty)
val subUpstream = publisher.expectSubscription()
val subDownstream = subscriber.expectSubscription()
notUsedF.futureValue should ===(NotUsed)
subUpstream.expectRequest() should be >= (1L)
subDownstream.request(1)
subscriber.expectNext(2)
subUpstream.sendNext(22)
subUpstream.expectCancellation()
subDownstream.cancel()
suffixF.futureValue should ===(Seq(22))
srcWatchTermF.futureValue should ===(Done)
}
"early downstream cancellation is later handed out to materialized flow" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
val (srcWatchTermF, matFlowWatchTermFF) = Source
.fromPublisher(publisher)
.watchTermination()(Keep.right)
.flatMapPrefixMat(3) { prefix =>
prefix should ===(0 until 3)
Flow[Int].watchTermination()(Keep.right)
}(Keep.both)
.to(Sink.fromSubscriber(subscriber))
.run()
val matFlowWatchTerm = matFlowWatchTermFF.flatten
matFlowWatchTerm.value should be(empty)
srcWatchTermF.value should be(empty)
val subDownstream = subscriber.expectSubscription()
val subUpstream = publisher.expectSubscription()
subDownstream.request(1)
subUpstream.expectRequest() should be >= (1L)
subUpstream.sendNext(0)
subUpstream.sendNext(1)
subDownstream.cancel()
//subflow not materialized yet, hence mat value (future) isn't ready yet
matFlowWatchTerm.value should be(empty)
srcWatchTermF.value should be(empty)
//this one is sent AFTER downstream cancellation
subUpstream.sendNext(2)
subUpstream.expectCancellation()
matFlowWatchTerm.futureValue should ===(Done)
srcWatchTermF.futureValue should ===(Done)
}
"early downstream failure is deferred until prefix completion" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
val (srcWatchTermF, matFlowWatchTermFF) = Source
.fromPublisher(publisher)
.watchTermination()(Keep.right)
.flatMapPrefixMat(3) { prefix =>
prefix should ===(0 until 3)
Flow[Int].watchTermination()(Keep.right)
}(Keep.both)
.to(Sink.fromSubscriber(subscriber))
.run()
val matFlowWatchTerm = matFlowWatchTermFF.flatten
matFlowWatchTerm.value should be(empty)
srcWatchTermF.value should be(empty)
val subDownstream = subscriber.expectSubscription()
val subUpstream = publisher.expectSubscription()
subDownstream.request(1)
subUpstream.expectRequest() should be >= (1L)
subUpstream.sendNext(0)
subUpstream.sendNext(1)
subDownstream.asInstanceOf[SubscriptionWithCancelException].cancel(TE("that again?!"))
matFlowWatchTerm.value should be(empty)
srcWatchTermF.value should be(empty)
subUpstream.sendNext(2)
matFlowWatchTerm.failed.futureValue should ===(TE("that again?!"))
srcWatchTermF.failed.futureValue should ===(TE("that again?!"))
subUpstream.expectCancellation()
}
"downstream failure is propagated via the materialized flow" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
val ((srcWatchTermF, notUsedF), suffixF) = src10()
.watchTermination()(Keep.right)
.flatMapPrefixMat(2) { prefix =>
prefix should ===(0 until 2)
Flow.fromSinkAndSourceCoupled(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
}(Keep.both)
.map {
case 2 => 2
case 3 => throw TE("3!?!?!?")
case i => fail(s"unexpected value $i")
}
.toMat(Sink.seq)(Keep.both)
.run()
notUsedF.value should be(empty)
suffixF.value should be(empty)
srcWatchTermF.value should be(empty)
val subUpstream = publisher.expectSubscription()
val subDownstream = subscriber.expectSubscription()
notUsedF.futureValue should ===(NotUsed)
subUpstream.expectRequest() should be >= (1L)
subDownstream.request(1)
subscriber.expectNext(2)
subUpstream.sendNext(2)
subDownstream.request(1)
subscriber.expectNext(3)
subUpstream.sendNext(3)
subUpstream.expectCancellation() should ===(TE("3!?!?!?"))
subscriber.expectError(TE("3!?!?!?"))
suffixF.failed.futureValue should ===(TE("3!?!?!?"))
srcWatchTermF.failed.futureValue should ===(TE("3!?!?!?"))
}
"complete mat value with failures on abrupt termination before materializing the flow" in assertAllStagesStopped {
val mat = Materializer(system)
val publisher = TestPublisher.manualProbe[Int]()
val flow = Source
.fromPublisher(publisher)
.flatMapPrefixMat(2) { prefix =>
fail(s"unexpected prefix (length = ${prefix.size})")
Flow[Int]
}(Keep.right)
.toMat(Sink.ignore)(Keep.both)
val (prefixF, doneF) = flow.run()(mat)
publisher.expectSubscription()
prefixF.value should be(empty)
doneF.value should be(empty)
mat.shutdown()
prefixF.failed.futureValue match {
case _: AbruptTerminationException =>
case ex: NeverMaterializedException =>
ex.getCause should not be null
ex.getCause should be(a[AbruptTerminationException])
}
doneF.failed.futureValue should be(a[AbruptTerminationException])
}
"respond to abrupt termination after flow materialization" in assertAllStagesStopped {
val mat = Materializer(system)
val countFF = src10()
.flatMapPrefixMat(2) { prefix =>
prefix should ===(0 until 2)
Flow[Int]
.concat(Source.repeat(3))
.fold(0L) {
case (acc, _) => acc + 1
}
.alsoToMat(Sink.head)(Keep.right)
}(Keep.right)
.to(Sink.ignore)
.run()(mat)
val countF = countFF.futureValue
//at this point we know the flow was materialized, now we can stop the materializer
mat.shutdown()
//expect the nested flow to be terminated abruptly.
countF.failed.futureValue should be(a[AbruptStageTerminationException])
}
"behave like via when n = 0" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(0) { prefix =>
prefix should be(empty)
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
prefixF.futureValue should be(empty)
suffixF.futureValue should ===(0 until 10)
}
"behave like via when n = 0 and upstream produces no elements" in assertAllStagesStopped {
val (prefixF, suffixF) = Source
.empty[Int]
.flatMapPrefixMat(0) { prefix =>
prefix should be(empty)
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
prefixF.futureValue should be(empty)
suffixF.futureValue should be(empty)
}
"propagate errors during flow's creation when n = 0" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(0) { prefix =>
prefix should be(empty)
throw TE("not this time my friend!")
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
prefixF.failed.futureValue should be(a[NeverMaterializedException])
prefixF.failed.futureValue.getCause === (TE("not this time my friend!"))
suffixF.failed.futureValue should ===(TE("not this time my friend!"))
}
"propagate materialization failures when n = 0" in assertAllStagesStopped {
val (prefixF, suffixF) = src10()
.flatMapPrefixMat(0) { prefix =>
prefix should be(empty)
Flow[Int].mapMaterializedValue(_ => throw TE("Bang! no materialization this time"))
}(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
prefixF.failed.futureValue should be(a[NeverMaterializedException])
prefixF.failed.futureValue.getCause === (TE("Bang! no materialization this time"))
suffixF.failed.futureValue should ===(TE("Bang! no materialization this time"))
}
"run a detached flow" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[String]()
val detachedFlow = Flow.fromSinkAndSource(Sink.cancelled[Int], Source(List("a", "b", "c"))).via {
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.empty[Int])
}
val fHeadOpt = Source
.fromPublisher(publisher)
.flatMapPrefix(2) { prefix =>
prefix should ===(0 until 2)
detachedFlow
}
.runWith(Sink.headOption)
subscriber.expectNoMessage()
val subsc = publisher.expectSubscription()
subsc.expectRequest() should be >= 2L
subsc.sendNext(0)
subscriber.expectNoMessage()
subsc.sendNext(1)
val sinkSubscription = subscriber.expectSubscription()
//this indicates
fHeadOpt.futureValue should be(empty)
//materializef flow immediately cancels upstream
subsc.expectCancellation()
//at this point both ends of the 'external' fow are closed
sinkSubscription.request(10)
subscriber.expectNext("a", "b", "c")
subscriber.expectComplete()
}
}
}

View file

@ -55,6 +55,7 @@ import akka.stream._
val detacher = name("detacher") val detacher = name("detacher")
val groupBy = name("groupBy") val groupBy = name("groupBy")
val prefixAndTail = name("prefixAndTail") val prefixAndTail = name("prefixAndTail")
val flatMapPrefix = name("flatMapPrefix")
val split = name("split") val split = name("split")
val concatAll = name("concatAll") val concatAll = name("concatAll")
val processor = name("processor") val processor = name("processor")

View file

@ -0,0 +1,170 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.fusing
import akka.annotation.InternalApi
import akka.stream.scaladsl.{ Flow, Keep, Source }
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.util.OptionVal
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
@InternalApi private[akka] final class FlatMapPrefix[In, Out, M](n: Int, f: immutable.Seq[In] => Flow[In, Out, M])
extends GraphStageWithMaterializedValue[FlowShape[In, Out], Future[M]] {
require(n >= 0, s"FlatMapPrefix: n ($n) must be non-negative.")
val in = Inlet[In](s"${this}.in")
val out = Outlet[Out](s"${this}.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
val accumulated = collection.mutable.Buffer.empty[In]
private var subSource = OptionVal.none[SubSourceOutlet[In]]
private var subSink = OptionVal.none[SubSinkInlet[Out]]
private var downstreamCause = OptionVal.none[Throwable]
setHandlers(in, out, this)
override def postStop(): Unit = {
//this covers the case when the nested flow was never materialized
matPromise.tryFailure(new AbruptStageTerminationException(this))
super.postStop()
}
override def onPush(): Unit = {
subSource match {
case OptionVal.Some(s) => s.push(grab(in))
case OptionVal.None =>
accumulated.append(grab(in))
if (accumulated.size == n) {
materializeFlow()
} else {
//gi'me some more!
pull(in)
}
}
}
override def onUpstreamFinish(): Unit = {
subSource match {
case OptionVal.Some(s) => s.complete()
case OptionVal.None => materializeFlow()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
subSource match {
case OptionVal.Some(s) => s.fail(ex)
case OptionVal.None =>
//flow won't be materialized, so we have to complete the future with a failure indicating this
matPromise.failure(new NeverMaterializedException(ex))
super.onUpstreamFailure(ex)
}
}
override def onPull(): Unit = {
subSink match {
case OptionVal.Some(s) =>
//delegate to subSink
s.pull()
case OptionVal.None if accumulated.size < n =>
pull(in)
case OptionVal.None if accumulated.size == n =>
//corner case for n = 0, can be handled in FlowOps
materializeFlow()
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
subSink match {
case OptionVal.None => downstreamCause = OptionVal.Some(cause)
case OptionVal.Some(s) => s.cancel(cause)
}
}
def materializeFlow(): Unit =
try {
val prefix = accumulated.toVector
accumulated.clear()
subSource = OptionVal.Some(new SubSourceOutlet[In](s"${this}.subSource"))
val OptionVal.Some(theSubSource) = subSource
theSubSource.setHandler {
new OutHandler {
override def onPull(): Unit = {
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!isClosed(in)) {
cancel(in, cause)
}
}
}
}
subSink = OptionVal.Some(new SubSinkInlet[Out](s"${this}.subSink"))
val OptionVal.Some(theSubSink) = subSink
theSubSink.setHandler {
new InHandler {
override def onPush(): Unit = {
push(out, theSubSink.grab())
}
override def onUpstreamFinish(): Unit = {
complete(out)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
fail(out, ex)
}
}
}
val matVal = try {
val flow = f(prefix)
val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink)
interpreter.subFusingMaterializer.materialize(runnableGraph)
} catch {
case NonFatal(ex) =>
matPromise.failure(new NeverMaterializedException(ex))
subSource = OptionVal.None
subSink = OptionVal.None
throw ex
}
matPromise.success(matVal)
//in case downstream was closed
downstreamCause match {
case OptionVal.Some(ex) => theSubSink.cancel(ex)
case OptionVal.None =>
}
//in case we've materialized due to upstream completion
if (isClosed(in)) {
theSubSource.complete()
}
//in case we've been pulled by downstream
if (isAvailable(out)) {
theSubSink.pull()
}
} catch {
case NonFatal(ex) => failStage(ex)
}
}
(logic, matPromise.future)
}
}

View file

@ -12,11 +12,11 @@ import akka.annotation.InternalApi
import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._ import akka.stream._
import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SingleSource
import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.TraversalBuilder import akka.stream.impl.TraversalBuilder
import akka.stream.impl.fusing.GraphStages.SingleSource
import akka.stream.impl.{ Buffer => BufferImpl } import akka.stream.impl.{ Buffer => BufferImpl }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._

View file

@ -2024,6 +2024,47 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out], javadsl.Source[Out, NotUsed]], Mat] = def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out], javadsl.Source[Out, NotUsed]], Mat] =
new Flow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) }) new Flow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) })
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
* then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
* This method returns a flow consuming the rest of the stream producing the materialized flow's output.
*
* '''Emits when''' the materialized flow emits.
* Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the materialized flow completes.
* If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
* the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
*
* '''Cancels when''' the materialized flow cancels.
* Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
*
* @param n the number of elements to accumulate before materializing the downstream flow.
* @param f a function that produces the downstream flow based on the upstream's prefix.
**/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.Flow(newDelegate)
}
/**
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow (as a completion stage).
* see [[#flatMapPrefix]] for details.
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]],
matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Flow[In, Out2, Mat3] = {
val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) =>
matF(m1, fm2.toJava)
}
new javadsl.Flow(newDelegate)
}
/** /**
* This operation demultiplexes the incoming stream into separate output * This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element * streams, one for each element key. The key is computed for each element

View file

@ -3148,6 +3148,47 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
Mat] = Mat] =
new Source(delegate.prefixAndTail(n).map { case (taken, tail) => Pair(taken.asJava, tail.asJava) }) new Source(delegate.prefixAndTail(n).map { case (taken, tail) => Pair(taken.asJava, tail.asJava) })
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
* then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
* This method returns a flow consuming the rest of the stream producing the materialized flow's output.
*
* '''Emits when''' the materialized flow emits.
* Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the materialized flow completes.
* If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
* the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
*
* '''Cancels when''' the materialized flow cancels.
* Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
*
* @param n the number of elements to accumulate before materializing the downstream flow.
* @param f a function that produces the downstream flow based on the upstream's prefix.
**/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.Source(newDelegate)
}
/**
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow (as a completion stage).
* see [[#flatMapPrefix]] for details.
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]],
matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Source[Out2, Mat3] = {
val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) =>
matF(m1, fm2.toJava)
}
new javadsl.Source(newDelegate)
}
/** /**
* This operation demultiplexes the incoming stream into separate output * This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element * streams, one for each element key. The key is computed for each element

View file

@ -1380,6 +1380,33 @@ class SubFlow[In, Out, Mat](
Mat] = Mat] =
new SubFlow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) }) new SubFlow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) })
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
* then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
* This method returns a flow consuming the rest of the stream producing the materialized flow's output.
*
* '''Emits when''' the materialized flow emits.
* Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the materialized flow completes.
* If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
* the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
*
* '''Cancels when''' the materialized flow cancels.
* Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
*
* @param n the number of elements to accumulate before materializing the downstream flow.
* @param f a function that produces the downstream flow based on the upstream's prefix.
**/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.SubFlow(newDelegate)
}
/** /**
* Transform each input element into a `Source` of output elements that is * Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation, * then flattened into the output stream by concatenation,

View file

@ -1357,6 +1357,33 @@ class SubSource[Out, Mat](
Mat] = Mat] =
new SubSource(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) }) new SubSource(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) })
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
* then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
* This method returns a flow consuming the rest of the stream producing the materialized flow's output.
*
* '''Emits when''' the materialized flow emits.
* Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the materialized flow completes.
* If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
* the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
*
* '''Cancels when''' the materialized flow cancels.
* Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
*
* @param n the number of elements to accumulate before materializing the downstream flow.
* @param f a function that produces the downstream flow based on the upstream's prefix.
**/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.SubSource(newDelegate)
}
/** /**
* Transform each input element into a `Source` of output elements that is * Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation, * then flattened into the output stream by concatenation,

View file

@ -1929,6 +1929,30 @@ trait FlowOps[+Out, +Mat] {
def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, NotUsed])] = def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, NotUsed])] =
via(new PrefixAndTail[Out](n)) via(new PrefixAndTail[Out](n))
/**
* Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
* then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
* This method returns a flow consuming the rest of the stream producing the materialized flow's output.
*
* '''Emits when''' the materialized flow emits.
* Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
*
* '''Backpressures when''' the materialized flow backpressures
*
* '''Completes when''' the materialized flow completes.
* If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
* the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
*
* '''Cancels when''' the materialized flow cancels.
* Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
*
* @param n the number of elements to accumulate before materializing the downstream flow.
* @param f a function that produces the downstream flow based on the upstream's prefix.
**/
def flatMapPrefix[Out2, Mat2](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2]): Repr[Out2] = {
via(new FlatMapPrefix(n, f))
}
/** /**
* This operation demultiplexes the incoming stream into separate output * This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element * streams, one for each element key. The key is computed for each element
@ -3123,6 +3147,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/ */
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3] def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3]
/**
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow.
* see [[#flatMapPrefix]] for details.
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2])(
matF: (Mat, Future[Mat2]) => Mat3): ReprMat[Out2, Mat3] = {
viaMat(new FlatMapPrefix(n, f))(matF)
}
/** /**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples. * Combine the elements of current flow and the given [[Source]] into a stream of tuples.
* *