diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 8ca7108735..393f14d8e3 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -126,6 +126,7 @@ mergePreferred one of the inputs has an element available, preferring a zip all of the inputs have an element available downstream backpressures any upstream completes zipWith all of the inputs have an element available downstream backpressures any upstream completes concat the current stream has an element available; if the current input completes, it tries the next one downstream backpressures all upstreams complete +prepend the given stream has an element available; if the given input completes, it tries the current one downstream backpressures all upstreams complete ===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== (*) This behavior is changeable to completing when any upstream completes by setting ``eagerClose=true``. diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index a2cdf5e950..61c7eef48a 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -408,6 +408,25 @@ public class FlowTest extends StreamTest { assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); } + @Test + public void mustBeAbleToUsePrepend() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + final Flow flow = Flow.of(String.class); + in2.via(flow.prepend(in1)).runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + List output = Arrays.asList(probe.receiveN(6)); + assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); + } + @Test public void mustBeAbleToUsePrefixAndTail() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index b934c21be8..82543d17d9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -249,6 +249,25 @@ public class SourceTest extends StreamTest { assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); } + @Test + public void mustBeAbleToUsePrepend() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + + in2.prepend(in1).runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + List output = Arrays.asList(probe.receiveN(6)); + assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); + } + @Test public void mustBeAbleToUseCallableInput() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 51843fef7e..18d0f05b03 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "alsoToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass -> graphHelpers, jSourceClass -> graphHelpers, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index 8b33351d74..5427f41462 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -40,6 +40,23 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { subs.expectComplete() } + "be able to prepend a Source to a Flow" in { + val s1: Source[String, _] = Source(List(1, 2, 3)).map(_.toString + "-s") + val s2: Source[Int, _] = Source(List(4, 5, 6)) + val f2: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s") + + val subs = TestSubscriber.manualProbe[Any]() + val subSink = Sink.asPublisher[Any](false) + + val (_, res) = f2.prepend(s1).runWith(s2, subSink) + + res.subscribe(subs) + val sub = subs.expectSubscription() + sub.request(9) + (1 to 6).foreach(e ⇒ subs.expectNext(e.toString + "-s")) + subs.expectComplete() + } + commonTests() "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index a8709764d6..1cef680167 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1008,6 +1008,42 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.concatMat(that)(combinerToScala(matF))) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.prepend(that)) + + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * @see [[#prepend]]. + */ + def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.prependMat(that)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bf1940772c..61f2269b42 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -446,6 +446,43 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.concatMat(that)(combinerToScala(matF))) + /** + * Prepend the given [[Source]] to this one, meaning that once the given source + * is exhausted and all result elements have been generated, the current source's + * elements will be produced. + * + * Note that the current [[Source]] is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled. + * + * '''Emits when''' element is available from current source or from the given [[Source]] when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' given [[Source]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + new Source(delegate.prepend(that)) + + /** + * Prepend the given [[Source]] to this one, meaning that once the given source + * is exhausted and all result elements have been generated, the current source's + * elements will be produced. + * + * Note that the current [[Source]] is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled. + * + * @see [[#prepend]]. + */ + def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.prependMat(that)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index ddba19b89d..eb6d3ad1ba 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -714,6 +714,27 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] = new SubFlow(delegate.concat(that)) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] = + new SubFlow(delegate.prepend(that)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 26c16ca7c4..6616918796 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -712,6 +712,27 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] = new SubSource(delegate.concat(that)) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] = + new SubSource(delegate.prepend(that)) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 975129f056..3af32daa15 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1434,6 +1434,35 @@ trait FlowOps[+Out, +Mat] { FlowShape(merge.in(0), merge.out) } + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' this [[Flow]] completes + * + * '''Cancels when''' downstream cancels + */ + def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] = + via(prependGraph(that)) + + protected def prependGraph[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] = + GraphDSL.create(that) { implicit b ⇒ + r ⇒ + val merge = b.add(Concat[U]()) + r ~> merge.in(0) + FlowShape(merge.in(1), merge.out) + } + /** * Concatenates this [[Flow]] with the given [[Source]] so the first element * emitted by that source is emitted after the last element of this @@ -1607,6 +1636,21 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = viaMat(concatGraph(that))(matF) + /** + * Prepend the given [[Source]] to this [[Flow]], meaning that before elements + * are generated from this Flow, the Source's elements will be produced until it + * is exhausted, at which point Flow elements will start being produced. + * + * Note that this Flow will be materialized together with the [[Source]] and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. + * + * @see [[#prepend]]. + */ + def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = + viaMat(prependGraph(that))(matF) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * through will also be sent to the [[Sink]].