From a159aee6d84513e0a3d45a25702a047def829646 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 17 Dec 2023 17:56:44 +0800 Subject: [PATCH] +str Add combine seq method to Source and Sink. --- .../apache/pekko/stream/javadsl/SinkTest.java | 38 ++++++++++++++ .../pekko/stream/javadsl/SourceTest.java | 13 +++++ .../pekko/stream/scaladsl/SinkSpec.scala | 34 +++++++++++-- .../pekko/stream/scaladsl/SourceSpec.scala | 8 +++ .../apache/pekko/stream/javadsl/Sink.scala | 47 +++++++++++++++--- .../apache/pekko/stream/javadsl/Source.scala | 34 ++++++++++--- .../apache/pekko/stream/scaladsl/Sink.scala | 49 ++++++++++++++++--- .../apache/pekko/stream/scaladsl/Source.scala | 45 ++++++++++++----- 8 files changed, 236 insertions(+), 32 deletions(-) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index 287a6d213e..cdcfd588bd 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -26,12 +26,15 @@ import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; import org.apache.pekko.japi.function.Function; import org.apache.pekko.stream.*; +import org.apache.pekko.stream.testkit.TestSubscriber; +import org.apache.pekko.stream.testkit.javadsl.TestSink; import org.apache.pekko.testkit.javadsl.TestKit; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; +import org.reactivestreams.Subscription; import static org.junit.Assert.*; @@ -127,6 +130,41 @@ public class SinkTest extends StreamTest { probe2.expectMsgEquals("done2"); } + @Test + public void mustBeAbleToUseCombineMat() { + final Sink> sink1 = TestSink.create(system); + final Sink> sink2 = TestSink.create(system); + final Sink, TestSubscriber.Probe>> sink = + Sink.combineMat(sink1, sink2, Broadcast::create, Keep.both()); + + final Pair, TestSubscriber.Probe> subscribers = + Source.from(Arrays.asList(0, 1)).runWith(sink, system); + final TestSubscriber.Probe subscriber1 = subscribers.first(); + final TestSubscriber.Probe subscriber2 = subscribers.second(); + final Subscription sub1 = subscriber1.expectSubscription(); + final Subscription sub2 = subscriber2.expectSubscription(); + sub1.request(2); + sub2.request(2); + subscriber1.expectNext(0, 1).expectComplete(); + subscriber2.expectNext(0, 1).expectComplete(); + } + + @Test + public void mustBeAbleToUseCombineMany() throws Exception { + final Sink> firstSink = Sink.head(); + final Sink> secondSink = Sink.head(); + final Sink> thirdSink = Sink.head(); + + final Sink>> combineSink = + Sink.combine(Arrays.asList(firstSink, secondSink, thirdSink), Broadcast::create); + final List> results = + Source.single(1L).toMat(combineSink, Keep.right()).run(system); + for (CompletionStage result : results) { + final long value = result.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(1L, value); + } + } + @Test public void mustBeAbleToUseContramap() throws Exception { List out = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index b316b96a9d..f385ecb807 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -968,6 +968,19 @@ public class SourceTest extends StreamTest { probe.expectMsgAllOf(0, 1, 2, 3); } + @Test + public void mustBeAbleToCombineN() throws Exception { + final Source source1 = Source.single(1); + final Source source2 = Source.single(2); + final Source source3 = Source.single(3); + final List> sources = Arrays.asList(source1, source2, source3); + final CompletionStage result = + Source.combine(sources, Concat::create) + .runWith(Sink.collect(Collectors.toList()), system) + .thenApply(list -> list.stream().mapToInt(l -> l).sum()); + assertEquals(6, result.toCompletableFuture().get(3, TimeUnit.SECONDS).intValue()); + } + @SuppressWarnings("unchecked") @Test public void mustBeAbleToZipN() throws Exception { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index c0b32c0116..e5f0202b5c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -17,16 +17,16 @@ import scala.annotation.nowarn import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import org.reactivestreams.Publisher -import org.scalatest.concurrent.ScalaFutures - import org.apache.pekko +import org.scalatest.concurrent.ScalaFutures import pekko.Done import pekko.stream._ import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.{ TestSink, TestSource } import pekko.testkit.DefaultTimeout +import org.reactivestreams.Publisher + class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import GraphDSL.Implicits._ @@ -138,6 +138,34 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "combine many sinks to one" in { + val source = Source(List(0, 1, 2, 3, 4, 5)) + implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic + val sink = Sink + .combine( + List( + Sink.reduce[Int]((a, b) => a + b), + Sink.reduce[Int]((a, b) => a + b), + Sink.reduce[Int]((a, b) => a + b)))(Broadcast[Int](_)) + .mapMaterializedValue(Future.reduceLeft(_)(_ + _)) + val result = source.runWith(sink) + result.futureValue should be(45) + } + + "combine two sinks with combineMat" in { + implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic + Source(List(0, 1, 2, 3, 4, 5)) + .toMat(Sink.combineMat(Sink.reduce[Int]((a, b) => a + b), Sink.reduce[Int]((a, b) => a + b))(Broadcast[Int](_))( + (f1, f2) => { + for { + r1 <- f1 + r2 <- f2 + } yield r1 + r2 + }))(Keep.right) + .run() + .futureValue should be(30) + } + "combine to two sinks with simplified API" in { val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]()) val sink = Sink.combine(Sink.fromSubscriber(probes(0)), Sink.fromSubscriber(probes(1)))(Broadcast[Int](_)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index a24e492269..eee8ce96ee 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -150,6 +150,14 @@ class SourceSpec extends StreamSpec with DefaultTimeout { out.expectComplete() } + "combine many sources into one" in { + val sources = Vector.tabulate(5)(_ => Source.maybe[Int]) + val (promises, sub) = Source.combine(sources)(Concat(_)).toMat(TestSink.probe[Int])(Keep.both).run() + for ((promise, idx) <- promises.zipWithIndex) + promise.success(Some(idx)) + sub.request(5).expectNextN(0 to 4).expectComplete() + } + "combine from two inputs with simplified API" in { val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]()) val source = Source.fromPublisher(probes(0)) :: Source.fromPublisher(probes(1)) :: Nil diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index b5b63060c5..7bbbcd782f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -19,21 +19,19 @@ import java.util.concurrent.CompletionStage import java.util.function.BiFunction import java.util.stream.Collector +import scala.annotation.nowarn import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.util.Try -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber - import org.apache.pekko import pekko._ import pekko.actor.ActorRef import pekko.actor.ClassicActorSystemProvider import pekko.actor.Status import pekko.dispatch.ExecutionContexts -import pekko.japi.function +import pekko.japi.{ function, Util } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.LinearTraversalBuilder @@ -43,6 +41,9 @@ import pekko.stream.scaladsl.SinkToCompletionStage import pekko.util.FutureConverters._ import pekko.util.OptionConverters._ +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + /** Java API */ object Sink { @@ -372,10 +373,44 @@ object Sink { output1: Sink[U, _], output2: Sink[U, _], rest: java.util.List[Sink[U, _]], - strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = { + @nowarn + @deprecatedName(Symbol("strategy")) + fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]) + : Sink[T, NotUsed] = { import pekko.util.ccompat.JavaConverters._ val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else immutable.Seq() - new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => strategy.apply(num))) + new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => fanOutStrategy.apply(num))) + } + + /** + * Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink` with 2 outlets. + * @since 1.1.0 + */ + def combineMat[T, U, M1, M2, M]( + first: Sink[U, M1], + second: Sink[U, M2], + fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]], + matF: function.Function2[M1, M2, M]): Sink[T, M] = { + new Sink( + scaladsl.Sink.combineMat(first.asScala, second.asScala)(size => fanOutStrategy(size))(combinerToScala(matF))) + } + + /** + * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. + * The fanoutGraph's outlets size must match the provides sinks'. + * @since 1.1.0 + */ + def combine[T, U, M]( + sinks: java.util.List[_ <: Graph[SinkShape[U], M]], + fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]) + : Sink[T, java.util.List[M]] = { + val seq = if (sinks != null) Util.immutableSeq(sinks).collect { + case sink: Sink[U @unchecked, M @unchecked] => sink.asScala + case other => other + } + else immutable.Seq() + import org.apache.pekko.util.ccompat.JavaConverters._ + new Sink(scaladsl.Sink.combine(seq)(size => fanOutStrategy(size)).mapMaterializedValue(_.asJava)) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index ae01282622..1f6e80d60d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import org.reactivestreams.{ Publisher, Subscriber } - import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } @@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._ import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ +import org.reactivestreams.{ Publisher, Subscriber } + /** Java API */ object Source { private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty) @@ -656,10 +656,12 @@ object Source { first: Source[T, _ <: Any], second: Source[T, _ <: Any], rest: java.util.List[Source[T, _ <: Any]], - strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]) + @nowarn + @deprecatedName(Symbol("strategy")) + fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, NotUsed] = { val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else immutable.Seq() - new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => strategy.apply(num))) + new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num))) } /** @@ -668,10 +670,30 @@ object Source { def combineMat[T, U, M1, M2, M]( first: Source[T, M1], second: Source[T, M2], - strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]], + @nowarn + @deprecatedName(Symbol("strategy")) + fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]], combine: function.Function2[M1, M2, M]): Source[U, M] = { new Source( - scaladsl.Source.combineMat(first.asScala, second.asScala)(num => strategy.apply(num))(combinerToScala(combine))) + scaladsl.Source.combineMat(first.asScala, second.asScala)(num => fanInStrategy.apply(num))( + combinerToScala(combine))) + } + + /** + * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]]. + * @since 1.1.0 + */ + def combine[T, U, M]( + sources: java.util.List[_ <: Graph[SourceShape[T], M]], + fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]]) + : Source[U, java.util.List[M]] = { + val seq = if (sources != null) Util.immutableSeq(sources).collect { + case source: Source[T @unchecked, M @unchecked] => source.asScala + case other => other + } + else immutable.Seq() + import org.apache.pekko.util.ccompat.JavaConverters._ + new Source(scaladsl.Source.combine(seq)(size => fanInStrategy(size)).mapMaterializedValue(_.asJava)) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 7e379a350b..099c56425f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -13,7 +13,7 @@ package org.apache.pekko.stream.scaladsl -import scala.annotation.tailrec +import scala.annotation.{ nowarn, tailrec } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.ExecutionContext @@ -22,9 +22,6 @@ import scala.util.Failure import scala.util.Success import scala.util.Try -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -40,6 +37,9 @@ import pekko.stream.javadsl import pekko.stream.stage._ import pekko.util.ccompat._ +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + /** * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` @@ -339,10 +339,12 @@ object Sink { * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. */ def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)( - strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] = + @nowarn + @deprecatedName(Symbol("strategy")) + fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] = Sink.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ - val d = b.add(strategy(rest.size + 2)) + val d = b.add(fanOutStrategy(rest.size + 2)) d.out(0) ~> first d.out(1) ~> second @@ -355,6 +357,41 @@ object Sink { combineRest(2, rest.iterator) }) + /** + * Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink` with 2 outlets. + * @since 1.1.0 + */ + def combineMat[T, U, M1, M2, M](first: Sink[U, M1], second: Sink[U, M2])( + fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed])(matF: (M1, M2) => M): Sink[T, M] = { + Sink.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b => (shape1, shape2) => + import GraphDSL.Implicits._ + val d = b.add(fanOutStrategy(2)) + d.out(0) ~> shape1 + d.out(1) ~> shape2 + new SinkShape[T](d.in) + }) + } + + /** + * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. + * The fanoutGraph's outlets size must match the provides sinks'. + * @since 1.1.0 + */ + def combine[T, U, M](sinks: immutable.Seq[Graph[SinkShape[U], M]])( + fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, immutable.Seq[M]] = + sinks match { + case immutable.Seq() => Sink.cancelled.mapMaterializedValue(_ => Nil) + case immutable.Seq(sink) => sink.asInstanceOf[Sink[T, M]].mapMaterializedValue(_ :: Nil) + case _ => + Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes => + import GraphDSL.Implicits._ + val c = b.add(fanOutStrategy(sinks.size)) + for ((shape, idx) <- shapes.zipWithIndex) + c.out(idx) ~> shape + SinkShape(c.in) + }) + } + /** * A `Sink` that will invoke the given function to each of the elements * as they pass in. The sink is materialized into a [[scala.concurrent.Future]] diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 429276f36d..4fa134838e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.CompletionStage -import scala.annotation.tailrec +import scala.annotation.{ nowarn, tailrec } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.{ Future, Promise } @@ -756,10 +756,12 @@ object Source { * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]]. */ def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)( - strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] = + @nowarn + @deprecatedName(Symbol("strategy")) + fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] = Source.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ - val c = b.add(strategy(rest.size + 2)) + val c = b.add(fanInStrategy(rest.size + 2)) first ~> c.in(0) second ~> c.in(1) @@ -772,19 +774,40 @@ object Source { combineRest(2, rest.iterator) }) + /** + * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]]. + * @since 1.1.0 + */ + def combine[T, U, M](sources: immutable.Seq[Graph[SourceShape[T], M]])( + fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, immutable.Seq[M]] = + sources match { + case immutable.Seq() => Source.empty.mapMaterializedValue(_ => Nil) + case immutable.Seq(source) => source.asInstanceOf[Source[U, M]].mapMaterializedValue(_ :: Nil) + case _ => + Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes => + import GraphDSL.Implicits._ + val c = b.add(fanInStrategy(sources.size)) + for ((shape, i) <- shapes.zipWithIndex) { + shape ~> c.in(i) + } + SourceShape(c.out) + }) + } + /** * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]] with a materialized value. */ def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])( - strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] = { - val secondPartiallyCombined = GraphDSL.createGraph(second) { implicit b => secondShape => + @nowarn + @deprecatedName(Symbol("strategy")) + fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] = + Source.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b => (shape1, shape2) => import GraphDSL.Implicits._ - val c = b.add(strategy(2)) - secondShape ~> c.in(1) - FlowShape(c.in(0), c.out) - } - first.viaMat(secondPartiallyCombined)(matF) - } + val c = b.add(fanInStrategy(2)) + shape1 ~> c.in(0) + shape2 ~> c.in(1) + SourceShape(c.out) + }) /** * Combine the elements of multiple streams into a stream of sequences.