#18948 add eagerComplete parameter to FlowOps.merge

This commit is contained in:
Roland Ferenczy 2015-11-18 16:09:05 +01:00 committed by Roland Kuhn
parent e6b8c86315
commit 144875a665
8 changed files with 67 additions and 40 deletions

View file

@ -18,9 +18,9 @@ object Merge {
* Create a new `Merge` with the specified number of input ports.
*
* @param inputPorts number of input ports
* @param eagerClose if true, the merge will complete as soon as one of its inputs completes.
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
*/
def apply[T](inputPorts: Int, eagerClose: Boolean = false): Merge[T] = new Merge(inputPorts, eagerClose)
def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] = new Merge(inputPorts, eagerComplete)
}
@ -32,12 +32,13 @@ object Merge {
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Merge must have more than 1 input port")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
override def initialAttributes = Attributes.name("Merge")
@ -82,7 +83,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte
}
override def onUpstreamFinish() =
if (eagerClose) {
if (eagerComplete) {
in.foreach(cancel)
runningUpstreams = 0
if (!pending) completeStage()
@ -118,9 +119,9 @@ object MergePreferred {
* Create a new `MergePreferred` with the specified number of secondary input ports.
*
* @param secondaryPorts number of secondary input ports
* @param eagerClose if true, the merge will complete as soon as one of its inputs completes.
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
*/
def apply[T](secondaryPorts: Int, eagerClose: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerClose)
def apply[T](secondaryPorts: Int, eagerComplete: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerComplete)
}
/**
@ -134,13 +135,13 @@ object MergePreferred {
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
override def initialAttributes = Attributes.name("MergePreferred")
@ -155,7 +156,7 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose:
var openInputs = secondaryPorts + 1
def onComplete(): Unit = {
openInputs -= 1
if (eagerClose || openInputs == 0) completeStage()
if (eagerComplete || openInputs == 0) completeStage()
}
setHandler(out, new OutHandler {