+str #19041 deterministic interleave operation
This commit is contained in:
parent
fe3c34ed00
commit
b9faf9d628
4 changed files with 74 additions and 17 deletions
|
|
@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
|
||||||
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
|
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
|
||||||
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
|
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
|
||||||
|
|
||||||
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "concatGraph", "alsoToGraph")
|
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "interleaveGraph", "concatGraph", "alsoToGraph")
|
||||||
val allowMissing: Map[Class[_], Set[String]] = Map(
|
val allowMissing: Map[Class[_], Set[String]] = Map(
|
||||||
jFlowClass -> graphHelpers,
|
jFlowClass -> graphHelpers,
|
||||||
jSourceClass -> graphHelpers,
|
jSourceClass -> graphHelpers,
|
||||||
|
|
|
||||||
|
|
@ -698,6 +698,32 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
||||||
def merge[T >: Out](that: Graph[SourceShape[T], _]): SubFlow[In, T, Mat] =
|
def merge[T >: Out](that: Graph[SourceShape[T], _]): SubFlow[In, T, Mat] =
|
||||||
new SubFlow(delegate.merge(that))
|
new SubFlow(delegate.merge(that))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||||
|
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that`
|
||||||
|
* source, then repeat process.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
* {{{
|
||||||
|
* Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7
|
||||||
|
* }}}
|
||||||
|
*
|
||||||
|
* After one of upstreams is complete than all the rest elements will be emitted from the second one
|
||||||
|
*
|
||||||
|
* If it gets error from one of upstreams - stream completes with failure.
|
||||||
|
*
|
||||||
|
* '''Emits when''' element is available from the currently consumed upstream
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures. Signal to current
|
||||||
|
* upstream, switch to next upstream when received `segmentSize` elements
|
||||||
|
*
|
||||||
|
* '''Completes when''' the [[Flow]] and given [[Source]] completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*/
|
||||||
|
def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubFlow[In, T, Mat] =
|
||||||
|
new SubFlow(delegate.interleave(that, segmentSize))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
|
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -696,6 +696,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
||||||
def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] =
|
def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] =
|
||||||
new SubSource(delegate.merge(that))
|
new SubSource(delegate.merge(that))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
|
||||||
|
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||||
|
* then repeat process.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
* {{{
|
||||||
|
* Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
|
||||||
|
* // 1, 2, 4, 5, 3, 6, 7
|
||||||
|
* }}}
|
||||||
|
*
|
||||||
|
* After one of sources is complete than all the rest elements will be emitted from the second one
|
||||||
|
*
|
||||||
|
* If one of sources gets upstream error - stream completes with failure.
|
||||||
|
*
|
||||||
|
* '''Emits when''' element is available from the currently consumed upstream
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures. Signal to current
|
||||||
|
* upstream, switch to next upstream when received `segmentSize` elements
|
||||||
|
*
|
||||||
|
* '''Completes when''' this [[Source]] and given one completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*/
|
||||||
|
def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] =
|
||||||
|
new SubSource(delegate.interleave(that, segmentSize))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
|
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -1323,28 +1323,18 @@ trait FlowOps[+Out, +Mat] {
|
||||||
*
|
*
|
||||||
* '''Cancels when''' downstream cancels
|
* '''Cancels when''' downstream cancels
|
||||||
*/
|
*/
|
||||||
def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], segmentSize: Int): Repr[U, Mat] =
|
def interleave[U >: Out](that: Graph[SourceShape[U], _], segmentSize: Int): Repr[U] =
|
||||||
interleaveMat(that, segmentSize)(Keep.left)
|
via(interleaveGraph(that, segmentSize))
|
||||||
|
|
||||||
/**
|
protected def interleaveGraph[U >: Out, M](that: Graph[SourceShape[U], M],
|
||||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
segmentSize: Int): Graph[FlowShape[Out @uncheckedVariance, U], M] =
|
||||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
GraphDSL.create(that) { implicit b ⇒
|
||||||
* then repeat process.
|
|
||||||
*
|
|
||||||
* After one of upstreams is complete than all the rest elements will be emitted from the second one
|
|
||||||
*
|
|
||||||
* If it gets error from one of upstreams - stream completes with failure.
|
|
||||||
*
|
|
||||||
* @see [[#interleave]].
|
|
||||||
*/
|
|
||||||
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], segmentSize: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] =
|
|
||||||
this.viaMat(GraphDSL.create(that) { implicit b ⇒
|
|
||||||
r ⇒
|
r ⇒
|
||||||
import GraphDSL.Implicits._
|
import GraphDSL.Implicits._
|
||||||
val interleave = b.add(Interleave[U](2, segmentSize))
|
val interleave = b.add(Interleave[U](2, segmentSize))
|
||||||
r ~> interleave.in(1)
|
r ~> interleave.in(1)
|
||||||
FlowShape(interleave.in(0), interleave.out)
|
FlowShape(interleave.in(0), interleave.out)
|
||||||
})(matF)
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||||
|
|
@ -1532,6 +1522,20 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
||||||
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
|
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
|
||||||
viaMat(mergeGraph(that))(matF)
|
viaMat(mergeGraph(that))(matF)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||||
|
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||||
|
* then repeat process.
|
||||||
|
*
|
||||||
|
* After one of upstreams is complete than all the rest elements will be emitted from the second one
|
||||||
|
*
|
||||||
|
* If it gets error from one of upstreams - stream completes with failure.
|
||||||
|
*
|
||||||
|
* @see [[#interleave]].
|
||||||
|
*/
|
||||||
|
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
|
||||||
|
viaMat(interleaveGraph(that, request))(matF)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
|
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
|
||||||
* Flow’s input is exhausted and all result elements have been generated,
|
* Flow’s input is exhausted and all result elements have been generated,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue