diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index 2a5ae42302..5cfd4e4dd7 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -287,6 +287,23 @@ should be replaced by .. includecode:: code/docs/MigrationsJava.java#flatMapConcat +`Sink.fanoutPublisher() and Sink.publisher() is now a single method` +==================================================================== + +It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support +a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher). +To make the decision whether to support fanout or not an active one, the aforementioned methods have been +replaced with a single method: ``Sink.publisher(fanout: Boolean)``. + +Update procedure +---------------- + +1. Replace all occurences of ``Sink.publisher`` with ``Sink.publisher(false)`` +2. Replace all occurences of ``Sink.fanoutPublisher`` with ``Sink.publisher(true)`` + +TODO: code example + + FlexiMerge an FlexiRoute has been replaced by GraphStage ======================================================== diff --git a/akka-docs-dev/rst/java/stream-integrations.rst b/akka-docs-dev/rst/java/stream-integrations.rst index 511ba65d22..5284de1dd0 100644 --- a/akka-docs-dev/rst/java/stream-integrations.rst +++ b/akka-docs-dev/rst/java/stream-integrations.rst @@ -107,8 +107,8 @@ This is how it can be used as input :class:`Source` to a :class:`Flow`: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage -You can only attach one subscriber to this publisher. Increase the max number of subscribers parameter or use a `Broadcast` element -in order to support multiple subscribers. +You can only attach one subscriber to this publisher. Use a ``Broadcast``-element or +attach a ``Sink.publisher(true)`` to enable multiple subscribers. ActorSubscriber ^^^^^^^^^^^^^^^ @@ -414,10 +414,10 @@ by using the Publisher-:class:`Sink`: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#source-publisher -A publisher that is created with ``Sink.publisher`` supports a specified number of subscribers. Additional -subscription attempts will be rejected with an :class:`IllegalStateException`. +A publisher that is created with ``Sink.publisher(false)`` supports only a single subscription. +Additional subscription attempts will be rejected with an :class:`IllegalStateException`. -A publisher that supports multiple subscribers is created as follows: +A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java :include: author-alert-subscriber,author-storage-subscriber diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index f15251c045..800e1aff4c 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -41,7 +41,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { val impl = new Fixture { override def tweets: Publisher[Tweet] = - TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(1)) + TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(false)) override def storage = TestSubscriber.manualProbe[Author] @@ -92,7 +92,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { //#source-publisher val authorPublisher: Publisher[Author] = - Source(tweets).via(authors).runWith(Sink.publisher(1)) + Source(tweets).via(authors).runWith(Sink.publisher(fanout = false)) authorPublisher.subscribe(storage) //#source-publisher @@ -108,7 +108,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { //#source-fanoutPublisher val authorPublisher: Publisher[Author] = Source(tweets).via(authors) - .runWith(Sink.publisher(maxNumberOfSubscribers = Int.MaxValue)) + .runWith(Sink.publisher(fanout = true)) authorPublisher.subscribe(storage) authorPublisher.subscribe(alert) diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index 4d969a7991..9e541c096d 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -409,10 +409,10 @@ by using the Publisher-:class:`Sink`: .. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher -A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second -subscription attempt will be rejected with an :class:`IllegalStateException`. +A publisher that is created with ``Sink.publisher(false)`` supports only a single subscription. +Additional subscription attempts will be rejected with an :class:`IllegalStateException`. -A publisher that supports multiple subscribers is created as follows: +A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows: .. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala :include: author-alert-subscriber,author-storage-subscriber diff --git a/akka-docs-dev/rst/stream-design.rst b/akka-docs-dev/rst/stream-design.rst index 17ac52d9f1..2336be2429 100644 --- a/akka-docs-dev/rst/stream-design.rst +++ b/akka-docs-dev/rst/stream-design.rst @@ -38,7 +38,7 @@ Akka Streams fully implement the Reactive Streams specification and interoperate All stream Processors produced by the default materialization of Akka Streams are restricted to having a single Subscriber, additional Subscribers will be rejected. The reason for this is that the stream topologies described using our DSL never require fan-out behavior from the Publisher sides of the elements, all fan-out is done using explicit elements like :class:`Broadcast[T]`. -This means that ``Sink.publisher()`` must be used where broadcast behavior is needed for interoperation with other Reactive Streams implementations. +This means that ``Sink.publisher(true)`` must be used where broadcast behavior is needed for interoperation with other Reactive Streams implementations. What shall users of streaming libraries expect? ----------------------------------------------- diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 047107af8f..1aae759a8d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -159,11 +159,11 @@ private[http] object StreamUtils { case Nil ⇒ Nil case Seq(one) ⇒ Vector(input.via(one)) case multiple ⇒ - val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.publisher(transformers.size))(Keep.both).run() + val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.publisher(true))(Keep.both).run() val sources = transformers.map { flow ⇒ // Doubly wrap to ensure that subscription to the running publisher happens before the final sources // are exposed, so there is no race - Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher(1))) + Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher(false))) } // The fanout publisher must be wired to the original source after all fanout subscribers have been subscribed input.runWith(Sink(fanoutSub)) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 2d07ca635a..a4b93dcbc0 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -406,7 +406,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def acceptConnection(): (TestSubscriber.ManualProbe[HttpRequest], TestPublisher.ManualProbe[HttpResponse]) = { connSourceSub.request(1) val incomingConnection = connSource.expectNext() - val sink = Sink.publisher[HttpRequest](1) + val sink = Sink.publisher[HttpRequest](false) val source = Source.subscriber[HttpResponse] val handler = Flow.fromSinkAndSourceMat(sink, source)(Keep.both) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala index 63cbec91b8..fda7efcd7b 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala @@ -10,7 +10,7 @@ import org.reactivestreams.Publisher class ConcatTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher(1)) + Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher(false)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala index 10451a76f2..1668a9a31c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala @@ -15,7 +15,7 @@ class FanoutPublisherTest extends AkkaPublisherVerification[Int] { if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 } else 0 until elements.toInt - Source(iterable).runWith(Sink.publisher(Int.MaxValue)) + Source(iterable).runWith(Sink.publisher(true)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index e0e611b12c..5257cd018c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -13,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val s1 = Source(iterable(elements / 2)) val s2 = Source(iterable((elements + 1) / 2)) - Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher(1)) + Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher(false)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala index 2260228b57..7c72dc7d88 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala @@ -13,7 +13,7 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val p = Promise[Int]() - val pub = Source(p.future).runWith(Sink.publisher(1)) + val pub = Source(p.future).runWith(Sink.publisher(false)) p.success(0) pub } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala index cfec442b9c..61e6e6ea58 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala @@ -19,7 +19,7 @@ class GroupByTest extends AkkaPublisherVerification[Int] { val futureGroupSource = Source(iterable(elements)).groupBy(elem ⇒ "all").map { case (_, group) ⇒ group }.runWith(Sink.head) val groupSource = Await.result(futureGroupSource, 3.seconds) - groupSource.runWith(Sink.publisher(1)) + groupSource.runWith(Sink.publisher(false)) } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala index 1503b30344..ec107f078c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -11,7 +11,7 @@ import org.reactivestreams._ class IterablePublisherTest extends AkkaPublisherVerification[Int] { override def createPublisher(elements: Long): Publisher[Int] = { - Source(iterable(elements)).runWith(Sink.publisher(1)) + Source(iterable(elements)).runWith(Sink.publisher(false)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala index 6d1e97bfdd..bc02853256 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala @@ -10,7 +10,7 @@ import akka.stream.scaladsl.{ Keep, Source, Sink } class MaybeSourceTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - val (p, pub) = Source.maybe[Int].toMat(Sink.publisher(1))(Keep.both).run() + val (p, pub) = Source.maybe[Int].toMat(Sink.publisher(false))(Keep.both).run() p success Some(1) pub } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala index 826a753808..0262ccc6dd 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala @@ -15,7 +15,7 @@ class PrefixAndTailTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) ⇒ tail }.runWith(Sink.head) val tailSource = Await.result(futureTailSource, 3.seconds) - tailSource.runWith(Sink.publisher(1)) + tailSource.runWith(Sink.publisher(false)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala index 54b3ca4502..32201ca5b1 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala @@ -13,7 +13,7 @@ import org.reactivestreams._ class SingleElementPublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher(1)) + Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher(false)) } override def maxElementsFromPublisher(): Long = 1 diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala index 62bb35dcce..4267e602a4 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala @@ -11,7 +11,7 @@ import org.reactivestreams.Publisher class SingleElementSourceTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = - Source.single(1).runWith(Sink.publisher(1)) + Source.single(1).runWith(Sink.publisher(false)) override def maxElementsFromPublisher(): Long = 1 } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala index 333d99c7c7..9004943776 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala @@ -18,7 +18,7 @@ class SplitWhenTest extends AkkaPublisherVerification[Int] { else { val futureSource = Source(iterable(elements)).splitWhen(elem ⇒ false).runWith(Sink.head) val source = Await.result(futureSource, 3.seconds) - source.runWith(Sink.publisher(1)) + source.runWith(Sink.publisher(false)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala index 371967795d..2fa3b857e8 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala @@ -39,7 +39,7 @@ class SynchronousFilePublisherTest extends AkkaPublisherVerification[ByteString] def createPublisher(elements: Long): Publisher[ByteString] = SynchronousFileSource(file, chunkSize = 512) .take(elements) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) @AfterClass def after = file.delete() diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala index 0abd1aa2db..72e36764d7 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala @@ -27,7 +27,7 @@ abstract class BaseTwoStreamsSetup extends AkkaSpec { def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T] - def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher(1)) + def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher(false)) def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException) diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala index ccfbe430a2..e2b0e45d5a 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -20,7 +20,7 @@ trait ScriptedTest extends Matchers { class ScriptException(msg: String) extends RuntimeException(msg) def toPublisher[In, Out]: (Source[Out, _], ActorMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.runWith(Sink.publisher(1))(m) + (f, m) ⇒ f.runWith(Sink.publisher(false))(m) object Script { def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = { diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala index 6e9cf406fd..f500d6c45b 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala @@ -22,7 +22,7 @@ class TestPublisherSubscriberSpec extends AkkaSpec { "have all events accessible from manual probes" in assertAllStagesStopped { val upstream = TestPublisher.manualProbe[Int]() val downstream = TestSubscriber.manualProbe[Int]() - Source(upstream).runWith(Sink.publisher(1))(materializer).subscribe(downstream) + Source(upstream).runWith(Sink.publisher(false))(materializer).subscribe(downstream) val upstreamSubscription = upstream.expectSubscription() val downstreamSubscription: Subscription = downstream.expectEventPF { case OnSubscribe(sub) ⇒ sub } @@ -46,7 +46,7 @@ class TestPublisherSubscriberSpec extends AkkaSpec { "handle gracefully partial function that is not suitable" in assertAllStagesStopped { val upstream = TestPublisher.manualProbe[Int]() val downstream = TestSubscriber.manualProbe[Int]() - Source(upstream).runWith(Sink.publisher(1))(materializer).subscribe(downstream) + Source(upstream).runWith(Sink.publisher(false))(materializer).subscribe(downstream) val upstreamSubscription = upstream.expectSubscription() val downstreamSubscription: Subscription = downstream.expectEventPF { case OnSubscribe(sub) ⇒ sub } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 85fc779e4c..337775e5ac 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -67,7 +67,7 @@ public class FlowGraphTest extends StreamTest { final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - final Sink> publisher = Sink.publisher(1); + final Sink> publisher = Sink.publisher(false); final Source source = Source.fromGraph( FlowGraph.create(new Function, SourceShape>() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 5c63bf649c..75cb2cf011 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -387,7 +387,7 @@ public class FlowTest extends StreamTest { final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - final Sink> publisher = Sink.publisher(1); + final Sink> publisher = Sink.publisher(false); final Source source = Source.fromGraph( FlowGraph.create(new Function, SourceShape>() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 563fc006f1..4cc982c71c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -39,7 +39,7 @@ public class SinkTest extends StreamTest { @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { - final Sink> pubSink = Sink.publisher(Integer.MAX_VALUE); + final Sink> pubSink = Sink.publisher(true); @SuppressWarnings("unused") final Publisher publisher = Source.from(new ArrayList()).runWith(pubSink, materializer); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index c4129ee538..3c08de4171 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -104,12 +104,12 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { map(_.toString), duration ⇒ probe.ref ! duration). map { s: String ⇒ s + "!" } - val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = flow.runWith(Source.subscriber[Int], Sink.publisher[String](1)) + val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = flow.runWith(Source.subscriber[Int], Sink.publisher[String](false)) val c1 = TestSubscriber.manualProbe[String]() val c2 = flowOut.subscribe(c1) - val p = Source(0 to 100).runWith(Sink.publisher(1)) + val p = Source(0 to 100).runWith(Sink.publisher(false)) p.subscribe(flowIn) val s = c1.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index 1a3dd2a37d..e2ff970263 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -76,7 +76,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val p = SynchronousFileSource(testFile, chunkSize) .withAttributes(bufferAttributes) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) val sub = c.expectSubscription() @@ -113,7 +113,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val p = SynchronousFileSource(testFile, chunkSize) .withAttributes(bufferAttributes) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) @@ -140,7 +140,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "onError whent trying to read from file which does not exist" in assertAllStagesStopped { - val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher(1)) + val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index acbc81329d..3b03852a34 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -41,15 +41,15 @@ class FlowCompileSpec extends AkkaSpec { val closedSource: Source[Int, _] = intSeq.via(open3) "closedSource.run()" shouldNot compile - val closedSink: Sink[Int, _] = open3.to(Sink.publisher[Int](1)) + val closedSink: Sink[Int, _] = open3.to(Sink.publisher[Int](false)) "closedSink.run()" shouldNot compile - closedSource.to(Sink.publisher[Int](1)).run() + closedSource.to(Sink.publisher[Int](false)).run() intSeq.to(closedSink).run() } "append Sink" in { val open: Flow[Int, String, _] = Flow[Int].map(_.toString) - val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.publisher[Int](1)) + val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.publisher[Int](false)) val appended: Sink[Int, _] = open.to(closedSink) "appended.run()" shouldNot compile "appended.connect(Sink.head[Int])" shouldNot compile @@ -61,13 +61,13 @@ class FlowCompileSpec extends AkkaSpec { val closedSource2: Source[String, _] = closedSource.via(open) "closedSource2.run()" shouldNot compile "strSeq.connect(closedSource2)" shouldNot compile - closedSource2.to(Sink.publisher[String](1)).run + closedSource2.to(Sink.publisher[String](false)).run } } "Sink" should { val openSource: Sink[Int, _] = - Flow[Int].map(_.toString).to(Sink.publisher[String](1)) + Flow[Int].map(_.toString).to(Sink.publisher[String](false)) "accept Source" in { intSeq.to(openSource) } @@ -83,7 +83,7 @@ class FlowCompileSpec extends AkkaSpec { val openSource: Source[String, _] = Source(Seq(1, 2, 3)).map(_.toString) "accept Sink" in { - openSource.to(Sink.publisher[String](1)) + openSource.to(Sink.publisher[String](false)) } "not be accepted by Source" in { "openSource.connect(intSeq)" shouldNot compile @@ -96,7 +96,7 @@ class FlowCompileSpec extends AkkaSpec { "RunnableGraph" should { Sink.head[String] val closed: RunnableGraph[Publisher[String]] = - Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String](1))(Keep.right) + Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String](false))(Keep.right) "run" in { closed.run() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index c9c7b2b9f2..1cf7669d99 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -29,7 +29,7 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s") val subs = TestSubscriber.manualProbe[Any]() - val subSink = Sink.publisher[Any](1) + val subSink = Sink.publisher[Any](false) val (_, res) = f1.concat(s2).runWith(s1, subSink) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index d5f7d64222..16353457b6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -21,7 +21,7 @@ class FlowFromFutureSpec extends AkkaSpec { "A Flow based on a Future" must { "produce one element from already successful Future" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() - val p = Source(Future.successful(1)).runWith(Sink.publisher(Int.MaxValue)).subscribe(c) + val p = Source(Future.successful(1)).runWith(Sink.publisher(true)).subscribe(c) val sub = c.expectSubscription() c.expectNoMsg(100.millis) sub.request(1) @@ -32,14 +32,14 @@ class FlowFromFutureSpec extends AkkaSpec { "produce error from already failed Future" in assertAllStagesStopped { val ex = new RuntimeException("test") with NoStackTrace val c = TestSubscriber.manualProbe[Int]() - Source(Future.failed[Int](ex)).runWith(Sink.publisher(1)).subscribe(c) + Source(Future.failed[Int](ex)).runWith(Sink.publisher(false)).subscribe(c) c.expectSubscriptionAndError(ex) } "produce one element when Future is completed" in assertAllStagesStopped { val promise = Promise[Int]() val c = TestSubscriber.manualProbe[Int]() - Source(promise.future).runWith(Sink.publisher(Int.MaxValue)).subscribe(c) + Source(promise.future).runWith(Sink.publisher(true)).subscribe(c) val sub = c.expectSubscription() sub.request(1) c.expectNoMsg(100.millis) @@ -52,7 +52,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed but not before request" in { val promise = Promise[Int]() val c = TestSubscriber.manualProbe[Int]() - Source(promise.future).runWith(Sink.publisher(Int.MaxValue)).subscribe(c) + Source(promise.future).runWith(Sink.publisher(true)).subscribe(c) val sub = c.expectSubscription() promise.success(1) c.expectNoMsg(200.millis) @@ -63,7 +63,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements with multiple subscribers" in assertAllStagesStopped { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher(Int.MaxValue)) + val p = Source(promise.future).runWith(Sink.publisher(true)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -81,7 +81,7 @@ class FlowFromFutureSpec extends AkkaSpec { "allow cancel before receiving element" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher(Int.MaxValue)) + val p = Source(promise.future).runWith(Sink.publisher(true)) val keepAlive = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]() p.subscribe(keepAlive) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 7647040a18..20ee5ef0c6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -36,7 +36,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val in1 = Source(List("a", "b", "c")) val in2 = Source(List("d", "e", "f")) - val out1 = Sink.publisher[String](1) + val out1 = Sink.publisher[String](false) val out2 = Sink.head[String] "A Graph" should { @@ -165,9 +165,9 @@ class FlowGraphCompileSpec extends AkkaSpec { val in3 = Source(List("b")) val in5 = Source(List("b")) val in7 = Source(List("a")) - val out2 = Sink.publisher[String](1) - val out9 = Sink.publisher[String](1) - val out10 = Sink.publisher[String](1) + val out2 = Sink.publisher[String](false) + val out9 = Sink.publisher[String](false) + val out10 = Sink.publisher[String](false) def f(s: String) = Flow[String].transform(op[String, String]).named(s) import FlowGraph.Implicits._ @@ -198,7 +198,7 @@ class FlowGraphCompileSpec extends AkkaSpec { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ val zip = b.add(Zip[Int, String]()) val unzip = b.add(Unzip[Int, String]()) - val out = Sink.publisher[(Int, String)](1) + val out = Sink.publisher[(Int, String)](false) import FlowGraph.Implicits._ Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in unzip.out0 ~> Flow[Int].map(_ * 2) ~> zip.in0 @@ -213,8 +213,8 @@ class FlowGraphCompileSpec extends AkkaSpec { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ val zip = b.add(Zip[Int, String]()) val unzip = b.add(Unzip[Int, String]()) - val wrongOut = Sink.publisher[(Int, Int)](1) - val whatever = Sink.publisher[Any](1) + val wrongOut = Sink.publisher[(Int, Int)](false) + val whatever = Sink.publisher[Any](false) "Flow(List(1, 2, 3)) ~> zip.left ~> wrongOut" shouldNot compile """Flow(List("a", "b", "c")) ~> zip.left""" shouldNot compile """Flow(List("a", "b", "c")) ~> zip.out""" shouldNot compile @@ -278,7 +278,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val outB = b add Sink(TestSubscriber.manualProbe[Fruit]()) val merge = b add Merge[Fruit](11) val unzip = b add Unzip[Int, String]() - val whatever = b add Sink.publisher[Any](1) + val whatever = b add Sink.publisher[Any](false) import FlowGraph.Implicits._ b.add(Source[Fruit](apples)) ~> merge.in(0) appleSource ~> merge.in(1) @@ -293,12 +293,12 @@ class FlowGraphCompileSpec extends AkkaSpec { b.add(Source(apples)) ~> Flow[Apple] ~> merge.in(9) b.add(Source(apples)) ~> Flow[Apple] ~> outB - b.add(Source(apples)) ~> Flow[Apple] ~> b.add(Sink.publisher[Fruit](1)) + b.add(Source(apples)) ~> Flow[Apple] ~> b.add(Sink.publisher[Fruit](false)) appleSource ~> Flow[Apple] ~> merge.in(10) Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in unzip.out1 ~> whatever - unzip.out0 ~> b.add(Sink.publisher[Any](1)) + unzip.out0 ~> b.add(Sink.publisher[Any](false)) "merge.out ~> b.add(Broadcast[Apple](2))" shouldNot compile "merge.out ~> Flow[Fruit].map(identity) ~> b.add(Broadcast[Apple](2))" shouldNot compile diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 2c0d38db78..807dc9075c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -35,8 +35,8 @@ class FlowGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Source(1 to elementCount).runWith(Sink.publisher(1)) - val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher(1)) + val source = Source(1 to elementCount).runWith(Sink.publisher(false)) + val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher(false)) val masterSubscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() groupStream.subscribe(masterSubscriber) @@ -58,7 +58,7 @@ class FlowGroupBySpec extends AkkaSpec { "groupBy" must { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 2) { - val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher(1))) + val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher(false))) masterSubscriber.expectNoMsg(100.millis) s1.expectNoMsg(100.millis) @@ -66,7 +66,7 @@ class FlowGroupBySpec extends AkkaSpec { s1.expectNext(1) s1.expectNoMsg(100.millis) - val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher(1))) + val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher(false))) s2.expectNoMsg(100.millis) s2.request(2) @@ -95,9 +95,9 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of substreams" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 2) { - StreamPuppet(getSubFlow(1).runWith(Sink.publisher(1))).cancel() + StreamPuppet(getSubFlow(1).runWith(Sink.publisher(false))).cancel() - val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher(1))) + val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher(false))) substream.request(2) substream.expectNext(2) substream.expectNext(4) @@ -113,7 +113,7 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of master stream when not consumed anything" in assertAllStagesStopped { val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -125,7 +125,7 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of master stream when substreams are open" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 3, elementCount = 13) { - val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher(1))) + val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher(false))) substream.request(1) substream.expectNext(1) @@ -144,7 +144,7 @@ class FlowGroupBySpec extends AkkaSpec { } "work with empty input stream" in assertAllStagesStopped { - val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher(1)) + val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -153,7 +153,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream" in assertAllStagesStopped { val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -170,7 +170,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream when substreams are running" in assertAllStagesStopped { val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -182,7 +182,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(false))) substreamPuppet.request(1) substreamPuppet.expectNext(1) @@ -200,7 +200,7 @@ class FlowGroupBySpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) @@ -212,7 +212,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(false))) substreamPuppet.request(1) substreamPuppet.expectNext(1) @@ -230,7 +230,7 @@ class FlowGroupBySpec extends AkkaSpec { val publisher = Source(publisherProbeProbe) .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) @@ -242,7 +242,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream1) = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(1))) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(false))) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -250,7 +250,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(4) val (_, substream2) = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(1))) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(false))) substreamPuppet2.request(10) substreamPuppet2.expectNext(4) // note that 2 was dropped diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 27ea52d7f6..d87bedc551 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -31,7 +31,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { override def iterator: Iterator[Int] = (1 to 3).iterator.map(x ⇒ if (x == 2) throw new IllegalStateException("not two") else x) } - val p = Source(iterable).runWith(Sink.publisher(1)) + val p = Source(iterable).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -48,7 +48,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { val iterable = new immutable.Iterable[Int] { override def iterator: Iterator[Int] = throw new IllegalStateException("no good iterator") } - val p = Source(iterable).runWith(Sink.publisher(1)) + val p = Source(iterable).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError().getMessage should be("no good iterator") @@ -62,7 +62,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { override def next(): Int = -1 } } - val p = Source(iterable).runWith(Sink.publisher(1)) + val p = Source(iterable).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError().getMessage should be("no next") @@ -84,7 +84,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { testName must { "produce elements" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.publisher(1)) + val p = createSource(3).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -98,7 +98,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "complete empty" in assertAllStagesStopped { - val p = createSource(0).runWith(Sink.publisher(1)) + val p = createSource(0).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndComplete() @@ -106,7 +106,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with multiple subscribers" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.publisher(Int.MaxValue)) + val p = createSource(3).runWith(Sink.publisher(true)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -130,7 +130,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.publisher(Int.MaxValue)) + val p = createSource(3).runWith(Sink.publisher(true)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -153,7 +153,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in assertAllStagesStopped { - val p = createSource(3).map(_ * 2).runWith(Sink.publisher(1)) + val p = createSource(3).map(_ * 2).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -165,7 +165,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in assertAllStagesStopped { - val p = createSource(4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher(1)) + val p = createSource(4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -176,7 +176,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "not produce after cancel" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.publisher(1)) + val p = createSource(3).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala index 177178af81..ea9bb2dcf5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala @@ -27,7 +27,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { val probe = TestSubscriber.manualProbe[Int]() Source(List(1)). map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1). - runWith(Sink.publisher(1)).subscribe(probe) + runWith(Sink.publisher(false)).subscribe(probe) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index e0563b7e2b..cddc73939f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -202,7 +202,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (_, tail) = Await.result(f, 3.seconds) - val tailPub = tail.runWith(Sink.publisher(1)) + val tailPub = tail.runWith(Sink.publisher(false)) s.sendComplete() tailPub.subscribe(sub) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index e9981046e6..5fb66a0742 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -89,13 +89,13 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece }) val toPublisher: (Source[Any, _], ActorMaterializer) ⇒ Publisher[Any] = - (f, m) ⇒ f.runWith(Sink.publisher(1))(m) + (f, m) ⇒ f.runWith(Sink.publisher(false))(m) def toFanoutPublisher[In, Out](elasticity: Int): (Source[Out, _], ActorMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.runWith(Sink.publisher(Int.MaxValue).withAttributes(Attributes.inputBuffer(elasticity, elasticity)))(m) + (f, m) ⇒ f.runWith(Sink.publisher(true).withAttributes(Attributes.inputBuffer(elasticity, elasticity)))(m) def materializeIntoSubscriberAndPublisher[In, Out](flow: Flow[In, Out, _]): (Subscriber[In], Publisher[Out]) = { - flow.runWith(Source.subscriber[In], Sink.publisher[Out](1)) + flow.runWith(Source.subscriber[In], Sink.publisher[Out](false)) } "A Flow" must { @@ -176,7 +176,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val c1 = TestSubscriber.manualProbe[String]() flowOut.subscribe(c1) - val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher(1)) + val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher(false)) source.subscribe(flowIn) val sub1 = c1.expectSubscription() @@ -197,7 +197,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) source.subscribe(flowIn) c1.expectNext("1") @@ -216,7 +216,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) source.subscribe(flowIn) c1.expectNext("elem-1") @@ -229,7 +229,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow: Flow[String, String, _] = Flow[String] val c1 = TestSubscriber.manualProbe[String]() val sink: Sink[String, _] = flow.to(Sink(c1)) - val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher(1)) + val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher(false)) Source(publisher).to(sink).run() val sub1 = c1.expectSubscription() @@ -243,7 +243,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "perform transformation operation" in { val flow = Flow[Int].map(i ⇒ { testActor ! i.toString; i.toString }) - val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) Source(publisher).via(flow).to(Sink.ignore).run() expectMsg("1") @@ -255,7 +255,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow = Flow[Int].map(_.toString) val c1 = TestSubscriber.manualProbe[String]() val sink: Sink[Int, _] = flow.to(Sink(c1)) - val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) Source(publisher).to(sink).run() val sub1 = c1.expectSubscription() @@ -268,8 +268,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be materializable several times with fanout publisher" in assertAllStagesStopped { val flow = Source(List(1, 2, 3)).map(_.toString) - val p1 = flow.runWith(Sink.publisher(2)) - val p2 = flow.runWith(Sink.publisher(2)) + val p1 = flow.runWith(Sink.publisher(true)) + val p2 = flow.runWith(Sink.publisher(true)) val s1 = TestSubscriber.manualProbe[String]() val s2 = TestSubscriber.manualProbe[String]() val s3 = TestSubscriber.manualProbe[String]() @@ -301,7 +301,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be covariant" in { val f1: Source[Fruit, _] = Source[Fruit](apples) - val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher(1)) + val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher(false)) val f2: Source[Source[Fruit, _], _] = Source[Fruit](apples).splitWhen(_ ⇒ true) val f3: Source[(Boolean, Source[Fruit, _]), _] = Source[Fruit](apples).groupBy(_ ⇒ true) val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source[Fruit](apples).prefixAndTail(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala index dbe6ad256c..f7efb084b4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala @@ -37,7 +37,7 @@ class FlowSplitAfterSpec extends AkkaSpec { class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6) { val source = Source(1 to elementCount) - val groupStream = source.splitAfter(_ == splitAfter).runWith(Sink.publisher(1)) + val groupStream = source.splitAfter(_ == splitAfter).runWith(Sink.publisher(false)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -59,7 +59,7 @@ class FlowSplitAfterSpec extends AkkaSpec { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(3, elementCount = 5) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) masterSubscriber.expectNoMsg(100.millis) s1.request(2) @@ -70,7 +70,7 @@ class FlowSplitAfterSpec extends AkkaSpec { s1.request(1) s1.expectComplete() - val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) s2.request(2) s2.expectNext(4) @@ -83,14 +83,14 @@ class FlowSplitAfterSpec extends AkkaSpec { "work when first element is split-by" in assertAllStagesStopped { new SubstreamsSupport(splitAfter = 1, elementCount = 3) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) masterSubscriber.expectNoMsg(100.millis) s1.request(3) s1.expectNext(1) s1.expectComplete() - val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) s2.request(3) s2.expectNext(2) @@ -103,9 +103,9 @@ class FlowSplitAfterSpec extends AkkaSpec { "support cancelling substreams" in assertAllStagesStopped { new SubstreamsSupport(splitAfter = 5, elementCount = 8) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) s1.cancel() - val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) s2.request(4) s2.expectNext(6) @@ -120,7 +120,7 @@ class FlowSplitAfterSpec extends AkkaSpec { "support cancelling the master stream" in assertAllStagesStopped { new SubstreamsSupport(splitAfter = 5, elementCount = 8) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(false))) masterSubscription.cancel() s1.request(5) s1.expectNext(1) @@ -138,7 +138,7 @@ class FlowSplitAfterSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .splitAfter(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -150,7 +150,7 @@ class FlowSplitAfterSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(false))) substreamPuppet.request(10) substreamPuppet.expectNext(1) @@ -171,7 +171,7 @@ class FlowSplitAfterSpec extends AkkaSpec { val publisher = Source(publisherProbeProbe) .splitAfter(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -183,7 +183,7 @@ class FlowSplitAfterSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream1 = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(1))) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(false))) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -202,7 +202,7 @@ class FlowSplitAfterSpec extends AkkaSpec { substreamPuppet1.expectNext(6) substreamPuppet1.expectComplete() val substream2 = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(1))) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(false))) substreamPuppet2.request(10) upstreamSubscription.sendNext(7) substreamPuppet2.expectNext(7) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 2c88e21820..5cf524f910 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -35,7 +35,7 @@ class FlowSplitWhenSpec extends AkkaSpec { class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { val source = Source(1 to elementCount) - val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher(1)) + val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher(false)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -57,7 +57,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(elementCount = 4) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(false))) masterSubscriber.expectNoMsg(100.millis) s1.request(2) @@ -66,7 +66,7 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.request(1) s1.expectComplete() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher(false))) s2.request(1) s2.expectNext(3) @@ -83,7 +83,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "work when first element is split-by" in assertAllStagesStopped { new SubstreamsSupport(1, elementCount = 3) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(false))) masterSubscriber.expectNoMsg(100.millis) s1.request(5) @@ -98,9 +98,9 @@ class FlowSplitWhenSpec extends AkkaSpec { "support cancelling substreams" in assertAllStagesStopped { new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(false))) s1.cancel() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher(false))) s2.request(4) s2.expectNext(5) @@ -184,7 +184,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "support cancelling the master stream" in assertAllStagesStopped { new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(false))) masterSubscription.cancel() s1.request(4) s1.expectNext(1) @@ -201,7 +201,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -213,7 +213,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(false))) substreamPuppet.request(10) substreamPuppet.expectNext(1) @@ -234,7 +234,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val publisher = Source(publisherProbeProbe) .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -246,7 +246,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream1 = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(1))) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(false))) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -264,7 +264,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(6) substreamPuppet1.expectComplete() val substream2 = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(1))) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(false))) substreamPuppet2.request(10) substreamPuppet2.expectNext(6) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index e707b66fa8..cf2920aad0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -29,7 +29,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "A Flow with transform operations" must { "produce one-to-one transformation as expected" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { var tot = 0 @@ -38,7 +38,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher(1)) + runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -52,7 +52,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce one-to-several transformation as expected" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { var tot = 0 @@ -72,7 +72,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } }). - runWith(Sink.publisher(1)) + runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -109,7 +109,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.pull() } else ctx.push(elem) } - }).runWith(Sink.publisher(1)) + }).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Int]() p.subscribe(subscriber) @@ -135,7 +135,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher(1)) + val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher(false)) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { var tot = 0 @@ -147,7 +147,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher(1)) + runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -161,7 +161,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def")).runWith(Sink.publisher(1)) + val p = Source(List("a", "bc", "def")).runWith(Sink.publisher(false)) val p2 = Source(p). transform(() ⇒ new PushStage[String, Int] { var concat = "" @@ -177,7 +177,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher(2)) + runWith(Sink.publisher(true)) val c1 = TestSubscriber.manualProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription() @@ -200,7 +200,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support emit onUpstreamFinish" in assertAllStagesStopped { - val p = Source(List("a")).runWith(Sink.publisher(1)) + val p = Source(List("a")).runWith(Sink.publisher(false)) val p2 = Source(p). transform(() ⇒ new StatefulStage[String, String] { var s = "" @@ -213,7 +213,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug override def onUpstreamFinish(ctx: Context[String]) = terminationEmit(Iterator.single(s + "B"), ctx) }). - runWith(Sink.publisher(1)) + runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -244,7 +244,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "report error when exception is thrown" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -268,7 +268,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support emit of final elements when onUpstreamFailure" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) val p2 = Source(p). map(elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem). transform(() ⇒ new StatefulStage[Int, Int] { @@ -292,7 +292,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support cancel as expected" in assertAllStagesStopped { - val p = Source(1 to 100).runWith(Sink.publisher(1)) + val p = Source(1 to 100).runWith(Sink.publisher(false)) val received = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -312,7 +312,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support producing elements from empty inputs" in assertAllStagesStopped { - val p = Source(List.empty[Int]).runWith(Sink.publisher(1)) + val p = Source(List.empty[Int]).runWith(Sink.publisher(false)) Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -416,7 +416,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = terminationEmit(Iterator(42), ctx) }) - .runWith(Sink.publisher(1)) + .runWith(Sink.publisher(false)) val inSub = in.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index 4eb929a70d..eb583fbe5d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -222,7 +222,7 @@ class GraphFlowSpec extends AkkaSpec { "work with a Source when having KeyedSink inside" in { val probe = TestSubscriber.manualProbe[Int]() - val pubSink = Sink.publisher[Int](1) + val pubSink = Sink.publisher[Int](false) val sink = Sink.fromGraph(FlowGraph.create(pubSink) { implicit b ⇒ p ⇒ SinkShape(p.inlet) @@ -277,7 +277,7 @@ class GraphFlowSpec extends AkkaSpec { "materialize properly" in { val probe = TestSubscriber.manualProbe[Int]() val inSource = Source.subscriber[Int] - val outSink = Sink.publisher[Int](1) + val outSink = Sink.publisher[Int](false) val flow = Flow.fromGraph(FlowGraph.create(partialGraph) { implicit b ⇒ partial ⇒ @@ -309,7 +309,7 @@ class GraphFlowSpec extends AkkaSpec { val subscriber = m1 val publisher = m3 - source1.runWith(Sink.publisher(1)).subscribe(subscriber) + source1.runWith(Sink.publisher(false)).subscribe(subscriber) publisher.subscribe(probe) validateProbe(probe, stdRequests, stdResult) @@ -318,7 +318,7 @@ class GraphFlowSpec extends AkkaSpec { "allow connecting source to sink directly" in { val probe = TestSubscriber.manualProbe[Int]() val inSource = Source.subscriber[Int] - val outSink = Sink.publisher[Int](1) + val outSink = Sink.publisher[Int](false) val source = Source.fromGraph(FlowGraph.create(inSource) { implicit b ⇒ src ⇒ @@ -340,7 +340,7 @@ class GraphFlowSpec extends AkkaSpec { val subscriber = m1 val publisher = m2 - source1.runWith(Sink.publisher(1)).subscribe(subscriber) + source1.runWith(Sink.publisher(false)).subscribe(subscriber) publisher.subscribe(probe) validateProbe(probe, 4, (0 to 3).toSet) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 87273e8321..006a3db8c1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -47,7 +47,7 @@ class GraphBalanceSpec extends AkkaSpec { "support waiting for demand from all downstream subscriptions" in { val s1 = TestSubscriber.manualProbe[Int]() - val p2 = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](1)) { implicit b ⇒ + val p2 = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](false)) { implicit b ⇒ p2Sink ⇒ val balance = b.add(Balance[Int](2, waitForAllDownstreams = true)) Source(List(1, 2, 3)) ~> balance.in @@ -78,7 +78,7 @@ class GraphBalanceSpec extends AkkaSpec { "support waiting for demand from all non-cancelled downstream subscriptions" in assertAllStagesStopped { val s1 = TestSubscriber.manualProbe[Int]() - val (p2, p3) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](1), Sink.publisher[Int](1))(Keep.both) { implicit b ⇒ + val (p2, p3) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](false), Sink.publisher[Int](false))(Keep.both) { implicit b ⇒ (p2Sink, p3Sink) ⇒ val balance = b.add(Balance[Int](3, waitForAllDownstreams = true)) Source(List(1, 2, 3)) ~> balance.in diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index b184eb78ee..a9cd9a193f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -155,7 +155,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual } "be able to run plain flow" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(false)) val s = TestSubscriber.manualProbe[Int] val flow = Flow[Int].map(_ * 2) RunnableGraph.fromGraph(FlowGraph.create() { implicit builder ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index b502d33c85..b31847e4e9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -19,7 +19,7 @@ class PublisherSinkSpec extends AkkaSpec { "be unique when created twice" in assertAllStagesStopped { - val (pub1, pub2) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](1), Sink.publisher[Int](1))(Keep.both) { implicit b ⇒ + val (pub1, pub2) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](false), Sink.publisher[Int](false))(Keep.both) { implicit b ⇒ (p1, p2) ⇒ import FlowGraph.Implicits._ @@ -40,14 +40,14 @@ class PublisherSinkSpec extends AkkaSpec { } "work with SubscriberSource" in { - val (sub, pub) = Source.subscriber[Int].toMat(Sink.publisher(1))(Keep.both).run() + val (sub, pub) = Source.subscriber[Int].toMat(Sink.publisher(false))(Keep.both).run() Source(1 to 100).to(Sink(sub)).run() Await.result(Source(pub).grouped(1000).runWith(Sink.head), 3.seconds) should ===(1 to 100) } "be able to use Publisher in materialized value transformation" in { val f = Source(1 to 3).runWith( - Sink.publisher[Int](1).mapMaterializedValue(p ⇒ Source(p).runFold(0)(_ + _))) + Sink.publisher[Int](false).mapMaterializedValue(p ⇒ Source(p).runFold(0)(_ + _))) Await.result(f, 3.seconds) should be(6) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index e8755cfb68..e6a33a87d0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -17,7 +17,7 @@ class SourceSpec extends AkkaSpec { "Single Source" must { "produce element" in { - val p = Source.single(1).runWith(Sink.publisher(1)) + val p = Source.single(1).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -27,7 +27,7 @@ class SourceSpec extends AkkaSpec { } "reject later subscriber" in { - val p = Source.single(1).runWith(Sink.publisher(1)) + val p = Source.single(1).runWith(Sink.publisher(false)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -45,7 +45,7 @@ class SourceSpec extends AkkaSpec { "Empty Source" must { "complete immediately" in { - val p = Source.empty.runWith(Sink.publisher(1)) + val p = Source.empty.runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndComplete() @@ -60,7 +60,7 @@ class SourceSpec extends AkkaSpec { "Failed Source" must { "emit error immediately" in { val ex = new RuntimeException with NoStackTrace - val p = Source.failed(ex).runWith(Sink.publisher(1)) + val p = Source.failed(ex).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError(ex) @@ -75,7 +75,7 @@ class SourceSpec extends AkkaSpec { "Maybe Source" must { "complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped { val neverSource = Source.maybe[Int] - val pubSink = Sink.publisher[Int](1) + val pubSink = Sink.publisher[Int](false) val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index 078fd97f98..3dcbb637d7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -41,7 +41,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in assertAllStagesStopped { val publisherProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher(1)) + val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -57,7 +57,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = TestSubscriber.manualProbe[Int]() - s1.runWith(Sink.publisher(1)).subscribe(s1SubscriberProbe) + s1.runWith(Sink.publisher(false)).subscribe(s1SubscriberProbe) val s1Subscription = s1SubscriberProbe.expectSubscription() s1Subscription.request(100) s1SubscriberProbe.expectNext(1) @@ -65,7 +65,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = TestSubscriber.manualProbe[Int]() - s2.runWith(Sink.publisher(1)).subscribe(s2SubscriberProbe) + s2.runWith(Sink.publisher(false)).subscribe(s2SubscriberProbe) val s2Subscription = s2SubscriberProbe.expectSubscription() s2Subscription.request(100) s2SubscriberProbe.expectNext(2) @@ -83,7 +83,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in assertAllStagesStopped { val publisherProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -108,7 +108,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "not timeout and cancel substream publishers when they have been subscribed to" in { val publisherProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -123,7 +123,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = TestSubscriber.manualProbe[Int]() - s1.runWith(Sink.publisher(1)).subscribe(s1SubscriberProbe) + s1.runWith(Sink.publisher(false)).subscribe(s1SubscriberProbe) val s1Sub = s1SubscriberProbe.expectSubscription() s1Sub.request(1) s1SubscriberProbe.expectNext(1) @@ -131,7 +131,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = TestSubscriber.manualProbe[Int]() - s2.runWith(Sink.publisher(1)).subscribe(s2SubscriberProbe) + s2.runWith(Sink.publisher(false)).subscribe(s2SubscriberProbe) val s2Sub = s2SubscriberProbe.expectSubscription() // sleep long enough for timeout to trigger if not canceled diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 449c9bf2f2..b93de9771d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -49,7 +49,7 @@ class TickSourceSpec extends AkkaSpec { } "reject multiple subscribers, but keep the first" in { - val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.publisher(1)) + val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.publisher(false)) val c1 = TestSubscriber.manualProbe[String]() val c2 = TestSubscriber.manualProbe[String]() p.subscribe(c1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index 6c95a21289..176324b744 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -24,7 +24,7 @@ private[akka] class ConcatAllImpl(f: Any ⇒ Source[Any, _], materializer: Actor import akka.stream.impl.MultiStreamInputProcessor._ val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher(1))(materializer) + val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher(false))(materializer) // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 9155a10bf2..3fe69931c0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -7,8 +7,7 @@ import org.reactivestreams.Subscriber /** * INTERNAL API */ -private[akka] abstract class FanoutOutputs(val maxNumberOfSubscribers: Int, - val maxBufferSize: Int, +private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, val pump: Pump) @@ -93,17 +92,17 @@ private[akka] abstract class FanoutOutputs(val maxNumberOfSubscribers: Int, } private[akka] object FanoutProcessorImpl { - def props(actorMaterializerSettings: ActorMaterializerSettings, maxNumberOfSubscribers: Int): Props = - Props(new FanoutProcessorImpl(actorMaterializerSettings, maxNumberOfSubscribers)).withDeploy(Deploy.local) + def props(actorMaterializerSettings: ActorMaterializerSettings): Props = + Props(new FanoutProcessorImpl(actorMaterializerSettings)).withDeploy(Deploy.local) } /** * INTERNAL API */ -private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings, maxNumberOfSubscribers: Int) +private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) { override val primaryOutputs: FanoutOutputs = - new FanoutOutputs(maxNumberOfSubscribers, settings.maxInputBufferSize, settings.initialInputBufferSize, self, this) { + new FanoutOutputs(settings.maxInputBufferSize, settings.initialInputBufferSize, self, this) { override def afterShutdown(): Unit = afterFlush() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index c7bfe54a68..8cf2d6c34f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -66,7 +66,6 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha * INTERNAL API */ private[akka] final class FanoutPublisherSink[In]( - maxNumberOfSubscribers: Int, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { @@ -76,15 +75,15 @@ private[akka] final class FanoutPublisherSink[In]( val fanoutProcessor = ActorProcessorFactory[In, In]( actorMaterializer.actorOf( context, - FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes), maxNumberOfSubscribers))) + FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes)))) (fanoutProcessor, fanoutProcessor) } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = - new FanoutPublisherSink[In](maxNumberOfSubscribers, attributes, shape) + new FanoutPublisherSink[In](attributes, shape) override def withAttributes(attr: Attributes): Module = - new FanoutPublisherSink[In](maxNumberOfSubscribers, attr, amendShape(attr)) + new FanoutPublisherSink[In](attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 29ecf6490c..472925a140 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -62,7 +62,6 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff def initialBufferSize: Int def maxBufferSize: Int - def maxNumberOfSubscribers: Int /** * called when we are ready to consume more elements from our upstream @@ -232,18 +231,12 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff private def addSubscription(subscriber: Subscriber[_ >: T]): Unit = { import ReactiveStreamsCompliance._ - if (maxNumberOfSubscribers < 1 || subscriptions.size >= maxNumberOfSubscribers) { - tryOnSubscribe(subscriber, CancelledSubscription) - tryOnError(subscriber, - new IllegalStateException(s"Max number of Subscribers exceeded. [${maxNumberOfSubscribers}]")) - } else { - val newSubscription = createSubscription(subscriber) - subscriptions ::= newSubscription - buffer.initCursor(newSubscription) - try tryOnSubscribe(subscriber, newSubscription) - catch { - case _: SpecViolation ⇒ unregisterSubscriptionInternal(newSubscription) - } + val newSubscription = createSubscription(subscriber) + subscriptions ::= newSubscription + buffer.initCursor(newSubscription) + try tryOnSubscribe(subscriber, newSubscription) + catch { + case _: SpecViolation ⇒ unregisterSubscriptionInternal(newSubscription) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index aa256cdf45..7298f87d77 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -45,14 +45,17 @@ object Sink { /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. - * that can handle `maxNumberOfSubscribers` [[org.reactivestreams.Subscriber]]s. * - * If `maxNumberOfSubscribers` is greater than 1, the size of the `inputBuffer` configured for this stage - * becomes the maximum number of elements that the fastest [[org.reactivestreams.Subscriber]] can be ahead - * of the slowest one before slowing the processing down due to back pressure. + * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that + * the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. */ - def publisher[In](maxNumberOfSubscribers: Int): Sink[In, Publisher[In]] = - new Sink(scaladsl.Sink.publisher(maxNumberOfSubscribers)) + def publisher[T](fanout: Boolean): Sink[T, Publisher[T]] = + new Sink(scaladsl.Sink.publisher(fanout)) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2ab40a814f..b5708b9308 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -223,7 +223,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it. */ def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = { - Source.subscriber[In].via(this).toMat(Sink.publisher[Out](1))(Keep.both[Subscriber[In], Publisher[Out]]) + Source.subscriber[In].via(this).toMat(Sink.publisher[Out](false))(Keep.both[Subscriber[In], Publisher[Out]]) .mapMaterializedValue { case (sub, pub) ⇒ new Processor[In, Out] { override def onError(t: Throwable): Unit = sub.onError(t) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 7e65ee199d..ef8a82e4b6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -80,21 +80,19 @@ object Sink { /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. - * that can handle `maxNumberOfSubscribers` [[org.reactivestreams.Subscriber]]s. * - * If `maxNumberOfSubscribers` is greater than 1, the size of the `inputBuffer` configured for this stage - * becomes the maximum number of elements that the fastest [[org.reactivestreams.Subscriber]] can be ahead - * of the slowest one before slowing the processing down due to back pressure. + * If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that + * the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. */ - def publisher[T](maxNumberOfSubscribers: Int): Sink[T, Publisher[T]] = + def publisher[T](fanout: Boolean): Sink[T, Publisher[T]] = new Sink( - maxNumberOfSubscribers match { - case 1 ⇒ new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")) - case n ⇒ - new FanoutPublisherSink[T](n, - DefaultAttributes.fanoutPublisherSink, - shape("FanoutPublisherSink")) - }) + if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) + else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) /** * A `Sink` that will consume the stream and discard the elements.