diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md
index dd0ef601f8..fe8881d034 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md
@@ -14,6 +14,17 @@ After completion of the original upstream the elements of the given source will
After completion of the original upstream the elements of the given source will be emitted.
+Both streams will be materialized together.
+
+@@@ note
+
+ The `concat` operator is for backwards compatibility reasons "detached" and will eagerly
+ demand an element from both upstreams when the stream is materialized and will then have a
+ one element buffer for each of the upstreams, this is most often not what you want, instead
+ use @ref(concatLazy)[concatLazy.md]
+
+@@@
+
## Example
Scala
: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concat }
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md
new file mode 100644
index 0000000000..6617381c12
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concatLazy.md
@@ -0,0 +1,40 @@
+# concatLazy
+
+After completion of the original upstream the elements of the given source will be emitted.
+
+@ref[Fan-in operators](../index.md#fan-in-operators)
+
+## Signature
+
+@apidoc[Source.concat](Source) { scala="#concatLazy[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#concatLazy(akka.stream.Graph)" }
+@apidoc[Flow.concat](Flow) { scala="#concatLazy[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#concatLazy(akka.stream.Graph)" }
+
+
+## Description
+
+After completion of the original upstream the elements of the given source will be emitted.
+
+Both streams will be materialized together, however, the given stream will be pulled for the first time only after the original upstream was completed. (In contrast, @ref(concat)[concat.md], introduces single-element buffers after both, original and given sources so that the given source is also pulled once immediately.)
+
+To defer the materialization of the given source (or to completely avoid its materialization if the original upstream fails or cancels), wrap it into @ref(Source.lazySource)[../Source/lazySource.md].
+
+If materialized values needs to be collected `concatLazyMat` is available.
+
+## Example
+Scala
+: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concatLazy }
+
+Java
+: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concatLazy }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the current stream has an element available; if the current input completes, it tries the next one
+
+**backpressures** when downstream backpressures
+
+**completes** when all upstreams complete
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md
index dab4264eb7..7d6b61c9ca 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md
@@ -14,8 +14,26 @@ Prepends the given source to the flow, consuming it until completion before the
Prepends the given source to the flow, consuming it until completion before the original source is consumed.
+@@@ note
+
+ The `prepend` operator is for backwards compatibility reasons "detached" and will eagerly
+ demand an element from both upstreams when the stream is materialized and will then have a
+ one element buffer for each of the upstreams, this is most often not what you want, instead
+ use @ref(prependLazy)[prependLazy.md]
+
+@@@
+
If materialized values needs to be collected `prependMat` is available.
+@@@ note
+
+The `prepend` operator is for backwards compatibility reasons "detached" and will eagerly
+demand an element from both upstreams when the stream is materialized and will then have a
+one element buffer for each of the upstreams, this is not always what you want, if not,
+use @ref(prependLazy)[prependLazy.md]
+
+@@@
+
## Example
Scala
: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prepend }
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md
new file mode 100644
index 0000000000..9b6ec504fb
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prependLazy.md
@@ -0,0 +1,40 @@
+# prependLazy
+
+Prepends the given source to the flow, consuming it until completion before the original source is consumed.
+
+@ref[Fan-in operators](../index.md#fan-in-operators)
+
+## Signature
+
+@apidoc[Source.prepend](Source) { scala="#prepend[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#prepend(akka.stream.Graph)" }
+@apidoc[Flow.prepend](Flow) { scala="#prepend[U>:Out,Mat2](that:akka.stream.Graph[akka.stream.SourceShape[U],Mat2]):FlowOps.this.Repr[U]" java="#prepend(akka.stream.Graph)" }
+
+
+## Description
+
+Prepends the given source to the flow, consuming it until completion before the original source is consumed.
+
+Both streams will be materialized together, however, the original stream will be pulled for the first time only after the prepended upstream was completed. (In contrast, @ref(prepend)[prepend.md], introduces single-element buffers after both, original and given sources so that the original source is also pulled once immediately.)
+
+If materialized values needs to be collected `prependLazyMat` is available.
+
+See also @ref[prepend](prepend.md) which is detached.
+
+## Example
+Scala
+: @@snip [FlowPrependSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prependLazy }
+
+Java
+: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #prependLazy }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the given stream has an element available; if the given input completes, it tries the current one
+
+**backpressures** when downstream backpressures
+
+**completes** when all upstreams complete
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 8a95918bbb..abc0fa1510 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -262,6 +262,7 @@ the inputs in different ways.
|--|--|--|
| |@ref[MergeSequence](MergeSequence.md)|Merge a linear sequence partitioned across multiple sources.|
|Source/Flow|@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
+|Source/Flow|@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
|Source/Flow|@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.|
|Source/Flow|@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.|
@@ -270,6 +271,7 @@ the inputs in different ways.
|Source/Flow|@ref[mergeSorted](Source-or-Flow/mergeSorted.md)|Merge multiple sources.|
|Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
+|Source/Flow|@ref[prependLazy](Source-or-Flow/prependLazy.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|Source/Flow|@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|@ref[zipAll](Source-or-Flow/zipAll.md)|Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source.|
|Source/Flow|@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.|
@@ -393,6 +395,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [completionStageSource](Source/completionStageSource.md)
* [completionTimeout](Source-or-Flow/completionTimeout.md)
* [concat](Source-or-Flow/concat.md)
+* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
* [cycle](Source/cycle.md)
@@ -504,6 +507,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md)
* [preMaterialize](Sink/preMaterialize.md)
* [prepend](Source-or-Flow/prepend.md)
+* [prependLazy](Source-or-Flow/prependLazy.md)
* [queue](Source/queue.md)
* [queue](Sink/queue.md)
* [range](Source/range.md)
diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index 7d9bd7af26..31e0e63ad6 100644
--- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -19,7 +19,9 @@ import akka.japi.function.Function2;
// #zip-with-index
// #or-else
// #prepend
+// #prependLazy
// #concat
+// #concatLazy
// #interleave
// #merge
// #merge-sorted
@@ -33,7 +35,9 @@ import java.util.*;
// #merge
// #interleave
// #concat
+// #concatLazy
// #prepend
+// #prependLazy
// #or-else
// #zip-with-index
// #zip-with
@@ -124,6 +128,16 @@ class SourceOrFlow {
// #prepend
}
+ void prependLazyExample() {
+ // #prepend
+ Source ladies = Source.from(Arrays.asList("Emma", "Emily"));
+ Source gentlemen = Source.from(Arrays.asList("Liam", "William"));
+ gentlemen.prependLazy(ladies).runWith(Sink.foreach(System.out::print), system);
+ // this will print "Emma", "Emily", "Liam", "William"
+
+ // #prepend
+ }
+
void concatExample() {
// #concat
Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
@@ -134,6 +148,16 @@ class SourceOrFlow {
// #concat
}
+ void concatLazyExample() {
+ // #concat
+ Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
+ Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
+ sourceA.concatLazy(sourceB).runWith(Sink.foreach(System.out::print), system);
+ // prints 1, 2, 3, 4, 10, 20, 30, 40
+
+ // #concat
+ }
+
void interleaveExample() {
// #interleave
Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala
index a55a625bdb..45970405d2 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala
@@ -4,27 +4,36 @@
package akka.stream.scaladsl
-import scala.concurrent.{ Await, Promise }
-import scala.concurrent.duration._
-
-import org.reactivestreams.Publisher
-
import akka.NotUsed
-import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber }
+import akka.stream.testkit.BaseTwoStreamsSetup
+import akka.stream.testkit.TestPublisher
+import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink
+import org.reactivestreams.Publisher
+import org.scalatest.concurrent.ScalaFutures
-class FlowConcatSpec extends BaseTwoStreamsSetup {
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.concurrent.Await
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+abstract class AbstractFlowConcatSpec extends BaseTwoStreamsSetup {
override type Outputs = Int
- override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
+ def eager: Boolean
+
+ // not used but we want the rest of the BaseTwoStreamsSetup infra
+ override def setup(p1: Publisher[Int], p2: Publisher[Int]): TestSubscriber.Probe[Int] = {
val subscriber = TestSubscriber.probe[Outputs]()
- Source.fromPublisher(p1).concat(Source.fromPublisher(p2)).runWith(Sink.fromSubscriber(subscriber))
+ val s1 = Source.fromPublisher(p1)
+ val s2 = Source.fromPublisher(p2)
+ (if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(Sink.fromSubscriber(subscriber))
subscriber
}
- "A Concat for Flow " must {
+ s"${if (eager) "An eager" else "A lazy"} Concat for Flow " must {
"be able to concat Flow with a Source" in {
val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s")
@@ -34,7 +43,8 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.asPublisher[Any](false)
- val (_, res) = f1.concat(s2).runWith(s1, subSink)
+ val (_, res) =
+ (if (eager) f1.concatLazy(s2) else f1.concat(s2)).runWith(s1, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
@@ -51,7 +61,8 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.asPublisher[Any](false)
- val (_, res) = f2.prepend(s1).runWith(s2, subSink)
+ val (_, res) =
+ (if (eager) f2.prepend(s1) else f2.prependLazy(s1)).runWith(s2, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
@@ -121,7 +132,10 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
"correctly handle async errors in secondary upstream" in assertAllStagesStopped {
val promise = Promise[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
- Source(List(1, 2, 3)).concat(Source.future(promise.future)).runWith(Sink.fromSubscriber(subscriber))
+ val s1 = Source(List(1, 2, 3))
+ val s2 = Source.future(promise.future)
+
+ (if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(Sink.fromSubscriber(subscriber))
val subscription = subscriber.expectSubscription()
subscription.request(4)
@@ -131,7 +145,9 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
"work with Source DSL" in {
- val testSource = Source(1 to 5).concatMat(Source(6 to 10))(Keep.both).grouped(1000)
+ val s1 = Source(1 to 5)
+ val s2 = Source(6 to 10)
+ val testSource = (if (eager) s1.concatMat(s2)(Keep.both) else s1.concatLazyMat(s2)(Keep.both)).grouped(1000)
Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = testSource.toMat(Sink.ignore)(Keep.left)
@@ -143,9 +159,11 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
"work with Flow DSL" in {
+ val s1 = Source(1 to 5)
+ val s2 = Source(6 to 10)
val testFlow: Flow[Int, Seq[Int], (NotUsed, NotUsed)] =
- Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000)
- Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
+ (if (eager) Flow[Int].concatMat(s2)(Keep.both) else Flow[Int].concatLazyMat(s2)(Keep.both)).grouped(1000)
+ Await.result(s1.viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore)
val x = runnable.run()
@@ -158,8 +176,12 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
"work with Flow DSL2" in {
- val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000)
- Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
+ val s1 = Source(1 to 5)
+ val s2 = Source(6 to 10)
+ val testFlow =
+ (if (eager) Flow[Int].concatMat(s2)(Keep.both)
+ else Flow[Int].concatLazyMat(s2)(Keep.both)).grouped(1000)
+ Await.result(s1.viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val sink = testFlow.concatMat(Source(1 to 5))(Keep.both).to(Sink.ignore).mapMaterializedValue[String] {
case ((m1, m2), m3) =>
@@ -174,8 +196,10 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
"subscribe at once to initial source and to one that it's concat to" in {
val publisher1 = TestPublisher.probe[Int]()
val publisher2 = TestPublisher.probe[Int]()
+ val s1 = Source.fromPublisher(publisher1)
+ val s2 = Source.fromPublisher(publisher2)
val probeSink =
- Source.fromPublisher(publisher1).concat(Source.fromPublisher(publisher2)).runWith(TestSink.probe[Int])
+ (if (eager) s1.concat(s2) else s1.concatLazy(s2)).runWith(TestSink.probe[Int])
val sub1 = publisher1.expectSubscription()
val sub2 = publisher2.expectSubscription()
@@ -193,11 +217,32 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
probeSink.expectComplete()
}
+ "optimize away empty concat" in {
+ val s1 = Source.single(1)
+ val concat = if (eager) s1.concat(Source.empty) else s1.concatLazy(Source.empty)
+ (concat should be).theSameInstanceAs(s1)
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1))
+ }
+ "optimize single elem concat" in {
+ val s1 = Source.single(1)
+ val s2 = Source.single(2)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ // avoids digging too deap into the traversal builder
+ concat.traversalBuilder.pendingBuilder.toString should include("SingleConcat(2)")
+
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+ }
+ }
+}
+
+class FlowConcatSpec extends AbstractFlowConcatSpec with ScalaFutures {
+ override def eager: Boolean = true
+
+ "concat" must {
"work in example" in {
//#concat
- import akka.stream.scaladsl.Sink
- import akka.stream.scaladsl.Source
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
@@ -208,3 +253,40 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
}
}
}
+
+class FlowConcatLazySpec extends AbstractFlowConcatSpec {
+ override def eager: Boolean = false
+
+ "concatLazy" must {
+ "Make it possible to entirely avoid materialization of the second flow" in {
+ val publisher = TestPublisher.probe[Int]()
+ val subscriber = TestSubscriber.probe[Int]()
+ val secondStreamWasMaterialized = new AtomicBoolean(false)
+ Source
+ .fromPublisher(publisher)
+ .concatLazy(Source.lazySource { () =>
+ secondStreamWasMaterialized.set(true)
+ Source.single(3)
+ })
+ .runWith(Sink.fromSubscriber(subscriber))
+ subscriber.request(1)
+ publisher.sendNext(1)
+ subscriber.expectNext(1)
+ subscriber.cancel()
+ publisher.expectCancellation()
+ // cancellation went all the way upstream across one async boundary so if second source materialization
+ // would happen it would have happened already
+ secondStreamWasMaterialized.get should ===(false)
+ }
+
+ "work in example" in {
+ //#concatLazy
+ val sourceA = Source(List(1, 2, 3, 4))
+ val sourceB = Source(List(10, 20, 30, 40))
+
+ sourceA.concatLazy(sourceB).runWith(Sink.foreach(println))
+ //#concatLazy
+ }
+ }
+
+}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala
index 5a0509990b..9a641813a9 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala
@@ -25,5 +25,15 @@ class FlowPrependSpec extends AkkaSpec {
// this will print "Emma", "Emily", "Liam", "William"
//#prepend
}
+
+ "work in lazy entrance example" in {
+ //#prependLazy
+ val ladies = Source(List("Emma", "Emily"))
+ val gentlemen = Source(List("Liam", "William"))
+
+ gentlemen.prependLazy(ladies).runWith(Sink.foreach(println))
+ // this will print "Emma", "Emily", "Liam", "William"
+ //#prependLazy
+ }
}
}
diff --git a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes
new file mode 100644
index 0000000000..d9684b46b0
--- /dev/null
+++ b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/23044-concat-prepend-improvements.excludes
@@ -0,0 +1,11 @@
+# internal API changes and stream operator additions
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Implicits#PortOpsImpl.concatGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Implicits#PortOpsImpl.prependGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.concatGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.prependGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.concatGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.prependGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.concatGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.prependGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SubFlowImpl.concatGraph")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SubFlowImpl.prependGraph")
\ No newline at end of file
diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala
new file mode 100644
index 0000000000..671327cc2d
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/impl/SingleConcat.scala
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2009-2021 Lightbend Inc.
+ */
+
+package akka.stream.impl
+
+import akka.annotation.InternalApi
+import akka.stream.Attributes
+import akka.stream.FlowShape
+import akka.stream.Inlet
+import akka.stream.Outlet
+import akka.stream.stage.GraphStage
+import akka.stream.stage.GraphStageLogic
+import akka.stream.stage.InHandler
+import akka.stream.stage.OutHandler
+
+/**
+ * Concatenating a single element to a stream is common enough that it warrants this optimization
+ * which avoids the actual fan-out for such cases.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final class SingleConcat[E](singleElem: E) extends GraphStage[FlowShape[E, E]] {
+
+ val in = Inlet[E]("SingleConcat.in")
+ val out = Outlet[E]("SingleConcat.out")
+
+ override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ override def onPush(): Unit = {
+ push(out, grab(in))
+ }
+
+ override def onPull(): Unit = pull(in)
+
+ override def onUpstreamFinish(): Unit = {
+ emit(out, singleElem, () => completeStage())
+ }
+ setHandlers(in, out, this)
+ }
+
+ override def toString: String = s"SingleConcat($singleElem)"
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 9c8d9b0cc2..008a75aca7 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -2355,10 +2355,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
- * Note that the [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@@ -2376,11 +2379,40 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
- * Note that the [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
+ * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
+ new Flow(delegate.concatLazy(that))
+
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazyMat]]
+ *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
@@ -2391,15 +2423,40 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF)))
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
+ * the operator can be combined with `Source.lazy` to defer materialization of `that`.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concatMat]]
+ *
+ * @see [[#concatLazy]].
+ *
+ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
+ * where appropriate instead of manually writing functions that pass through one of the values.
+ */
+ def concatLazyMat[M, M2](
+ that: Graph[SourceShape[Out], M],
+ matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
+ new Flow(delegate.concatMat(that)(combinerToScala(matF)))
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
- * Note that this Flow will be materialized together with the [[Source]] and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
@@ -2412,6 +2469,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.prepend(that))
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
+ * by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is also detached use [[#prepend]]
+ *
+ * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
+ *
+ * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' this [[Flow]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
+ new Flow(delegate.prepend(that))
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
@@ -2420,7 +2500,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
- * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
+ * When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
@@ -2432,6 +2512,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF)))
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow.
+ *
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is detached use [[#prependMat]]
+ *
+ * @see [[#prependLazy]].
+ *
+ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
+ * where appropriate instead of manually writing functions that pass through one of the values.
+ */
+ def prependLazyMat[M, M2](
+ that: Graph[SourceShape[Out], M],
+ matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
+ new Flow(delegate.prependLazyMat(that)(combinerToScala(matF)))
+
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala
index 685b88f9f4..fae4713823 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala
@@ -516,6 +516,12 @@ object Concat {
*/
def create[T](inputCount: Int): Graph[UniformFanInShape[T, T], NotUsed] = scaladsl.Concat[T](inputCount)
+ /**
+ * Create a new anonymous `Concat` operator with the specified input types.
+ */
+ def create[T](inputCount: Int, detachedInputs: Boolean): Graph[UniformFanInShape[T, T], NotUsed] =
+ scaladsl.Concat[T](inputCount, detachedInputs)
+
/**
* Create a new anonymous `Concat` operator with the specified input types.
*/
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 0c5bb6dc9a..50af2d1f03 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -1156,10 +1156,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* is exhausted and all result elements have been generated,
* the given source elements will be produced.
*
- * Note that given [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
@@ -1172,15 +1175,44 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concat(that))
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
+ new Source(delegate.concatLazy(that))
+
/**
* Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated,
* the given source elements will be produced.
*
- * Note that given [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
@@ -1192,15 +1224,40 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.concatMat(that)(combinerToScala(matF)))
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
+ * the operator can be combined with `Source.lazy` to defer materialization of `that`.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concatMat]]
+ *
+ * @see [[#concatLazy]].
+ *
+ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
+ * where appropriate instead of manually writing functions that pass through one of the values.
+ */
+ def concatLazyMat[M, M2](
+ that: Graph[SourceShape[Out], M],
+ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
+ new Source(delegate.concatLazyMat(that)(combinerToScala(matF)))
+
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
- * Note that the current [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
@@ -1213,15 +1270,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.prepend(that))
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
+ * by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is also detached use [[#prepend]]
+ *
+ * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
+ *
+ * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' this [[Flow]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
+ new Source(delegate.prependLazy(that))
+
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
- * Note that the current [[Source]] is materialized together with this Flow and just kept
+ * Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
- * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
+ * When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
@@ -1233,6 +1313,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.prependMat(that)(combinerToScala(matF)))
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow.
+ *
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is detached use [[#prependMat]]
+ *
+ * @see [[#prependLazy]].
+ *
+ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
+ * where appropriate instead of manually writing functions that pass through one of the values.
+ */
+ def prependLazyMat[M, M2](
+ that: Graph[SourceShape[Out], M],
+ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
+ new Source(delegate.prependLazyMat(that)(combinerToScala(matF)))
+
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index 8073ae48f9..3ccf927d90 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -1494,10 +1494,13 @@ class SubFlow[In, Out, Mat](
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
- * Note that the [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@@ -1510,13 +1513,65 @@ class SubFlow[In, Out, Mat](
def concat[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.concat(that))
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def concatLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.concatLazy(that))
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
- * Note that this Flow will be materialized together with the [[Source]] and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
+ *
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is not detached use [[#prependLazy]]
+ *
+ * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' this [[Flow]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.prepend(that))
+
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
+ * by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
@@ -1528,7 +1583,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
- def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
+ def prependLazy[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.prepend(that))
/**
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index 270a8944c7..c8743974cc 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -1470,10 +1470,13 @@ class SubSource[Out, Mat](
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
- * Note that the [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
@@ -1486,15 +1489,44 @@ class SubSource[Out, Mat](
def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concat(that))
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
+ new SubSource(delegate.concatLazy(that))
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
- * Note that this Flow will be materialized together with the [[Source]] and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
@@ -1507,6 +1539,29 @@ class SubSource[Out, Mat](
def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.prepend(that))
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
+ * by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is also detached use [[#prepend]]
+ *
+ * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
+ *
+ * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' this [[Flow]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def prependLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
+ new SubSource(delegate.prependLazy(that))
+
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 403bde7647..93c38029d8 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -18,6 +18,7 @@ import akka.annotation.DoNotInherit
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream.Attributes.SourceLocation
import akka.stream._
+import akka.stream.impl.SingleConcat
import akka.stream.impl.{
fusing,
LinearTraversalBuilder,
@@ -31,6 +32,7 @@ import akka.stream.impl.{
import akka.stream.impl.fusing._
import akka.stream.impl.fusing.FlattenMerge
import akka.stream.stage._
+import akka.util.OptionVal
import akka.util.{ ConstantFun, Timeout }
import akka.util.ccompat._
@@ -2991,8 +2993,13 @@ trait FlowOps[+Out, +Mat] {
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
- * Note that the [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazy]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
@@ -3005,23 +3012,97 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels
*/
def concat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
- via(concatGraph(that))
+ internalConcat(that, detached = true)
protected def concatGraph[U >: Out, Mat2](
- that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
+ that: Graph[SourceShape[U], Mat2],
+ detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b => r =>
- val merge = b.add(Concat[U]())
+ val merge = b.add(Concat[U](2, detached))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
}
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
+ * the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
+ * time when this source completes.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concat]]
+ *
+ * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ *
+ * '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def concatLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
+ internalConcat(that, detached = false)
+
+ private def internalConcat[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], detached: Boolean): Repr[U] =
+ that match {
+ case source if source eq Source.empty => this.asInstanceOf[Repr[U]]
+ case other =>
+ TraversalBuilder.getSingleSource(other) match {
+ case OptionVal.Some(singleSource) =>
+ via(new SingleConcat(singleSource.elem.asInstanceOf[U]))
+ case _ => via(concatGraph(other, detached))
+ }
+ }
+
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
- * Note that this Flow will be materialized together with the [[Source]] and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
+ *
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is not detached use [[#prependLazy]]
+ *
+ *
+ * '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' this [[Flow]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
+ via(prependGraph(that, detached = true))
+
+ protected def prependGraph[U >: Out, Mat2](
+ that: Graph[SourceShape[U], Mat2],
+ detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
+ GraphDSL.create(that) { implicit b => r =>
+ val merge = b.add(Concat[U](2, detached))
+ r ~> merge.in(0)
+ FlowShape(merge.in(1), merge.out)
+ }
+
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
+ * by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
@@ -3033,16 +3114,8 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
- def prepend[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
- via(prependGraph(that))
-
- protected def prependGraph[U >: Out, Mat2](
- that: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
- GraphDSL.create(that) { implicit b => r =>
- val merge = b.add(Concat[U]())
- r ~> merge.in(0)
- FlowShape(merge.in(1), merge.out)
- }
+ def prependLazy[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2]): Repr[U] =
+ via(prependGraph(that, detached = false))
/**
* Provides a secondary source that will be consumed if this stream completes without any
@@ -3456,10 +3529,13 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
- * Note that the [[Source]] is materialized together with this Flow and just kept
- * from producing elements by asserting back-pressure until its time comes.
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
- * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* @see [[#concat]].
*
@@ -3467,7 +3543,28 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
- viaMat(concatGraph(that))(matF)
+ viaMat(concatGraph(that, detached = true))(matF)
+
+ /**
+ * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
+ * Flow’s input is exhausted and all result elements have been generated,
+ * the Source’s elements will be produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
+ * the operator can be combined with `Source.lazy` to defer materialization of `that`.
+ *
+ * The second source is then kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * For a concat operator that is detached, use [[#concatMat]]
+ *
+ * @see [[#concatLazy]].
+ *
+ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
+ * where appropriate instead of manually writing functions that pass through one of the values.
+ */
+ def concatLazyMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(
+ matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
+ viaMat(concatGraph(that, detached = false))(matF)
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
@@ -3479,13 +3576,37 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
+ * When needing a concat operator that is not detached use [[#prependLazyMat]]
+ *
* @see [[#prepend]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
- viaMat(prependGraph(that))(matF)
+ viaMat(prependGraph(that, detached = true))(matF)
+
+ /**
+ * Prepend the given [[Source]] to this [[Flow]], meaning that before elements
+ * are generated from this Flow, the Source's elements will be produced until it
+ * is exhausted, at which point Flow elements will start being produced.
+ *
+ * Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
+ * in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
+ * (so it can not be combined with `Source.lazy` to defer materialization of `that`).
+ *
+ * This flow will then be kept from producing elements by asserting back-pressure until its time comes.
+ *
+ * When needing a prepend operator that is not detached use [[#prependLazyMat]]
+ *
+ * @see [[#prependLazy]].
+ *
+ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
+ * where appropriate instead of manually writing functions that pass through one of the values.
+ */
+ def prependLazyMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(
+ matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3] =
+ viaMat(prependGraph(that, detached = true))(matF)
/**
* Provides a secondary source that will be consumed if this stream completes without any
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
index e0ced0761b..fa1b69b9bb 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
@@ -1244,11 +1244,34 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U
object Concat {
+ // two streams is so common that we can re-use a single instance to avoid some allocations
+ private val _concatTwo = new Concat[Any](2)
+ private def concatTwo[T]: GraphStage[UniformFanInShape[T, T]] =
+ _concatTwo.asInstanceOf[GraphStage[UniformFanInShape[T, T]]]
+
/**
- * Create a new `Concat`.
+ * Create a new `Concat`. Note that this for historical reasons creates a "detached" Concat which
+ * will eagerly pull each input on materialization and act as a one element buffer for each input.
*/
def apply[T](inputPorts: Int = 2): Graph[UniformFanInShape[T, T], NotUsed] =
- GraphStages.withDetachedInputs(new Concat[T](inputPorts))
+ apply(inputPorts, detachedInputs = true)
+
+ /**
+ * Create a new `Concat` operator that will concatenate two or more streams.
+ * @param inputPorts The number of fan-in input ports
+ * @param detachedInputs If the ports should be detached (eagerly pull both inputs) useful to avoid deadlocks in graphs with loops
+ * @return
+ */
+ def apply[T](inputPorts: Int, detachedInputs: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = {
+ val concat = {
+ if (inputPorts == 2) concatTwo[T]
+ else new Concat[T](inputPorts)
+ }
+
+ if (detachedInputs) GraphStages.withDetachedInputs(concat)
+ else concat
+ }
+
}
/**