diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 0398dafed3..ba770b2684 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -589,4 +589,28 @@ public class FlowTest extends StreamTest { probe.expectMsgAllOf("A","B","C"); } + @Test + public void mustBeAbleToBroadcastEagerCancel() throws Exception { + final Sink out1 = Sink.cancelled(); + final Sink out2 = Sink.ignore(); + + final Sink sink = Sink.factory().create(new Function, Inlet>() { + @Override + public Inlet apply(Builder b) throws Exception { + final UniformFanOutShape broadcast = b.graph(Broadcast.create(2, true)); + + b.from(broadcast.out(0)).to(out1); + b.from(broadcast.out(1)).to(out2); + return broadcast.in(); + } + }); + + final JavaTestKit probe = new JavaTestKit(system); + Source source = Source.actorRef(1, OverflowStrategy.dropNew()); + final ActorRef actor = source.toMat(sink, Keep.left()).run(materializer); + probe.watch(actor); + probe.expectTerminated(actor); + } + + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index dc2ce75bb4..4a3f6e0341 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -103,7 +103,7 @@ object Broadcast { * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], Unit] = - scaladsl.Broadcast(outputCount) + scaladsl.Broadcast(outputCount, eagerCancel = eagerCancel) /** * Create a new `Broadcast` vertex with the specified input type.