+str Add Flow/Source#alsotoAll (#31292)

This commit is contained in:
kerr 2022-04-20 20:40:45 +08:00 committed by GitHub
parent 4949b12c5a
commit f4fcf383e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 195 additions and 16 deletions

View file

@ -0,0 +1,31 @@
# alsoToAll
Attaches the given @apidoc[Source]s to this @apidoc[Flow], meaning that elements that pass through this @apidoc[Flow] will also be sent to all those @apidoc[Sink]s.
@ref[Fan-out operators](../index.md#fan-out-operators)
## Signature
@apidoc[Source.alsoToAll](Source) { scala="#alsoToAll(that:akka.stream.Graph[akka.stream.SinkShape[Out],_]*):FlowOps.this.Repr[Out]" java="#alsoToAll(akka.stream.Graph*)" }
@apidoc[Flow.alsoToAll](Flow) { scala="#alsoToAll(that:akka.stream.Graph[akka.stream.SinkShape[Out],_]*):FlowOps.this.Repr[Out]" java="#alsoToAll(akka.stream.Graph*)" }
## Description
Attaches the given @apidoc[Source] s to this @apidoc[Flow], meaning that elements that pass through this @apidoc[Flow]
will also be sent to all those @apidoc[Sink]s.
## Reactive Streams semantics
@@@div { .callout }
**emits** when an element is available and demand exists both from the @apidoc[Sink]s and the downstream
**backpressures** when downstream or any of the @apidoc[Sink]s backpressures
**completes** when upstream completes
**cancels** when downstream or or any of the @apidoc[Sink]s cancels
@@@

View file

@ -300,6 +300,7 @@ There is a number of fan-out operators for which currently no 'fluent' is API av
| |<a name="unzip"></a>@ref[Unzip](Unzip.md)|Takes a stream of two element tuples and unzips the two elements ino two different downstreams.| | |<a name="unzip"></a>@ref[Unzip](Unzip.md)|Takes a stream of two element tuples and unzips the two elements ino two different downstreams.|
| |<a name="unzipwith"></a>@ref[UnzipWith](UnzipWith.md)|Splits each element of input into multiple downstreams using a function| | |<a name="unzipwith"></a>@ref[UnzipWith](UnzipWith.md)|Splits each element of input into multiple downstreams using a function|
|Source/Flow|<a name="alsoto"></a>@ref[alsoTo](Source-or-Flow/alsoTo.md)|Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.| |Source/Flow|<a name="alsoto"></a>@ref[alsoTo](Source-or-Flow/alsoTo.md)|Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.|
|Source/Flow|<a name="alsotoall"></a>@ref[alsoToAll](Source-or-Flow/alsoToAll.md)|Attaches the given @apidoc[Source]s to this @apidoc[Flow], meaning that elements that pass through this @apidoc[Flow] will also be sent to all those @apidoc[Sink]s.|
|Source/Flow|<a name="divertto"></a>@ref[divertTo](Source-or-Flow/divertTo.md)|Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.| |Source/Flow|<a name="divertto"></a>@ref[divertTo](Source-or-Flow/divertTo.md)|Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.|
|Source/Flow|<a name="wiretap"></a>@ref[wireTap](Source-or-Flow/wireTap.md)|Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow.| |Source/Flow|<a name="wiretap"></a>@ref[wireTap](Source-or-Flow/wireTap.md)|Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow.|
@ -375,6 +376,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md) * [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)
* [aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md) * [aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md)
* [alsoTo](Source-or-Flow/alsoTo.md) * [alsoTo](Source-or-Flow/alsoTo.md)
* [alsoToAll](Source-or-Flow/alsoToAll.md)
* [asFlowWithContext](Flow/asFlowWithContext.md) * [asFlowWithContext](Flow/asFlowWithContext.md)
* [asInputStream](StreamConverters/asInputStream.md) * [asInputStream](StreamConverters/asInputStream.md)
* [asJavaStream](StreamConverters/asJavaStream.md) * [asJavaStream](StreamConverters/asJavaStream.md)

View file

@ -1348,6 +1348,12 @@ public class FlowTest extends StreamTest {
Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo"); Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo");
} }
@Test
public void mustBeAbleToUseAlsoToAll() {
final Flow<Integer, Integer, NotUsed> f =
Flow.of(Integer.class).alsoToAll(Sink.ignore(), Sink.ignore());
}
@Test @Test
public void mustBeAbleToUseDivertTo() { public void mustBeAbleToUseDivertTo() {
final Flow<Integer, Integer, NotUsed> f = final Flow<Integer, Integer, NotUsed> f =

View file

@ -1146,6 +1146,12 @@ public class SourceTest extends StreamTest {
Source.<Integer>empty().alsoToMat(Sink.ignore(), (i, n) -> "foo"); Source.<Integer>empty().alsoToMat(Sink.ignore(), (i, n) -> "foo");
} }
@Test
public void mustBeAbleToUseAlsoToAll() {
final Source<Integer, NotUsed> f =
Source.<Integer>empty().alsoToAll(Sink.ignore(), Sink.ignore());
}
@Test @Test
public void mustBeAbleToUseDivertTo() { public void mustBeAbleToUseDivertTo() {
final Source<Integer, NotUsed> f = Source.<Integer>empty().divertTo(Sink.ignore(), e -> true); final Source<Integer, NotUsed> f = Source.<Integer>empty().divertTo(Sink.ignore(), e -> true);

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
class FlowAlsoToAllSpec extends StreamSpec("""
akka.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest {
"An also to all" must {
"publish elements to all its downstream" in {
val (sub1, sink1) = TestSink.probe[Int].preMaterialize();
val (sub2, sink2) = TestSink.probe[Int].preMaterialize();
val (sub3, sink3) = TestSink.probe[Int].preMaterialize();
Source.single(1).alsoToAll(sink1, sink2).runWith(sink3)
sub1.expectSubscription().request(1)
sub2.expectSubscription().request(1)
sub3.expectSubscription().request(1)
sub1.expectNext(1).expectComplete()
sub2.expectNext(1).expectComplete()
sub3.expectNext(1).expectComplete()
}
"publish elements to its only downstream" in {
val (sub1, sink1) = TestSink.probe[Int].preMaterialize();
Source.single(1).alsoToAll().runWith(sink1)
sub1.expectSubscription().request(1)
sub1.expectNext(1).expectComplete()
}
}
}

View file

@ -9,12 +9,11 @@ import java.util.Optional
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.function.BiFunction import java.util.function.BiFunction
import java.util.function.Supplier import java.util.function.Supplier
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.annotation.nowarn import scala.annotation.{ nowarn, varargs }
import org.reactivestreams.Processor import org.reactivestreams.Processor
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
@ -27,7 +26,7 @@ import akka.japi.Pair
import akka.japi.Util import akka.japi.Util
import akka.japi.function import akka.japi.function
import akka.japi.function.Creator import akka.japi.function.Creator
import akka.stream._ import akka.stream.{ javadsl, _ }
import akka.util.ConstantFun import akka.util.ConstantFun
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
import akka.util.Timeout import akka.util.Timeout
@ -2623,6 +2622,25 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] = def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.alsoTo(that)) new Flow(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes
* through will also be sent to all those [[Sink]]s.
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
@varargs
@SafeVarargs
def alsoToAll(those: Graph[SinkShape[Out], _]*): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.alsoToAll(those: _*))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].

View file

@ -15,7 +15,6 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.annotation.nowarn
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
@ -30,6 +29,8 @@ import akka.util.{ unused, _ }
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._ import akka.util.ccompat.JavaConverters._
import scala.annotation.{ nowarn, varargs }
/** Java API */ /** Java API */
object Source { object Source {
private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty) private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty)
@ -1395,7 +1396,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
new Source(delegate.orElseMat(secondary)(combinerToScala(matF))) new Source(delegate.orElseMat(secondary)(combinerToScala(matF)))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Source]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].
* *
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
@ -1411,6 +1412,25 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] = def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.alsoTo(that)) new Source(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes
* through will also be sent to all those [[Sink]]s.
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
@varargs
@SafeVarargs
def alsoToAll(those: Graph[SinkShape[Out], _]*): javadsl.Source[Out, Mat] =
new Source(delegate.alsoToAll(those: _*))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].

View file

@ -7,14 +7,11 @@ package akka.stream.javadsl
import java.util.Comparator import java.util.Comparator
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.function.Supplier import java.util.function.Supplier
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.annotation.{ nowarn, varargs }
import scala.annotation.nowarn
import akka.NotUsed import akka.NotUsed
import akka.annotation.ApiMayChange import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
@ -1629,6 +1626,25 @@ class SubFlow[In, Out, Mat](
def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] = def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.alsoTo(that)) new SubFlow(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes
* through will also be sent to all those [[Sink]]s.
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
@varargs
@SafeVarargs
def alsoToAll(those: Graph[SinkShape[Out], _]*): SubFlow[In, Out, Mat] =
new SubFlow(delegate.alsoToAll(those: _*))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`. * instead of being passed through if the predicate `when` returns `true`.

View file

@ -7,14 +7,11 @@ package akka.stream.javadsl
import java.util.Comparator import java.util.Comparator
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.function.Supplier import java.util.function.Supplier
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.annotation.{ nowarn, varargs }
import scala.annotation.nowarn
import akka.NotUsed import akka.NotUsed
import akka.annotation.ApiMayChange import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
@ -1589,7 +1586,7 @@ class SubSource[Out, Mat](
new SubSource(delegate.orElse(secondary)) new SubSource(delegate.orElse(secondary))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Source]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].
* *
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
@ -1605,6 +1602,25 @@ class SubSource[Out, Mat](
def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] = def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.alsoTo(that)) new SubSource(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes
* through will also be sent to all those [[Sink]]s.
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
@varargs
@SafeVarargs
def alsoToAll(those: Graph[SinkShape[Out], _]*): SubSource[Out, Mat] =
new SubSource(delegate.alsoToAll(those: _*))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`. * instead of being passed through if the predicate `when` returns `true`.

View file

@ -3196,7 +3196,7 @@ trait FlowOps[+Out, +Mat] {
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].
* *
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
@ -3219,6 +3219,32 @@ trait FlowOps[+Out, +Mat] {
FlowShape(bcast.in, bcast.out(0)) FlowShape(bcast.in, bcast.out(0))
} }
/**
* Attaches the given [[Sink]]s to this [[Source]], meaning that elements that pass
* through will also be sent to the [[Sink]].
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
def alsoToAll(those: Graph[SinkShape[Out], _]*): Repr[Out] = those match {
case those if those.isEmpty => this.asInstanceOf[Repr[Out]]
case _ =>
via(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Out](those.size + 1, eagerCancel = true))
for ((that, idx) <- those.zipWithIndex)
bcast.out(idx + 1) ~> that
FlowShape(bcast.in, bcast.out(0))
})
}
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`. * instead of being passed through if the predicate `when` returns `true`.

View file

@ -38,7 +38,9 @@ object AkkaDisciplinePlugin extends AutoPlugin {
"akka-persistence-typed", "akka-persistence-typed",
// references to deprecated PARSER fields in generated message formats? // references to deprecated PARSER fields in generated message formats?
"akka-persistence-query", "akka-persistence-query",
"akka-docs") "akka-docs",
// use varargs of `Graph` in alsoTo and etc operators
"akka-stream-tests")
val looseProjects = Set( val looseProjects = Set(
"akka-actor", "akka-actor",