+str add zipWithIndex to FlowOps #21290

This commit is contained in:
Nafer Sanabria 2016-09-21 01:41:56 -05:00 committed by Johan Andrén
parent c1a840b2e9
commit 94d7237d17
11 changed files with 151 additions and 8 deletions

View file

@ -1655,6 +1655,29 @@ trait FlowOps[+Out, +Mat] {
FlowShape(zip.in0, zip.out)
}
/**
* Combine the elements of current flow into a stream of tuples consisting
* of all elements paired with their index. Indices start at 0.
*
* '''Emits when''' upstream emits an element and is paired with their index
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipWithIndex: Repr[(Out, Long)] = {
statefulMapConcat[(Out, Long)] { ()
var index: Long = 0L
elem {
val zipped = (elem, index)
index += 1
immutable.Iterable[(Out, Long)](zipped)
}
}
}
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that`