diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 31c9c309ce..e2aada1769 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -83,7 +83,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "alsoToGraph", "orElseGraph", "divertToGraph", - "zipWithGraph") + "zipWithGraph", + "flatten") // FIXME document these methods as well val pendingTestCases = Map( diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/FlatMapConcatDoubleSubscriberTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/FlatMapConcatDoubleSubscriberTest.scala index 1a107c7907..e9083d675a 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/FlatMapConcatDoubleSubscriberTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/FlatMapConcatDoubleSubscriberTest.scala @@ -29,7 +29,7 @@ class FlatMapConcatDoubleSubscriberTest extends PekkoSubscriberBlackboxVerificat def subscribe(s: Subscriber[_ >: Int]): Unit = subscriber.success(s.asInstanceOf[Subscriber[Int]]) })) - .flatMapConcat(identity) + .flatten .runWith(Sink.ignore) Await.result(subscriber.future, 1.second) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index d3188f9053..c1c993990a 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -89,7 +89,8 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "alsoToGraph", "wireTapGraph", "orElseGraph", - "divertToGraph") + "divertToGraph", + "flatten") val forComprehensions = Set("withFilter", "flatMap", "foreach") diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala index ac4bc14c89..6c015c3431 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -482,7 +482,7 @@ class ActorGraphInterpreterSpec extends StreamSpec { }) } })) - .flatMapConcat(identity) + .flatten .runWith(Sink.ignore) done.future.futureValue // would throw on failure } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala index 191038ed01..4b89f51c72 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -246,7 +246,7 @@ class FlowFlattenMergeSpec extends StreamSpec { Source.single(11))) val probe = - sources.flatMapConcat(identity).runWith(TestSink.probe) + sources.flatten.runWith(TestSink.probe) probe.request(3) probe.expectNext(0) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala index 16b5e30675..81db84dae7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala @@ -303,7 +303,7 @@ class FlowSplitAfterSpec extends StreamSpec(""" val streamWithTightTimeout = testSource.lift .delay(1.second) - .flatMapConcat(identity) + .flatten .toMat(Sink.ignore)(Keep.right) .withAttributes(ActorAttributes .streamSubscriptionTimeout(500.milliseconds, StreamSubscriptionTimeoutTerminationMode.cancel)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala index 5eaaa1a9d3..a7522a51d4 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala @@ -316,7 +316,7 @@ class FlowSplitWhenSpec extends StreamSpec(""" val testStreamWithTightTimeout = testSource.lift .delay(1.second) - .flatMapConcat(identity) + .flatten .toMat(Sink.ignore)(Keep.right) .withAttributes(ActorAttributes .streamSubscriptionTimeout(500.milliseconds, StreamSubscriptionTimeoutTerminationMode.cancel)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index eee8ce96ee..48ece508e1 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -515,4 +515,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout { Await.result(result, 4.seconds) shouldBe Done } } + + "Source of sources" must { + "be able to concat with flatten" in { + (for { + i <- Source(1 to 5) + j = Source(1 to i) + } yield j).flatten + .reduce(_ + _) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(35) + .expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 6bb973aff5..5113ca20d2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2541,6 +2541,20 @@ trait FlowOps[+Out, +Mat] { @ApiMayChange def flatMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = flatMapConcat(f) + /** + * Flattens a stream of `Source` into a single output stream by concatenation, + * fully consuming one `Source` after the other. This function is qquivalent to flatMapConcat(identity). + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatten[T, M](implicit ev: Out <:< Graph[SourceShape[T], M]): Repr[T] = flatMap(ev) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth`