diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleaveAll.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleaveAll.md
new file mode 100644
index 0000000000..a96c8879d4
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleaveAll.md
@@ -0,0 +1,36 @@
+# interleaveAll
+
+Emits a specifiable number of elements from the original source, then from the provided sources and repeats.
+
+@ref[Fan-in operators](../index.md#fan-in-operators)
+
+## Signature
+
+@apidoc[Source.interleaveAll](Source) { scala="#interleaveAll[U>:Out](that:List[akka.stream.Graph[akka.stream.SourceShape[U],_]],segmentSize:Int,eagerClose:Boolean):FlowOps.this.Repr[U]" java="#interleaveAll(java.util.List[akka.stream.Graph],int,boolean)" }
+@apidoc[Flow.interleaveAll](Flow) { scala="#interleaveAll[U>:Out](that:List[akka.stream.Graph[akka.stream.SourceShape[U],_]],segmentSize:Int,eagerClose:Boolean):FlowOps.this.Repr[U]" java="#interleaveAll(java.util.List[akka.stream.Graph],int,boolean)" }
+
+
+## Description
+
+Emits a specifiable number of elements from the original source, then from the provided sources and repeats.
+If one source completes the rest of the other stream will be emitted when `eagerClose` is false, otherwise
+the flow is complete.
+
+## Example
+Scala
+: @@snip [FlowInterleaveSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveAllSpec.scala) { #interleaveAll }
+
+Java
+: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #interleaveAll }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when element is available from the currently consumed upstream
+
+**backpressures** when upstream backpressures
+
+**completes** when all upstreams have completed if `eagerClose` is false, or any upstream completes if `eagerClose` is true.
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 5deca6aa36..d8d893e138 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -270,6 +270,7 @@ the inputs in different ways.
|Source/Flow|@ref[concat](Source-or-Flow/concat.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|@ref[concatLazy](Source-or-Flow/concatLazy.md)|After completion of the original upstream the elements of the given source will be emitted.|
|Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.|
+|Source/Flow|@ref[interleaveAll](Source-or-Flow/interleaveAll.md)|Emits a specifiable number of elements from the original source, then from the provided sources and repeats.|
|Source/Flow|@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.|
|Source/Flow|@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.|
|Source/Flow|@ref[mergePreferred](Source-or-Flow/mergePreferred.md)|Merge multiple sources.|
@@ -477,6 +478,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [initialDelay](Source-or-Flow/initialDelay.md)
* [initialTimeout](Source-or-Flow/initialTimeout.md)
* [interleave](Source-or-Flow/interleave.md)
+* [interleaveAll](Source-or-Flow/interleaveAll.md)
* [intersperse](Source-or-Flow/intersperse.md)
* [javaCollector](StreamConverters/javaCollector.md)
* [javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)
diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index c7624530e8..8a4fde1e74 100644
--- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -23,6 +23,7 @@ import akka.japi.function.Function2;
// #concat
// #concatLazy
// #interleave
+// #interleaveAll
// #merge
// #merge-sorted
import akka.stream.javadsl.Keep;
@@ -34,6 +35,7 @@ import java.util.*;
// #merge-sorted
// #merge
// #interleave
+// #interleaveAll
// #concat
// #concatLazy
// #prepend
@@ -165,7 +167,19 @@ class SourceOrFlow {
// #interleave
}
-
+
+ void interleaveAllExample() {
+ // #interleaveAll
+ Source sourceA = Source.from(Arrays.asList(1, 2, 7, 8));
+ Source sourceB = Source.from(Arrays.asList(3, 4, 9));
+ Source sourceC = Source.from(Arrays.asList(5, 6));
+ sourceA.interleaveAll(Arrays.asList(sourceB, sourceC), 2, false)
+ .fold(new StringJoiner(","),(joiner, input) -> joiner.add(String.valueOf(input)))
+ .runForeach(System.out::println, system);
+ //prints 1,2,3,4,5,6,7,8,9
+ // #interleaveAll
+ }
+
void mergeExample() {
// #merge
Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java
index 727feaf6ea..9b34cfe792 100644
--- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java
@@ -77,15 +77,14 @@ public class SinkTest extends StreamTest {
CompletionStage> result = Source.from(list).runWith(collectorSink, system);
assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
}
-
+
@Test
public void mustBeAbleToUseCollectorOnSink() throws Exception {
- //#collect-to-list
+ // #collect-to-list
final List list = Arrays.asList(1, 2, 3);
CompletionStage> result =
- Source.from(list)
- .runWith(Sink.collect(Collectors.toList()), system);
- //#collect-to-list
+ Source.from(list).runWith(Sink.collect(Collectors.toList()), system);
+ // #collect-to-list
assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
}
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
index f6526aed71..2a20fed8b5 100644
--- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
@@ -21,6 +21,8 @@ import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.testkit.TestPublisher;
+import akka.stream.testkit.TestSubscriber;
+import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import akka.testkit.javadsl.TestKit;
@@ -777,6 +779,19 @@ public class SourceTest extends StreamTest {
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
+ @Test
+ public void mustBeAbleToUseInterleaveAll() {
+ Source sourceA = Source.from(Arrays.asList(1, 2, 7, 8));
+ Source sourceB = Source.from(Arrays.asList(3, 4, 9));
+ Source sourceC = Source.from(Arrays.asList(5, 6));
+ final TestSubscriber.Probe sub =
+ sourceA
+ .interleaveAll(Arrays.asList(sourceB, sourceC), 2, false)
+ .runWith(TestSink.probe(system), system);
+ sub.expectSubscription().request(9);
+ sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete();
+ }
+
@Test
public void mustBeAbleToUseDropWhile() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveAllSpec.scala
new file mode 100644
index 0000000000..63057a625d
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveAllSpec.scala
@@ -0,0 +1,117 @@
+/*
+ * Copyright (C) 2015-2022 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.stream.testkit._
+import akka.stream.testkit.scaladsl.StreamTestKit._
+import akka.stream.testkit.scaladsl.TestSink
+
+import java.util.StringJoiner
+
+class FlowInterleaveAllSpec extends StreamSpec("""
+ akka.stream.materializer.initial-input-buffer-size = 2
+ akka.stream.materializer.max-input-buffer-size = 2
+ """) {
+
+ "An InterleaveAll for Flow " must {
+
+ "work in the happy case" in assertAllStagesStopped {
+ val sub = Source(List(1, 2, 7))
+ .interleaveAll(List(Source(List(3, 4, 8)), Source(List(5, 6, 9, 10))), 2, eagerClose = false)
+ .runWith(TestSink.probe[Int])
+ sub.expectSubscription().request(10)
+ sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).expectComplete()
+ }
+
+ "work with none other sources" in {
+ val sub = Source(List(1, 2, 3)).interleaveAll(Nil, 2, eagerClose = false).runWith(TestSink.probe[Int])
+ sub.expectSubscription().request(3)
+ sub.expectNext(1, 2, 3).expectComplete()
+ }
+
+ "work with empty other source" in {
+ val sub =
+ Source(List(1, 2, 3)).interleaveAll(List(Source.empty), 2, eagerClose = false).runWith(TestSink.probe[Int])
+ sub.expectSubscription().request(3)
+ sub.expectNext(1, 2, 3).expectComplete()
+ }
+
+ "eagerClose = true, first stream closed" in assertAllStagesStopped {
+ val sub = Source(List(1, 2, 7))
+ .interleaveAll(List(Source(List(3, 4, 8)), Source(List(5, 6, 9, 10))), 2, eagerClose = true)
+ .runWith(TestSink.probe[Int])
+ sub.expectSubscription().request(10)
+ sub.expectNext(1, 2, 3, 4, 5, 6, 7).expectComplete()
+ }
+
+ "eagerClose = true, other stream closed" in assertAllStagesStopped {
+ val probe = TestSubscriber.manualProbe[Int]()
+
+ val pub1 = TestPublisher.probe[Int]()
+ val pub2 = TestPublisher.probe[Int]()
+ val pub3 = TestPublisher.probe[Int]()
+
+ Source
+ .fromPublisher(pub1)
+ .interleaveAll(List(Source.fromPublisher(pub2), Source.fromPublisher(pub3)), 2, eagerClose = true)
+ .runWith(Sink.fromSubscriber(probe))
+
+ probe.expectSubscription().request(10)
+
+ // just to make it extra clear that it eagerly pulls all inputs
+ pub1.expectRequest()
+ pub2.expectRequest()
+ pub3.expectRequest()
+
+ pub1.sendNext(0)
+ pub2.sendNext(10)
+ pub3.sendNext(20)
+
+ pub1.expectRequest()
+ pub1.sendNext(1)
+
+ pub2.expectRequest()
+ pub2.sendNext(11)
+ pub2.sendComplete()
+
+ probe.expectNext(0, 1, 10, 11, 20)
+ probe.expectComplete()
+
+ pub1.expectCancellation()
+ pub3.expectCancellation()
+ }
+
+ "pass along early cancellation" in assertAllStagesStopped {
+ val pub1 = TestPublisher.manualProbe[Int]()
+ val pub2 = TestPublisher.manualProbe[Int]()
+ val pub3 = TestPublisher.manualProbe[Int]()
+ val sub1 = TestSubscriber.manualProbe[Int]()
+
+ Source
+ .fromPublisher(pub1)
+ .interleaveAll(List(Source.fromPublisher(pub2), Source.fromPublisher(pub3)), 2, eagerClose = false)
+ .runWith(Sink.fromSubscriber(sub1))
+ sub1.expectSubscription().cancel()
+ pub1.expectSubscription().expectCancellation()
+ pub2.expectSubscription().expectCancellation()
+ pub3.expectSubscription().expectCancellation()
+ }
+
+ "work in example" in {
+ //#interleaveAll
+ val sourceA = Source(List(1, 2, 7, 8))
+ val sourceB = Source(List(3, 4, 9))
+ val sourceC = Source(List(5, 6))
+
+ sourceA
+ .interleaveAll(List(sourceB, sourceC), 2, eagerClose = false)
+ .fold(new StringJoiner(","))((joiner, input) => joiner.add(String.valueOf(input)))
+ .runWith(Sink.foreach(println))
+ //prints 1,2,3,4,5,6,7,8,9
+ //#interleaveAll
+ }
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 67c9c5f708..183707ad44 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -9,16 +9,13 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.function.Supplier
-
import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-
import org.reactivestreams.Processor
-
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
@@ -36,6 +33,8 @@ import akka.util.JavaDurationConverters._
import akka.util.Timeout
import akka.util.unused
+import scala.collection.immutable
+
object Flow {
/** Create a `Flow` which can process elements of type `T`. */
@@ -2860,6 +2859,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF)))
+ /**
+ * Interleave is a deterministic merge of the given [[Source]]s with elements of this [[Flow]].
+ * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
+ * then repeat process.
+ *
+ * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
+ * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
+ * other upstream and complete itself.
+ *
+ * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
+ *
+ * '''Emits when''' element is available from the currently consumed upstream
+ *
+ * '''Backpressures when''' downstream backpressures. Signal to current
+ * upstream, switch to next upstream when received `segmentSize` elements
+ *
+ * '''Completes when''' the [[Flow]] and given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def interleaveAll(
+ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ segmentSize: Int,
+ eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
+ val seq = if (those != null) Util.immutableSeq(those).collect {
+ case source: Source[Out @unchecked, _] => source.asScala
+ case other => other
+ } else immutable.Seq()
+ new Flow(delegate.interleaveAll(seq, segmentSize, eagerClose))
+ }
+
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index c5f6a25c0c..973d29ec72 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -1609,6 +1609,37 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF)))
+ /**
+ * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
+ * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
+ * then repeat process.
+ *
+ * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
+ * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
+ * other upstream and complete itself.
+ *
+ * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
+ *
+ * '''Emits when''' element is available from the currently consumed upstream
+ *
+ * '''Backpressures when''' downstream backpressures. Signal to current
+ * upstream, switch to next upstream when received `segmentSize` elements
+ *
+ * '''Completes when''' the [[Flow]] and given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def interleaveAll(
+ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ segmentSize: Int,
+ eagerClose: Boolean): javadsl.Source[Out, Mat] = {
+ val seq = if (those != null) Util.immutableSeq(those).collect {
+ case source: Source[Out @unchecked, _] => source.asScala
+ case other => other
+ } else immutable.Seq()
+ new Source(delegate.interleaveAll(seq, segmentSize, eagerClose))
+ }
+
/**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index 3fed0a9439..a13d8b960f 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -7,14 +7,12 @@ package akka.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-
import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
@@ -24,6 +22,8 @@ import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
+import scala.collection.immutable
+
object SubFlow {
/**
@@ -1762,6 +1762,37 @@ class SubFlow[In, Out, Mat](
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubFlow[In, Out, Mat] =
new SubFlow(delegate.interleave(that, segmentSize))
+ /**
+ * Interleave is a deterministic merge of the given [[Source]]s with elements of this [[Flow]].
+ * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that`
+ * source, then repeat process.
+ *
+ * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
+ * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
+ * other upstream and complete itself.
+ *
+ * If it gets error from one of upstreams - stream completes with failure.
+ *
+ * '''Emits when''' element is available from the currently consumed upstream
+ *
+ * '''Backpressures when''' downstream backpressures. Signal to current
+ * upstream, switch to next upstream when received `segmentSize` elements
+ *
+ * '''Completes when''' the [[Flow]] and given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def interleaveAll(
+ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ segmentSize: Int,
+ eagerClose: Boolean): SubFlow[In, Out, Mat] = {
+ val seq = if (those != null) Util.immutableSeq(those).collect {
+ case source: Source[Out @unchecked, _] => source.asScala
+ case other => other
+ } else immutable.Seq()
+ new SubFlow(delegate.interleaveAll(seq, segmentSize, eagerClose))
+ }
+
/**
* MergeLatest joins elements from N input streams into stream of lists of size N.
* i-th element in list is the latest emitted element from i-th input stream.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index d5387f6f55..5c2b100608 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -7,14 +7,12 @@ package akka.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-
import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
@@ -24,6 +22,8 @@ import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
+import scala.collection.immutable
+
/**
* * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with
* fan-in operators where you do not want to pay the cost of casting each element in a `map`.
@@ -1739,6 +1739,37 @@ class SubSource[Out, Mat](
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] =
new SubSource(delegate.interleave(that, segmentSize))
+ /**
+ * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
+ * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
+ * then repeat process.
+ *
+ * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
+ * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
+ * other upstream and complete itself.
+ *
+ * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
+ *
+ * '''Emits when''' element is available from the currently consumed upstream
+ *
+ * '''Backpressures when''' downstream backpressures. Signal to current
+ * upstream, switch to next upstream when received `segmentSize` elements
+ *
+ * '''Completes when''' the [[Flow]] and given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def interleaveAll(
+ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ segmentSize: Int,
+ eagerClose: Boolean): SubSource[Out, Mat] = {
+ val seq = if (those != null) Util.immutableSeq(those).collect {
+ case source: Source[Out @unchecked, _] => source.asScala
+ case other => other
+ } else immutable.Seq()
+ new SubSource(delegate.interleaveAll(seq, segmentSize, eagerClose))
+ }
+
/**
* MergeLatest joins elements from N input streams into stream of lists of size N.
* i-th element in list is the latest emitted element from i-th input stream.
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index e72cf027af..61258c7aea 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -2937,6 +2937,41 @@ trait FlowOps[+Out, +Mat] {
FlowShape(interleave.in(0), interleave.out)
}
+ /**
+ * Interleave is a deterministic merge of the given [[Source]]s with elements of this [[Flow]].
+ * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that`
+ * source, then repeat process.
+ *
+ * If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
+ * through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
+ * other upstream and complete itself.
+ *
+ * If it gets error from one of upstreams - stream completes with failure.
+ *
+ * '''Emits when''' element is available from the currently consumed upstream
+ *
+ * '''Backpressures when''' downstream backpressures. Signal to current
+ * upstream, switch to next upstream when received `segmentSize` elements
+ *
+ * '''Completes when''' the [[Flow]] and given [[Source]] completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def interleaveAll[U >: Out](
+ those: immutable.Seq[Graph[SourceShape[U], _]],
+ segmentSize: Int,
+ eagerClose: Boolean): Repr[U] = those match {
+ case those if those.isEmpty => this.asInstanceOf[Repr[U]]
+ case _ =>
+ via(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val interleave = b.add(Interleave[U](those.size + 1, segmentSize, eagerClose))
+ for ((that, idx) <- those.zipWithIndex)
+ that ~> interleave.in(idx + 1)
+ FlowShape(interleave.in(0), interleave.out)
+ })
+ }
+
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
@@ -3091,7 +3126,7 @@ trait FlowOps[+Out, +Mat] {
that: Graph[SourceShape[U], Mat2],
detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.createGraph(that) { implicit b => r =>
- val merge = b.add(Concat[U](2, detached))
+ val merge = b.add(akka.stream.scaladsl.Concat[U](2, detached))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
}