diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index caccfb9dc8..722424625a 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -1075,6 +1075,18 @@ This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attrib --------------------------------------------------------------- +### divertTo + +Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element. + +**emits** when the chosen output stops backpressuring and there is an input element available + +**backpressures** when the chosen output backpressures + +**completes** when upstream completes and no output is pending + +--------------------------------------------------------------- +
## Flow stages composed of Sinks and Sources diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 7cf6052de1..ba9a2c28b3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -932,9 +932,22 @@ public class FlowTest extends StreamTest { } + @Test public void mustSuitablyOverrideAttributeHandlingMethods() { @SuppressWarnings("unused") final Flow f = Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); } + + @Test + public void mustBeAbleToUseAlsoTo() { + final Flow f = Flow.of(Integer.class).alsoTo(Sink.ignore()); + final Flow f2 = Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo"); + } + + @Test + public void mustBeAbleToUseDivertTo() { + final Flow f = Flow.of(Integer.class).divertTo(Sink.ignore(), e -> true); + final Flow f2 = Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo"); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 68debad4f4..2ea2f5640c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -860,4 +860,16 @@ public class SourceTest extends StreamTest { assertEquals((Object) 0, result); } + + @Test + public void mustBeAbleToUseAlsoTo() { + final Source f = Source.empty().alsoTo(Sink.ignore()); + final Source f2 = Source.empty().alsoToMat(Sink.ignore(), (i, n) -> "foo"); + } + + @Test + public void mustBeAbleToUseDivertTo() { + final Source f = Source.empty().divertTo(Sink.ignore(), e -> true); + final Source f2 = Source.empty().divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo"); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 2627ea65eb..4463e6b92c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph", "divertToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass → graphHelpers, jSourceClass → graphHelpers, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala index 24caefa591..43679ea969 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala @@ -177,4 +177,17 @@ class GraphPartitionSpec extends StreamSpec { } } + + "divertTo must send matching elements to the sink" in assertAllStagesStopped { + val odd = TestSubscriber.probe[Int]() + val even = TestSubscriber.probe[Int]() + Source(1 to 2).divertTo(Sink.fromSubscriber(odd), _ % 2 != 0).to(Sink.fromSubscriber(even)).run() + even.request(1) + even.expectNoMsg(1.second) + odd.request(1) + odd.expectNext(1) + even.expectNext(2) + odd.expectComplete() + even.expectComplete() + } } diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index c4b055226b..ec4bc83b95 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -35,3 +35,8 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConve ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$RsProcessorConverter") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$FlowProcessorConverter$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$FlowProcessorConverter") + +# divertTo +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.divertToMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertTo") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertToGraph") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 680acfbced..64d5474382 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1695,6 +1695,33 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * '''Emits when''' emits when an element is available from the input and the chosen output has demand + * + * '''Backpressures when''' the currently chosen output back-pressures + * + * '''Completes when''' upstream completes and no output is pending + * + * '''Cancels when''' when all downstreams cancel + */ + def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.divertTo(that, when.test)) + + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * @see [[#divertTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = + new Flow(delegate.divertToMat(that, when.test)(combinerToScala(matF))) + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3ba09913a6..c6d94dad0e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -751,6 +751,33 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = new Source(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * '''Emits when''' emits when an element is available from the input and the chosen output has demand + * + * '''Backpressures when''' the currently chosen output back-pressures + * + * '''Completes when''' upstream completes and no output is pending + * + * '''Cancels when''' when all downstreams cancel + */ + def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): javadsl.Source[Out, Mat] = + new Source(delegate.divertTo(that, when.test)) + + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * @see [[#divertTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = + new Source(delegate.divertToMat(that, when.test)(combinerToScala(matF))) + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index d64cf5c7d1..9dfe6eb20b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1150,6 +1150,21 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] = new SubFlow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * '''Emits when''' emits when an element is available from the input and the chosen output has demand + * + * '''Backpressures when''' the currently chosen output back-pressures + * + * '''Completes when''' upstream completes and no output is pending + * + * '''Cancels when''' when all downstreams cancel + */ + def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.divertTo(that, when.test)) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 4fe1f3e5d7..a87787b607 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1142,6 +1142,21 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] = new SubSource(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * '''Emits when''' emits when an element is available from the input and the chosen output has demand + * + * '''Backpressures when''' the currently chosen output back-pressures + * + * '''Completes when''' upstream completes and no output is pending + * + * '''Cancels when''' when all downstreams cancel + */ + def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubSource[Out, Mat] = + new SubSource(delegate.divertTo(that, when.test)) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 47b749bd68..4300b50569 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2298,6 +2298,28 @@ trait FlowOps[+Out, +Mat] { FlowShape(bcast.in, bcast.out(0)) } + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * '''Emits when''' emits when an element is available from the input and the chosen output has demand + * + * '''Backpressures when''' the currently chosen output back-pressures + * + * '''Completes when''' upstream completes and no output is pending + * + * '''Cancels when''' when all downstreams cancel + */ + def divertTo(that: Graph[SinkShape[Out], _], when: Out ⇒ Boolean): Repr[Out] = via(divertToGraph(that, when)) + + protected def divertToGraph[M](that: Graph[SinkShape[Out], M], when: Out ⇒ Boolean): Graph[FlowShape[Out @uncheckedVariance, Out], M] = + GraphDSL.create(that) { implicit b ⇒ r ⇒ + import GraphDSL.Implicits._ + val partition = b.add(Partition[Out](2, out ⇒ if (when(out)) 1 else 0)) + partition.out(1) ~> r + FlowShape(partition.in, partition.out(0)) + } + def withAttributes(attr: Attributes): Repr[Out] def addAttributes(attr: Attributes): Repr[Out] @@ -2539,6 +2561,18 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] = viaMat(alsoToGraph(that))(matF) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] + * instead of being passed through if the predicate `when` returns `true`. + * + * @see [[#divertTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def divertToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], when: Out ⇒ Boolean)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] = + viaMat(divertToGraph(that, when))(matF) + /** * Materializes to `Future[Done]` that completes on getting termination message. * The Future completes with success when received complete message from upstream or cancel