/** * Copyright (C) 2015-2016 Typesafe */ package docs.stream.javadsl.cookbook; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.*; import akka.stream.javadsl.*; import akka.stream.testkit.TestPublisher; import akka.stream.testkit.TestSubscriber; import akka.stream.testkit.javadsl.TestSink; import akka.stream.testkit.javadsl.TestSource; import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; import java.util.Arrays; import java.util.concurrent.TimeUnit; public class RecipeManualTrigger extends RecipeTest { static ActorSystem system; static Materializer mat; @BeforeClass public static void setup() { system = ActorSystem.create("RecipeManualTrigger"); mat = ActorMaterializer.create(system); } @AfterClass public static void tearDown() { JavaTestKit.shutdownActorSystem(system); system = null; mat = null; } class Trigger { } public final Trigger TRIGGER = new Trigger(); @Test public void zipped() throws Exception { new JavaTestKit(system) { { final Source> triggerSource = TestSource.probe(system); final Sink> messageSink = TestSink.probe(system); //#manually-triggered-stream final RunnableGraph, TestSubscriber.Probe>> g = RunnableGraph., TestSubscriber.Probe>>fromGraph( GraphDSL.create( triggerSource, messageSink, (p, s) -> new Pair<>(p, s), (builder, source, sink) -> { SourceShape elements = builder.add(Source.from(Arrays.asList("1", "2", "3", "4")).map(t -> new Message(t))); FlowShape, Message> takeMessage = builder.add(Flow.>create().map(p -> p.first())); final FanInShape2> zip = builder.add(Zip.create()); builder.from(elements).toInlet(zip.in0()); builder.from(source).toInlet(zip.in1()); builder.from(zip.out()).via(takeMessage).to(sink); return ClosedShape.getInstance(); } ) ); //#manually-triggered-stream Pair, TestSubscriber.Probe> pubSub = g.run(mat); TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS); sub.expectSubscription().request(1000); sub.expectNoMsg(timeout); pub.sendNext(TRIGGER); sub.expectNext(new Message("1")); sub.expectNoMsg(timeout); pub.sendNext(TRIGGER); pub.sendNext(TRIGGER); sub.expectNext(new Message("2")); sub.expectNext(new Message("3")); sub.expectNoMsg(timeout); pub.sendNext(TRIGGER); sub.expectNext(new Message("4")); sub.expectComplete(); } }; } @Test public void zipWith() throws Exception { new JavaTestKit(system) { { final Source> triggerSource = TestSource.probe(system); final Sink> messageSink = TestSink.probe(system); //#manually-triggered-stream-zipwith final RunnableGraph, TestSubscriber.Probe>> g = RunnableGraph., TestSubscriber.Probe>>fromGraph( GraphDSL.create( triggerSource, messageSink, (p, s) -> new Pair<>(p, s), (builder, source, sink) -> { final SourceShape elements = builder.add(Source.from(Arrays.asList("1", "2", "3", "4")).map(t -> new Message(t))); final FanInShape2 zipWith = builder.add(ZipWith.create((msg, trigger) -> msg)); builder.from(elements).toInlet(zipWith.in0()); builder.from(source).toInlet(zipWith.in1()); builder.from(zipWith.out()).to(sink); return ClosedShape.getInstance(); } ) ); //#manually-triggered-stream-zipwith Pair, TestSubscriber.Probe> pubSub = g.run(mat); TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS); sub.expectSubscription().request(1000); sub.expectNoMsg(timeout); pub.sendNext(TRIGGER); sub.expectNext(new Message("1")); sub.expectNoMsg(timeout); pub.sendNext(TRIGGER); pub.sendNext(TRIGGER); sub.expectNext(new Message("2")); sub.expectNext(new Message("3")); sub.expectNoMsg(timeout); pub.sendNext(TRIGGER); sub.expectNext(new Message("4")); sub.expectComplete(); } }; } }