#19522 make javadsl.Sink.asPublisher nicer

This commit is contained in:
Roland Kuhn 2016-01-20 21:01:27 +01:00
parent b1351b36ed
commit 9427052fd0
12 changed files with 43 additions and 12 deletions

View file

@ -7,6 +7,9 @@ import java.util.stream.Stream;
import akka.japi.Pair;
import akka.stream.javadsl.*;
//#asPublisher-import
import static akka.stream.javadsl.AsPublisher.*;
//#asPublisher-import
public class MigrationsJava {
@ -19,6 +22,11 @@ public class MigrationsJava {
Stream.iterate(new Pair<>(in, 0),
p -> new Pair<>(in, p.second() + 1)).iterator());
//#expand-state
//#asPublisher
Sink.asPublisher(WITH_FANOUT); // instead of Sink.asPublisher(true)
Sink.asPublisher(WITHOUT_FANOUT); // instead of Sink.asPublisher(false)
//#asPublisher
}
}

View file

@ -83,7 +83,7 @@ public class ReactiveStreamsDocTest {
final Fixture.RS rs = new Fixture.RS() {
@Override
public Publisher<Tweet> tweets() {
return TwitterStreamQuickstartDocTest.Model.tweets.runWith(Sink.asPublisher(false), mat);
return TwitterStreamQuickstartDocTest.Model.tweets.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), mat);
}
/**
@ -177,7 +177,9 @@ public class ReactiveStreamsDocTest {
{
//#source-publisher
final Publisher<Author> authorPublisher =
Source.fromPublisher(rs.tweets()).via(authors).runWith(Sink.asPublisher(false), mat);
Source.fromPublisher(rs.tweets())
.via(authors)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), mat);
authorPublisher.subscribe(rs.storage());
//#source-publisher
@ -197,7 +199,7 @@ public class ReactiveStreamsDocTest {
final Publisher<Author> authorPublisher =
Source.fromPublisher(rs.tweets())
.via(authors)
.runWith(Sink.asPublisher(true), mat);
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
authorPublisher.subscribe(rs.storage());
authorPublisher.subscribe(rs.alert());