diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/alsoToAll.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/alsoToAll.md new file mode 100644 index 0000000000..273437f80e --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/alsoToAll.md @@ -0,0 +1,31 @@ +# alsoToAll + +Attaches the given @apidoc[Source]s to this @apidoc[Flow], meaning that elements that pass through this @apidoc[Flow] will also be sent to all those @apidoc[Sink]s. + +@ref[Fan-out operators](../index.md#fan-out-operators) + +## Signature + +@apidoc[Source.alsoToAll](Source) { scala="#alsoToAll(that:akka.stream.Graph[akka.stream.SinkShape[Out],_]*):FlowOps.this.Repr[Out]" java="#alsoToAll(akka.stream.Graph*)" } +@apidoc[Flow.alsoToAll](Flow) { scala="#alsoToAll(that:akka.stream.Graph[akka.stream.SinkShape[Out],_]*):FlowOps.this.Repr[Out]" java="#alsoToAll(akka.stream.Graph*)" } + +## Description + +Attaches the given @apidoc[Source] s to this @apidoc[Flow], meaning that elements that pass through this @apidoc[Flow] +will also be sent to all those @apidoc[Sink]s. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when an element is available and demand exists both from the @apidoc[Sink]s and the downstream + +**backpressures** when downstream or any of the @apidoc[Sink]s backpressures + +**completes** when upstream completes + +**cancels** when downstream or or any of the @apidoc[Sink]s cancels + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index dd3daafda5..cae12a8068 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -300,6 +300,7 @@ There is a number of fan-out operators for which currently no 'fluent' is API av | |@ref[Unzip](Unzip.md)|Takes a stream of two element tuples and unzips the two elements ino two different downstreams.| | |@ref[UnzipWith](UnzipWith.md)|Splits each element of input into multiple downstreams using a function| |Source/Flow|@ref[alsoTo](Source-or-Flow/alsoTo.md)|Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.| +|Source/Flow|@ref[alsoToAll](Source-or-Flow/alsoToAll.md)|Attaches the given @apidoc[Source]s to this @apidoc[Flow], meaning that elements that pass through this @apidoc[Flow] will also be sent to all those @apidoc[Sink]s.| |Source/Flow|@ref[divertTo](Source-or-Flow/divertTo.md)|Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.| |Source/Flow|@ref[wireTap](Source-or-Flow/wireTap.md)|Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow.| @@ -375,6 +376,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md) * [aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md) * [alsoTo](Source-or-Flow/alsoTo.md) +* [alsoToAll](Source-or-Flow/alsoToAll.md) * [asFlowWithContext](Flow/asFlowWithContext.md) * [asInputStream](StreamConverters/asInputStream.md) * [asJavaStream](StreamConverters/asJavaStream.md) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 6c569f485c..7456824ffe 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -1348,6 +1348,12 @@ public class FlowTest extends StreamTest { Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseAlsoToAll() { + final Flow f = + Flow.of(Integer.class).alsoToAll(Sink.ignore(), Sink.ignore()); + } + @Test public void mustBeAbleToUseDivertTo() { final Flow f = diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 9910d60521..6f16370275 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -1146,6 +1146,12 @@ public class SourceTest extends StreamTest { Source.empty().alsoToMat(Sink.ignore(), (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseAlsoToAll() { + final Source f = + Source.empty().alsoToAll(Sink.ignore(), Sink.ignore()); + } + @Test public void mustBeAbleToUseDivertTo() { final Source f = Source.empty().divertTo(Sink.ignore(), e -> true); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAlsoToAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAlsoToAllSpec.scala new file mode 100644 index 0000000000..6288953ea4 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAlsoToAllSpec.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +class FlowAlsoToAllSpec extends StreamSpec(""" + akka.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + + "An also to all" must { + "publish elements to all its downstream" in { + val (sub1, sink1) = TestSink.probe[Int].preMaterialize(); + val (sub2, sink2) = TestSink.probe[Int].preMaterialize(); + val (sub3, sink3) = TestSink.probe[Int].preMaterialize(); + Source.single(1).alsoToAll(sink1, sink2).runWith(sink3) + sub1.expectSubscription().request(1) + sub2.expectSubscription().request(1) + sub3.expectSubscription().request(1) + sub1.expectNext(1).expectComplete() + sub2.expectNext(1).expectComplete() + sub3.expectNext(1).expectComplete() + } + + "publish elements to its only downstream" in { + val (sub1, sink1) = TestSink.probe[Int].preMaterialize(); + Source.single(1).alsoToAll().runWith(sink1) + sub1.expectSubscription().request(1) + sub1.expectNext(1).expectComplete() + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 4170a4c604..75d5d1e004 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -9,12 +9,11 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.BiFunction import java.util.function.Supplier - import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import scala.annotation.nowarn +import scala.annotation.{ nowarn, varargs } import org.reactivestreams.Processor import akka.Done import akka.NotUsed @@ -27,7 +26,7 @@ import akka.japi.Pair import akka.japi.Util import akka.japi.function import akka.japi.function.Creator -import akka.stream._ +import akka.stream.{ javadsl, _ } import akka.util.ConstantFun import akka.util.JavaDurationConverters._ import akka.util.Timeout @@ -2623,6 +2622,25 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes + * through will also be sent to all those [[Sink]]s. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sinks and the downstream. + * + * '''Backpressures when''' downstream or any of the [[Sink]]s backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream or any of the [[Sink]]s cancels + */ + @varargs + @SafeVarargs + def alsoToAll(those: Graph[SinkShape[Out], _]*): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.alsoToAll(those: _*)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 65a2f174e9..a92a2f8dad 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -15,7 +15,6 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import scala.annotation.nowarn import org.reactivestreams.{ Publisher, Subscriber } import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } @@ -30,6 +29,8 @@ import akka.util.{ unused, _ } import akka.util.JavaDurationConverters._ import akka.util.ccompat.JavaConverters._ +import scala.annotation.{ nowarn, varargs } + /** Java API */ object Source { private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty) @@ -1395,7 +1396,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ new Source(delegate.orElseMat(secondary)(combinerToScala(matF))) /** - * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that passes * through will also be sent to the [[Sink]]. * * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. @@ -1411,6 +1412,25 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] = new Source(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes + * through will also be sent to all those [[Sink]]s. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sinks and the downstream. + * + * '''Backpressures when''' downstream or any of the [[Sink]]s backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream or any of the [[Sink]]s cancels + */ + @varargs + @SafeVarargs + def alsoToAll(those: Graph[SinkShape[Out], _]*): javadsl.Source[Out, Mat] = + new Source(delegate.alsoToAll(those: _*)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 0dbafad7cb..11a59da8c3 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -7,14 +7,11 @@ package akka.stream.javadsl import java.util.Comparator import java.util.concurrent.CompletionStage import java.util.function.Supplier - import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - -import scala.annotation.nowarn - +import scala.annotation.{ nowarn, varargs } import akka.NotUsed import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } @@ -1629,6 +1626,25 @@ class SubFlow[In, Out, Mat]( def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] = new SubFlow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes + * through will also be sent to all those [[Sink]]s. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sinks and the downstream. + * + * '''Backpressures when''' downstream or any of the [[Sink]]s backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream or any of the [[Sink]]s cancels + */ + @varargs + @SafeVarargs + def alsoToAll(those: Graph[SinkShape[Out], _]*): SubFlow[In, Out, Mat] = + new SubFlow(delegate.alsoToAll(those: _*)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 6cb1fbc6a0..9dd35d1370 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -7,14 +7,11 @@ package akka.stream.javadsl import java.util.Comparator import java.util.concurrent.CompletionStage import java.util.function.Supplier - import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - -import scala.annotation.nowarn - +import scala.annotation.{ nowarn, varargs } import akka.NotUsed import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } @@ -1589,7 +1586,7 @@ class SubSource[Out, Mat]( new SubSource(delegate.orElse(secondary)) /** - * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that passes * through will also be sent to the [[Sink]]. * * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. @@ -1605,6 +1602,25 @@ class SubSource[Out, Mat]( def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] = new SubSource(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes + * through will also be sent to all those [[Sink]]s. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sinks and the downstream. + * + * '''Backpressures when''' downstream or any of the [[Sink]]s backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream or any of the [[Sink]]s cancels + */ + @varargs + @SafeVarargs + def alsoToAll(those: Graph[SinkShape[Out], _]*): SubSource[Out, Mat] = + new SubSource(delegate.alsoToAll(those: _*)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8a8bad3fe5..8c6482f9a1 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3196,7 +3196,7 @@ trait FlowOps[+Out, +Mat] { def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed /** - * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass * through will also be sent to the [[Sink]]. * * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. @@ -3219,6 +3219,32 @@ trait FlowOps[+Out, +Mat] { FlowShape(bcast.in, bcast.out(0)) } + /** + * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sinks and the downstream. + * + * '''Backpressures when''' downstream or any of the [[Sink]]s backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream or any of the [[Sink]]s cancels + */ + def alsoToAll(those: Graph[SinkShape[Out], _]*): Repr[Out] = those match { + case those if those.isEmpty => this.asInstanceOf[Repr[Out]] + case _ => + via(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val bcast = b.add(Broadcast[Out](those.size + 1, eagerCancel = true)) + for ((that, idx) <- those.zipWithIndex) + bcast.out(idx + 1) ~> that + FlowShape(bcast.in, bcast.out(0)) + }) + } + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 8eca0fed7b..bde3e5fa0a 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -38,7 +38,9 @@ object AkkaDisciplinePlugin extends AutoPlugin { "akka-persistence-typed", // references to deprecated PARSER fields in generated message formats? "akka-persistence-query", - "akka-docs") + "akka-docs", + // use varargs of `Graph` in alsoTo and etc operators + "akka-stream-tests") val looseProjects = Set( "akka-actor",