diff --git a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java index 84bf97bf28..1eb8c1b795 100644 --- a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java +++ b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java @@ -7,6 +7,9 @@ import java.util.stream.Stream; import akka.japi.Pair; import akka.stream.javadsl.*; +//#asPublisher-import +import static akka.stream.javadsl.AsPublisher.*; +//#asPublisher-import public class MigrationsJava { @@ -19,6 +22,11 @@ public class MigrationsJava { Stream.iterate(new Pair<>(in, 0), p -> new Pair<>(in, p.second() + 1)).iterator()); //#expand-state + + //#asPublisher + Sink.asPublisher(WITH_FANOUT); // instead of Sink.asPublisher(true) + Sink.asPublisher(WITHOUT_FANOUT); // instead of Sink.asPublisher(false) + //#asPublisher } } \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/stream/ReactiveStreamsDocTest.java b/akka-docs/rst/java/code/docs/stream/ReactiveStreamsDocTest.java index eeea90c438..542127e999 100644 --- a/akka-docs/rst/java/code/docs/stream/ReactiveStreamsDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/ReactiveStreamsDocTest.java @@ -83,7 +83,7 @@ public class ReactiveStreamsDocTest { final Fixture.RS rs = new Fixture.RS() { @Override public Publisher tweets() { - return TwitterStreamQuickstartDocTest.Model.tweets.runWith(Sink.asPublisher(false), mat); + return TwitterStreamQuickstartDocTest.Model.tweets.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), mat); } /** @@ -177,7 +177,9 @@ public class ReactiveStreamsDocTest { { //#source-publisher final Publisher authorPublisher = - Source.fromPublisher(rs.tweets()).via(authors).runWith(Sink.asPublisher(false), mat); + Source.fromPublisher(rs.tweets()) + .via(authors) + .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), mat); authorPublisher.subscribe(rs.storage()); //#source-publisher @@ -197,7 +199,7 @@ public class ReactiveStreamsDocTest { final Publisher authorPublisher = Source.fromPublisher(rs.tweets()) .via(authors) - .runWith(Sink.asPublisher(true), mat); + .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat); authorPublisher.subscribe(rs.storage()); authorPublisher.subscribe(rs.alert()); diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index bcb5b2db83..cbc4c409ae 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -73,3 +73,15 @@ In Akka 2.4.x this is formulated like so: .. includecode:: ../code/docs/stream/MigrationsJava.java#expand-state +Changed Sinks +============= + +Sink.asPublisher is now configured using an enum +------------------------------------------------ + +In order to not use a meaningless boolean parameter we have changed the signature to: + +.. includecode:: ../code/docs/stream/MigrationsJava.java#asPublisher-import + +.. includecode:: ../code/docs/stream/MigrationsJava.java#asPublisher + diff --git a/akka-docs/rst/java/stream/stream-integrations.rst b/akka-docs/rst/java/stream/stream-integrations.rst index 1f7600d961..035d6fd291 100644 --- a/akka-docs/rst/java/stream/stream-integrations.rst +++ b/akka-docs/rst/java/stream/stream-integrations.rst @@ -108,7 +108,7 @@ This is how it can be used as input :class:`Source` to a :class:`Flow`: .. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage You can only attach one subscriber to this publisher. Use a ``Broadcast``-element or -attach a ``Sink.asPublisher(true)`` to enable multiple subscribers. +attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscribers. ActorSubscriber ^^^^^^^^^^^^^^^ @@ -414,7 +414,7 @@ by using the Publisher-:class:`Sink`: .. includecode:: ../code/docs/stream/ReactiveStreamsDocTest.java#source-publisher -A publisher that is created with ``Sink.asPublisher(false)`` supports only a single subscription. +A publisher that is created with ``Sink.asPublisher(AsPublisher.WITHOUT_FANOUT)`` supports only a single subscription. Additional subscription attempts will be rejected with an :class:`IllegalStateException`. A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows: diff --git a/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index 65eab25877..4ffd9d3bc3 100644 --- a/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -42,7 +42,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { val impl = new Fixture { override def tweets: Publisher[Tweet] = - TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.asPublisher(false)) + TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.asPublisher(fanout = false)) override def storage = TestSubscriber.manualProbe[Author] diff --git a/akka-docs/rst/scala/stream/stream-integrations.rst b/akka-docs/rst/scala/stream/stream-integrations.rst index d1a858b113..b710ccda17 100644 --- a/akka-docs/rst/scala/stream/stream-integrations.rst +++ b/akka-docs/rst/scala/stream/stream-integrations.rst @@ -409,7 +409,7 @@ by using the Publisher-:class:`Sink`: .. includecode:: ../code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher -A publisher that is created with ``Sink.asPublisher(false)`` supports only a single subscription. +A publisher that is created with ``Sink.asPublisher(fanout = false)`` supports only a single subscription. Additional subscription attempts will be rejected with an :class:`IllegalStateException`. A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows: diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index e8179129a4..9313832c3d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -68,7 +68,7 @@ public class FlowGraphTest extends StreamTest { final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - final Sink> publisher = Sink.asPublisher(false); + final Sink> publisher = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT); final Source source = Source.fromGraph( GraphDSL.create(new Function, SourceShape>() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 846d85013a..324caee47d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -337,7 +337,7 @@ public class FlowTest extends StreamTest { final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - final Sink> publisher = Sink.asPublisher(false); + final Sink> publisher = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT); final Source source = Source.fromGraph( GraphDSL.create(new Function, SourceShape>() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 2f12b3f7bf..c10a9b0201 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -33,7 +33,7 @@ public class SinkTest extends StreamTest { @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { - final Sink> pubSink = Sink.asPublisher(true); + final Sink> pubSink = Sink.asPublisher(AsPublisher.WITH_FANOUT); @SuppressWarnings("unused") final Publisher publisher = Source.from(new ArrayList()).runWith(pubSink, materializer); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 8924f4b78c..afad9e1854 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -30,6 +30,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: (classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) :: + (classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: diff --git a/akka-stream/src/main/java/akka/stream/javadsl/AsPublisher.java b/akka-stream/src/main/java/akka/stream/javadsl/AsPublisher.java new file mode 100644 index 0000000000..aaba7caca7 --- /dev/null +++ b/akka-stream/src/main/java/akka/stream/javadsl/AsPublisher.java @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.stream.javadsl; + +public enum AsPublisher { + WITH_FANOUT, WITHOUT_FANOUT +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 6cb19738f9..fb1bc07461 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -58,8 +58,8 @@ object Sink { * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and * reject any additional `Subscriber`s. */ - def asPublisher[T](fanout: Boolean): Sink[T, Publisher[T]] = - new Sink(scaladsl.Sink.asPublisher(fanout)) + def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = + new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized