From d0f0f7308f7098a7890c810b1bea8e6b32b2e47c Mon Sep 17 00:00:00 2001 From: James Roper Date: Wed, 2 Sep 2015 16:59:32 +1000 Subject: [PATCH] =str #18384 Fix javadsl.Broadcast eagerCancel * The javadsl for Broadcast should not ignore eagerCancel --- .../java/akka/stream/javadsl/FlowTest.java | 24 +++++++++++++++++++ .../scala/akka/stream/javadsl/Graph.scala | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 0398dafed3..ba770b2684 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -589,4 +589,28 @@ public class FlowTest extends StreamTest { probe.expectMsgAllOf("A","B","C"); } + @Test + public void mustBeAbleToBroadcastEagerCancel() throws Exception { + final Sink out1 = Sink.cancelled(); + final Sink out2 = Sink.ignore(); + + final Sink sink = Sink.factory().create(new Function, Inlet>() { + @Override + public Inlet apply(Builder b) throws Exception { + final UniformFanOutShape broadcast = b.graph(Broadcast.create(2, true)); + + b.from(broadcast.out(0)).to(out1); + b.from(broadcast.out(1)).to(out2); + return broadcast.in(); + } + }); + + final JavaTestKit probe = new JavaTestKit(system); + Source source = Source.actorRef(1, OverflowStrategy.dropNew()); + final ActorRef actor = source.toMat(sink, Keep.left()).run(materializer); + probe.watch(actor); + probe.expectTerminated(actor); + } + + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index dc2ce75bb4..4a3f6e0341 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -103,7 +103,7 @@ object Broadcast { * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], Unit] = - scaladsl.Broadcast(outputCount) + scaladsl.Broadcast(outputCount, eagerCancel = eagerCancel) /** * Create a new `Broadcast` vertex with the specified input type.