feat: Add flatten operator.

This commit is contained in:
He-Pin 2024-01-14 00:43:21 +08:00 committed by kerr
parent 6872f9c8b7
commit 447728f3c0
9 changed files with 37 additions and 7 deletions

View file

@ -83,7 +83,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"alsoToGraph",
"orElseGraph",
"divertToGraph",
"zipWithGraph")
"zipWithGraph",
"flatten")
// FIXME document these methods as well
val pendingTestCases = Map(

View file

@ -29,7 +29,7 @@ class FlatMapConcatDoubleSubscriberTest extends PekkoSubscriberBlackboxVerificat
def subscribe(s: Subscriber[_ >: Int]): Unit =
subscriber.success(s.asInstanceOf[Subscriber[Int]])
}))
.flatMapConcat(identity)
.flatten
.runWith(Sink.ignore)
Await.result(subscriber.future, 1.second)

View file

@ -89,7 +89,8 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"alsoToGraph",
"wireTapGraph",
"orElseGraph",
"divertToGraph")
"divertToGraph",
"flatten")
val forComprehensions = Set("withFilter", "flatMap", "foreach")

View file

@ -482,7 +482,7 @@ class ActorGraphInterpreterSpec extends StreamSpec {
})
}
}))
.flatMapConcat(identity)
.flatten
.runWith(Sink.ignore)
done.future.futureValue // would throw on failure
}

View file

@ -246,7 +246,7 @@ class FlowFlattenMergeSpec extends StreamSpec {
Source.single(11)))
val probe =
sources.flatMapConcat(identity).runWith(TestSink.probe)
sources.flatten.runWith(TestSink.probe)
probe.request(3)
probe.expectNext(0)

View file

@ -303,7 +303,7 @@ class FlowSplitAfterSpec extends StreamSpec("""
val streamWithTightTimeout =
testSource.lift
.delay(1.second)
.flatMapConcat(identity)
.flatten
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes
.streamSubscriptionTimeout(500.milliseconds, StreamSubscriptionTimeoutTerminationMode.cancel))

View file

@ -316,7 +316,7 @@ class FlowSplitWhenSpec extends StreamSpec("""
val testStreamWithTightTimeout =
testSource.lift
.delay(1.second)
.flatMapConcat(identity)
.flatten
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes
.streamSubscriptionTimeout(500.milliseconds, StreamSubscriptionTimeoutTerminationMode.cancel))

View file

@ -515,4 +515,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
Await.result(result, 4.seconds) shouldBe Done
}
}
"Source of sources" must {
"be able to concat with flatten" in {
(for {
i <- Source(1 to 5)
j = Source(1 to i)
} yield j).flatten
.reduce(_ + _)
.runWith(TestSink[Int]())
.request(1)
.expectNext(35)
.expectComplete()
}
}
}

View file

@ -2541,6 +2541,20 @@ trait FlowOps[+Out, +Mat] {
@ApiMayChange
def flatMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = flatMapConcat(f)
/**
* Flattens a stream of `Source` into a single output stream by concatenation,
* fully consuming one `Source` after the other. This function is qquivalent to <code>flatMapConcat(identity)</code>.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatten[T, M](implicit ev: Out <:< Graph[SourceShape[T], M]): Repr[T] = flatMap(ev)
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`