diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index f0a6ee7f8e..02b78be4c4 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -1917,6 +1917,8 @@ Emit each incoming element each of `n` outputs. **completes** when upstream completes +**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel. + --------------------------------------------------------------- ### balance @@ -1929,6 +1931,8 @@ Fan-out the stream to several streams. Each upstream element is emitted to the f **completes** when upstream completes +**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel. + --------------------------------------------------------------- ### partition @@ -1942,6 +1946,8 @@ partitioner function applied to the element. **completes** when upstream completes and no output is pending +**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel. + ---------------------------------------------------------------
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 6ea371096d..6135487e09 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -229,7 +229,7 @@ class GraphBalanceSpec extends StreamSpec { c1.expectComplete() } - "cancel upstream when downstreams cancel" in assertAllStagesStopped { + "cancel upstream when all downstreams cancel if eagerCancel is false" in assertAllStagesStopped { val p1 = TestPublisher.manualProbe[Int]() val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() @@ -260,6 +260,36 @@ class GraphBalanceSpec extends StreamSpec { bsub.expectCancellation() } + "cancel upstream when any downstream cancel if eagerCancel is true" in assertAllStagesStopped { + val p1 = TestPublisher.manualProbe[Int]() + val c1 = TestSubscriber.manualProbe[Int]() + val c2 = TestSubscriber.manualProbe[Int]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val balance = b.add(new Balance[Int](2, waitForAllDownstreams = false, eagerCancel = true)) + Source.fromPublisher(p1.getPublisher) ~> balance.in + balance.out(0) ~> Sink.fromSubscriber(c1) + balance.out(1) ~> Sink.fromSubscriber(c2) + ClosedShape + }).run() + + val bsub = p1.expectSubscription() + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + + sub1.request(1) + p1.expectRequest(bsub, 16) + bsub.sendNext(1) + c1.expectNext(1) + + sub2.request(1) + bsub.sendNext(2) + c2.expectNext(2) + + sub1.cancel() + bsub.expectCancellation() + } + // Bug #20943 "not push output twice" in assertAllStagesStopped { val p1 = TestPublisher.manualProbe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index ee9a818e5a..a16f177da1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -263,12 +263,13 @@ object Partition { * * '''Completes when''' upstream completes * - * '''Cancels when''' all downstreams cancel + * '''Cancels when''' If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel */ object Balance { /** - * Create a new `Balance` stage with the specified input type. + * Create a new `Balance` stage with the specified input type, `eagerCancel` is `false`. * + * @param outputCount number of output ports * @param waitForAllDownstreams if `true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element */ @@ -277,24 +278,51 @@ object Balance { /** * Create a new `Balance` stage with the specified input type. + * + * @param outputCount number of output ports + * @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element + * @param eagerCancel if true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled. + */ + def create[T](outputCount: Int, waitForAllDownstreams: Boolean, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = + new scaladsl.Balance(outputCount, waitForAllDownstreams, eagerCancel) + + /** + * Create a new `Balance` stage with the specified input type, both `waitForAllDownstreams` and `eagerCancel` are `false`. + * + * @param outputCount number of output ports */ def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] = create(outputCount, waitForAllDownstreams = false) /** - * Create a new `Balance` stage with the specified input type. + * Create a new `Balance` stage with the specified input type, both `waitForAllDownstreams` and `eagerCancel` are `false`. + * + * @param clazz a type hint for this method + * @param outputCount number of output ports */ def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed] = create(outputCount) /** - * Create a new `Balance` stage with the specified input type. + * Create a new `Balance` stage with the specified input type, `eagerCancel` is `false`. * - * @param waitForAllDownstreams if `true` it will not start emitting - * elements to downstream outputs until all of them have requested at least one element + * @param clazz a type hint for this method + * @param outputCount number of output ports + * @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element */ def create[T](clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = create(outputCount, waitForAllDownstreams) + + /** + * Create a new `Balance` stage with the specified input type. + * + * @param clazz a type hint for this method + * @param outputCount number of output ports + * @param waitForAllDownstreams if `true` it will not start emitting elements to downstream outputs until all of them have requested at least one element + * @param eagerCancel if true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled. + */ + def create[T](clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = + new scaladsl.Balance(outputCount, waitForAllDownstreams, eagerCancel) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 69aff32d6c..0d90143c6c 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -825,7 +825,10 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int, val e object Balance { /** - * Create a new `Balance` with the specified number of output ports. + * Create a new `Balance` with the specified number of output ports. This method sets `eagerCancel` to `false`. + * To specify a different value for the `eagerCancel` parameter, then instantiate Balance using the constructor. + * + * If `eagerCancel` is true, balance cancels upstream if any of its downstreams cancel, if false, when all have cancelled. * * @param outputPorts number of output ports * @param waitForAllDownstreams if you use `waitForAllDownstreams = true` it will not start emitting @@ -833,7 +836,7 @@ object Balance { * default value is `false` */ def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = - new Balance(outputPorts, waitForAllDownstreams) + new Balance(outputPorts, waitForAllDownstreams, false) } /** @@ -849,11 +852,16 @@ object Balance { * * '''Completes when''' upstream completes * - * '''Cancels when''' all downstreams cancel + * '''Cancels when''' If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel */ -final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { +final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { // one output might seem counter intuitive but saves us from special handling in other places require(outputPorts >= 1, "A Balance must have one or more output ports") + + @Deprecated + @deprecated("Use the constructor which also specifies the `eagerCancel` parameter", since = "2.5.12") + def this(outputPorts: Int, waitForAllDownstreams: Boolean) = this(outputPorts, waitForAllDownstreams, false) + val in: Inlet[T] = Inlet[T]("Balance.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) override def initialAttributes = DefaultAttributes.balance @@ -908,11 +916,14 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) } override def onDownstreamFinish() = { - downstreamsRunning -= 1 - if (downstreamsRunning == 0) completeStage() - else if (!hasPulled && needDownstreamPulls > 0) { - needDownstreamPulls -= 1 - if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in) + if (eagerCancel) completeStage() + else { + downstreamsRunning -= 1 + if (downstreamsRunning == 0) completeStage() + else if (!hasPulled && needDownstreamPulls > 0) { + needDownstreamPulls -= 1 + if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in) + } } } })