diff --git a/docs/src/main/paradox/stream/operators/Flow/flattenMerge.md b/docs/src/main/paradox/stream/operators/Flow/flattenMerge.md
new file mode 100644
index 0000000000..900bbcb7e5
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Flow/flattenMerge.md
@@ -0,0 +1,29 @@
+# flattenMerge
+
+Flattens a stream of Source into a single output stream by merging.
+
+@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
+
+## Signature
+
+@apidoc[Flow.flattenMerge](Flow) { scala="#flattenMerge%5BT%2C%20M%5D(breadth%3A%20Int)(implicit%20ev%3A%20Out%20%3C%3A%3C%20Graph%5BSourceShape%5BT%5D%2C%20M%5D):FlowOps.this.Repr[T]" }
+
+## Description
+
+Flattens a stream of `Source` into a single output stream by merging, where at most breadth substreams are being consumed
+at any given time. This function is equivalent to `flatMapMerge(breadth, identity)`.
+Emits when a currently consumed substream has an element available
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when one of the currently consumed substreams has an element available
+
+**backpressures** when downstream backpressures or the max number of substreams is reached
+
+**completes** when upstream completes and all consumed substreams complete
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index e79c880f21..adbb3183bf 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -251,6 +251,7 @@ See the @ref:[Substreams](../stream-substream.md) page for more detail and code
|Source/Flow|@ref[flatMapConcat](Source-or-Flow/flatMapConcat.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.|
|Source/Flow|@ref[flatMapMerge](Source-or-Flow/flatMapMerge.md)|Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.|
|Source/Flow|@ref[flatMapPrefix](Source-or-Flow/flatMapPrefix.md)|Use the first `n` elements from the stream to determine how to process the rest.|
+|Flow|@ref[flattenMerge](Flow/flattenMerge.md)|Flattens a stream of Source into a single output stream by merging.|
|Source/Flow|@ref[groupBy](Source-or-Flow/groupBy.md)|Demultiplex the incoming stream into separate output streams.|
|Source/Flow|@ref[prefixAndTail](Source-or-Flow/prefixAndTail.md)|Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.|
|Source/Flow|@ref[splitAfter](Source-or-Flow/splitAfter.md)|End the current substream whenever a predicate returns `true`, starting a new substream for the next element.|
@@ -451,6 +452,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [flatMapConcat](Source-or-Flow/flatMapConcat.md)
* [flatMapMerge](Source-or-Flow/flatMapMerge.md)
* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md)
+* [flattenMerge](Flow/flattenMerge.md)
* [flattenOptional](Flow/flattenOptional.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
index ce1d60bee9..7c78fe1af3 100755
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
@@ -91,7 +91,8 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"wireTapGraph",
"orElseGraph",
"divertToGraph",
- "flatten")
+ "flatten",
+ "flattenMerge")
val forComprehensions = Set("withFilter", "flatMap", "foreach")
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
index 8198d35265..99b342d74b 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
@@ -51,6 +51,13 @@ class FlowFlattenMergeSpec extends StreamSpec {
.futureValue should ===((0 until 40).toSet)
}
+ "work in the nominal case with flattenMerge" in {
+ Source(List(src10(0), src10(10), src10(20), src10(30)))
+ .flattenMerge(4)
+ .runWith(toSet)
+ .futureValue should ===((0 until 40).toSet)
+ }
+
"not be held back by one slow stream" in {
Source(List(src10(0), src10(10), blocked, src10(20), src10(30)))
.flatMapMerge(3, identity)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index f365368f91..da519cf81b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -2667,7 +2667,7 @@ trait FlowOps[+Out, +Mat] {
/**
* Flattens a stream of `Source` into a single output stream by concatenation,
- * fully consuming one `Source` after the other. This function is qquivalent to flatMapConcat(identity).
+ * fully consuming one `Source` after the other. This function is equivalent to flatMapConcat(identity).
*
* '''Emits when''' a currently consumed substream has an element available
*
@@ -2697,6 +2697,24 @@ trait FlowOps[+Out, +Mat] {
def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
map(f).via(new FlattenMerge[T, M](breadth))
+ /**
+ * Flattens a stream of `Source` into a single output stream by merging,
+ * where at most `breadth` substreams are being consumed at any given time.
+ * This function is equivalent to flatMapMerge(breadth, identity).
+ *
+ * '''Emits when''' a currently consumed substream has an element available
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes and all consumed substreams complete
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.1.0
+ */
+ def flattenMerge[T, M](breadth: Int)(implicit ev: Out <:< Graph[SourceShape[T], M]): Repr[T] =
+ flatMapMerge(breadth, ev)
+
/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.InitialTimeoutException]].