+str #19041 deterministic interleave operation
This commit is contained in:
parent
737e7b8dfc
commit
fe3c34ed00
5 changed files with 56 additions and 50 deletions
|
|
@ -22,11 +22,11 @@ class FlowMergeSpec extends BaseTwoStreamsSetup {
|
|||
"work in the happy case" in assertAllStagesStopped {
|
||||
// Different input sizes (4 and 6)
|
||||
val source1 = Source(0 to 3)
|
||||
val source2 = Source(4 to 9)
|
||||
val source3 = Source(List[Int]())
|
||||
val source2 = Source(List[Int]())
|
||||
val source3 = Source(4 to 9)
|
||||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
source1.merge(source3).merge(source2)
|
||||
source1.merge(source2).merge(source3)
|
||||
.map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe))
|
||||
|
||||
val subscription = probe.expectSubscription()
|
||||
|
|
|
|||
|
|
@ -1005,9 +1005,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
new Flow(delegate.alsoToMat(that)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
|
||||
* then repeat process from the beginning.
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
* then repeat process.
|
||||
*
|
||||
* Example:
|
||||
* {{{
|
||||
|
|
@ -1016,35 +1016,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* src.via(flow) // 1, 2, 4, 5, 3, 6, 7
|
||||
* }}}
|
||||
*
|
||||
* After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one
|
||||
* After one of upstreams is complete than all the rest elements will be emitted from the second one
|
||||
*
|
||||
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
|
||||
*
|
||||
* '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled
|
||||
* '''Emits when''' element is available from the currently consumed upstream
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
* '''Backpressures when''' downstream backpressures. Signal to current
|
||||
* upstream, switch to next upstream when received `segmentSize` elements
|
||||
*
|
||||
* '''Completes when''' the [[Flow]] and given [[Source]] completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.interleave(that, result))
|
||||
def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.interleave(that, segmentSize))
|
||||
|
||||
/**
|
||||
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
|
||||
* then repeat process from the beginning.
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
* then repeat process.
|
||||
*
|
||||
* After [[Flow]] or [[Source]] is compete than all the rest elements will be emitted from the second one
|
||||
* After one of upstreams is compete than all the rest elements will be emitted from the second one
|
||||
*
|
||||
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
|
||||
*
|
||||
* @see [[#interleave]].
|
||||
*/
|
||||
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int,
|
||||
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int,
|
||||
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
|
||||
new Flow(delegate.interleaveMat(that, result)(combinerToScala(matF)))
|
||||
new Flow(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||
|
|
|
|||
|
|
@ -499,9 +499,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
new Source(delegate.alsoToMat(that)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]].
|
||||
* It takes `request` number of elements from this source to emit downstream, then - same amount for given one,
|
||||
* then repeat process from the beginning.
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
* then repeat process.
|
||||
*
|
||||
* Example:
|
||||
* {{{
|
||||
|
|
@ -513,21 +513,22 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
*
|
||||
* If one of sources gets upstream error - stream completes with failure.
|
||||
*
|
||||
* '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled
|
||||
* '''Emits when''' element is available from the currently consumed upstream
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
* '''Backpressures when''' downstream backpressures. Signal to current
|
||||
* upstream, switch to next upstream when received `segmentSize` elements
|
||||
*
|
||||
* '''Completes when''' this [[Source]] and given one completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.interleave(that, result))
|
||||
def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.interleave(that, segmentSize))
|
||||
|
||||
/**
|
||||
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]].
|
||||
* It takes `request` number of elements from this source to emit downstream, then - same amount for given source,
|
||||
* then repeat process from the beginning.
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
* then repeat process.
|
||||
*
|
||||
* After one of sources is complete than all the rest elements will be emitted from the second one
|
||||
*
|
||||
|
|
@ -535,9 +536,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
*
|
||||
* @see [[#interleave]].
|
||||
*/
|
||||
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int,
|
||||
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int,
|
||||
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
|
||||
new Source(delegate.interleaveMat(that, result)(combinerToScala(matF)))
|
||||
new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
|
||||
|
|
|
|||
|
|
@ -1301,46 +1301,47 @@ trait FlowOps[+Out, +Mat] {
|
|||
}
|
||||
|
||||
/**
|
||||
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
|
||||
* then repeat process from the beginning.
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that`
|
||||
* source, then repeat process.
|
||||
*
|
||||
* Example:
|
||||
* {{{
|
||||
* Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7
|
||||
* }}}
|
||||
*
|
||||
* After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one
|
||||
* After one of upstreams is complete than all the rest elements will be emitted from the second one
|
||||
*
|
||||
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
|
||||
* If it gets error from one of upstreams - stream completes with failure.
|
||||
*
|
||||
* '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled
|
||||
* '''Emits when''' element is available from the currently consumed upstream
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
* '''Backpressures when''' downstream backpressures. Signal to current
|
||||
* upstream, switch to next upstream when received `segmentSize` elements
|
||||
*
|
||||
* '''Completes when''' the [[Flow]] and given [[Source]] completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], request: Int): Repr[U, Mat] =
|
||||
interleaveMat(that, request)(Keep.left)
|
||||
def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], segmentSize: Int): Repr[U, Mat] =
|
||||
interleaveMat(that, segmentSize)(Keep.left)
|
||||
|
||||
/**
|
||||
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
|
||||
* then repeat process from the beginning.
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
|
||||
* then repeat process.
|
||||
*
|
||||
* After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one
|
||||
* After one of upstreams is complete than all the rest elements will be emitted from the second one
|
||||
*
|
||||
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
|
||||
* If it gets error from one of upstreams - stream completes with failure.
|
||||
*
|
||||
* @see [[#interleave]].
|
||||
*/
|
||||
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] =
|
||||
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], segmentSize: Int)(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] =
|
||||
this.viaMat(GraphDSL.create(that) { implicit b ⇒
|
||||
r ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val interleave = b.add(Interleave[U](2, request))
|
||||
val interleave = b.add(Interleave[U](2, segmentSize))
|
||||
r ~> interleave.in(1)
|
||||
FlowShape(interleave.in(0), interleave.out)
|
||||
})(matF)
|
||||
|
|
|
|||
|
|
@ -233,11 +233,11 @@ object Interleave {
|
|||
* to take from each input.
|
||||
*
|
||||
* @param inputPorts number of input ports
|
||||
* @param request number of elements to send downstream before switching to next input port
|
||||
* @param segmentSize number of elements to send downstream before switching to next input port
|
||||
* @param eagerClose if true, interleave completes upstream if any of its upstream completes.
|
||||
*/
|
||||
def apply[T](inputPorts: Int, request: Int, eagerClose: Boolean = false): Interleave[T] =
|
||||
new Interleave(inputPorts, request, eagerClose)
|
||||
def apply[T](inputPorts: Int, segmentSize: Int, eagerClose: Boolean = false): Interleave[T] =
|
||||
new Interleave(inputPorts, segmentSize, eagerClose)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -253,7 +253,10 @@ object Interleave {
|
|||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
*/
|
||||
class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
|
||||
final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
|
||||
require(inputPorts > 1, "input ports must be > 1")
|
||||
require(segmentSize > 0, "segmentSize must be > 0")
|
||||
|
||||
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Interleave.in" + i))
|
||||
val out: Outlet[T] = Outlet[T]("Interleave.out")
|
||||
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
|
||||
|
|
@ -280,7 +283,7 @@ class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClo
|
|||
override def onPush(): Unit = {
|
||||
push(out, grab(i))
|
||||
counter += 1
|
||||
if (counter == request) switchToNextInput()
|
||||
if (counter == segmentSize) switchToNextInput()
|
||||
}
|
||||
|
||||
override def onUpstreamFinish() = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue