add eagerCancel to Partition and Balance stage (#22339)

* add eagerCancel to Partition and Balance stage

* fix mima error

* rebase master and add unit test

* add java support and since to deprecated
This commit is contained in:
Hawstein 2018-04-10 00:46:39 +08:00 committed by Konrad `ktoso` Malawski
parent 332aa58540
commit ecdaf0e530
4 changed files with 91 additions and 16 deletions

View file

@ -1917,6 +1917,8 @@ Emit each incoming element each of `n` outputs.
**completes** when upstream completes
**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
---------------------------------------------------------------
### balance
@ -1929,6 +1931,8 @@ Fan-out the stream to several streams. Each upstream element is emitted to the f
**completes** when upstream completes
**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
---------------------------------------------------------------
### partition
@ -1942,6 +1946,8 @@ partitioner function applied to the element.
**completes** when upstream completes and no output is pending
**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
---------------------------------------------------------------
<br/>

View file

@ -229,7 +229,7 @@ class GraphBalanceSpec extends StreamSpec {
c1.expectComplete()
}
"cancel upstream when downstreams cancel" in assertAllStagesStopped {
"cancel upstream when all downstreams cancel if eagerCancel is false" in assertAllStagesStopped {
val p1 = TestPublisher.manualProbe[Int]()
val c1 = TestSubscriber.manualProbe[Int]()
val c2 = TestSubscriber.manualProbe[Int]()
@ -260,6 +260,36 @@ class GraphBalanceSpec extends StreamSpec {
bsub.expectCancellation()
}
"cancel upstream when any downstream cancel if eagerCancel is true" in assertAllStagesStopped {
val p1 = TestPublisher.manualProbe[Int]()
val c1 = TestSubscriber.manualProbe[Int]()
val c2 = TestSubscriber.manualProbe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val balance = b.add(new Balance[Int](2, waitForAllDownstreams = false, eagerCancel = true))
Source.fromPublisher(p1.getPublisher) ~> balance.in
balance.out(0) ~> Sink.fromSubscriber(c1)
balance.out(1) ~> Sink.fromSubscriber(c2)
ClosedShape
}).run()
val bsub = p1.expectSubscription()
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
p1.expectRequest(bsub, 16)
bsub.sendNext(1)
c1.expectNext(1)
sub2.request(1)
bsub.sendNext(2)
c2.expectNext(2)
sub1.cancel()
bsub.expectCancellation()
}
// Bug #20943
"not push output twice" in assertAllStagesStopped {
val p1 = TestPublisher.manualProbe[Int]()

View file

@ -263,12 +263,13 @@ object Partition {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
* '''Cancels when''' If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*/
object Balance {
/**
* Create a new `Balance` stage with the specified input type.
* Create a new `Balance` stage with the specified input type, `eagerCancel` is `false`.
*
* @param outputCount number of output ports
* @param waitForAllDownstreams if `true` it will not start emitting
* elements to downstream outputs until all of them have requested at least one element
*/
@ -277,24 +278,51 @@ object Balance {
/**
* Create a new `Balance` stage with the specified input type.
*
* @param outputCount number of output ports
* @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element
* @param eagerCancel if true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled.
*/
def create[T](outputCount: Int, waitForAllDownstreams: Boolean, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] =
new scaladsl.Balance(outputCount, waitForAllDownstreams, eagerCancel)
/**
* Create a new `Balance` stage with the specified input type, both `waitForAllDownstreams` and `eagerCancel` are `false`.
*
* @param outputCount number of output ports
*/
def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] =
create(outputCount, waitForAllDownstreams = false)
/**
* Create a new `Balance` stage with the specified input type.
* Create a new `Balance` stage with the specified input type, both `waitForAllDownstreams` and `eagerCancel` are `false`.
*
* @param clazz a type hint for this method
* @param outputCount number of output ports
*/
def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] =
create(outputCount)
/**
* Create a new `Balance` stage with the specified input type.
* Create a new `Balance` stage with the specified input type, `eagerCancel` is `false`.
*
* @param waitForAllDownstreams if `true` it will not start emitting
* elements to downstream outputs until all of them have requested at least one element
* @param clazz a type hint for this method
* @param outputCount number of output ports
* @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element
*/
def create[T](clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] =
create(outputCount, waitForAllDownstreams)
/**
* Create a new `Balance` stage with the specified input type.
*
* @param clazz a type hint for this method
* @param outputCount number of output ports
* @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element
* @param eagerCancel if true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled.
*/
def create[T](clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] =
new scaladsl.Balance(outputCount, waitForAllDownstreams, eagerCancel)
}
/**

View file

@ -825,7 +825,10 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int, val e
object Balance {
/**
* Create a new `Balance` with the specified number of output ports.
* Create a new `Balance` with the specified number of output ports. This method sets `eagerCancel` to `false`.
* To specify a different value for the `eagerCancel` parameter, then instantiate Balance using the constructor.
*
* If `eagerCancel` is true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled.
*
* @param outputPorts number of output ports
* @param waitForAllDownstreams if you use `waitForAllDownstreams = true` it will not start emitting
@ -833,7 +836,7 @@ object Balance {
* default value is `false`
*/
def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] =
new Balance(outputPorts, waitForAllDownstreams)
new Balance(outputPorts, waitForAllDownstreams, false)
}
/**
@ -849,11 +852,16 @@ object Balance {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
* '''Cancels when''' If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*/
final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
// one output might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Balance must have one or more output ports")
@Deprecated
@deprecated("Use the constructor which also specifies the `eagerCancel` parameter", since = "2.5.12")
def this(outputPorts: Int, waitForAllDownstreams: Boolean) = this(outputPorts, waitForAllDownstreams, false)
val in: Inlet[T] = Inlet[T]("Balance.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Balance.out" + i))
override def initialAttributes = DefaultAttributes.balance
@ -908,11 +916,14 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean)
}
override def onDownstreamFinish() = {
downstreamsRunning -= 1
if (downstreamsRunning == 0) completeStage()
else if (!hasPulled && needDownstreamPulls > 0) {
needDownstreamPulls -= 1
if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in)
if (eagerCancel) completeStage()
else {
downstreamsRunning -= 1
if (downstreamsRunning == 0) completeStage()
else if (!hasPulled && needDownstreamPulls > 0) {
needDownstreamPulls -= 1
if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in)
}
}
}
})