+str Add mergeAll stream operator. (#31298)

This commit is contained in:
kerr 2022-09-01 21:10:33 +08:00 committed by GitHub
parent 078ddfa88c
commit c9185892db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 254 additions and 16 deletions

View file

@ -0,0 +1,33 @@
# mergeAll
Merge multiple sources.
@ref[Fan-in operators](../index.md#fan-in-operators)
## Signature
@apidoc[Source.mergeAll](Source) { scala="#mergeAll[U>:Out,M](those:immutable.Seq[akka.stream.Graph[akka.stream.SourceShape[U],M]],eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergeAll(java.util.List,boolean)" }
@apidoc[Flow.mergeAll](Flow) { scala="#mergeAll[U>:Out,M](those:immutable.Seq[akka.stream.Graph[akka.stream.SourceShape[U],M]],eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergeAll(java.util.List,boolean)" }
## Description
Merge multiple sources. Picks elements randomly if all sources has elements ready.
## Example
Scala
: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeAllSpec.scala) { #merge-all }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge-all }
## Reactive Streams semantics
@@@div { .callout }
**emits** when one of the inputs has an element available
**backpressures** when downstream backpressures
**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
@@@

View file

@ -273,6 +273,7 @@ the inputs in different ways.
|Source/Flow|<a name="interleave"></a>@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|Source/Flow|<a name="interleaveall"></a>@ref[interleaveAll](Source-or-Flow/interleaveAll.md)|Emits a specifiable number of elements from the original source, then from the provided sources and repeats.|
|Source/Flow|<a name="merge"></a>@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.|
|Source/Flow|<a name="mergeall"></a>@ref[mergeAll](Source-or-Flow/mergeAll.md)|Merge multiple sources.|
|Source/Flow|<a name="mergelatest"></a>@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.|
|Source/Flow|<a name="mergepreferred"></a>@ref[mergePreferred](Source-or-Flow/mergePreferred.md)|Merge multiple sources.|
|Source/Flow|<a name="mergeprioritized"></a>@ref[mergePrioritized](Source-or-Flow/mergePrioritized.md)|Merge multiple sources.|
@ -514,6 +515,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mapError](Source-or-Flow/mapError.md)
* [maybe](Source/maybe.md)
* [merge](Source-or-Flow/merge.md)
* [mergeAll](Source-or-Flow/mergeAll.md)
* [mergeLatest](Source-or-Flow/mergeLatest.md)
* [mergePreferred](Source-or-Flow/mergePreferred.md)
* [mergePrioritized](Source-or-Flow/mergePrioritized.md)

View file

@ -5,14 +5,16 @@
package jdocs.stream.operators;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.event.LogMarker;
import akka.japi.Pair;
import akka.japi.function.Function2;
import akka.japi.pf.PFBuilder;
import akka.stream.Attributes;
import akka.stream.javadsl.Flow;
import akka.NotUsed;
import akka.japi.function.Function2;
// #zip
// #zip-with
@ -204,6 +206,17 @@ class SourceOrFlow {
// #merge
}
void mergeAllExample() {
// #merge-all
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(4, 5, 6));
Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(7, 8, 9, 10));
sourceA.mergeAll(Arrays.asList(sourceB, sourceC), false)
.runForeach(System.out::println, system);
// merging is not deterministic, can for example print 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
// #merge-all
}
void mergePreferredExample() {
// #mergePreferred

View file

@ -10,6 +10,7 @@ import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Status;
import akka.japi.Pair;
import akka.japi.Util;
import akka.japi.function.*;
import akka.japi.pf.PFBuilder;
// #imports
@ -1125,6 +1126,20 @@ public class SourceTest extends StreamTest {
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
}
@Test
public void mustBeAbleToUseMerge3() {
final Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3));
final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(4, 5, 6));
final Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(7, 8, 9));
final TestSubscriber.Probe<Integer> sub =
sourceA
.mergeAll(Arrays.asList(sourceB, sourceC), false)
.runWith(TestSink.probe(system), system);
sub.expectSubscription().request(9);
sub.expectNextUnorderedN(Util.immutableSeq(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)))
.expectComplete();
}
@Test
public void mustBeAbleToUseInitialTimeout() {
ExecutionException exception =

View file

@ -0,0 +1,57 @@
/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
class FlowMergeAllSpec extends StreamSpec("""
akka.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest {
"Flow mergeAll" must {
"merge all upstream elements to its downstream" in {
val source1 = Source(1 to 3)
val source2 = Source(4 to 6)
val source3 = Source(7 to 10)
source1
.mergeAll(List(source2, source3), eagerComplete = false)
.fold(Set.empty[Int])((set, i) => set + i)
.runWith(TestSink.probe)
.request(1)
.expectNext(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.expectComplete();
}
"merge all elements of the first completed source to its downstream " in {
val source1 = Source(1 to 2)
val source2 = Source(3 to 6)
val source3 = Source(7 to 10)
val result =
source1.mergeAll(List(source2, source3), eagerComplete = true).runFold(Set.empty[Int])((set, i) => set + i)
result.futureValue should contain allElementsOf (Set(1, 2))
}
"merge single upstream elements to its downstream" in {
Source(1 to 3)
.mergeAll(Nil, eagerComplete = false)
.runWith(TestSink.probe)
.request(3)
.expectNext(1, 2, 3)
.expectComplete()
}
"works in merge numbers example" in {
// #merge-all
val sourceA = Source(1 to 3)
val sourceB = Source(4 to 6)
val sourceC = Source(7 to 10)
sourceA.mergeAll(List(sourceB, sourceC), eagerComplete = false).runForeach(println)
// merging is not deterministic, can for example print 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
// #merge-all
}
}
}

View file

@ -11,6 +11,7 @@ import java.util.function.BiFunction
import java.util.function.Supplier
import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.duration.FiniteDuration
@ -2977,6 +2978,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
eagerComplete: Boolean): javadsl.Flow[In, Out, M2] =
new Flow(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
/**
* Merge the given [[Source]]s to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
} else immutable.Seq()
new javadsl.Flow(delegate.mergeAll(seq, eagerComplete))
}
/**
* MergeLatest joins elements from N input streams into stream of lists of size N.
* i-th element in list is the latest emitted element from i-th input stream.

View file

@ -4,6 +4,22 @@
package akka.stream.javadsl
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import akka._
import akka.actor.ActorRef
import akka.actor.ClassicActorSystemProvider
@ -16,20 +32,6 @@ import akka.stream.impl.LinearTraversalBuilder
import akka.stream.javadsl
import akka.stream.scaladsl
import akka.stream.scaladsl.SinkToCompletionStage
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try
/** Java API */
object Sink {

View file

@ -1725,6 +1725,28 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
eagerComplete: Boolean): javadsl.Source[Out, M2] =
new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF)))
/**
* Merge the given [[Source]]s to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
} else immutable.Seq()
new Source(delegate.mergeAll(seq, eagerComplete))
}
/**
* MergeLatest joins elements from N input streams into stream of lists of size N.
* i-th element in list is the latest emitted element from i-th input stream.

View file

@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.duration.FiniteDuration
@ -1764,6 +1765,28 @@ class SubFlow[In, Out, Mat](
def merge(that: Graph[SourceShape[Out], _]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.merge(that))
/**
* Merge the given [[Source]]s to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
} else immutable.Seq()
new SubFlow(delegate.mergeAll(seq, eagerComplete))
}
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that`

View file

@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.duration.FiniteDuration
@ -1740,6 +1741,28 @@ class SubSource[Out, Mat](
def merge(that: Graph[SourceShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.merge(that))
/**
* Merge the given [[Source]]s to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubSource[Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
} else immutable.Seq()
new SubSource(delegate.mergeAll(seq, eagerComplete))
}
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,

View file

@ -2996,6 +2996,31 @@ trait FlowOps[+Out, +Mat] {
FlowShape(merge.in(0), merge.out)
}
/**
* Merge the given [[Source]]s to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def mergeAll[U >: Out](those: immutable.Seq[Graph[SourceShape[U], _]], eagerComplete: Boolean): Repr[U] =
those match {
case those if those.isEmpty => this.asInstanceOf[Repr[U]]
case _ =>
via(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[U](those.size + 1, eagerComplete))
for ((that, idx) <- those.zipWithIndex)
that ~> merge.in(idx + 1)
FlowShape(merge.in(0), merge.out)
})
}
/**
* MergeLatest joins elements from N input streams into stream of lists of size N.
* i-th element in list is the latest emitted element from i-th input stream.