From ccd8481fec4c8951897ff470e02e19dcfbdc2120 Mon Sep 17 00:00:00 2001 From: eyal farago Date: Wed, 5 Feb 2020 15:37:27 +0100 Subject: [PATCH] stream: add flatMapPrefix operator (#28380) --- .../operators/Source-or-Flow/flatMapPrefix.md | 37 ++ .../main/paradox/stream/operators/index.md | 2 + .../akka/stream/tck/FlatMapPrefixTest.scala | 20 + .../scaladsl/FlowFlatMapPrefixSpec.scala | 544 ++++++++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 1 + .../stream/impl/fusing/FlatMapPrefix.scala | 170 ++++++ .../stream/impl/fusing/StreamOfStreams.scala | 4 +- .../main/scala/akka/stream/javadsl/Flow.scala | 41 ++ .../scala/akka/stream/javadsl/Source.scala | 41 ++ .../scala/akka/stream/javadsl/SubFlow.scala | 27 + .../scala/akka/stream/javadsl/SubSource.scala | 27 + .../scala/akka/stream/scaladsl/Flow.scala | 33 ++ 12 files changed, 945 insertions(+), 2 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md create mode 100644 akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md new file mode 100644 index 0000000000..e2dff32a5c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md @@ -0,0 +1,37 @@ +# flatMapPrefix + +Use the first `n` elements from the stream to determine how to process the rest. + +@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapPrefix } + +@@@ + +## Description + +Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements), +then apply *f* on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). +This method returns a flow consuming the rest of the stream producing the materialized flow's output. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the materialized flow emits. + Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements). + +**backpressures** when the materialized flow backpressures + +**completes** the materialized flow completes. + If upstream completes before producing `n` elements, `f` will be applied with the provided elements, + the resulting flow will be materialized and signalled for upstream completion, it can then or continue to emit elements at its own discretion. + + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index ae90373ca9..9ac5fbc2c9 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -241,6 +241,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code |--|--|--| |Source/Flow|@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.| |Source/Flow|@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.| +|Source/Flow|@ref[flatMapPrefix](Source-or-Flow/flatMapPrefix.md)|Use the first `n` elements from the stream to determine how to process the rest.| |Source/Flow|@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.| |Source/Flow|@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.| |Source/Flow|@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.| @@ -440,6 +441,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [extrapolate](Source-or-Flow/extrapolate.md) * [buffer](Source-or-Flow/buffer.md) * [prefixAndTail](Source-or-Flow/prefixAndTail.md) +* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md) * [groupBy](Source-or-Flow/groupBy.md) * [splitWhen](Source-or-Flow/splitWhen.md) * [splitAfter](Source-or-Flow/splitAfter.md) diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala new file mode 100644 index 0000000000..a3d7caa0bd --- /dev/null +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.stream.tck + +import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } +import org.reactivestreams.Publisher + +class FlatMapPrefixTest extends AkkaPublisherVerification[Int] { + override def createPublisher(elements: Long): Publisher[Int] = { + val publisher = Source(iterable(elements)) + .map(_.toInt) + .flatMapPrefixMat(1) { seq => + Flow[Int].prepend(Source(seq)) + }(Keep.left) + .runWith(Sink.asPublisher(false)) + publisher + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala new file mode 100644 index 0000000000..f0a6bfaa05 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala @@ -0,0 +1,544 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.stream.{ + AbruptStageTerminationException, + AbruptTerminationException, + Materializer, + NeverMaterializedException, + SubscriptionWithCancelException +} +import akka.{ Done, NotUsed } + +class FlowFlatMapPrefixSpec extends StreamSpec { + def src10(i: Int = 0) = Source(i until (i + 10)) + + "A PrefixAndDownstream" must { + + "work in the simple identity case" in assertAllStagesStopped { + src10() + .flatMapPrefixMat(2) { _ => + Flow[Int] + }(Keep.left) + .runWith(Sink.seq[Int]) + .futureValue should ===(2 until 10) + } + + "expose mat value in the simple identity case" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(2) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 2) + suffixF.futureValue should ===(2 until 10) + } + + "work when source is exactly the required prefix" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(10) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 10) + suffixF.futureValue should be(empty) + } + + "work when source has less than the required prefix" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(20) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 10) + suffixF.futureValue should be(empty) + } + + "simple identity case when downstream completes before consuming the entire stream" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .flatMapPrefixMat(10) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .take(10) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 10) + suffixF.futureValue should ===(10 until 20) + } + + "propagate failure to create the downstream flow" in assertAllStagesStopped { + val suffixF = Source(0 until 100) + .flatMapPrefixMat(10) { prefix => + throw TE(s"I hate mondays! (${prefix.size})") + }(Keep.right) + .to(Sink.ignore) + .run + + val ex = suffixF.failed.futureValue + ex.getCause should not be null + ex.getCause should ===(TE("I hate mondays! (10)")) + } + + "propagate flow failures" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .flatMapPrefixMat(10) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).map { + case 15 => throw TE("don't like 15 either!") + case n => n + } + }(Keep.right) + .toMat(Sink.ignore)(Keep.both) + .run + prefixF.futureValue should ===(0 until 10) + val ex = suffixF.failed.futureValue + ex should ===(TE("don't like 15 either!")) + } + + "produce multiple elements per input" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(7) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)) + }(Keep.right) + .toMat(Sink.seq[Int])(Keep.both) + .run() + + prefixF.futureValue should ===(0 until 7) + suffixF.futureValue should ===(7 :: 8 :: 8 :: 9 :: 9 :: 9 :: Nil) + } + + "succeed when upstream produces no elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source + .empty[Int] + .flatMapPrefixMat(7) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)) + }(Keep.right) + .toMat(Sink.seq[Int])(Keep.both) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should be(empty) + } + + "apply materialized flow's semantics when upstream produces no elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source + .empty[Int] + .flatMapPrefixMat(7) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)).prepend(Source(100 to 101)) + }(Keep.right) + .toMat(Sink.seq[Int])(Keep.both) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should ===(100 :: 101 :: Nil) + } + + "handles upstream completion" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val matValue = Source + .fromPublisher(publisher) + .flatMapPrefixMat(2) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).prepend(Source(100 to 101)) + }(Keep.right) + .to(Sink.fromSubscriber(subscriber)) + .run() + + matValue.value should be(empty) + + val upstream = publisher.expectSubscription() + val downstream = subscriber.expectSubscription() + + downstream.request(1000) + + upstream.expectRequest() + //completing publisher + upstream.sendComplete() + + matValue.futureValue should ===(Nil) + + subscriber.expectNext(100) + + subscriber.expectNext(101).expectComplete() + + } + + "work when materialized flow produces no downstream elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).filter(_ => false) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 4) + suffixF.futureValue should be(empty) + } + + "work when materialized flow does not consume upstream" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .map { i => + i should be <= 4 + i + } + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).take(0) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 4) + suffixF.futureValue should be(empty) + } + + "work when materialized flow cancels upstream but keep producing" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).take(0).concat(Source(11 to 12)) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.futureValue should ===(0 until 4) + suffixF.futureValue should ===(11 :: 12 :: Nil) + } + + "propagate materialization failure (when application of 'f' succeeds)" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => throw TE(s"boom-bada-bang (${prefix.size})")) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + prefixF.failed.futureValue should be(a[NeverMaterializedException]) + prefixF.failed.futureValue.getCause should ===(TE("boom-bada-bang (4)")) + suffixF.failed.futureValue should ===(TE("boom-bada-bang (4)")) + } + + "succeed when materialized flow completes downstream but keep consuming elements" in assertAllStagesStopped { + val (prefixAndTailF, suffixF) = src10() + .flatMapPrefixMat(4) { prefix => + Flow[Int] + .mapMaterializedValue(_ => prefix) + .viaMat { + Flow.fromSinkAndSourceMat(Sink.seq[Int], Source.empty[Int])(Keep.left) + }(Keep.both) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run + + suffixF.futureValue should be(empty) + val (prefix, suffix) = prefixAndTailF.futureValue + prefix should ===(0 until 4) + suffix.futureValue should ===(4 until 10) + } + + "downstream cancellation is propagated via the materialized flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val ((srcWatchTermF, notUsedF), suffixF) = src10() + .watchTermination()(Keep.right) + .flatMapPrefixMat(2) { prefix => + prefix should ===(0 until 2) + Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) + }(Keep.both) + .take(1) + .toMat(Sink.seq)(Keep.both) + .run() + + notUsedF.value should be(empty) + suffixF.value should be(empty) + srcWatchTermF.value should be(empty) + + val subUpstream = publisher.expectSubscription() + val subDownstream = subscriber.expectSubscription() + + notUsedF.futureValue should ===(NotUsed) + + subUpstream.expectRequest() should be >= (1L) + subDownstream.request(1) + subscriber.expectNext(2) + subUpstream.sendNext(22) + subUpstream.expectCancellation() + subDownstream.cancel() + + suffixF.futureValue should ===(Seq(22)) + srcWatchTermF.futureValue should ===(Done) + } + + "early downstream cancellation is later handed out to materialized flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val (srcWatchTermF, matFlowWatchTermFF) = Source + .fromPublisher(publisher) + .watchTermination()(Keep.right) + .flatMapPrefixMat(3) { prefix => + prefix should ===(0 until 3) + Flow[Int].watchTermination()(Keep.right) + }(Keep.both) + .to(Sink.fromSubscriber(subscriber)) + .run() + val matFlowWatchTerm = matFlowWatchTermFF.flatten + + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + val subDownstream = subscriber.expectSubscription() + val subUpstream = publisher.expectSubscription() + subDownstream.request(1) + subUpstream.expectRequest() should be >= (1L) + subUpstream.sendNext(0) + subUpstream.sendNext(1) + subDownstream.cancel() + + //subflow not materialized yet, hence mat value (future) isn't ready yet + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + //this one is sent AFTER downstream cancellation + subUpstream.sendNext(2) + + subUpstream.expectCancellation() + + matFlowWatchTerm.futureValue should ===(Done) + srcWatchTermF.futureValue should ===(Done) + + } + + "early downstream failure is deferred until prefix completion" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val (srcWatchTermF, matFlowWatchTermFF) = Source + .fromPublisher(publisher) + .watchTermination()(Keep.right) + .flatMapPrefixMat(3) { prefix => + prefix should ===(0 until 3) + Flow[Int].watchTermination()(Keep.right) + }(Keep.both) + .to(Sink.fromSubscriber(subscriber)) + .run() + val matFlowWatchTerm = matFlowWatchTermFF.flatten + + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + val subDownstream = subscriber.expectSubscription() + val subUpstream = publisher.expectSubscription() + subDownstream.request(1) + subUpstream.expectRequest() should be >= (1L) + subUpstream.sendNext(0) + subUpstream.sendNext(1) + subDownstream.asInstanceOf[SubscriptionWithCancelException].cancel(TE("that again?!")) + + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + subUpstream.sendNext(2) + + matFlowWatchTerm.failed.futureValue should ===(TE("that again?!")) + srcWatchTermF.failed.futureValue should ===(TE("that again?!")) + + subUpstream.expectCancellation() + } + + "downstream failure is propagated via the materialized flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val ((srcWatchTermF, notUsedF), suffixF) = src10() + .watchTermination()(Keep.right) + .flatMapPrefixMat(2) { prefix => + prefix should ===(0 until 2) + Flow.fromSinkAndSourceCoupled(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) + }(Keep.both) + .map { + case 2 => 2 + case 3 => throw TE("3!?!?!?") + case i => fail(s"unexpected value $i") + } + .toMat(Sink.seq)(Keep.both) + .run() + + notUsedF.value should be(empty) + suffixF.value should be(empty) + srcWatchTermF.value should be(empty) + + val subUpstream = publisher.expectSubscription() + val subDownstream = subscriber.expectSubscription() + + notUsedF.futureValue should ===(NotUsed) + + subUpstream.expectRequest() should be >= (1L) + subDownstream.request(1) + subscriber.expectNext(2) + subUpstream.sendNext(2) + subDownstream.request(1) + subscriber.expectNext(3) + subUpstream.sendNext(3) + subUpstream.expectCancellation() should ===(TE("3!?!?!?")) + subscriber.expectError(TE("3!?!?!?")) + + suffixF.failed.futureValue should ===(TE("3!?!?!?")) + srcWatchTermF.failed.futureValue should ===(TE("3!?!?!?")) + } + + "complete mat value with failures on abrupt termination before materializing the flow" in assertAllStagesStopped { + val mat = Materializer(system) + val publisher = TestPublisher.manualProbe[Int]() + + val flow = Source + .fromPublisher(publisher) + .flatMapPrefixMat(2) { prefix => + fail(s"unexpected prefix (length = ${prefix.size})") + Flow[Int] + }(Keep.right) + .toMat(Sink.ignore)(Keep.both) + + val (prefixF, doneF) = flow.run()(mat) + + publisher.expectSubscription() + prefixF.value should be(empty) + doneF.value should be(empty) + + mat.shutdown() + + prefixF.failed.futureValue match { + case _: AbruptTerminationException => + case ex: NeverMaterializedException => + ex.getCause should not be null + ex.getCause should be(a[AbruptTerminationException]) + } + doneF.failed.futureValue should be(a[AbruptTerminationException]) + } + + "respond to abrupt termination after flow materialization" in assertAllStagesStopped { + val mat = Materializer(system) + val countFF = src10() + .flatMapPrefixMat(2) { prefix => + prefix should ===(0 until 2) + Flow[Int] + .concat(Source.repeat(3)) + .fold(0L) { + case (acc, _) => acc + 1 + } + .alsoToMat(Sink.head)(Keep.right) + }(Keep.right) + .to(Sink.ignore) + .run()(mat) + val countF = countFF.futureValue + //at this point we know the flow was materialized, now we can stop the materializer + mat.shutdown() + //expect the nested flow to be terminated abruptly. + countF.failed.futureValue should be(a[AbruptStageTerminationException]) + } + + "behave like via when n = 0" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should ===(0 until 10) + } + + "behave like via when n = 0 and upstream produces no elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source + .empty[Int] + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should be(empty) + } + + "propagate errors during flow's creation when n = 0" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + throw TE("not this time my friend!") + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + prefixF.failed.futureValue should be(a[NeverMaterializedException]) + prefixF.failed.futureValue.getCause === (TE("not this time my friend!")) + suffixF.failed.futureValue should ===(TE("not this time my friend!")) + } + + "propagate materialization failures when n = 0" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + Flow[Int].mapMaterializedValue(_ => throw TE("Bang! no materialization this time")) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + prefixF.failed.futureValue should be(a[NeverMaterializedException]) + prefixF.failed.futureValue.getCause === (TE("Bang! no materialization this time")) + suffixF.failed.futureValue should ===(TE("Bang! no materialization this time")) + } + + "run a detached flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[String]() + + val detachedFlow = Flow.fromSinkAndSource(Sink.cancelled[Int], Source(List("a", "b", "c"))).via { + Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.empty[Int]) + } + val fHeadOpt = Source + .fromPublisher(publisher) + .flatMapPrefix(2) { prefix => + prefix should ===(0 until 2) + detachedFlow + } + .runWith(Sink.headOption) + + subscriber.expectNoMessage() + val subsc = publisher.expectSubscription() + subsc.expectRequest() should be >= 2L + subsc.sendNext(0) + subscriber.expectNoMessage() + subsc.sendNext(1) + val sinkSubscription = subscriber.expectSubscription() + //this indicates + fHeadOpt.futureValue should be(empty) + + //materializef flow immediately cancels upstream + subsc.expectCancellation() + //at this point both ends of the 'external' fow are closed + + sinkSubscription.request(10) + subscriber.expectNext("a", "b", "c") + subscriber.expectComplete() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index c298400b56..0aa4719579 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -55,6 +55,7 @@ import akka.stream._ val detacher = name("detacher") val groupBy = name("groupBy") val prefixAndTail = name("prefixAndTail") + val flatMapPrefix = name("flatMapPrefix") val split = name("split") val concatAll = name("concatAll") val processor = name("processor") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala new file mode 100644 index 0000000000..bf04363bba --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala @@ -0,0 +1,170 @@ +/* + * Copyright (C) 2015-2020 Lightbend Inc. + */ + +package akka.stream.impl.fusing + +import akka.annotation.InternalApi +import akka.stream.scaladsl.{ Flow, Keep, Source } +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.util.OptionVal + +import scala.collection.immutable +import scala.concurrent.{ Future, Promise } +import scala.util.control.NonFatal + +@InternalApi private[akka] final class FlatMapPrefix[In, Out, M](n: Int, f: immutable.Seq[In] => Flow[In, Out, M]) + extends GraphStageWithMaterializedValue[FlowShape[In, Out], Future[M]] { + + require(n >= 0, s"FlatMapPrefix: n ($n) must be non-negative.") + + val in = Inlet[In](s"${this}.in") + val out = Outlet[Out](s"${this}.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M] + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + val accumulated = collection.mutable.Buffer.empty[In] + + private var subSource = OptionVal.none[SubSourceOutlet[In]] + private var subSink = OptionVal.none[SubSinkInlet[Out]] + + private var downstreamCause = OptionVal.none[Throwable] + + setHandlers(in, out, this) + + override def postStop(): Unit = { + //this covers the case when the nested flow was never materialized + matPromise.tryFailure(new AbruptStageTerminationException(this)) + super.postStop() + } + + override def onPush(): Unit = { + subSource match { + case OptionVal.Some(s) => s.push(grab(in)) + case OptionVal.None => + accumulated.append(grab(in)) + if (accumulated.size == n) { + materializeFlow() + } else { + //gi'me some more! + pull(in) + } + } + } + + override def onUpstreamFinish(): Unit = { + subSource match { + case OptionVal.Some(s) => s.complete() + case OptionVal.None => materializeFlow() + } + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + subSource match { + case OptionVal.Some(s) => s.fail(ex) + case OptionVal.None => + //flow won't be materialized, so we have to complete the future with a failure indicating this + matPromise.failure(new NeverMaterializedException(ex)) + super.onUpstreamFailure(ex) + } + } + + override def onPull(): Unit = { + subSink match { + case OptionVal.Some(s) => + //delegate to subSink + s.pull() + case OptionVal.None if accumulated.size < n => + pull(in) + case OptionVal.None if accumulated.size == n => + //corner case for n = 0, can be handled in FlowOps + materializeFlow() + } + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + subSink match { + case OptionVal.None => downstreamCause = OptionVal.Some(cause) + case OptionVal.Some(s) => s.cancel(cause) + } + } + + def materializeFlow(): Unit = + try { + val prefix = accumulated.toVector + accumulated.clear() + subSource = OptionVal.Some(new SubSourceOutlet[In](s"${this}.subSource")) + val OptionVal.Some(theSubSource) = subSource + theSubSource.setHandler { + new OutHandler { + override def onPull(): Unit = { + if (!isClosed(in) && !hasBeenPulled(in)) { + pull(in) + } + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + if (!isClosed(in)) { + cancel(in, cause) + } + } + } + } + subSink = OptionVal.Some(new SubSinkInlet[Out](s"${this}.subSink")) + val OptionVal.Some(theSubSink) = subSink + theSubSink.setHandler { + new InHandler { + override def onPush(): Unit = { + push(out, theSubSink.grab()) + } + + override def onUpstreamFinish(): Unit = { + complete(out) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + fail(out, ex) + } + } + } + val matVal = try { + val flow = f(prefix) + val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink) + interpreter.subFusingMaterializer.materialize(runnableGraph) + } catch { + case NonFatal(ex) => + matPromise.failure(new NeverMaterializedException(ex)) + subSource = OptionVal.None + subSink = OptionVal.None + throw ex + } + matPromise.success(matVal) + + //in case downstream was closed + downstreamCause match { + case OptionVal.Some(ex) => theSubSink.cancel(ex) + case OptionVal.None => + } + + //in case we've materialized due to upstream completion + if (isClosed(in)) { + theSubSource.complete() + } + + //in case we've been pulled by downstream + if (isAvailable(out)) { + theSubSink.pull() + } + } catch { + case NonFatal(ex) => failStage(ex) + } + } + (logic, matPromise.future) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index c15f8819b7..49460d197a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -12,11 +12,11 @@ import akka.annotation.InternalApi import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream._ -import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.fusing.GraphStages.SingleSource +import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.TraversalBuilder -import akka.stream.impl.fusing.GraphStages.SingleSource import akka.stream.impl.{ Buffer => BufferImpl } import akka.stream.scaladsl._ import akka.stream.stage._ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6d51527000..3a651cbd49 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2024,6 +2024,47 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out], javadsl.Source[Out, NotUsed]], Mat] = new Flow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) }) + /** + * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements), + * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). + * This method returns a flow consuming the rest of the stream producing the materialized flow's output. + * + * '''Emits when''' the materialized flow emits. + * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the materialized flow completes. + * If upstream completes before producing `n` elements, `f` will be applied with the provided elements, + * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion. + * + * '''Cancels when''' the materialized flow cancels. + * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow. + * + * @param n the number of elements to accumulate before materializing the downstream flow. + * @param f a function that produces the downstream flow based on the upstream's prefix. + **/ + def flatMapPrefix[Out2, Mat2]( + n: Int, + f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = { + val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) + new javadsl.Flow(newDelegate) + } + + /** + * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow (as a completion stage). + * see [[#flatMapPrefix]] for details. + */ + def flatMapPrefixMat[Out2, Mat2, Mat3]( + n: Int, + f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]], + matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Flow[In, Out2, Mat3] = { + val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) => + matF(m1, fm2.toJava) + } + new javadsl.Flow(newDelegate) + } + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 6abcaf40e8..0f37b0df41 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3148,6 +3148,47 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ Mat] = new Source(delegate.prefixAndTail(n).map { case (taken, tail) => Pair(taken.asJava, tail.asJava) }) + /** + * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements), + * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). + * This method returns a flow consuming the rest of the stream producing the materialized flow's output. + * + * '''Emits when''' the materialized flow emits. + * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the materialized flow completes. + * If upstream completes before producing `n` elements, `f` will be applied with the provided elements, + * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion. + * + * '''Cancels when''' the materialized flow cancels. + * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow. + * + * @param n the number of elements to accumulate before materializing the downstream flow. + * @param f a function that produces the downstream flow based on the upstream's prefix. + **/ + def flatMapPrefix[Out2, Mat2]( + n: Int, + f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = { + val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) + new javadsl.Source(newDelegate) + } + + /** + * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow (as a completion stage). + * see [[#flatMapPrefix]] for details. + */ + def flatMapPrefixMat[Out2, Mat2, Mat3]( + n: Int, + f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]], + matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Source[Out2, Mat3] = { + val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) => + matF(m1, fm2.toJava) + } + new javadsl.Source(newDelegate) + } + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index f64955869e..08c4a11ddf 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1380,6 +1380,33 @@ class SubFlow[In, Out, Mat]( Mat] = new SubFlow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) }) + /** + * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements), + * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). + * This method returns a flow consuming the rest of the stream producing the materialized flow's output. + * + * '''Emits when''' the materialized flow emits. + * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the materialized flow completes. + * If upstream completes before producing `n` elements, `f` will be applied with the provided elements, + * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion. + * + * '''Cancels when''' the materialized flow cancels. + * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow. + * + * @param n the number of elements to accumulate before materializing the downstream flow. + * @param f a function that produces the downstream flow based on the upstream's prefix. + **/ + def flatMapPrefix[Out2, Mat2]( + n: Int, + f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = { + val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) + new javadsl.SubFlow(newDelegate) + } + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by concatenation, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index fabb22b948..4bac9e7366 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1357,6 +1357,33 @@ class SubSource[Out, Mat]( Mat] = new SubSource(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) }) + /** + * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements), + * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). + * This method returns a flow consuming the rest of the stream producing the materialized flow's output. + * + * '''Emits when''' the materialized flow emits. + * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the materialized flow completes. + * If upstream completes before producing `n` elements, `f` will be applied with the provided elements, + * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion. + * + * '''Cancels when''' the materialized flow cancels. + * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow. + * + * @param n the number of elements to accumulate before materializing the downstream flow. + * @param f a function that produces the downstream flow based on the upstream's prefix. + **/ + def flatMapPrefix[Out2, Mat2]( + n: Int, + f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = { + val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) + new javadsl.SubSource(newDelegate) + } + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by concatenation, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f23590a7e4..db651d5c17 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1929,6 +1929,30 @@ trait FlowOps[+Out, +Mat] { def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, NotUsed])] = via(new PrefixAndTail[Out](n)) + /** + * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements), + * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). + * This method returns a flow consuming the rest of the stream producing the materialized flow's output. + * + * '''Emits when''' the materialized flow emits. + * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements). + * + * '''Backpressures when''' the materialized flow backpressures + * + * '''Completes when''' the materialized flow completes. + * If upstream completes before producing `n` elements, `f` will be applied with the provided elements, + * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion. + * + * '''Cancels when''' the materialized flow cancels. + * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow. + * + * @param n the number of elements to accumulate before materializing the downstream flow. + * @param f a function that produces the downstream flow based on the upstream's prefix. + **/ + def flatMapPrefix[Out2, Mat2](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2]): Repr[Out2] = { + via(new FlatMapPrefix(n, f)) + } + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -3123,6 +3147,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3] + /** + * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow. + * see [[#flatMapPrefix]] for details. + */ + def flatMapPrefixMat[Out2, Mat2, Mat3](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2])( + matF: (Mat, Future[Mat2]) => Mat3): ReprMat[Out2, Mat3] = { + viaMat(new FlatMapPrefix(n, f))(matF) + } + /** * Combine the elements of current flow and the given [[Source]] into a stream of tuples. *