+str #19069 Add FlowOp.prepend for prepending Sources to Flows

This commit is contained in:
Iain Monro 2015-12-07 12:16:59 +00:00 committed by Roland Kuhn
parent dcfa56e547
commit 52655f2836
10 changed files with 216 additions and 1 deletions

View file

@ -126,6 +126,7 @@ mergePreferred one of the inputs has an element available, preferring a
zip all of the inputs have an element available downstream backpressures any upstream completes zip all of the inputs have an element available downstream backpressures any upstream completes
zipWith all of the inputs have an element available downstream backpressures any upstream completes zipWith all of the inputs have an element available downstream backpressures any upstream completes
concat the current stream has an element available; if the current input completes, it tries the next one downstream backpressures all upstreams complete concat the current stream has an element available; if the current input completes, it tries the next one downstream backpressures all upstreams complete
prepend the given stream has an element available; if the given input completes, it tries the current one downstream backpressures all upstreams complete
===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== ===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
(*) This behavior is changeable to completing when any upstream completes by setting ``eagerClose=true``. (*) This behavior is changeable to completing when any upstream completes by setting ``eagerClose=true``.

View file

@ -408,6 +408,25 @@ public class FlowTest extends StreamTest {
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
} }
@Test
public void mustBeAbleToUsePrepend() {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
final Source<String, ?> in1 = Source.from(input1);
final Source<String, ?> in2 = Source.from(input2);
final Flow<String, String, ?> flow = Flow.of(String.class);
in2.via(flow.prepend(in1)).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
List<Object> output = Arrays.asList(probe.receiveN(6));
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
}
@Test @Test
public void mustBeAbleToUsePrefixAndTail() throws Exception { public void mustBeAbleToUsePrefixAndTail() throws Exception {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -249,6 +249,25 @@ public class SourceTest extends StreamTest {
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
} }
@Test
public void mustBeAbleToUsePrepend() {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
final Source<String, ?> in1 = Source.from(input1);
final Source<String, ?> in2 = Source.from(input2);
in2.prepend(in1).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
List<Object> output = Arrays.asList(probe.receiveN(6));
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
}
@Test @Test
public void mustBeAbleToUseCallableInput() { public void mustBeAbleToUseCallableInput() {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "alsoToGraph") val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph")
val allowMissing: Map[Class[_], Set[String]] = Map( val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass -> graphHelpers, jFlowClass -> graphHelpers,
jSourceClass -> graphHelpers, jSourceClass -> graphHelpers,

View file

@ -40,6 +40,23 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
subs.expectComplete() subs.expectComplete()
} }
"be able to prepend a Source to a Flow" in {
val s1: Source[String, _] = Source(List(1, 2, 3)).map(_.toString + "-s")
val s2: Source[Int, _] = Source(List(4, 5, 6))
val f2: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s")
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.asPublisher[Any](false)
val (_, res) = f2.prepend(s1).runWith(s2, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
sub.request(9)
(1 to 6).foreach(e subs.expectNext(e.toString + "-s"))
subs.expectComplete()
}
commonTests() commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {

View file

@ -1008,6 +1008,42 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF))) new Flow(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* @see [[#prepend]].
*/
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF)))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].

View file

@ -446,6 +446,43 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.concatMat(that)(combinerToScala(matF))) new Source(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
* Note that the current [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] =
new Source(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
* Note that the current [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
*
* @see [[#prepend]].
*/
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.prependMat(that)(combinerToScala(matF)))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].

View file

@ -714,6 +714,27 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] = def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] =
new SubFlow(delegate.concat(that)) new SubFlow(delegate.concat(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] =
new SubFlow(delegate.prepend(that))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].

View file

@ -712,6 +712,27 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] = def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] =
new SubSource(delegate.concat(that)) new SubSource(delegate.concat(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] =
new SubSource(delegate.prepend(that))
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].

View file

@ -1434,6 +1434,35 @@ trait FlowOps[+Out, +Mat] {
FlowShape(merge.in(0), merge.out) FlowShape(merge.in(0), merge.out)
} }
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
via(prependGraph(that))
protected def prependGraph[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b
r
val merge = b.add(Concat[U]())
r ~> merge.in(0)
FlowShape(merge.in(1), merge.out)
}
/** /**
* Concatenates this [[Flow]] with the given [[Source]] so the first element * Concatenates this [[Flow]] with the given [[Source]] so the first element
* emitted by that source is emitted after the last element of this * emitted by that source is emitted after the last element of this
@ -1607,6 +1636,21 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] = def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(concatGraph(that))(matF) viaMat(concatGraph(that))(matF)
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* @see [[#prepend]].
*/
def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(prependGraph(that))(matF)
/** /**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].