=str #18384 Fix javadsl.Broadcast eagerCancel
* The javadsl for Broadcast should not ignore eagerCancel
This commit is contained in:
parent
15d9a1eed4
commit
d0f0f7308f
2 changed files with 25 additions and 1 deletions
|
|
@ -589,4 +589,28 @@ public class FlowTest extends StreamTest {
|
|||
probe.expectMsgAllOf("A","B","C");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToBroadcastEagerCancel() throws Exception {
|
||||
final Sink<String, BoxedUnit> out1 = Sink.cancelled();
|
||||
final Sink<String, ?> out2 = Sink.ignore();
|
||||
|
||||
final Sink<String, BoxedUnit> sink = Sink.factory().<String>create(new Function<FlowGraph.Builder<BoxedUnit>, Inlet<String>>() {
|
||||
@Override
|
||||
public Inlet<String> apply(Builder<BoxedUnit> b) throws Exception {
|
||||
final UniformFanOutShape<String, String> broadcast = b.graph(Broadcast.<String>create(2, true));
|
||||
|
||||
b.from(broadcast.out(0)).to(out1);
|
||||
b.from(broadcast.out(1)).to(out2);
|
||||
return broadcast.in();
|
||||
}
|
||||
});
|
||||
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
Source<String, ActorRef> source = Source.actorRef(1, OverflowStrategy.dropNew());
|
||||
final ActorRef actor = source.toMat(sink, Keep.<ActorRef, BoxedUnit>left()).run(materializer);
|
||||
probe.watch(actor);
|
||||
probe.expectTerminated(actor);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ object Broadcast {
|
|||
* @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel.
|
||||
*/
|
||||
def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], Unit] =
|
||||
scaladsl.Broadcast(outputCount)
|
||||
scaladsl.Broadcast(outputCount, eagerCancel = eagerCancel)
|
||||
|
||||
/**
|
||||
* Create a new `Broadcast` vertex with the specified input type.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue