Lazy and fast concat and prepend (#30252)

This commit is contained in:
Johan Andrén 2021-05-27 17:18:47 +02:00 committed by GitHub
parent cbb12e6ef3
commit 4ade8ef2d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 825 additions and 77 deletions

View file

@ -14,6 +14,17 @@ After completion of the original upstream the elements of the given source will
After completion of the original upstream the elements of the given source will be emitted.
Both streams will be materialized together.
@@@ note
The `concat` operator is for backwards compatibility reasons "detached" and will eagerly
demand an element from both upstreams when the stream is materialized and will then have a
one element buffer for each of the upstreams, this is most often not what you want, instead
use @ref(concatLazy)[concatLazy.md]
@@@
## Example
Scala
: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concat }

View file

@ -0,0 +1,40 @@
# concatLazy
After completion of the original upstream the elements of the given source will be emitted.
@ref[Fan-in operators](../index.md#fan-in-operators)
## Signature
@apidoc[Source.concat](Source) { scala="#concatLazy[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#concatLazy(akka.stream.Graph)" }
@apidoc[Flow.concat](Flow) { scala="#concatLazy[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#concatLazy(akka.stream.Graph)" }
## Description
After completion of the original upstream the elements of the given source will be emitted.
Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref(concat)[concat.md], introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.)
To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref(Source.lazySource)[../Source/lazySource.md].
If materialized values needs to be collected `concatLazyMat` is available.
## Example
Scala
: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concatLazy }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concatLazy }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the current stream has an element available; if the current input completes, it tries the next one
**backpressures** when downstream backpressures
**completes** when all upstreams complete
@@@

View file

@ -14,8 +14,26 @@ Prepends the given source to the flow, consuming it until completion before the
Prepends the given source to the flow, consuming it until completion before the original source is consumed.
@@@ note
The `prepend` operator is for backwards compatibility reasons "detached" and will eagerly
demand an element from both upstreams when the stream is materialized and will then have a
one element buffer for each of the upstreams, this is most often not what you want, instead
use @ref(prependLazy)[prependLazy.md]
@@@
If materialized values needs to be collected `prependMat` is available.
@@@ note
The `prepend` operator is for backwards compatibility reasons "detached" and will eagerly
demand an element from both upstreams when the stream is materialized and will then have a
one element buffer for each of the upstreams, this is not always what you want, if not,
use @ref(prependLazy)[prependLazy.md]
@@@
## Example
Scala
: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prepend }

View file

@ -0,0 +1,40 @@
# prependLazy
Prepends the given source to the flow, consuming it until completion before the original source is consumed.
@ref[Fan-in operators](../index.md#fan-in-operators)
## Signature
@apidoc[Source.prepend](Source) { scala="#prepend[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#prepend(akka.stream.Graph)" }
@apidoc[Flow.prepend](Flow) { scala="#prepend[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#prepend(akka.stream.Graph)" }
## Description
Prepends the given source to the flow, consuming it until completion before the original source is consumed.
Both streams will be materialized together, however, the original stream will be pulled for the first time only after the prepended upstream was completed. (In contrast, @ref(prepend)[prepend.md], introduces single-element buffers after both, original and given sources so that the original source is also pulled once immediately.)
If materialized values needs to be collected `prependLazyMat` is available.
See also @ref[prepend](prepend.md) which is detached.
## Example
Scala
: @@snip [FlowPrependSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prependLazy }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #prependLazy }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the given stream has an element available; if the given input completes, it tries the current one
**backpressures** when downstream backpressures
**completes** when all upstreams complete
@@@

View file

@ -262,6 +262,7 @@ the inputs in different ways.
|--|--|--|
| |<a name="mergesequence"></a>@ref[MergeSequence](MergeSequence.md)|Merge a linear sequence partitioned across multiple sources.|
|Source/Flow|<a name="concat"></a>@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|<a name="concatlazy"></a>@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|<a name="interleave"></a>@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|Source/Flow|<a name="merge"></a>@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.|
|Source/Flow|<a name="mergelatest"></a>@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.|
@ -270,6 +271,7 @@ the inputs in different ways.
|Source/Flow|<a name="mergesorted"></a>@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.|
|Source/Flow|<a name="orelse"></a>@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|Source/Flow|<a name="prepend"></a>@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|Source/Flow|<a name="prependlazy"></a>@ref[prependLazy](Source-or-Flow/prependLazy.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|Source/Flow|<a name="zip"></a>@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|<a name="zipall"></a>@ref[zipAll](Source-or-Flow/zipAll.md)|Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source.|
|Source/Flow|<a name="ziplatest"></a>@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.|
@ -393,6 +395,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [completionStageSource](Source/completionStageSource.md)
* [completionTimeout](Source-or-Flow/completionTimeout.md)
* [concat](Source-or-Flow/concat.md)
* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
* [cycle](Source/cycle.md)
@ -504,6 +507,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md)
* [preMaterialize](Sink/preMaterialize.md)
* [prepend](Source-or-Flow/prepend.md)
* [prependLazy](Source-or-Flow/prependLazy.md)
* [queue](Source/queue.md)
* [queue](Sink/queue.md)
* [range](Source/range.md)

View file

@ -19,7 +19,9 @@ import akka.japi.function.Function2;
// #zip-with-index
// #or-else
// #prepend
// #prependLazy
// #concat
// #concatLazy
// #interleave
// #merge
// #merge-sorted
@ -33,7 +35,9 @@ import java.util.*;
// #merge
// #interleave
// #concat
// #concatLazy
// #prepend
// #prependLazy
// #or-else
// #zip-with-index
// #zip-with
@ -124,6 +128,16 @@ class SourceOrFlow {
// #prepend
}
void prependLazyExample() {
// #prepend
Source<String, NotUsed> ladies = Source.from(Arrays.asList("Emma", "Emily"));
Source<String, NotUsed> gentlemen = Source.from(Arrays.asList("Liam", "William"));
gentlemen.prependLazy(ladies).runWith(Sink.foreach(System.out::print), system);
// this will print "Emma", "Emily", "Liam", "William"
// #prepend
}
void concatExample() {
// #concat
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
@ -134,6 +148,16 @@ class SourceOrFlow {
// #concat
}
void concatLazyExample() {
// #concat
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.concatLazy(sourceB).runWith(Sink.foreach(System.out::print), system);
// prints 1, 2, 3, 4, 10, 20, 30, 40
// #concat
}
void interleaveExample() {
// #interleave
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));

View file

@ -4,27 +4,36 @@
package akka.stream.scaladsl
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import org.reactivestreams.Publisher
import akka.NotUsed
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber }
import akka.stream.testkit.BaseTwoStreamsSetup
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink
import org.reactivestreams.Publisher
import org.scalatest.concurrent.ScalaFutures
class FlowConcatSpec extends BaseTwoStreamsSetup {
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration._
abstract class AbstractFlowConcatSpec extends BaseTwoStreamsSetup {
override type Outputs = Int
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
def eager: Boolean
// not used but we want the rest of the BaseTwoStreamsSetup infra
override def setup(p1: Publisher[Int], p2: Publisher[Int]): TestSubscriber.Probe[Int] = {
val subscriber = TestSubscriber.probe[Outputs]()
Source.fromPublisher(p1).concat(Source.fromPublisher(p2)).runWith(Sink.fromSubscriber(subscriber))
val s1 = Source.fromPublisher(p1)
val s2 = Source.fromPublisher(p2)
(if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(Sink.fromSubscriber(subscriber))
subscriber
}
"A Concat for Flow " must {
s"${if (eager) "An eager" else "A lazy"} Concat for Flow " must {
"be able to concat Flow with a Source" in {
val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s")
@ -34,7 +43,8 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.asPublisher[Any](false)
val (_, res) = f1.concat(s2).runWith(s1, subSink)
val (_, res) =
(if (eager) f1.concatLazy(s2) else f1.concat(s2)).runWith(s1, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
@ -51,7 +61,8 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.asPublisher[Any](false)
val (_, res) = f2.prepend(s1).runWith(s2, subSink)
val (_, res) =
(if (eager) f2.prepend(s1) else f2.prependLazy(s1)).runWith(s2, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
@ -121,7 +132,10 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
"correctly handle async errors in secondary upstream" in assertAllStagesStopped {
val promise = Promise[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source(List(1, 2, 3)).concat(Source.future(promise.future)).runWith(Sink.fromSubscriber(subscriber))
val s1 = Source(List(1, 2, 3))
val s2 = Source.future(promise.future)
(if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(Sink.fromSubscriber(subscriber))
val subscription = subscriber.expectSubscription()
subscription.request(4)
@ -131,7 +145,9 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
"work with Source DSL" in {
val testSource = Source(1 to 5).concatMat(Source(6 to 10))(Keep.both).grouped(1000)
val s1 = Source(1 to 5)
val s2 = Source(6 to 10)
val testSource = (if (eager) s1.concatMat(s2)(Keep.both) else s1.concatLazyMat(s2)(Keep.both)).grouped(1000)
Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = testSource.toMat(Sink.ignore)(Keep.left)
@ -143,9 +159,11 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
"work with Flow DSL" in {
val s1 = Source(1 to 5)
val s2 = Source(6 to 10)
val testFlow: Flow[Int, Seq[Int], (NotUsed, NotUsed)] =
Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
(if (eager) Flow[Int].concatMat(s2)(Keep.both) else Flow[Int].concatLazyMat(s2)(Keep.both)).grouped(1000)
Await.result(s1.viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore)
val x = runnable.run()
@ -158,8 +176,12 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
"work with Flow DSL2" in {
val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val s1 = Source(1 to 5)
val s2 = Source(6 to 10)
val testFlow =
(if (eager) Flow[Int].concatMat(s2)(Keep.both)
else Flow[Int].concatLazyMat(s2)(Keep.both)).grouped(1000)
Await.result(s1.viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val sink = testFlow.concatMat(Source(1 to 5))(Keep.both).to(Sink.ignore).mapMaterializedValue[String] {
case ((m1, m2), m3) =>
@ -174,8 +196,10 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
"subscribe at once to initial source and to one that it's concat to" in {
val publisher1 = TestPublisher.probe[Int]()
val publisher2 = TestPublisher.probe[Int]()
val s1 = Source.fromPublisher(publisher1)
val s2 = Source.fromPublisher(publisher2)
val probeSink =
Source.fromPublisher(publisher1).concat(Source.fromPublisher(publisher2)).runWith(TestSink.probe[Int])
(if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(TestSink.probe[Int])
val sub1 = publisher1.expectSubscription()
val sub2 = publisher2.expectSubscription()
@ -193,11 +217,32 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
probeSink.expectComplete()
}
"optimize away empty concat" in {
val s1 = Source.single(1)
val concat = if (eager) s1.concat(Source.empty) else s1.concatLazy(Source.empty)
(concat should be).theSameInstanceAs(s1)
concat.runWith(Sink.seq).futureValue should ===(Seq(1))
}
"optimize single elem concat" in {
val s1 = Source.single(1)
val s2 = Source.single(2)
val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
// avoids digging too deap into the traversal builder
concat.traversalBuilder.pendingBuilder.toString should include("SingleConcat(2)")
concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
}
}
}
class FlowConcatSpec extends AbstractFlowConcatSpec with ScalaFutures {
override def eager: Boolean = true
"concat" must {
"work in example" in {
//#concat
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
@ -208,3 +253,40 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
}
}
class FlowConcatLazySpec extends AbstractFlowConcatSpec {
override def eager: Boolean = false
"concatLazy" must {
"Make it possible to entirely avoid materialization of the second flow" in {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()
val secondStreamWasMaterialized = new AtomicBoolean(false)
Source
.fromPublisher(publisher)
.concatLazy(Source.lazySource { () =>
secondStreamWasMaterialized.set(true)
Source.single(3)
})
.runWith(Sink.fromSubscriber(subscriber))
subscriber.request(1)
publisher.sendNext(1)
subscriber.expectNext(1)
subscriber.cancel()
publisher.expectCancellation()
// cancellation went all the way upstream across one async boundary so if second source materialization
// would happen it would have happened already
secondStreamWasMaterialized.get should ===(false)
}
"work in example" in {
//#concatLazy
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
sourceA.concatLazy(sourceB).runWith(Sink.foreach(println))
//#concatLazy
}
}
}

View file

@ -25,5 +25,15 @@ class FlowPrependSpec extends AkkaSpec {
// this will print "Emma", "Emily", "Liam", "William"
//#prepend
}
"work in lazy entrance example" in {
//#prependLazy
val ladies = Source(List("Emma", "Emily"))
val gentlemen = Source(List("Liam", "William"))
gentlemen.prependLazy(ladies).runWith(Sink.foreach(println))
// this will print "Emma", "Emily", "Liam", "William"
//#prependLazy
}
}
}

View file

@ -0,0 +1,11 @@
# internal API changes and stream operator additions
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Implicits#PortOpsImpl.concatGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Implicits#PortOpsImpl.prependGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.concatGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.prependGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.concatGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.prependGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.concatGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.prependGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SubFlowImpl.concatGraph")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SubFlowImpl.prependGraph")

View file

@ -0,0 +1,46 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
/**
* Concatenating a single element to a stream is common enough that it warrants this optimization
* which avoids the actual fan-out for such cases.
*
* INTERNAL API
*/
@InternalApi
private[akka] final class SingleConcat[E](singleElem: E) extends GraphStage[FlowShape[E, E]] {
val in = Inlet[E]("SingleConcat.in")
val out = Outlet[E]("SingleConcat.out")
override val shape: FlowShape[E, E] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
push(out, grab(in))
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
emit(out, singleElem, () => completeStage())
}
setHandlers(in, out, this)
}
override def toString: String = s"SingleConcat($singleElem)"
}

View file

@ -2355,10 +2355,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@ -2376,11 +2379,40 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
@ -2391,15 +2423,40 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
* the operator can be combined with `Source.lazy` to defer materialization of `that`.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concatMat]]
*
* @see [[#concatLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
@ -2412,6 +2469,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
@ -2420,7 +2500,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
* When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
@ -2432,6 +2512,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow.
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is detached use [[#prependMat]]
*
* @see [[#prependLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.prependLazyMat(that)(combinerToScala(matF)))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative

View file

@ -516,6 +516,12 @@ object Concat {
*/
def create[T](inputCount: Int): Graph[UniformFanInShape[T, T], NotUsed] = scaladsl.Concat[T](inputCount)
/**
* Create a new anonymous `Concat` operator with the specified input types.
*/
def create[T](inputCount: Int, detachedInputs: Boolean): Graph[UniformFanInShape[T, T], NotUsed] =
scaladsl.Concat[T](inputCount, detachedInputs)
/**
* Create a new anonymous `Concat` operator with the specified input types.
*/

View file

@ -1156,10 +1156,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* is exhausted and all result elements have been generated,
* the given source elements will be produced.
*
* Note that given [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
@ -1172,15 +1175,44 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concat(that))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concatLazy(that))
/**
* Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated,
* the given source elements will be produced.
*
* Note that given [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
@ -1192,15 +1224,40 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
* the operator can be combined with `Source.lazy` to defer materialization of `that`.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concatMat]]
*
* @see [[#concatLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.concatLazyMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
* Note that the current [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
@ -1213,15 +1270,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.prependLazy(that))
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
* Note that the current [[Source]] is materialized together with this Flow and just kept
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
* When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
@ -1233,6 +1313,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.prependMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow.
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is detached use [[#prependMat]]
*
* @see [[#prependLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.prependLazyMat(that)(combinerToScala(matF)))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative

View file

@ -1494,10 +1494,13 @@ class SubFlow[In, Out, Mat](
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@ -1510,13 +1513,65 @@ class SubFlow[In, Out, Mat](
def concat[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.concat(that))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.concatLazy(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
@ -1528,7 +1583,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
def prependLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.prepend(that))
/**

View file

@ -1470,10 +1470,13 @@ class SubSource[Out, Mat](
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@ -1486,15 +1489,44 @@ class SubSource[Out, Mat](
def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concat(that))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concatLazy(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
@ -1507,6 +1539,29 @@ class SubSource[Out, Mat](
def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prependLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.prependLazy(that))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative

View file

@ -18,6 +18,7 @@ import akka.annotation.DoNotInherit
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream.Attributes.SourceLocation
import akka.stream._
import akka.stream.impl.SingleConcat
import akka.stream.impl.{
fusing,
LinearTraversalBuilder,
@ -31,6 +32,7 @@ import akka.stream.impl.{
import akka.stream.impl.fusing._
import akka.stream.impl.fusing.FlattenMerge
import akka.stream.stage._
import akka.util.OptionVal
import akka.util.{ ConstantFun, Timeout }
import akka.util.ccompat._
@ -2991,8 +2993,13 @@ trait FlowOps[+Out, +Mat] {
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazy]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
@ -3005,23 +3012,97 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels
*/
def concat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
via(concatGraph(that))
internalConcat(that, detached = true)
protected def concatGraph[U >: Out, Mat2](
that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
that: Graph[SourceShape[U], Mat2],
detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b => r =>
val merge = b.add(Concat[U]())
val merge = b.add(Concat[U](2, detached))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
}
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
internalConcat(that, detached = false)
private def internalConcat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], detached: Boolean): Repr[U] =
that match {
case source if source eq Source.empty => this.asInstanceOf[Repr[U]]
case other =>
TraversalBuilder.getSingleSource(other) match {
case OptionVal.Some(singleSource) =>
via(new SingleConcat(singleSource.elem.asInstanceOf[U]))
case _ => via(concatGraph(other, detached))
}
}
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
via(prependGraph(that, detached = true))
protected def prependGraph[U >: Out, Mat2](
that: Graph[SourceShape[U], Mat2],
detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b => r =>
val merge = b.add(Concat[U](2, detached))
r ~> merge.in(0)
FlowShape(merge.in(1), merge.out)
}
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
@ -3033,16 +3114,8 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
via(prependGraph(that))
protected def prependGraph[U >: Out, Mat2](
that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b => r =>
val merge = b.add(Concat[U]())
r ~> merge.in(0)
FlowShape(merge.in(1), merge.out)
}
def prependLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
via(prependGraph(that, detached = false))
/**
* Provides a secondary source that will be consumed if this stream completes without any
@ -3456,10 +3529,13 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* @see [[#concat]].
*
@ -3467,7 +3543,28 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
viaMat(concatGraph(that))(matF)
viaMat(concatGraph(that, detached = true))(matF)
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
* the operator can be combined with `Source.lazy` to defer materialization of `that`.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concatMat]]
*
* @see [[#concatLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatLazyMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(
matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
viaMat(concatGraph(that, detached = false))(matF)
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
@ -3479,13 +3576,37 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* When needing a concat operator that is not detached use [[#prependLazyMat]]
*
* @see [[#prepend]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
viaMat(prependGraph(that))(matF)
viaMat(prependGraph(that, detached = true))(matF)
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* @see [[#prependLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependLazyMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(
matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
viaMat(prependGraph(that, detached = true))(matF)
/**
* Provides a secondary source that will be consumed if this stream completes without any

View file

@ -1244,11 +1244,34 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U
object Concat {
// two streams is so common that we can re-use a single instance to avoid some allocations
private val _concatTwo = new Concat[Any](2)
private def concatTwo[T]: GraphStage[UniformFanInShape[T, T]] =
_concatTwo.asInstanceOf[GraphStage[UniformFanInShape[T, T]]]
/**
* Create a new `Concat`.
* Create a new `Concat`. Note that this for historical reasons creates a "detached" Concat which
* will eagerly pull each input on materialization and act as a one element buffer for each input.
*/
def apply[T](inputPorts: Int = 2): Graph[UniformFanInShape[T, T], NotUsed] =
GraphStages.withDetachedInputs(new Concat[T](inputPorts))
apply(inputPorts, detachedInputs = true)
/**
* Create a new `Concat` operator that will concatenate two or more streams.
* @param inputPorts The number of fan-in input ports
* @param detachedInputs If the ports should be detached (eagerly pull both inputs) useful to avoid deadlocks in graphs with loops
* @return
*/
def apply[T](inputPorts: Int, detachedInputs: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = {
val concat = {
if (inputPorts == 2) concatTwo[T]
else new Concat[T](inputPorts)
}
if (detachedInputs) GraphStages.withDetachedInputs(concat)
else concat
}
}
/**