/* * Copyright (C) 2015-2017 Lightbend Inc. */ package jdocs.stream; import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.japi.function.Creator; import akka.stream.*; import akka.stream.javadsl.*; import akka.testkit.TestProbe; import jdocs.AbstractJavaTest; import jdocs.stream.TwitterStreamQuickstartDocTest.Model.Author; import jdocs.stream.TwitterStreamQuickstartDocTest.Model.Tweet; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; //#imports import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Processor; //#imports import org.reactivestreams.Subscription; import java.lang.Exception; import static jdocs.stream.ReactiveStreamsDocTest.Fixture.Data.authors; import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.AKKA; public class ReactiveStreamsDocTest extends AbstractJavaTest { static ActorSystem system; static Materializer mat; static TestProbe storageProbe; static TestProbe alertProbe; @BeforeClass public static void setup() { system = ActorSystem.create("ReactiveStreamsDocTest"); mat = ActorMaterializer.create(system); storageProbe = new TestProbe(system); alertProbe = new TestProbe(system); } @AfterClass public static void tearDown() { TestKit.shutdownActorSystem(system); system = null; mat = null; storageProbe = null; alertProbe = null; } static class Fixture { // below class additionally helps with aligning code includes nicely static class Data { static //#authors final Flow authors = Flow.of(Tweet.class) .filter(t -> t.hashtags().contains(AKKA)) .map(t -> t.author); //#authors } static interface RS { //#tweets-publisher Publisher tweets(); //#tweets-publisher //#author-storage-subscriber Subscriber storage(); //#author-storage-subscriber //#author-alert-subscriber Subscriber alert(); //#author-alert-subscriber } } final Fixture.RS rs = new Fixture.RS() { @Override public Publisher tweets() { return TwitterStreamQuickstartDocTest.Model.tweets.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), mat); } /** * This is a minimal version of SubscriberProbe, * which lives in akka-stream-testkit (test scope) and for * now wanted to avoid setting up (test -> compile) dependency for Maven). * * TODO: Once SubscriberProbe is easily used here replace this MPS with it. */ class MinimalProbeSubscriber implements Subscriber { private final ActorRef ref; public MinimalProbeSubscriber(ActorRef ref) { this.ref = ref; } @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(T t) { ref.tell(t, ActorRef.noSender()); } @Override public void onError(Throwable t) { ref.tell(t, ActorRef.noSender()); } @Override public void onComplete() { ref.tell("complete", ActorRef.noSender()); } } @Override public Subscriber storage() { return new MinimalProbeSubscriber<>(storageProbe.ref()); } @Override public Subscriber alert() { return new MinimalProbeSubscriber<>(alertProbe.ref()); } }; @Test public void reactiveStreamsPublisherViaFlowToSubscriber() throws Exception { new TestKit(system) { final TestProbe probe = new TestProbe(system); { //#connect-all Source.fromPublisher(rs.tweets()) .via(authors) .to(Sink.fromSubscriber(rs.storage())); //#connect-all } }; } @Test public void flowAsPublisherAndSubscriber() throws Exception { new TestKit(system) { final TestProbe probe = new TestProbe(system); { //#flow-publisher-subscriber final Processor processor = authors.toProcessor().run(mat); rs.tweets().subscribe(processor); processor.subscribe(rs.storage()); //#flow-publisher-subscriber assertStorageResult(); } }; } @Test public void sourceAsPublisher() throws Exception { new TestKit(system) { final TestProbe probe = new TestProbe(system); { //#source-publisher final Publisher authorPublisher = Source.fromPublisher(rs.tweets()) .via(authors) .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), mat); authorPublisher.subscribe(rs.storage()); //#source-publisher assertStorageResult(); } }; } @Test public void sourceAsFanoutPublisher() throws Exception { new TestKit(system) { final TestProbe probe = new TestProbe(system); { //#source-fanoutPublisher final Publisher authorPublisher = Source.fromPublisher(rs.tweets()) .via(authors) .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat); authorPublisher.subscribe(rs.storage()); authorPublisher.subscribe(rs.alert()); //#source-fanoutPublisher assertStorageResult(); } }; } @Test public void sinkAsSubscriber() throws Exception { new TestKit(system) { final TestProbe probe = new TestProbe(system); { //#sink-subscriber final Subscriber storage = rs.storage(); final Subscriber tweetSubscriber = authors .to(Sink.fromSubscriber(storage)) .runWith(Source.asSubscriber(), mat); rs.tweets().subscribe(tweetSubscriber); //#sink-subscriber assertStorageResult(); } }; } @Test public void useProcessor() throws Exception { new TestKit(system) { { //#use-processor // An example Processor factory final Creator> factory = new Creator>() { public Processor create() { return Flow.of(Integer.class).toProcessor().run(mat); } }; final Flow flow = Flow.fromProcessor(factory); //#use-processor } }; } void assertStorageResult() { storageProbe.expectMsg(new Author("rolandkuhn")); storageProbe.expectMsg(new Author("patriknw")); storageProbe.expectMsg(new Author("bantonsson")); storageProbe.expectMsg(new Author("drewhk")); storageProbe.expectMsg(new Author("ktosopl")); storageProbe.expectMsg(new Author("mmartynas")); storageProbe.expectMsg(new Author("akkateam")); storageProbe.expectMsg("complete"); } }