diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index e19abc372d..ad924c435f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -78,7 +78,7 @@ private[http] object OutgoingConnectionBlueprint { FlowGraph.partial() { implicit b ⇒ import FlowGraph.Implicits._ - val methodBypassFanout = b.add(Broadcast[HttpRequest](2)) + val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true)) val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser)) val terminationFanout = b.add(Broadcast[HttpResponse](2)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index d8a005676b..29a1360f2d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -173,8 +173,8 @@ private[akka] case class ActorMaterializerImpl( val flexi = r.flexi(r.shape) (FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) - case BroadcastModule(shape, _) ⇒ - (Broadcast.props(effectiveSettings, shape.outArray.size), shape.in, shape.outArray.toSeq) + case BroadcastModule(shape, eagerCancel, _) ⇒ + (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) case BalanceModule(shape, waitForDownstreams, _) ⇒ (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 8aea7288bf..6791d26854 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -289,14 +289,15 @@ private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val * INTERNAL API */ private[akka] object Broadcast { - def props(settings: ActorMaterializerSettings, outputPorts: Int): Props = - Props(new Broadcast(settings, outputPorts)).withDeploy(Deploy.local) + def props(settings: ActorMaterializerSettings, eagerCancel: Boolean, outputPorts: Int): Props = + Props(new Broadcast(settings, outputPorts, eagerCancel)).withDeploy(Deploy.local) } /** * INTERNAL API */ -private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int) extends FanOut(_settings, _outputPorts) { +private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int, eagerCancel: Boolean) extends FanOut(_settings, _outputPorts) { + outputBunch.unmarkCancelledOutputs(!eagerCancel) outputBunch.markAllOutputs() initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala index 56f65d1207..fb280a67cd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala @@ -44,11 +44,12 @@ private[stream] object Junctions { final case class BroadcastModule[T]( shape: UniformFanOutShape[T, T], + eagerCancel: Boolean, override val attributes: Attributes = name("broadcast")) extends FanOutModule { override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), attributes) + override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), eagerCancel, attributes) } final case class MergePreferredModule[T]( diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 0da7b5cf9d..29ed4ccb9e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -92,15 +92,26 @@ object MergePreferred { * * '''Completes when''' upstream completes * - * '''Cancels when''' all downstreams cancel + * '''Cancels when''' + * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel */ object Broadcast { /** * Create a new `Broadcast` vertex with the specified input type. + * + * @param outputCount number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ - def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = + def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], Unit] = scaladsl.Broadcast(outputCount) + /** + * Create a new `Broadcast` vertex with the specified input type. + * + * @param outputCount number of output ports + */ + def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, eagerCancel = false) + /** * Create a new `Broadcast` vertex with the specified input type. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 75d0e3a8b2..2a5e05cad7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -104,10 +104,11 @@ object Broadcast { * Create a new `Broadcast` with the specified number of output ports. * * @param outputPorts number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ - def apply[T](outputPorts: Int): Broadcast[T] = { + def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = { val shape = new UniformFanOutShape[T, T](outputPorts) - new Broadcast(outputPorts, shape, new BroadcastModule(shape, Attributes.name("Broadcast"))) + new Broadcast(outputPorts, shape, new BroadcastModule(shape, eagerCancel, Attributes.name("Broadcast"))) } } @@ -121,7 +122,9 @@ object Broadcast { * * '''Completes when''' upstream completes * - * '''Cancels when''' all downstreams cancel + * '''Cancels when''' + * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel + * */ class Broadcast[T] private (outputPorts: Int, override val shape: UniformFanOutShape[T, T],