diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index f95ec8ea3a..31fa3bec0e 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -43,8 +43,8 @@ In that case you can still manually fuse those graphs which shall run on less Ac * all Stages (this includes all built-in linear operators) * TCP connections -Introduced proper named constructor methods insted of ``wrap()`` -================================================================ +Introduced proper named constructor methods instead of ``wrap()`` +================================================================= There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index 2b2196ea72..59f615fafd 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -43,8 +43,8 @@ In that case you can still manually fuse those graphs which shall run on less Ac * all Stages (this includes all built-in linear operators) * TCP connections -Introduced proper named constructor methods insted of ``wrap()`` -================================================================ +Introduced proper named constructor methods instead of ``wrap()`` +================================================================= There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing @@ -351,8 +351,8 @@ should be replaced by .. includecode:: code/docs/MigrationsScala.scala#flatMapConcat -`Sink.fanoutPublisher() and Sink.publisher() is now a single method` -==================================================================== +`Sink.fanoutPublisher()` and `Sink.publisher()` is now a single method +====================================================================== It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher). diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index e42e0439c1..fc9a66b703 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -130,7 +130,7 @@ concat the current stream has an element available; if the curre prepend the given stream has an element available; if the given input completes, it tries the current one downstream backpressures all upstreams complete ===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -(*) This behavior is changeable to completing when any upstream completes by setting ``eagerClose=true``. +(*) This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``. Fan-out stages ^^^^^^^^^^^^^^ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala index 401efa22b5..24d9666de6 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala @@ -68,7 +68,7 @@ private object PoolConductor { GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ - val retryMerge = b.add(MergePreferred[RequestContext](1, eagerClose = true)) + val retryMerge = b.add(MergePreferred[RequestContext](1, eagerComplete = true)) val slotSelector = b.add(new SlotSelector(slotCount, pipeliningLimit, log)) val route = b.add(new Route(slotCount)) val retrySplit = b.add(Broadcast[RawSlotEvent](2)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 09cc873d2c..6a1a7c1c12 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1223,7 +1223,22 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Cancels when''' downstream cancels */ def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.merge(that)) + merge(that, eagerComplete = false) + + /** + * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def merge[T >: Out](that: Graph[SourceShape[T], _], eagerComplete: Boolean): javadsl.Flow[In, T, Mat] = + new Flow(delegate.merge(that, eagerComplete)) /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, @@ -1233,6 +1248,17 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends */ def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + mergeMat(that, matF, eagerComplete = false) + + /** + * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * @see [[#merge]] + */ + def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2], + eagerComplete: Boolean): javadsl.Flow[In, T, M2] = new Flow(delegate.mergeMat(that)(combinerToScala(matF))) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 989b466559..e8a49ab2a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -16,7 +16,7 @@ import akka.stream.impl.ConstantFun * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true) * * '''Cancels when''' downstream cancels */ @@ -36,20 +36,20 @@ object Merge { /** * Create a new `Merge` stage with the specified output type. * - * @param eagerClose set to true in order to make this stage eagerly + * @param eagerComplete set to true in order to make this stage eagerly * finish as soon as one of its inputs completes */ - def create[T](inputPorts: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], Unit] = - scaladsl.Merge(inputPorts, eagerClose = eagerClose) + def create[T](inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], Unit] = + scaladsl.Merge(inputPorts, eagerComplete = eagerComplete) /** * Create a new `Merge` stage with the specified output type. * - * @param eagerClose set to true in order to make this stage eagerly + * @param eagerComplete set to true in order to make this stage eagerly * finish as soon as one of its inputs completes */ - def create[T](clazz: Class[T], inputPorts: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], Unit] = - create(inputPorts, eagerClose) + def create[T](clazz: Class[T], inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], Unit] = + create(inputPorts, eagerComplete) } /** @@ -61,7 +61,7 @@ object Merge { * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true) * * '''Cancels when''' downstream cancels */ @@ -80,20 +80,20 @@ object MergePreferred { /** * Create a new `MergePreferred` stage with the specified output type. * - * @param eagerClose set to true in order to make this stage eagerly + * @param eagerComplete set to true in order to make this stage eagerly * finish as soon as one of its inputs completes */ - def create[T](secondaryPorts: Int, eagerClose: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = - scaladsl.MergePreferred(secondaryPorts, eagerClose = eagerClose) + def create[T](secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = + scaladsl.MergePreferred(secondaryPorts, eagerComplete = eagerComplete) /** * Create a new `MergePreferred` stage with the specified output type. * - * @param eagerClose set to true in order to make this stage eagerly + * @param eagerComplete set to true in order to make this stage eagerly * finish as soon as one of its inputs completes */ - def create[T](clazz: Class[T], secondaryPorts: Int, eagerClose: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = - create(secondaryPorts, eagerClose) + def create[T](clazz: Class[T], secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = + create(secondaryPorts, eagerComplete) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d9d4aaa582..ef51d07c6d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1460,17 +1460,17 @@ trait FlowOps[+Out, +Mat] { * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' all upstreams complete + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` * * '''Cancels when''' downstream cancels */ - def merge[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U] = - via(mergeGraph(that)) + def merge[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[U] = + via(mergeGraph(that, eagerComplete)) - protected def mergeGraph[U >: Out, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, U], M] = + protected def mergeGraph[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] = GraphDSL.create(that) { implicit b ⇒ r ⇒ - val merge = b.add(Merge[U](2)) + val merge = b.add(Merge[U](2, eagerComplete)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) } @@ -1721,8 +1721,8 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * * @see [[#merge]]. */ - def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = - viaMat(mergeGraph(that))(matF) + def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] = + viaMat(mergeGraph(that, eagerComplete))(matF) /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 84a3ad9478..71767d94fb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -18,9 +18,9 @@ object Merge { * Create a new `Merge` with the specified number of input ports. * * @param inputPorts number of input ports - * @param eagerClose if true, the merge will complete as soon as one of its inputs completes. + * @param eagerComplete if true, the merge will complete as soon as one of its inputs completes. */ - def apply[T](inputPorts: Int, eagerClose: Boolean = false): Merge[T] = new Merge(inputPorts, eagerClose) + def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] = new Merge(inputPorts, eagerComplete) } @@ -32,12 +32,13 @@ object Merge { * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` * * '''Cancels when''' downstream cancels */ -final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { +final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] { require(inputPorts > 1, "A Merge must have more than 1 input port") + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i)) val out: Outlet[T] = Outlet[T]("Merge.out") override def initialAttributes = Attributes.name("Merge") @@ -82,7 +83,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte } override def onUpstreamFinish() = - if (eagerClose) { + if (eagerComplete) { in.foreach(cancel) runningUpstreams = 0 if (!pending) completeStage() @@ -118,9 +119,9 @@ object MergePreferred { * Create a new `MergePreferred` with the specified number of secondary input ports. * * @param secondaryPorts number of secondary input ports - * @param eagerClose if true, the merge will complete as soon as one of its inputs completes. + * @param eagerComplete if true, the merge will complete as soon as one of its inputs completes. */ - def apply[T](secondaryPorts: Int, eagerClose: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerClose) + def apply[T](secondaryPorts: Int, eagerComplete: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerComplete) } /** @@ -134,13 +135,13 @@ object MergePreferred { * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true) + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` * * '''Cancels when''' downstream cancels * * A `Broadcast` has one `in` port and 2 or more `out` ports. */ -final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { +final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports") override def initialAttributes = Attributes.name("MergePreferred") @@ -155,7 +156,7 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: var openInputs = secondaryPorts + 1 def onComplete(): Unit = { openInputs -= 1 - if (eagerClose || openInputs == 0) completeStage() + if (eagerComplete || openInputs == 0) completeStage() } setHandler(out, new OutHandler {