diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md
new file mode 100644
index 0000000000..e2dff32a5c
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapPrefix.md
@@ -0,0 +1,37 @@
+# flatMapPrefix
+
+Use the first `n` elements from the stream to determine how to process the rest.
+
+@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
+
+@@@div { .group-scala }
+
+## Signature
+
+@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapPrefix }
+
+@@@
+
+## Description
+
+Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements),
+then apply *f* on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
+This method returns a flow consuming the rest of the stream producing the materialized flow's output.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the materialized flow emits.
+ Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
+
+**backpressures** when the materialized flow backpressures
+
+**completes** the materialized flow completes.
+ If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
+ the resulting flow will be materialized and signalled for upstream completion, it can then or continue to emit elements at its own discretion.
+
+
+@@@
+
+
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index ae90373ca9..9ac5fbc2c9 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -241,6 +241,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code
|--|--|--|
|Source/Flow|@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.|
|Source/Flow|@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.|
+|Source/Flow|@ref[flatMapPrefix](Source-or-Flow/flatMapPrefix.md)|Use the first `n` elements from the stream to determine how to process the rest.|
|Source/Flow|@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.|
|Source/Flow|@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.|
|Source/Flow|@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.|
@@ -440,6 +441,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [extrapolate](Source-or-Flow/extrapolate.md)
* [buffer](Source-or-Flow/buffer.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md)
+* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [splitWhen](Source-or-Flow/splitWhen.md)
* [splitAfter](Source-or-Flow/splitAfter.md)
diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala
new file mode 100644
index 0000000000..a3d7caa0bd
--- /dev/null
+++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapPrefixTest.scala
@@ -0,0 +1,20 @@
+/*
+ * Copyright (C) 2019-2020 Lightbend Inc.
+ */
+
+package akka.stream.tck
+
+import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
+import org.reactivestreams.Publisher
+
+class FlatMapPrefixTest extends AkkaPublisherVerification[Int] {
+ override def createPublisher(elements: Long): Publisher[Int] = {
+ val publisher = Source(iterable(elements))
+ .map(_.toInt)
+ .flatMapPrefixMat(1) { seq =>
+ Flow[Int].prepend(Source(seq))
+ }(Keep.left)
+ .runWith(Sink.asPublisher(false))
+ publisher
+ }
+}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala
new file mode 100644
index 0000000000..f0a6bfaa05
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala
@@ -0,0 +1,544 @@
+/*
+ * Copyright (C) 2019-2020 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.stream.testkit.Utils.TE
+import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
+import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
+import akka.stream.{
+ AbruptStageTerminationException,
+ AbruptTerminationException,
+ Materializer,
+ NeverMaterializedException,
+ SubscriptionWithCancelException
+}
+import akka.{ Done, NotUsed }
+
+class FlowFlatMapPrefixSpec extends StreamSpec {
+ def src10(i: Int = 0) = Source(i until (i + 10))
+
+ "A PrefixAndDownstream" must {
+
+ "work in the simple identity case" in assertAllStagesStopped {
+ src10()
+ .flatMapPrefixMat(2) { _ =>
+ Flow[Int]
+ }(Keep.left)
+ .runWith(Sink.seq[Int])
+ .futureValue should ===(2 until 10)
+ }
+
+ "expose mat value in the simple identity case" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(2) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 2)
+ suffixF.futureValue should ===(2 until 10)
+ }
+
+ "work when source is exactly the required prefix" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(10) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 10)
+ suffixF.futureValue should be(empty)
+ }
+
+ "work when source has less than the required prefix" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(20) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 10)
+ suffixF.futureValue should be(empty)
+ }
+
+ "simple identity case when downstream completes before consuming the entire stream" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source(0 until 100)
+ .flatMapPrefixMat(10) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .take(10)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 10)
+ suffixF.futureValue should ===(10 until 20)
+ }
+
+ "propagate failure to create the downstream flow" in assertAllStagesStopped {
+ val suffixF = Source(0 until 100)
+ .flatMapPrefixMat(10) { prefix =>
+ throw TE(s"I hate mondays! (${prefix.size})")
+ }(Keep.right)
+ .to(Sink.ignore)
+ .run
+
+ val ex = suffixF.failed.futureValue
+ ex.getCause should not be null
+ ex.getCause should ===(TE("I hate mondays! (10)"))
+ }
+
+ "propagate flow failures" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source(0 until 100)
+ .flatMapPrefixMat(10) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).map {
+ case 15 => throw TE("don't like 15 either!")
+ case n => n
+ }
+ }(Keep.right)
+ .toMat(Sink.ignore)(Keep.both)
+ .run
+ prefixF.futureValue should ===(0 until 10)
+ val ex = suffixF.failed.futureValue
+ ex should ===(TE("don't like 15 either!"))
+ }
+
+ "produce multiple elements per input" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(7) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n))
+ }(Keep.right)
+ .toMat(Sink.seq[Int])(Keep.both)
+ .run()
+
+ prefixF.futureValue should ===(0 until 7)
+ suffixF.futureValue should ===(7 :: 8 :: 8 :: 9 :: 9 :: 9 :: Nil)
+ }
+
+ "succeed when upstream produces no elements" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source
+ .empty[Int]
+ .flatMapPrefixMat(7) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n))
+ }(Keep.right)
+ .toMat(Sink.seq[Int])(Keep.both)
+ .run()
+
+ prefixF.futureValue should be(empty)
+ suffixF.futureValue should be(empty)
+ }
+
+ "apply materialized flow's semantics when upstream produces no elements" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source
+ .empty[Int]
+ .flatMapPrefixMat(7) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)).prepend(Source(100 to 101))
+ }(Keep.right)
+ .toMat(Sink.seq[Int])(Keep.both)
+ .run()
+
+ prefixF.futureValue should be(empty)
+ suffixF.futureValue should ===(100 :: 101 :: Nil)
+ }
+
+ "handles upstream completion" in assertAllStagesStopped {
+ val publisher = TestPublisher.manualProbe[Int]()
+ val subscriber = TestSubscriber.manualProbe[Int]()
+
+ val matValue = Source
+ .fromPublisher(publisher)
+ .flatMapPrefixMat(2) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).prepend(Source(100 to 101))
+ }(Keep.right)
+ .to(Sink.fromSubscriber(subscriber))
+ .run()
+
+ matValue.value should be(empty)
+
+ val upstream = publisher.expectSubscription()
+ val downstream = subscriber.expectSubscription()
+
+ downstream.request(1000)
+
+ upstream.expectRequest()
+ //completing publisher
+ upstream.sendComplete()
+
+ matValue.futureValue should ===(Nil)
+
+ subscriber.expectNext(100)
+
+ subscriber.expectNext(101).expectComplete()
+
+ }
+
+ "work when materialized flow produces no downstream elements" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source(0 until 100)
+ .flatMapPrefixMat(4) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).filter(_ => false)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 4)
+ suffixF.futureValue should be(empty)
+ }
+
+ "work when materialized flow does not consume upstream" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source(0 until 100)
+ .map { i =>
+ i should be <= 4
+ i
+ }
+ .flatMapPrefixMat(4) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).take(0)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 4)
+ suffixF.futureValue should be(empty)
+ }
+
+ "work when materialized flow cancels upstream but keep producing" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(4) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => prefix).take(0).concat(Source(11 to 12))
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.futureValue should ===(0 until 4)
+ suffixF.futureValue should ===(11 :: 12 :: Nil)
+ }
+
+ "propagate materialization failure (when application of 'f' succeeds)" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(4) { prefix =>
+ Flow[Int].mapMaterializedValue(_ => throw TE(s"boom-bada-bang (${prefix.size})"))
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ prefixF.failed.futureValue should be(a[NeverMaterializedException])
+ prefixF.failed.futureValue.getCause should ===(TE("boom-bada-bang (4)"))
+ suffixF.failed.futureValue should ===(TE("boom-bada-bang (4)"))
+ }
+
+ "succeed when materialized flow completes downstream but keep consuming elements" in assertAllStagesStopped {
+ val (prefixAndTailF, suffixF) = src10()
+ .flatMapPrefixMat(4) { prefix =>
+ Flow[Int]
+ .mapMaterializedValue(_ => prefix)
+ .viaMat {
+ Flow.fromSinkAndSourceMat(Sink.seq[Int], Source.empty[Int])(Keep.left)
+ }(Keep.both)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run
+
+ suffixF.futureValue should be(empty)
+ val (prefix, suffix) = prefixAndTailF.futureValue
+ prefix should ===(0 until 4)
+ suffix.futureValue should ===(4 until 10)
+ }
+
+ "downstream cancellation is propagated via the materialized flow" in assertAllStagesStopped {
+ val publisher = TestPublisher.manualProbe[Int]()
+ val subscriber = TestSubscriber.manualProbe[Int]()
+
+ val ((srcWatchTermF, notUsedF), suffixF) = src10()
+ .watchTermination()(Keep.right)
+ .flatMapPrefixMat(2) { prefix =>
+ prefix should ===(0 until 2)
+ Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
+ }(Keep.both)
+ .take(1)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ notUsedF.value should be(empty)
+ suffixF.value should be(empty)
+ srcWatchTermF.value should be(empty)
+
+ val subUpstream = publisher.expectSubscription()
+ val subDownstream = subscriber.expectSubscription()
+
+ notUsedF.futureValue should ===(NotUsed)
+
+ subUpstream.expectRequest() should be >= (1L)
+ subDownstream.request(1)
+ subscriber.expectNext(2)
+ subUpstream.sendNext(22)
+ subUpstream.expectCancellation()
+ subDownstream.cancel()
+
+ suffixF.futureValue should ===(Seq(22))
+ srcWatchTermF.futureValue should ===(Done)
+ }
+
+ "early downstream cancellation is later handed out to materialized flow" in assertAllStagesStopped {
+ val publisher = TestPublisher.manualProbe[Int]()
+ val subscriber = TestSubscriber.manualProbe[Int]()
+
+ val (srcWatchTermF, matFlowWatchTermFF) = Source
+ .fromPublisher(publisher)
+ .watchTermination()(Keep.right)
+ .flatMapPrefixMat(3) { prefix =>
+ prefix should ===(0 until 3)
+ Flow[Int].watchTermination()(Keep.right)
+ }(Keep.both)
+ .to(Sink.fromSubscriber(subscriber))
+ .run()
+ val matFlowWatchTerm = matFlowWatchTermFF.flatten
+
+ matFlowWatchTerm.value should be(empty)
+ srcWatchTermF.value should be(empty)
+
+ val subDownstream = subscriber.expectSubscription()
+ val subUpstream = publisher.expectSubscription()
+ subDownstream.request(1)
+ subUpstream.expectRequest() should be >= (1L)
+ subUpstream.sendNext(0)
+ subUpstream.sendNext(1)
+ subDownstream.cancel()
+
+ //subflow not materialized yet, hence mat value (future) isn't ready yet
+ matFlowWatchTerm.value should be(empty)
+ srcWatchTermF.value should be(empty)
+
+ //this one is sent AFTER downstream cancellation
+ subUpstream.sendNext(2)
+
+ subUpstream.expectCancellation()
+
+ matFlowWatchTerm.futureValue should ===(Done)
+ srcWatchTermF.futureValue should ===(Done)
+
+ }
+
+ "early downstream failure is deferred until prefix completion" in assertAllStagesStopped {
+ val publisher = TestPublisher.manualProbe[Int]()
+ val subscriber = TestSubscriber.manualProbe[Int]()
+
+ val (srcWatchTermF, matFlowWatchTermFF) = Source
+ .fromPublisher(publisher)
+ .watchTermination()(Keep.right)
+ .flatMapPrefixMat(3) { prefix =>
+ prefix should ===(0 until 3)
+ Flow[Int].watchTermination()(Keep.right)
+ }(Keep.both)
+ .to(Sink.fromSubscriber(subscriber))
+ .run()
+ val matFlowWatchTerm = matFlowWatchTermFF.flatten
+
+ matFlowWatchTerm.value should be(empty)
+ srcWatchTermF.value should be(empty)
+
+ val subDownstream = subscriber.expectSubscription()
+ val subUpstream = publisher.expectSubscription()
+ subDownstream.request(1)
+ subUpstream.expectRequest() should be >= (1L)
+ subUpstream.sendNext(0)
+ subUpstream.sendNext(1)
+ subDownstream.asInstanceOf[SubscriptionWithCancelException].cancel(TE("that again?!"))
+
+ matFlowWatchTerm.value should be(empty)
+ srcWatchTermF.value should be(empty)
+
+ subUpstream.sendNext(2)
+
+ matFlowWatchTerm.failed.futureValue should ===(TE("that again?!"))
+ srcWatchTermF.failed.futureValue should ===(TE("that again?!"))
+
+ subUpstream.expectCancellation()
+ }
+
+ "downstream failure is propagated via the materialized flow" in assertAllStagesStopped {
+ val publisher = TestPublisher.manualProbe[Int]()
+ val subscriber = TestSubscriber.manualProbe[Int]()
+
+ val ((srcWatchTermF, notUsedF), suffixF) = src10()
+ .watchTermination()(Keep.right)
+ .flatMapPrefixMat(2) { prefix =>
+ prefix should ===(0 until 2)
+ Flow.fromSinkAndSourceCoupled(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
+ }(Keep.both)
+ .map {
+ case 2 => 2
+ case 3 => throw TE("3!?!?!?")
+ case i => fail(s"unexpected value $i")
+ }
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ notUsedF.value should be(empty)
+ suffixF.value should be(empty)
+ srcWatchTermF.value should be(empty)
+
+ val subUpstream = publisher.expectSubscription()
+ val subDownstream = subscriber.expectSubscription()
+
+ notUsedF.futureValue should ===(NotUsed)
+
+ subUpstream.expectRequest() should be >= (1L)
+ subDownstream.request(1)
+ subscriber.expectNext(2)
+ subUpstream.sendNext(2)
+ subDownstream.request(1)
+ subscriber.expectNext(3)
+ subUpstream.sendNext(3)
+ subUpstream.expectCancellation() should ===(TE("3!?!?!?"))
+ subscriber.expectError(TE("3!?!?!?"))
+
+ suffixF.failed.futureValue should ===(TE("3!?!?!?"))
+ srcWatchTermF.failed.futureValue should ===(TE("3!?!?!?"))
+ }
+
+ "complete mat value with failures on abrupt termination before materializing the flow" in assertAllStagesStopped {
+ val mat = Materializer(system)
+ val publisher = TestPublisher.manualProbe[Int]()
+
+ val flow = Source
+ .fromPublisher(publisher)
+ .flatMapPrefixMat(2) { prefix =>
+ fail(s"unexpected prefix (length = ${prefix.size})")
+ Flow[Int]
+ }(Keep.right)
+ .toMat(Sink.ignore)(Keep.both)
+
+ val (prefixF, doneF) = flow.run()(mat)
+
+ publisher.expectSubscription()
+ prefixF.value should be(empty)
+ doneF.value should be(empty)
+
+ mat.shutdown()
+
+ prefixF.failed.futureValue match {
+ case _: AbruptTerminationException =>
+ case ex: NeverMaterializedException =>
+ ex.getCause should not be null
+ ex.getCause should be(a[AbruptTerminationException])
+ }
+ doneF.failed.futureValue should be(a[AbruptTerminationException])
+ }
+
+ "respond to abrupt termination after flow materialization" in assertAllStagesStopped {
+ val mat = Materializer(system)
+ val countFF = src10()
+ .flatMapPrefixMat(2) { prefix =>
+ prefix should ===(0 until 2)
+ Flow[Int]
+ .concat(Source.repeat(3))
+ .fold(0L) {
+ case (acc, _) => acc + 1
+ }
+ .alsoToMat(Sink.head)(Keep.right)
+ }(Keep.right)
+ .to(Sink.ignore)
+ .run()(mat)
+ val countF = countFF.futureValue
+ //at this point we know the flow was materialized, now we can stop the materializer
+ mat.shutdown()
+ //expect the nested flow to be terminated abruptly.
+ countF.failed.futureValue should be(a[AbruptStageTerminationException])
+ }
+
+ "behave like via when n = 0" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(0) { prefix =>
+ prefix should be(empty)
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ prefixF.futureValue should be(empty)
+ suffixF.futureValue should ===(0 until 10)
+ }
+
+ "behave like via when n = 0 and upstream produces no elements" in assertAllStagesStopped {
+ val (prefixF, suffixF) = Source
+ .empty[Int]
+ .flatMapPrefixMat(0) { prefix =>
+ prefix should be(empty)
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ prefixF.futureValue should be(empty)
+ suffixF.futureValue should be(empty)
+ }
+
+ "propagate errors during flow's creation when n = 0" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(0) { prefix =>
+ prefix should be(empty)
+ throw TE("not this time my friend!")
+ Flow[Int].mapMaterializedValue(_ => prefix)
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ prefixF.failed.futureValue should be(a[NeverMaterializedException])
+ prefixF.failed.futureValue.getCause === (TE("not this time my friend!"))
+ suffixF.failed.futureValue should ===(TE("not this time my friend!"))
+ }
+
+ "propagate materialization failures when n = 0" in assertAllStagesStopped {
+ val (prefixF, suffixF) = src10()
+ .flatMapPrefixMat(0) { prefix =>
+ prefix should be(empty)
+ Flow[Int].mapMaterializedValue(_ => throw TE("Bang! no materialization this time"))
+ }(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ prefixF.failed.futureValue should be(a[NeverMaterializedException])
+ prefixF.failed.futureValue.getCause === (TE("Bang! no materialization this time"))
+ suffixF.failed.futureValue should ===(TE("Bang! no materialization this time"))
+ }
+
+ "run a detached flow" in assertAllStagesStopped {
+ val publisher = TestPublisher.manualProbe[Int]()
+ val subscriber = TestSubscriber.manualProbe[String]()
+
+ val detachedFlow = Flow.fromSinkAndSource(Sink.cancelled[Int], Source(List("a", "b", "c"))).via {
+ Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.empty[Int])
+ }
+ val fHeadOpt = Source
+ .fromPublisher(publisher)
+ .flatMapPrefix(2) { prefix =>
+ prefix should ===(0 until 2)
+ detachedFlow
+ }
+ .runWith(Sink.headOption)
+
+ subscriber.expectNoMessage()
+ val subsc = publisher.expectSubscription()
+ subsc.expectRequest() should be >= 2L
+ subsc.sendNext(0)
+ subscriber.expectNoMessage()
+ subsc.sendNext(1)
+ val sinkSubscription = subscriber.expectSubscription()
+ //this indicates
+ fHeadOpt.futureValue should be(empty)
+
+ //materializef flow immediately cancels upstream
+ subsc.expectCancellation()
+ //at this point both ends of the 'external' fow are closed
+
+ sinkSubscription.request(10)
+ subscriber.expectNext("a", "b", "c")
+ subscriber.expectComplete()
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
index c298400b56..0aa4719579 100755
--- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
@@ -55,6 +55,7 @@ import akka.stream._
val detacher = name("detacher")
val groupBy = name("groupBy")
val prefixAndTail = name("prefixAndTail")
+ val flatMapPrefix = name("flatMapPrefix")
val split = name("split")
val concatAll = name("concatAll")
val processor = name("processor")
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala
new file mode 100644
index 0000000000..bf04363bba
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala
@@ -0,0 +1,170 @@
+/*
+ * Copyright (C) 2015-2020 Lightbend Inc.
+ */
+
+package akka.stream.impl.fusing
+
+import akka.annotation.InternalApi
+import akka.stream.scaladsl.{ Flow, Keep, Source }
+import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
+import akka.stream._
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.util.OptionVal
+
+import scala.collection.immutable
+import scala.concurrent.{ Future, Promise }
+import scala.util.control.NonFatal
+
+@InternalApi private[akka] final class FlatMapPrefix[In, Out, M](n: Int, f: immutable.Seq[In] => Flow[In, Out, M])
+ extends GraphStageWithMaterializedValue[FlowShape[In, Out], Future[M]] {
+
+ require(n >= 0, s"FlatMapPrefix: n ($n) must be non-negative.")
+
+ val in = Inlet[In](s"${this}.in")
+ val out = Outlet[Out](s"${this}.out")
+ override val shape: FlowShape[In, Out] = FlowShape(in, out)
+
+ override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
+ val matPromise = Promise[M]
+ val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
+ val accumulated = collection.mutable.Buffer.empty[In]
+
+ private var subSource = OptionVal.none[SubSourceOutlet[In]]
+ private var subSink = OptionVal.none[SubSinkInlet[Out]]
+
+ private var downstreamCause = OptionVal.none[Throwable]
+
+ setHandlers(in, out, this)
+
+ override def postStop(): Unit = {
+ //this covers the case when the nested flow was never materialized
+ matPromise.tryFailure(new AbruptStageTerminationException(this))
+ super.postStop()
+ }
+
+ override def onPush(): Unit = {
+ subSource match {
+ case OptionVal.Some(s) => s.push(grab(in))
+ case OptionVal.None =>
+ accumulated.append(grab(in))
+ if (accumulated.size == n) {
+ materializeFlow()
+ } else {
+ //gi'me some more!
+ pull(in)
+ }
+ }
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ subSource match {
+ case OptionVal.Some(s) => s.complete()
+ case OptionVal.None => materializeFlow()
+ }
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ subSource match {
+ case OptionVal.Some(s) => s.fail(ex)
+ case OptionVal.None =>
+ //flow won't be materialized, so we have to complete the future with a failure indicating this
+ matPromise.failure(new NeverMaterializedException(ex))
+ super.onUpstreamFailure(ex)
+ }
+ }
+
+ override def onPull(): Unit = {
+ subSink match {
+ case OptionVal.Some(s) =>
+ //delegate to subSink
+ s.pull()
+ case OptionVal.None if accumulated.size < n =>
+ pull(in)
+ case OptionVal.None if accumulated.size == n =>
+ //corner case for n = 0, can be handled in FlowOps
+ materializeFlow()
+ }
+ }
+
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ subSink match {
+ case OptionVal.None => downstreamCause = OptionVal.Some(cause)
+ case OptionVal.Some(s) => s.cancel(cause)
+ }
+ }
+
+ def materializeFlow(): Unit =
+ try {
+ val prefix = accumulated.toVector
+ accumulated.clear()
+ subSource = OptionVal.Some(new SubSourceOutlet[In](s"${this}.subSource"))
+ val OptionVal.Some(theSubSource) = subSource
+ theSubSource.setHandler {
+ new OutHandler {
+ override def onPull(): Unit = {
+ if (!isClosed(in) && !hasBeenPulled(in)) {
+ pull(in)
+ }
+ }
+
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ if (!isClosed(in)) {
+ cancel(in, cause)
+ }
+ }
+ }
+ }
+ subSink = OptionVal.Some(new SubSinkInlet[Out](s"${this}.subSink"))
+ val OptionVal.Some(theSubSink) = subSink
+ theSubSink.setHandler {
+ new InHandler {
+ override def onPush(): Unit = {
+ push(out, theSubSink.grab())
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ complete(out)
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ fail(out, ex)
+ }
+ }
+ }
+ val matVal = try {
+ val flow = f(prefix)
+ val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink)
+ interpreter.subFusingMaterializer.materialize(runnableGraph)
+ } catch {
+ case NonFatal(ex) =>
+ matPromise.failure(new NeverMaterializedException(ex))
+ subSource = OptionVal.None
+ subSink = OptionVal.None
+ throw ex
+ }
+ matPromise.success(matVal)
+
+ //in case downstream was closed
+ downstreamCause match {
+ case OptionVal.Some(ex) => theSubSink.cancel(ex)
+ case OptionVal.None =>
+ }
+
+ //in case we've materialized due to upstream completion
+ if (isClosed(in)) {
+ theSubSource.complete()
+ }
+
+ //in case we've been pulled by downstream
+ if (isAvailable(out)) {
+ theSubSink.pull()
+ }
+ } catch {
+ case NonFatal(ex) => failStage(ex)
+ }
+ }
+ (logic, matPromise.future)
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
index c15f8819b7..49460d197a 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
@@ -12,11 +12,11 @@ import akka.annotation.InternalApi
import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._
-import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.fusing.GraphStages.SingleSource
+import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.TraversalBuilder
-import akka.stream.impl.fusing.GraphStages.SingleSource
import akka.stream.impl.{ Buffer => BufferImpl }
import akka.stream.scaladsl._
import akka.stream.stage._
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 6d51527000..3a651cbd49 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -2024,6 +2024,47 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out], javadsl.Source[Out, NotUsed]], Mat] =
new Flow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) })
+ /**
+ * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
+ * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
+ * This method returns a flow consuming the rest of the stream producing the materialized flow's output.
+ *
+ * '''Emits when''' the materialized flow emits.
+ * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' the materialized flow completes.
+ * If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
+ * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
+ *
+ * '''Cancels when''' the materialized flow cancels.
+ * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
+ *
+ * @param n the number of elements to accumulate before materializing the downstream flow.
+ * @param f a function that produces the downstream flow based on the upstream's prefix.
+ **/
+ def flatMapPrefix[Out2, Mat2](
+ n: Int,
+ f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = {
+ val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
+ new javadsl.Flow(newDelegate)
+ }
+
+ /**
+ * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow (as a completion stage).
+ * see [[#flatMapPrefix]] for details.
+ */
+ def flatMapPrefixMat[Out2, Mat2, Mat3](
+ n: Int,
+ f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]],
+ matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Flow[In, Out2, Mat3] = {
+ val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) =>
+ matF(m1, fm2.toJava)
+ }
+ new javadsl.Flow(newDelegate)
+ }
+
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 6abcaf40e8..0f37b0df41 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -3148,6 +3148,47 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
Mat] =
new Source(delegate.prefixAndTail(n).map { case (taken, tail) => Pair(taken.asJava, tail.asJava) })
+ /**
+ * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
+ * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
+ * This method returns a flow consuming the rest of the stream producing the materialized flow's output.
+ *
+ * '''Emits when''' the materialized flow emits.
+ * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' the materialized flow completes.
+ * If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
+ * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
+ *
+ * '''Cancels when''' the materialized flow cancels.
+ * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
+ *
+ * @param n the number of elements to accumulate before materializing the downstream flow.
+ * @param f a function that produces the downstream flow based on the upstream's prefix.
+ **/
+ def flatMapPrefix[Out2, Mat2](
+ n: Int,
+ f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = {
+ val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
+ new javadsl.Source(newDelegate)
+ }
+
+ /**
+ * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow (as a completion stage).
+ * see [[#flatMapPrefix]] for details.
+ */
+ def flatMapPrefixMat[Out2, Mat2, Mat3](
+ n: Int,
+ f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]],
+ matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Source[Out2, Mat3] = {
+ val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) =>
+ matF(m1, fm2.toJava)
+ }
+ new javadsl.Source(newDelegate)
+ }
+
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index f64955869e..08c4a11ddf 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -1380,6 +1380,33 @@ class SubFlow[In, Out, Mat](
Mat] =
new SubFlow(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) })
+ /**
+ * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
+ * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
+ * This method returns a flow consuming the rest of the stream producing the materialized flow's output.
+ *
+ * '''Emits when''' the materialized flow emits.
+ * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' the materialized flow completes.
+ * If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
+ * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
+ *
+ * '''Cancels when''' the materialized flow cancels.
+ * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
+ *
+ * @param n the number of elements to accumulate before materializing the downstream flow.
+ * @param f a function that produces the downstream flow based on the upstream's prefix.
+ **/
+ def flatMapPrefix[Out2, Mat2](
+ n: Int,
+ f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = {
+ val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
+ new javadsl.SubFlow(newDelegate)
+ }
+
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index fabb22b948..4bac9e7366 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -1357,6 +1357,33 @@ class SubSource[Out, Mat](
Mat] =
new SubSource(delegate.prefixAndTail(n).map { case (taken, tail) => akka.japi.Pair(taken.asJava, tail.asJava) })
+ /**
+ * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
+ * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
+ * This method returns a flow consuming the rest of the stream producing the materialized flow's output.
+ *
+ * '''Emits when''' the materialized flow emits.
+ * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' the materialized flow completes.
+ * If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
+ * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
+ *
+ * '''Cancels when''' the materialized flow cancels.
+ * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
+ *
+ * @param n the number of elements to accumulate before materializing the downstream flow.
+ * @param f a function that produces the downstream flow based on the upstream's prefix.
+ **/
+ def flatMapPrefix[Out2, Mat2](
+ n: Int,
+ f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = {
+ val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
+ new javadsl.SubSource(newDelegate)
+ }
+
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index f23590a7e4..db651d5c17 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -1929,6 +1929,30 @@ trait FlowOps[+Out, +Mat] {
def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, NotUsed])] =
via(new PrefixAndTail[Out](n))
+ /**
+ * Takes up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements),
+ * then apply `f` on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).
+ * This method returns a flow consuming the rest of the stream producing the materialized flow's output.
+ *
+ * '''Emits when''' the materialized flow emits.
+ * Notice the first `n` elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).
+ *
+ * '''Backpressures when''' the materialized flow backpressures
+ *
+ * '''Completes when''' the materialized flow completes.
+ * If upstream completes before producing `n` elements, `f` will be applied with the provided elements,
+ * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.
+ *
+ * '''Cancels when''' the materialized flow cancels.
+ * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
+ *
+ * @param n the number of elements to accumulate before materializing the downstream flow.
+ * @param f a function that produces the downstream flow based on the upstream's prefix.
+ **/
+ def flatMapPrefix[Out2, Mat2](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2]): Repr[Out2] = {
+ via(new FlatMapPrefix(n, f))
+ }
+
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
@@ -3123,6 +3147,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3]
+ /**
+ * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow.
+ * see [[#flatMapPrefix]] for details.
+ */
+ def flatMapPrefixMat[Out2, Mat2, Mat3](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2])(
+ matF: (Mat, Future[Mat2]) => Mat3): ReprMat[Out2, Mat3] = {
+ viaMat(new FlatMapPrefix(n, f))(matF)
+ }
+
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*