The most common partition-case, as divertTo(Mat) #24388

This commit is contained in:
Viktor Klang (√) 2018-01-22 19:17:41 +01:00 committed by Johan Andrén
parent 6264f8ea70
commit e999d03e8b
11 changed files with 174 additions and 1 deletions

View file

@ -1075,6 +1075,18 @@ This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attrib
--------------------------------------------------------------- ---------------------------------------------------------------
### divertTo
Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
**emits** when the chosen output stops backpressuring and there is an input element available
**backpressures** when the chosen output backpressures
**completes** when upstream completes and no output is pending
---------------------------------------------------------------
<br/> <br/>
## Flow stages composed of Sinks and Sources ## Flow stages composed of Sinks and Sources

View file

@ -932,9 +932,22 @@ public class FlowTest extends StreamTest {
} }
@Test
public void mustSuitablyOverrideAttributeHandlingMethods() { public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused") @SuppressWarnings("unused")
final Flow<Integer, Integer, NotUsed> f = final Flow<Integer, Integer, NotUsed> f =
Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
} }
@Test
public void mustBeAbleToUseAlsoTo() {
final Flow<Integer, Integer, NotUsed> f = Flow.of(Integer.class).alsoTo(Sink.ignore());
final Flow<Integer, Integer, String> f2 = Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo");
}
@Test
public void mustBeAbleToUseDivertTo() {
final Flow<Integer, Integer, NotUsed> f = Flow.of(Integer.class).divertTo(Sink.ignore(), e -> true);
final Flow<Integer, Integer, String> f2 = Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo");
}
} }

View file

@ -860,4 +860,16 @@ public class SourceTest extends StreamTest {
assertEquals((Object) 0, result); assertEquals((Object) 0, result);
} }
@Test
public void mustBeAbleToUseAlsoTo() {
final Source<Integer, NotUsed> f = Source.<Integer>empty().alsoTo(Sink.ignore());
final Source<Integer, String> f2 = Source.<Integer>empty().alsoToMat(Sink.ignore(), (i, n) -> "foo");
}
@Test
public void mustBeAbleToUseDivertTo() {
final Source<Integer, NotUsed> f = Source.<Integer>empty().divertTo(Sink.ignore(), e -> true);
final Source<Integer, String> f2 = Source.<Integer>empty().divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo");
}
} }

View file

@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph") val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph", "divertToGraph")
val allowMissing: Map[Class[_], Set[String]] = Map( val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass graphHelpers, jFlowClass graphHelpers,
jSourceClass graphHelpers, jSourceClass graphHelpers,

View file

@ -177,4 +177,17 @@ class GraphPartitionSpec extends StreamSpec {
} }
} }
"divertTo must send matching elements to the sink" in assertAllStagesStopped {
val odd = TestSubscriber.probe[Int]()
val even = TestSubscriber.probe[Int]()
Source(1 to 2).divertTo(Sink.fromSubscriber(odd), _ % 2 != 0).to(Sink.fromSubscriber(even)).run()
even.request(1)
even.expectNoMsg(1.second)
odd.request(1)
odd.expectNext(1)
even.expectNext(2)
odd.expectComplete()
even.expectComplete()
}
} }

View file

@ -35,3 +35,8 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConve
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$RsProcessorConverter") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$RsProcessorConverter")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$FlowProcessorConverter$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$FlowProcessorConverter$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$FlowProcessorConverter") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConverters$Implicits$FlowProcessorConverter")
# divertTo
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.divertToMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertTo")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertToGraph")

View file

@ -1695,6 +1695,33 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) new Flow(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' when all downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.divertTo(that, when.test))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* @see [[#divertTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.divertToMat(that, when.test)(combinerToScala(matF)))
/** /**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,

View file

@ -751,6 +751,33 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.alsoToMat(that)(combinerToScala(matF))) new Source(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' when all downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): javadsl.Source[Out, Mat] =
new Source(delegate.divertTo(that, when.test))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* @see [[#divertTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.divertToMat(that, when.test)(combinerToScala(matF)))
/** /**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,

View file

@ -1150,6 +1150,21 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] = def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.alsoTo(that)) new SubFlow(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' when all downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.divertTo(that, when.test))
/** /**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.

View file

@ -1142,6 +1142,21 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] = def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.alsoTo(that)) new SubSource(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' when all downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.divertTo(that, when.test))
/** /**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.

View file

@ -2298,6 +2298,28 @@ trait FlowOps[+Out, +Mat] {
FlowShape(bcast.in, bcast.out(0)) FlowShape(bcast.in, bcast.out(0))
} }
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' when all downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: Out Boolean): Repr[Out] = via(divertToGraph(that, when))
protected def divertToGraph[M](that: Graph[SinkShape[Out], M], when: Out Boolean): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b r
import GraphDSL.Implicits._
val partition = b.add(Partition[Out](2, out if (when(out)) 1 else 0))
partition.out(1) ~> r
FlowShape(partition.in, partition.out(0))
}
def withAttributes(attr: Attributes): Repr[Out] def withAttributes(attr: Attributes): Repr[Out]
def addAttributes(attr: Attributes): Repr[Out] def addAttributes(attr: Attributes): Repr[Out]
@ -2539,6 +2561,18 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] = def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF) viaMat(alsoToGraph(that))(matF)
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* @see [[#divertTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def divertToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], when: Out Boolean)(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(divertToGraph(that, when))(matF)
/** /**
* Materializes to `Future[Done]` that completes on getting termination message. * Materializes to `Future[Done]` that completes on getting termination message.
* The Future completes with success when received complete message from upstream or cancel * The Future completes with success when received complete message from upstream or cancel