Merge pull request #19140 from agolubev/agolubev-#19041-determenistic-interleave

+str #19041 deterministic `interleave` operation
This commit is contained in:
Roland Kuhn 2015-12-14 07:26:54 +01:00
commit fda9c5d1b8
9 changed files with 425 additions and 5 deletions

View file

@ -741,6 +741,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] =
new SubSource(delegate.merge(that))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* Example:
* {{{
* Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
* // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After one of sources is complete than all the rest elements will be emitted from the second one
*
* If one of sources gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' this [[Source]] and given one completes
*
* '''Cancels when''' downstream cancels
*/
def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] =
new SubSource(delegate.interleave(that, segmentSize))
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
*