Merge pull request #19396 from akka/wip-18948-eagerComplete-RK
#18948 add eagerComplete parameter to FlowOps.merge
This commit is contained in:
commit
72eb3f539d
8 changed files with 67 additions and 40 deletions
|
|
@ -43,8 +43,8 @@ In that case you can still manually fuse those graphs which shall run on less Ac
|
|||
* all Stages (this includes all built-in linear operators)
|
||||
* TCP connections
|
||||
|
||||
Introduced proper named constructor methods insted of ``wrap()``
|
||||
================================================================
|
||||
Introduced proper named constructor methods instead of ``wrap()``
|
||||
=================================================================
|
||||
|
||||
There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of
|
||||
the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing
|
||||
|
|
|
|||
|
|
@ -43,8 +43,8 @@ In that case you can still manually fuse those graphs which shall run on less Ac
|
|||
* all Stages (this includes all built-in linear operators)
|
||||
* TCP connections
|
||||
|
||||
Introduced proper named constructor methods insted of ``wrap()``
|
||||
================================================================
|
||||
Introduced proper named constructor methods instead of ``wrap()``
|
||||
=================================================================
|
||||
|
||||
There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of
|
||||
the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing
|
||||
|
|
@ -351,8 +351,8 @@ should be replaced by
|
|||
|
||||
.. includecode:: code/docs/MigrationsScala.scala#flatMapConcat
|
||||
|
||||
`Sink.fanoutPublisher() and Sink.publisher() is now a single method`
|
||||
====================================================================
|
||||
`Sink.fanoutPublisher()` and `Sink.publisher()` is now a single method
|
||||
======================================================================
|
||||
|
||||
It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support
|
||||
a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher).
|
||||
|
|
|
|||
|
|
@ -130,7 +130,7 @@ concat the current stream has an element available; if the curre
|
|||
prepend the given stream has an element available; if the given input completes, it tries the current one downstream backpressures all upstreams complete
|
||||
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
|
||||
|
||||
(*) This behavior is changeable to completing when any upstream completes by setting ``eagerClose=true``.
|
||||
(*) This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.
|
||||
|
||||
Fan-out stages
|
||||
^^^^^^^^^^^^^^
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ private object PoolConductor {
|
|||
GraphDSL.create() { implicit b ⇒
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
val retryMerge = b.add(MergePreferred[RequestContext](1, eagerClose = true))
|
||||
val retryMerge = b.add(MergePreferred[RequestContext](1, eagerComplete = true))
|
||||
val slotSelector = b.add(new SlotSelector(slotCount, pipeliningLimit, log))
|
||||
val route = b.add(new Route(slotCount))
|
||||
val retrySplit = b.add(Broadcast[RawSlotEvent](2))
|
||||
|
|
|
|||
|
|
@ -1223,7 +1223,22 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.merge(that))
|
||||
merge(that, eagerComplete = false)
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||
* picking randomly when several elements ready.
|
||||
*
|
||||
* '''Emits when''' one of the inputs has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def merge[T >: Out](that: Graph[SourceShape[T], _], eagerComplete: Boolean): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.merge(that, eagerComplete))
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||
|
|
@ -1233,6 +1248,17 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
*/
|
||||
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
|
||||
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
|
||||
mergeMat(that, matF, eagerComplete = false)
|
||||
|
||||
/**
|
||||
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
|
||||
* picking randomly when several elements ready.
|
||||
*
|
||||
* @see [[#merge]]
|
||||
*/
|
||||
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
|
||||
matF: function.Function2[Mat, M, M2],
|
||||
eagerComplete: Boolean): javadsl.Flow[In, T, M2] =
|
||||
new Flow(delegate.mergeMat(that)(combinerToScala(matF)))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import akka.stream.impl.ConstantFun
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true)
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
|
|
@ -36,20 +36,20 @@ object Merge {
|
|||
/**
|
||||
* Create a new `Merge` stage with the specified output type.
|
||||
*
|
||||
* @param eagerClose set to true in order to make this stage eagerly
|
||||
* @param eagerComplete set to true in order to make this stage eagerly
|
||||
* finish as soon as one of its inputs completes
|
||||
*/
|
||||
def create[T](inputPorts: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], Unit] =
|
||||
scaladsl.Merge(inputPorts, eagerClose = eagerClose)
|
||||
def create[T](inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], Unit] =
|
||||
scaladsl.Merge(inputPorts, eagerComplete = eagerComplete)
|
||||
|
||||
/**
|
||||
* Create a new `Merge` stage with the specified output type.
|
||||
*
|
||||
* @param eagerClose set to true in order to make this stage eagerly
|
||||
* @param eagerComplete set to true in order to make this stage eagerly
|
||||
* finish as soon as one of its inputs completes
|
||||
*/
|
||||
def create[T](clazz: Class[T], inputPorts: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], Unit] =
|
||||
create(inputPorts, eagerClose)
|
||||
def create[T](clazz: Class[T], inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], Unit] =
|
||||
create(inputPorts, eagerComplete)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -61,7 +61,7 @@ object Merge {
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true)
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
|
|
@ -80,20 +80,20 @@ object MergePreferred {
|
|||
/**
|
||||
* Create a new `MergePreferred` stage with the specified output type.
|
||||
*
|
||||
* @param eagerClose set to true in order to make this stage eagerly
|
||||
* @param eagerComplete set to true in order to make this stage eagerly
|
||||
* finish as soon as one of its inputs completes
|
||||
*/
|
||||
def create[T](secondaryPorts: Int, eagerClose: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
|
||||
scaladsl.MergePreferred(secondaryPorts, eagerClose = eagerClose)
|
||||
def create[T](secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
|
||||
scaladsl.MergePreferred(secondaryPorts, eagerComplete = eagerComplete)
|
||||
|
||||
/**
|
||||
* Create a new `MergePreferred` stage with the specified output type.
|
||||
*
|
||||
* @param eagerClose set to true in order to make this stage eagerly
|
||||
* @param eagerComplete set to true in order to make this stage eagerly
|
||||
* finish as soon as one of its inputs completes
|
||||
*/
|
||||
def create[T](clazz: Class[T], secondaryPorts: Int, eagerClose: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
|
||||
create(secondaryPorts, eagerClose)
|
||||
def create[T](clazz: Class[T], secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
|
||||
create(secondaryPorts, eagerComplete)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1460,17 +1460,17 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' all upstreams complete
|
||||
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def merge[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U] =
|
||||
via(mergeGraph(that))
|
||||
def merge[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[U] =
|
||||
via(mergeGraph(that, eagerComplete))
|
||||
|
||||
protected def mergeGraph[U >: Out, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, U], M] =
|
||||
protected def mergeGraph[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
|
||||
GraphDSL.create(that) { implicit b ⇒
|
||||
r ⇒
|
||||
val merge = b.add(Merge[U](2))
|
||||
val merge = b.add(Merge[U](2, eagerComplete))
|
||||
r ~> merge.in(1)
|
||||
FlowShape(merge.in(0), merge.out)
|
||||
}
|
||||
|
|
@ -1721,8 +1721,8 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
|||
*
|
||||
* @see [[#merge]].
|
||||
*/
|
||||
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
|
||||
viaMat(mergeGraph(that))(matF)
|
||||
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
|
||||
viaMat(mergeGraph(that, eagerComplete))(matF)
|
||||
|
||||
/**
|
||||
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ object Merge {
|
|||
* Create a new `Merge` with the specified number of input ports.
|
||||
*
|
||||
* @param inputPorts number of input ports
|
||||
* @param eagerClose if true, the merge will complete as soon as one of its inputs completes.
|
||||
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
|
||||
*/
|
||||
def apply[T](inputPorts: Int, eagerClose: Boolean = false): Merge[T] = new Merge(inputPorts, eagerClose)
|
||||
def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] = new Merge(inputPorts, eagerComplete)
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -32,12 +32,13 @@ object Merge {
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
|
||||
final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
|
||||
require(inputPorts > 1, "A Merge must have more than 1 input port")
|
||||
|
||||
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i))
|
||||
val out: Outlet[T] = Outlet[T]("Merge.out")
|
||||
override def initialAttributes = Attributes.name("Merge")
|
||||
|
|
@ -82,7 +83,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte
|
|||
}
|
||||
|
||||
override def onUpstreamFinish() =
|
||||
if (eagerClose) {
|
||||
if (eagerComplete) {
|
||||
in.foreach(cancel)
|
||||
runningUpstreams = 0
|
||||
if (!pending) completeStage()
|
||||
|
|
@ -118,9 +119,9 @@ object MergePreferred {
|
|||
* Create a new `MergePreferred` with the specified number of secondary input ports.
|
||||
*
|
||||
* @param secondaryPorts number of secondary input ports
|
||||
* @param eagerClose if true, the merge will complete as soon as one of its inputs completes.
|
||||
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
|
||||
*/
|
||||
def apply[T](secondaryPorts: Int, eagerClose: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerClose)
|
||||
def apply[T](secondaryPorts: Int, eagerComplete: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerComplete)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -134,13 +135,13 @@ object MergePreferred {
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
|
||||
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* A `Broadcast` has one `in` port and 2 or more `out` ports.
|
||||
*/
|
||||
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
|
||||
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
|
||||
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
|
||||
|
||||
override def initialAttributes = Attributes.name("MergePreferred")
|
||||
|
|
@ -155,7 +156,7 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose:
|
|||
var openInputs = secondaryPorts + 1
|
||||
def onComplete(): Unit = {
|
||||
openInputs -= 1
|
||||
if (eagerClose || openInputs == 0) completeStage()
|
||||
if (eagerComplete || openInputs == 0) completeStage()
|
||||
}
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue