+str #19041 deterministic interleave operation

This commit is contained in:
Alexander Golubev 2015-12-09 21:52:53 -05:00
parent 15cc65ce9d
commit 737e7b8dfc
6 changed files with 332 additions and 2 deletions

View file

@ -0,0 +1,121 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.reactivestreams.Publisher
class FlowInterleaveSpec extends BaseTwoStreamsSetup {
override type Outputs = Int
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source(p1).interleave(Source(p2), 2).runWith(Sink(subscriber))
subscriber
}
"An Interleave for Flow " must {
"work in the happy case" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[Int]()
Source(0 to 3).interleave(Source(List[Int]()), 2).interleave(Source(4 to 9), 2).runWith(Sink(probe))
val subscription = probe.expectSubscription()
var collected = Set.empty[Int]
for (_ 1 to 10) {
subscription.request(1)
collected += probe.expectNext()
}
collected should be(Set(0, 1, 4, 5, 2, 3, 6, 7, 8, 9))
probe.expectComplete()
}
"work when bucket is not equal elements in stream" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[Int]()
Source(0 to 2).interleave(Source(3 to 5), 2).runWith(Sink(probe))
probe.expectSubscription().request(10)
probe.expectNext(0, 1, 3, 4, 2, 5)
probe.expectComplete()
}
"work with bucket = 1" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[Int]()
Source(0 to 2).interleave(Source(3 to 5), 1).runWith(Sink(probe))
probe.expectSubscription().request(10)
probe.expectNext(0, 3, 1, 4, 2, 5)
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
(1 to 4).foreach(subscriber1.expectNext)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
(1 to 4).foreach(subscriber2.expectNext)
subscriber2.expectComplete()
}
"work with one delayed completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
(1 to 4).foreach(subscriber1.expectNext)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
(1 to 4).foreach(subscriber2.expectNext)
subscriber2.expectComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
subscriber1.expectError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
subscriber2.expectError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
// This is nondeterministic, multiple scenarios can happen
pending
}
"pass along early cancellation" in assertAllStagesStopped {
val up1 = TestPublisher.manualProbe[Int]()
val up2 = TestPublisher.manualProbe[Int]()
val down = TestSubscriber.manualProbe[Int]()
val (graphSubscriber1, graphSubscriber2) = Source.subscriber[Int]
.interleaveMat(Source.subscriber[Int], 2)((_, _)).toMat(Sink(down))(Keep.left).run
val downstream = down.expectSubscription()
downstream.cancel()
up1.subscribe(graphSubscriber1)
up2.subscribe(graphSubscriber2)
up1.expectSubscription().expectCancellation()
up2.expectSubscription().expectCancellation()
}
}
}

View file

@ -5,7 +5,7 @@ package akka.stream.scaladsl
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit._ import akka.stream.testkit._
import org.reactivestreams.{ Subscriber, Publisher } import org.reactivestreams.Publisher
class FlowMergeSpec extends BaseTwoStreamsSetup { class FlowMergeSpec extends BaseTwoStreamsSetup {
@ -26,7 +26,7 @@ class FlowMergeSpec extends BaseTwoStreamsSetup {
val source3 = Source(List[Int]()) val source3 = Source(List[Int]())
val probe = TestSubscriber.manualProbe[Int]() val probe = TestSubscriber.manualProbe[Int]()
Source(0 to 3).merge(Source(List[Int]())).merge(Source(4 to 9)) source1.merge(source3).merge(source2)
.map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe)) .map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe))
val subscription = probe.expectSubscription() val subscription = probe.expectSubscription()

View file

@ -1004,6 +1004,48 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) new Flow(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
* then repeat process from the beginning.
*
* Example:
* {{{
* Source<Integer, ?> src = Source.from(Arrays.asList(1, 2, 3))
* Flow<Integer, Integer, ?> flow = flow.interleave(Source.from(Arrays.asList(4, 5, 6, 7)), 2)
* src.via(flow) // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Flow[In, T, Mat] =
new Flow(delegate.interleave(that, result))
/**
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
* then repeat process from the beginning.
*
* After [[Flow]] or [[Source]] is compete than all the rest elements will be emitted from the second one
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* @see [[#interleave]].
*/
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int,
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.interleaveMat(that, result)(combinerToScala(matF)))
/** /**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.

View file

@ -498,6 +498,47 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.alsoToMat(that)(combinerToScala(matF))) new Source(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]].
* It takes `request` number of elements from this source to emit downstream, then - same amount for given one,
* then repeat process from the beginning.
*
* Example:
* {{{
* Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
* // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After one of sources is complete than all the rest elements will be emitted from the second one
*
* If one of sources gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Source]] and given one completes
*
* '''Cancels when''' downstream cancels
*/
def interleave[T >: Out](that: Graph[SourceShape[T], _], result: Int): javadsl.Source[T, Mat] =
new Source(delegate.interleave(that, result))
/**
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Source]].
* It takes `request` number of elements from this source to emit downstream, then - same amount for given source,
* then repeat process from the beginning.
*
* After one of sources is complete than all the rest elements will be emitted from the second one
*
* If one of sources gets upstream error - stream completes with failure.
*
* @see [[#interleave]].
*/
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], result: Int,
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.interleaveMat(that, result)(combinerToScala(matF)))
/** /**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.

View file

@ -1300,6 +1300,51 @@ trait FlowOps[+Out, +Mat] {
FlowShape(zip.in0, zip.out) FlowShape(zip.in0, zip.out)
} }
/**
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
* then repeat process from the beginning.
*
* Example:
* {{{
* Source(List(1, 2, 3)).interleave(List(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] depending on what was pulled
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleave[U >: Out, Mat2](that: Graph[SourceShape[U], Mat2], request: Int): Repr[U, Mat] =
interleaveMat(that, request)(Keep.left)
/**
* Interleave represents deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It takes `request` number of elements from this flow to emit downstream, then - same amount for given source,
* then repeat process from the beginning.
*
* After [[Flow]] or [[Source]] is complete than all the rest elements will be emitted from the second one
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* @see [[#interleave]].
*/
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(GraphDSL.create(that) { implicit b
r
import GraphDSL.Implicits._
val interleave = b.add(Interleave[U](2, request))
r ~> interleave.in(1)
FlowShape(interleave.in(0), interleave.out)
})(matF)
/** /**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.

View file

@ -227,6 +227,87 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose:
} }
} }
object Interleave {
/**
* Create a new `Interleave` with the specified number of input ports and given size of elements
* to take from each input.
*
* @param inputPorts number of input ports
* @param request number of elements to send downstream before switching to next input port
* @param eagerClose if true, interleave completes upstream if any of its upstream completes.
*/
def apply[T](inputPorts: Int, request: Int, eagerClose: Boolean = false): Interleave[T] =
new Interleave(inputPorts, request, eagerClose)
}
/**
* Interleave represents deterministic merge which takes N elements per input stream,
* in-order of inputs, emits them downstream and then cycles/"wraps-around" the inputs.
*
* '''Emits when''' element is available from current input (depending on phase)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
*
* '''Cancels when''' downstream cancels
*
*/
class Interleave[T] private (val inputPorts: Int, val request: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Interleave.in" + i))
val out: Outlet[T] = Outlet[T]("Interleave.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var counter = 0
private var currentUpstreamIndex = 0
private var runningUpstreams = inputPorts
private def upstreamsClosed = runningUpstreams == 0
private def currentUpstream = in(currentUpstreamIndex)
private def switchToNextInput(): Unit = {
@tailrec
def nextInletIndex(index: Int): Int = {
val reduced = (index + 1) % inputPorts
if (!isClosed(in(reduced))) reduced else nextInletIndex(index + 1)
}
counter = 0
currentUpstreamIndex = nextInletIndex(currentUpstreamIndex)
}
in.foreach { i
setHandler(i, new InHandler {
override def onPush(): Unit = {
push(out, grab(i))
counter += 1
if (counter == request) switchToNextInput()
}
override def onUpstreamFinish() = {
if (eagerClose) {
in.foreach(cancel)
completeStage()
} else {
runningUpstreams -= 1
if (!upstreamsClosed) {
if (i == currentUpstream) {
switchToNextInput()
if (isAvailable(out)) pull(currentUpstream)
}
} else completeStage()
}
}
})
}
setHandler(out, new OutHandler {
override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) pull(currentUpstream)
})
}
override def toString = "Interleave"
}
object Broadcast { object Broadcast {
/** /**
* Create a new `Broadcast` with the specified number of output ports. * Create a new `Broadcast` with the specified number of output ports.