diff --git a/akka-docs-dev/rst/java/stream-integrations.rst b/akka-docs-dev/rst/java/stream-integrations.rst index 5bee154450..511ba65d22 100644 --- a/akka-docs-dev/rst/java/stream-integrations.rst +++ b/akka-docs-dev/rst/java/stream-integrations.rst @@ -107,8 +107,8 @@ This is how it can be used as input :class:`Source` to a :class:`Flow`: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage -You can only attach one subscriber to this publisher. Use a ``Broadcast`` -element or attach a ``Sink.fanoutPublisher`` to enable multiple subscribers. +You can only attach one subscriber to this publisher. Increase the max number of subscribers parameter or use a `Broadcast` element +in order to support multiple subscribers. ActorSubscriber ^^^^^^^^^^^^^^^ @@ -414,18 +414,17 @@ by using the Publisher-:class:`Sink`: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#source-publisher -A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second -subscription attempt will be rejected with an :class:`IllegalStateException`. +A publisher that is created with ``Sink.publisher`` supports a specified number of subscribers. Additional +subscription attempts will be rejected with an :class:`IllegalStateException`. -A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher`` -instead: +A publisher that supports multiple subscribers is created as follows: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java :include: author-alert-subscriber,author-storage-subscriber .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#source-fanoutPublisher -The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber +The input buffer size of the stage controls how far apart the slowest subscriber can be from the fastest subscriber before slowing down the stream. To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber` diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index a001829af3..f15251c045 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -41,7 +41,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { val impl = new Fixture { override def tweets: Publisher[Tweet] = - TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher) + TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(1)) override def storage = TestSubscriber.manualProbe[Author] @@ -92,7 +92,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { //#source-publisher val authorPublisher: Publisher[Author] = - Source(tweets).via(authors).runWith(Sink.publisher) + Source(tweets).via(authors).runWith(Sink.publisher(1)) authorPublisher.subscribe(storage) //#source-publisher @@ -108,7 +108,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { //#source-fanoutPublisher val authorPublisher: Publisher[Author] = Source(tweets).via(authors) - .runWith(Sink.fanoutPublisher(initialBufferSize = 8, maximumBufferSize = 16)) + .runWith(Sink.publisher(maxNumberOfSubscribers = Int.MaxValue)) authorPublisher.subscribe(storage) authorPublisher.subscribe(alert) diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index 4c044c22d2..4d969a7991 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -102,8 +102,8 @@ This is how it can be used as input :class:`Source` to a :class:`Flow`: .. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage -You can only attach one subscriber to this publisher. Use a ``Broadcast`` -element or attach a ``Sink.fanoutPublisher`` to enable multiple subscribers. +A publisher that is created with ``Sink.publisher`` supports a specified number of subscribers. Additional +subscription attempts will be rejected with an :class:`IllegalStateException`. ActorSubscriber ^^^^^^^^^^^^^^^ @@ -412,15 +412,14 @@ by using the Publisher-:class:`Sink`: A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second subscription attempt will be rejected with an :class:`IllegalStateException`. -A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher`` -instead: +A publisher that supports multiple subscribers is created as follows: .. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala :include: author-alert-subscriber,author-storage-subscriber .. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-fanoutPublisher -The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber +The input buffer size of the stage controls how far apart the slowest subscriber can be from the fastest subscriber before slowing down the stream. To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber` diff --git a/akka-docs-dev/rst/stream-design.rst b/akka-docs-dev/rst/stream-design.rst index 920f428888..17ac52d9f1 100644 --- a/akka-docs-dev/rst/stream-design.rst +++ b/akka-docs-dev/rst/stream-design.rst @@ -38,7 +38,7 @@ Akka Streams fully implement the Reactive Streams specification and interoperate All stream Processors produced by the default materialization of Akka Streams are restricted to having a single Subscriber, additional Subscribers will be rejected. The reason for this is that the stream topologies described using our DSL never require fan-out behavior from the Publisher sides of the elements, all fan-out is done using explicit elements like :class:`Broadcast[T]`. -This means that ``Sink.fanoutPublisher`` must be used where multicast behavior is needed for interoperation with other Reactive Streams implementations. +This means that ``Sink.publisher()`` must be used where broadcast behavior is needed for interoperation with other Reactive Streams implementations. What shall users of streaming libraries expect? ----------------------------------------------- diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index d4c22aec04..047107af8f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -159,11 +159,11 @@ private[http] object StreamUtils { case Nil ⇒ Nil case Seq(one) ⇒ Vector(input.via(one)) case multiple ⇒ - val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.fanoutPublisher(16, 16))(Keep.both).run() + val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.publisher(transformers.size))(Keep.both).run() val sources = transformers.map { flow ⇒ // Doubly wrap to ensure that subscription to the running publisher happens before the final sources // are exposed, so there is no race - Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher)) + Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher(1))) } // The fanout publisher must be wired to the original source after all fanout subscribers have been subscribed input.runWith(Sink(fanoutSub)) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 904cc3097e..2d07ca635a 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -406,7 +406,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def acceptConnection(): (TestSubscriber.ManualProbe[HttpRequest], TestPublisher.ManualProbe[HttpResponse]) = { connSourceSub.request(1) val incomingConnection = connSource.expectNext() - val sink = Sink.publisher[HttpRequest] + val sink = Sink.publisher[HttpRequest](1) val source = Source.subscriber[HttpResponse] val handler = Flow.fromSinkAndSourceMat(sink, source)(Keep.both) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala index a9e578f56f..63cbec91b8 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala @@ -10,7 +10,7 @@ import org.reactivestreams.Publisher class ConcatTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher) + Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher(1)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala index f142ef6192..10451a76f2 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala @@ -15,7 +15,7 @@ class FanoutPublisherTest extends AkkaPublisherVerification[Int] { if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 } else 0 until elements.toInt - Source(iterable).runWith(Sink.fanoutPublisher(initialBufferSize = 2, maximumBufferSize = 4)) + Source(iterable).runWith(Sink.publisher(Int.MaxValue)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index 94fe7192be..e0e611b12c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -13,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val s1 = Source(iterable(elements / 2)) val s2 = Source(iterable((elements + 1) / 2)) - Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher) + Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher(1)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala index 3e2b73d87d..2260228b57 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala @@ -13,7 +13,7 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val p = Promise[Int]() - val pub = Source(p.future).runWith(Sink.publisher) + val pub = Source(p.future).runWith(Sink.publisher(1)) p.success(0) pub } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala index f576f32dd7..cfec442b9c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala @@ -19,7 +19,7 @@ class GroupByTest extends AkkaPublisherVerification[Int] { val futureGroupSource = Source(iterable(elements)).groupBy(elem ⇒ "all").map { case (_, group) ⇒ group }.runWith(Sink.head) val groupSource = Await.result(futureGroupSource, 3.seconds) - groupSource.runWith(Sink.publisher) + groupSource.runWith(Sink.publisher(1)) } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala index c939fd5b26..1503b30344 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -11,7 +11,7 @@ import org.reactivestreams._ class IterablePublisherTest extends AkkaPublisherVerification[Int] { override def createPublisher(elements: Long): Publisher[Int] = { - Source(iterable(elements)).runWith(Sink.publisher) + Source(iterable(elements)).runWith(Sink.publisher(1)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala index c05dc2a912..6d1e97bfdd 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MaybeSourceTest.scala @@ -10,7 +10,7 @@ import akka.stream.scaladsl.{ Keep, Source, Sink } class MaybeSourceTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - val (p, pub) = Source.maybe[Int].toMat(Sink.publisher)(Keep.both).run() + val (p, pub) = Source.maybe[Int].toMat(Sink.publisher(1))(Keep.both).run() p success Some(1) pub } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala index 6173c2ce61..826a753808 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala @@ -15,7 +15,7 @@ class PrefixAndTailTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) ⇒ tail }.runWith(Sink.head) val tailSource = Await.result(futureTailSource, 3.seconds) - tailSource.runWith(Sink.publisher) + tailSource.runWith(Sink.publisher(1)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala index c1dc1de756..54b3ca4502 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala @@ -13,7 +13,7 @@ import org.reactivestreams._ class SingleElementPublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { - Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher) + Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher(1)) } override def maxElementsFromPublisher(): Long = 1 diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala index e63a0e8063..62bb35dcce 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala @@ -11,7 +11,7 @@ import org.reactivestreams.Publisher class SingleElementSourceTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = - Source.single(1).runWith(Sink.publisher) + Source.single(1).runWith(Sink.publisher(1)) override def maxElementsFromPublisher(): Long = 1 } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala index e52303a4ed..333d99c7c7 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala @@ -18,7 +18,7 @@ class SplitWhenTest extends AkkaPublisherVerification[Int] { else { val futureSource = Source(iterable(elements)).splitWhen(elem ⇒ false).runWith(Sink.head) val source = Await.result(futureSource, 3.seconds) - source.runWith(Sink.publisher) + source.runWith(Sink.publisher(1)) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala index b9f9fb47de..371967795d 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala @@ -39,7 +39,7 @@ class SynchronousFilePublisherTest extends AkkaPublisherVerification[ByteString] def createPublisher(elements: Long): Publisher[ByteString] = SynchronousFileSource(file, chunkSize = 512) .take(elements) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) @AfterClass def after = file.delete() diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala index 4e465ba822..0abd1aa2db 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala @@ -27,7 +27,7 @@ abstract class BaseTwoStreamsSetup extends AkkaSpec { def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T] - def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher) + def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher(1)) def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException) diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala index f17d820072..ccfbe430a2 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -20,7 +20,7 @@ trait ScriptedTest extends Matchers { class ScriptException(msg: String) extends RuntimeException(msg) def toPublisher[In, Out]: (Source[Out, _], ActorMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.runWith(Sink.publisher)(m) + (f, m) ⇒ f.runWith(Sink.publisher(1))(m) object Script { def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = { diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala index 97670e8ee6..6e9cf406fd 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala @@ -22,7 +22,7 @@ class TestPublisherSubscriberSpec extends AkkaSpec { "have all events accessible from manual probes" in assertAllStagesStopped { val upstream = TestPublisher.manualProbe[Int]() val downstream = TestSubscriber.manualProbe[Int]() - Source(upstream).runWith(Sink.publisher)(materializer).subscribe(downstream) + Source(upstream).runWith(Sink.publisher(1))(materializer).subscribe(downstream) val upstreamSubscription = upstream.expectSubscription() val downstreamSubscription: Subscription = downstream.expectEventPF { case OnSubscribe(sub) ⇒ sub } @@ -46,7 +46,7 @@ class TestPublisherSubscriberSpec extends AkkaSpec { "handle gracefully partial function that is not suitable" in assertAllStagesStopped { val upstream = TestPublisher.manualProbe[Int]() val downstream = TestSubscriber.manualProbe[Int]() - Source(upstream).runWith(Sink.publisher)(materializer).subscribe(downstream) + Source(upstream).runWith(Sink.publisher(1))(materializer).subscribe(downstream) val upstreamSubscription = upstream.expectSubscription() val downstreamSubscription: Subscription = downstream.expectEventPF { case OnSubscribe(sub) ⇒ sub } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 7f98ebb3b9..85fc779e4c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -67,7 +67,7 @@ public class FlowGraphTest extends StreamTest { final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - final Sink> publisher = Sink.publisher(); + final Sink> publisher = Sink.publisher(1); final Source source = Source.fromGraph( FlowGraph.create(new Function, SourceShape>() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 8c47d6ac1c..5c63bf649c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -387,7 +387,7 @@ public class FlowTest extends StreamTest { final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - final Sink> publisher = Sink.publisher(); + final Sink> publisher = Sink.publisher(1); final Source source = Source.fromGraph( FlowGraph.create(new Function, SourceShape>() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index cf7a6c76a9..563fc006f1 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -39,7 +39,7 @@ public class SinkTest extends StreamTest { @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { - final Sink> pubSink = Sink.fanoutPublisher(2, 2); + final Sink> pubSink = Sink.publisher(Integer.MAX_VALUE); @SuppressWarnings("unused") final Publisher publisher = Source.from(new ArrayList()).runWith(pubSink, materializer); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index c2eb4ed609..c4129ee538 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -104,12 +104,12 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { map(_.toString), duration ⇒ probe.ref ! duration). map { s: String ⇒ s + "!" } - val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = flow.runWith(Source.subscriber[Int], Sink.publisher[String]) + val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = flow.runWith(Source.subscriber[Int], Sink.publisher[String](1)) val c1 = TestSubscriber.manualProbe[String]() val c2 = flowOut.subscribe(c1) - val p = Source(0 to 100).runWith(Sink.publisher) + val p = Source(0 to 100).runWith(Sink.publisher(1)) p.subscribe(flowIn) val s = c1.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index df3a2ff65c..1a3dd2a37d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -76,7 +76,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val p = SynchronousFileSource(testFile, chunkSize) .withAttributes(bufferAttributes) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) val sub = c.expectSubscription() @@ -113,7 +113,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val p = SynchronousFileSource(testFile, chunkSize) .withAttributes(bufferAttributes) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) @@ -140,7 +140,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "onError whent trying to read from file which does not exist" in assertAllStagesStopped { - val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher) + val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index 0d6a0edd47..acbc81329d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -41,15 +41,15 @@ class FlowCompileSpec extends AkkaSpec { val closedSource: Source[Int, _] = intSeq.via(open3) "closedSource.run()" shouldNot compile - val closedSink: Sink[Int, _] = open3.to(Sink.publisher[Int]) + val closedSink: Sink[Int, _] = open3.to(Sink.publisher[Int](1)) "closedSink.run()" shouldNot compile - closedSource.to(Sink.publisher[Int]).run() + closedSource.to(Sink.publisher[Int](1)).run() intSeq.to(closedSink).run() } "append Sink" in { val open: Flow[Int, String, _] = Flow[Int].map(_.toString) - val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.publisher[Int]) + val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.publisher[Int](1)) val appended: Sink[Int, _] = open.to(closedSink) "appended.run()" shouldNot compile "appended.connect(Sink.head[Int])" shouldNot compile @@ -61,13 +61,13 @@ class FlowCompileSpec extends AkkaSpec { val closedSource2: Source[String, _] = closedSource.via(open) "closedSource2.run()" shouldNot compile "strSeq.connect(closedSource2)" shouldNot compile - closedSource2.to(Sink.publisher[String]).run + closedSource2.to(Sink.publisher[String](1)).run } } "Sink" should { val openSource: Sink[Int, _] = - Flow[Int].map(_.toString).to(Sink.publisher[String]) + Flow[Int].map(_.toString).to(Sink.publisher[String](1)) "accept Source" in { intSeq.to(openSource) } @@ -83,7 +83,7 @@ class FlowCompileSpec extends AkkaSpec { val openSource: Source[String, _] = Source(Seq(1, 2, 3)).map(_.toString) "accept Sink" in { - openSource.to(Sink.publisher[String]) + openSource.to(Sink.publisher[String](1)) } "not be accepted by Source" in { "openSource.connect(intSeq)" shouldNot compile @@ -96,7 +96,7 @@ class FlowCompileSpec extends AkkaSpec { "RunnableGraph" should { Sink.head[String] val closed: RunnableGraph[Publisher[String]] = - Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String])(Keep.right) + Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String](1))(Keep.right) "run" in { closed.run() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index 3c1d41d81e..c9c7b2b9f2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -29,7 +29,7 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s") val subs = TestSubscriber.manualProbe[Any]() - val subSink = Sink.publisher[Any] + val subSink = Sink.publisher[Any](1) val (_, res) = f1.concat(s2).runWith(s1, subSink) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 0cf1459e8f..d5f7d64222 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -21,7 +21,7 @@ class FlowFromFutureSpec extends AkkaSpec { "A Flow based on a Future" must { "produce one element from already successful Future" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() - val p = Source(Future.successful(1)).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) + val p = Source(Future.successful(1)).runWith(Sink.publisher(Int.MaxValue)).subscribe(c) val sub = c.expectSubscription() c.expectNoMsg(100.millis) sub.request(1) @@ -32,14 +32,14 @@ class FlowFromFutureSpec extends AkkaSpec { "produce error from already failed Future" in assertAllStagesStopped { val ex = new RuntimeException("test") with NoStackTrace val c = TestSubscriber.manualProbe[Int]() - Source(Future.failed[Int](ex)).runWith(Sink.publisher).subscribe(c) + Source(Future.failed[Int](ex)).runWith(Sink.publisher(1)).subscribe(c) c.expectSubscriptionAndError(ex) } "produce one element when Future is completed" in assertAllStagesStopped { val promise = Promise[Int]() val c = TestSubscriber.manualProbe[Int]() - Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) + Source(promise.future).runWith(Sink.publisher(Int.MaxValue)).subscribe(c) val sub = c.expectSubscription() sub.request(1) c.expectNoMsg(100.millis) @@ -52,7 +52,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed but not before request" in { val promise = Promise[Int]() val c = TestSubscriber.manualProbe[Int]() - Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) + Source(promise.future).runWith(Sink.publisher(Int.MaxValue)).subscribe(c) val sub = c.expectSubscription() promise.success(1) c.expectNoMsg(200.millis) @@ -63,7 +63,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements with multiple subscribers" in assertAllStagesStopped { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) + val p = Source(promise.future).runWith(Sink.publisher(Int.MaxValue)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -81,7 +81,7 @@ class FlowFromFutureSpec extends AkkaSpec { "allow cancel before receiving element" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) + val p = Source(promise.future).runWith(Sink.publisher(Int.MaxValue)) val keepAlive = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]() p.subscribe(keepAlive) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 88faebfdef..7647040a18 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -36,7 +36,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val in1 = Source(List("a", "b", "c")) val in2 = Source(List("d", "e", "f")) - val out1 = Sink.publisher[String] + val out1 = Sink.publisher[String](1) val out2 = Sink.head[String] "A Graph" should { @@ -165,9 +165,9 @@ class FlowGraphCompileSpec extends AkkaSpec { val in3 = Source(List("b")) val in5 = Source(List("b")) val in7 = Source(List("a")) - val out2 = Sink.publisher[String] - val out9 = Sink.publisher[String] - val out10 = Sink.publisher[String] + val out2 = Sink.publisher[String](1) + val out9 = Sink.publisher[String](1) + val out10 = Sink.publisher[String](1) def f(s: String) = Flow[String].transform(op[String, String]).named(s) import FlowGraph.Implicits._ @@ -198,7 +198,7 @@ class FlowGraphCompileSpec extends AkkaSpec { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ val zip = b.add(Zip[Int, String]()) val unzip = b.add(Unzip[Int, String]()) - val out = Sink.publisher[(Int, String)] + val out = Sink.publisher[(Int, String)](1) import FlowGraph.Implicits._ Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in unzip.out0 ~> Flow[Int].map(_ * 2) ~> zip.in0 @@ -213,8 +213,8 @@ class FlowGraphCompileSpec extends AkkaSpec { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ val zip = b.add(Zip[Int, String]()) val unzip = b.add(Unzip[Int, String]()) - val wrongOut = Sink.publisher[(Int, Int)] - val whatever = Sink.publisher[Any] + val wrongOut = Sink.publisher[(Int, Int)](1) + val whatever = Sink.publisher[Any](1) "Flow(List(1, 2, 3)) ~> zip.left ~> wrongOut" shouldNot compile """Flow(List("a", "b", "c")) ~> zip.left""" shouldNot compile """Flow(List("a", "b", "c")) ~> zip.out""" shouldNot compile @@ -278,7 +278,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val outB = b add Sink(TestSubscriber.manualProbe[Fruit]()) val merge = b add Merge[Fruit](11) val unzip = b add Unzip[Int, String]() - val whatever = b add Sink.publisher[Any] + val whatever = b add Sink.publisher[Any](1) import FlowGraph.Implicits._ b.add(Source[Fruit](apples)) ~> merge.in(0) appleSource ~> merge.in(1) @@ -293,12 +293,12 @@ class FlowGraphCompileSpec extends AkkaSpec { b.add(Source(apples)) ~> Flow[Apple] ~> merge.in(9) b.add(Source(apples)) ~> Flow[Apple] ~> outB - b.add(Source(apples)) ~> Flow[Apple] ~> b.add(Sink.publisher[Fruit]) + b.add(Source(apples)) ~> Flow[Apple] ~> b.add(Sink.publisher[Fruit](1)) appleSource ~> Flow[Apple] ~> merge.in(10) Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in unzip.out1 ~> whatever - unzip.out0 ~> b.add(Sink.publisher[Any]) + unzip.out0 ~> b.add(Sink.publisher[Any](1)) "merge.out ~> b.add(Broadcast[Apple](2))" shouldNot compile "merge.out ~> Flow[Fruit].map(identity) ~> b.add(Broadcast[Apple](2))" shouldNot compile diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index dba5cf2787..2c0d38db78 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -35,8 +35,8 @@ class FlowGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Source(1 to elementCount).runWith(Sink.publisher) - val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher) + val source = Source(1 to elementCount).runWith(Sink.publisher(1)) + val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher(1)) val masterSubscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() groupStream.subscribe(masterSubscriber) @@ -58,7 +58,7 @@ class FlowGroupBySpec extends AkkaSpec { "groupBy" must { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 2) { - val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) + val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher(1))) masterSubscriber.expectNoMsg(100.millis) s1.expectNoMsg(100.millis) @@ -66,7 +66,7 @@ class FlowGroupBySpec extends AkkaSpec { s1.expectNext(1) s1.expectNoMsg(100.millis) - val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) + val s2 = StreamPuppet(getSubFlow(0).runWith(Sink.publisher(1))) s2.expectNoMsg(100.millis) s2.request(2) @@ -95,9 +95,9 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of substreams" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 2) { - StreamPuppet(getSubFlow(1).runWith(Sink.publisher)).cancel() + StreamPuppet(getSubFlow(1).runWith(Sink.publisher(1))).cancel() - val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher)) + val substream = StreamPuppet(getSubFlow(0).runWith(Sink.publisher(1))) substream.request(2) substream.expectNext(2) substream.expectNext(4) @@ -113,7 +113,7 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of master stream when not consumed anything" in assertAllStagesStopped { val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -125,7 +125,7 @@ class FlowGroupBySpec extends AkkaSpec { "accept cancellation of master stream when substreams are open" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 3, elementCount = 13) { - val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) + val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher(1))) substream.request(1) substream.expectNext(1) @@ -144,7 +144,7 @@ class FlowGroupBySpec extends AkkaSpec { } "work with empty input stream" in assertAllStagesStopped { - val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher) + val publisher = Source(List.empty[Int]).groupBy(_ % 2).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -153,7 +153,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream" in assertAllStagesStopped { val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -170,7 +170,7 @@ class FlowGroupBySpec extends AkkaSpec { "abort on onError from upstream when substreams are running" in assertAllStagesStopped { val publisherProbeProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher) + val publisher = Source(publisherProbeProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -182,7 +182,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) substreamPuppet.request(1) substreamPuppet.expectNext(1) @@ -200,7 +200,7 @@ class FlowGroupBySpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) @@ -212,7 +212,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream) = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) substreamPuppet.request(1) substreamPuppet.expectNext(1) @@ -230,7 +230,7 @@ class FlowGroupBySpec extends AkkaSpec { val publisher = Source(publisherProbeProbe) .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) @@ -242,7 +242,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(1) val (_, substream1) = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(1))) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -250,7 +250,7 @@ class FlowGroupBySpec extends AkkaSpec { upstreamSubscription.sendNext(4) val (_, substream2) = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(1))) substreamPuppet2.request(10) substreamPuppet2.expectNext(4) // note that 2 was dropped diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 0c495b6fe4..27ea52d7f6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -31,7 +31,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { override def iterator: Iterator[Int] = (1 to 3).iterator.map(x ⇒ if (x == 2) throw new IllegalStateException("not two") else x) } - val p = Source(iterable).runWith(Sink.publisher) + val p = Source(iterable).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -48,7 +48,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { val iterable = new immutable.Iterable[Int] { override def iterator: Iterator[Int] = throw new IllegalStateException("no good iterator") } - val p = Source(iterable).runWith(Sink.publisher) + val p = Source(iterable).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError().getMessage should be("no good iterator") @@ -62,7 +62,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { override def next(): Int = -1 } } - val p = Source(iterable).runWith(Sink.publisher) + val p = Source(iterable).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError().getMessage should be("no next") @@ -84,7 +84,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { testName must { "produce elements" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.publisher) + val p = createSource(3).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -98,7 +98,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "complete empty" in assertAllStagesStopped { - val p = createSource(0).runWith(Sink.publisher) + val p = createSource(0).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndComplete() @@ -106,7 +106,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with multiple subscribers" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.fanoutPublisher(2, 4)) + val p = createSource(3).runWith(Sink.publisher(Int.MaxValue)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -130,7 +130,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.fanoutPublisher(2, 4)) + val p = createSource(3).runWith(Sink.publisher(Int.MaxValue)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -153,7 +153,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in assertAllStagesStopped { - val p = createSource(3).map(_ * 2).runWith(Sink.publisher) + val p = createSource(3).map(_ * 2).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -165,7 +165,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in assertAllStagesStopped { - val p = createSource(4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher) + val p = createSource(4).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -176,7 +176,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { } "not produce after cancel" in assertAllStagesStopped { - val p = createSource(3).runWith(Sink.publisher) + val p = createSource(3).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala index 098e4ce681..177178af81 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala @@ -27,7 +27,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { val probe = TestSubscriber.manualProbe[Int]() Source(List(1)). map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1). - runWith(Sink.publisher).subscribe(probe) + runWith(Sink.publisher(1)).subscribe(probe) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 406ad030c0..e0563b7e2b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -202,7 +202,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (_, tail) = Await.result(f, 3.seconds) - val tailPub = tail.runWith(Sink.publisher) + val tailPub = tail.runWith(Sink.publisher(1)) s.sendComplete() tailPub.subscribe(sub) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index d05cd1f863..e9981046e6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -89,15 +89,13 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece }) val toPublisher: (Source[Any, _], ActorMaterializer) ⇒ Publisher[Any] = - (f, m) ⇒ f.runWith(Sink.publisher)(m) + (f, m) ⇒ f.runWith(Sink.publisher(1))(m) - def toFanoutPublisher[In, Out](initialBufferSize: Int, maximumBufferSize: Int): (Source[Out, _], ActorMaterializer) ⇒ Publisher[Out] = - (f, m) ⇒ f.runWith(Sink.fanoutPublisher(initialBufferSize, maximumBufferSize))(m) + def toFanoutPublisher[In, Out](elasticity: Int): (Source[Out, _], ActorMaterializer) ⇒ Publisher[Out] = + (f, m) ⇒ f.runWith(Sink.publisher(Int.MaxValue).withAttributes(Attributes.inputBuffer(elasticity, elasticity)))(m) def materializeIntoSubscriberAndPublisher[In, Out](flow: Flow[In, Out, _]): (Subscriber[In], Publisher[Out]) = { - val source = Source.subscriber[In] - val sink = Sink.publisher[Out] - flow.runWith(source, sink) + flow.runWith(Source.subscriber[In], Sink.publisher[Out](1)) } "A Flow" must { @@ -178,7 +176,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val c1 = TestSubscriber.manualProbe[String]() flowOut.subscribe(c1) - val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) + val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher(1)) source.subscribe(flowIn) val sub1 = c1.expectSubscription() @@ -199,7 +197,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) + val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) source.subscribe(flowIn) c1.expectNext("1") @@ -218,7 +216,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece sub1.request(3) c1.expectNoMsg(200.millis) - val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) + val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) source.subscribe(flowIn) c1.expectNext("elem-1") @@ -231,7 +229,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow: Flow[String, String, _] = Flow[String] val c1 = TestSubscriber.manualProbe[String]() val sink: Sink[String, _] = flow.to(Sink(c1)) - val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) + val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher(1)) Source(publisher).to(sink).run() val sub1 = c1.expectSubscription() @@ -245,7 +243,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "perform transformation operation" in { val flow = Flow[Int].map(i ⇒ { testActor ! i.toString; i.toString }) - val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher) + val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) Source(publisher).via(flow).to(Sink.ignore).run() expectMsg("1") @@ -257,7 +255,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val flow = Flow[Int].map(_.toString) val c1 = TestSubscriber.manualProbe[String]() val sink: Sink[Int, _] = flow.to(Sink(c1)) - val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) + val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) Source(publisher).to(sink).run() val sub1 = c1.expectSubscription() @@ -270,8 +268,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be materializable several times with fanout publisher" in assertAllStagesStopped { val flow = Source(List(1, 2, 3)).map(_.toString) - val p1 = flow.runWith(Sink.fanoutPublisher(2, 2)) - val p2 = flow.runWith(Sink.fanoutPublisher(2, 2)) + val p1 = flow.runWith(Sink.publisher(2)) + val p2 = flow.runWith(Sink.publisher(2)) val s1 = TestSubscriber.manualProbe[String]() val s2 = TestSubscriber.manualProbe[String]() val s3 = TestSubscriber.manualProbe[String]() @@ -303,7 +301,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be covariant" in { val f1: Source[Fruit, _] = Source[Fruit](apples) - val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher) + val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher(1)) val f2: Source[Source[Fruit, _], _] = Source[Fruit](apples).splitWhen(_ ⇒ true) val f3: Source[(Boolean, Source[Fruit, _]), _] = Source[Fruit](apples).groupBy(_ ⇒ true) val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source[Fruit](apples).prefixAndTail(1) @@ -329,7 +327,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest subscriber" in { new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { + toFanoutPublisher(1)) { val downstream2 = TestSubscriber.manualProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -356,7 +354,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "support slow subscriber with fan-out 2" in { new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 2, maximumBufferSize = 2)) { + toFanoutPublisher(2)) { val downstream2 = TestSubscriber.manualProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -396,7 +394,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "support incoming subscriber while elements were requested before" in { new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { + toFanoutPublisher(1)) { downstreamSubscription.request(5) upstream.expectRequest(upstreamSubscription, 1) upstreamSubscription.sendNext("a1") @@ -434,7 +432,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "be unblocked when blocking subscriber cancels subscription" in { new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { + toFanoutPublisher(1)) { val downstream2 = TestSubscriber.manualProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -471,7 +469,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "call future subscribers' onError after onSubscribe if initial upstream was completed" in { new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { + toFanoutPublisher(1)) { val downstream2 = TestSubscriber.manualProbe[Any]() // don't link it just yet @@ -510,7 +508,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "call future subscribers' onError should be called instead of onSubscribed after initial upstream reported an error" in { new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { + toFanoutPublisher(1)) { downstreamSubscription.request(1) upstreamSubscription.expectRequest(1) @@ -527,7 +525,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "call future subscribers' onError when all subscriptions were cancelled" in { new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1), - toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) { + toFanoutPublisher(16)) { upstreamSubscription.expectRequest(1) downstreamSubscription.cancel() upstreamSubscription.expectCancellation() @@ -542,7 +540,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A broken Flow" must { "cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in { - new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) { + new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(16)) { def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = { val error = sprobe.expectError() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala index 6c8dc5a366..dbe6ad256c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitAfterSpec.scala @@ -37,7 +37,7 @@ class FlowSplitAfterSpec extends AkkaSpec { class SubstreamsSupport(splitAfter: Int = 3, elementCount: Int = 6) { val source = Source(1 to elementCount) - val groupStream = source.splitAfter(_ == splitAfter).runWith(Sink.publisher) + val groupStream = source.splitAfter(_ == splitAfter).runWith(Sink.publisher(1)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -59,7 +59,7 @@ class FlowSplitAfterSpec extends AkkaSpec { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(3, elementCount = 5) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) masterSubscriber.expectNoMsg(100.millis) s1.request(2) @@ -70,7 +70,7 @@ class FlowSplitAfterSpec extends AkkaSpec { s1.request(1) s1.expectComplete() - val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) s2.request(2) s2.expectNext(4) @@ -83,14 +83,14 @@ class FlowSplitAfterSpec extends AkkaSpec { "work when first element is split-by" in assertAllStagesStopped { new SubstreamsSupport(splitAfter = 1, elementCount = 3) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) masterSubscriber.expectNoMsg(100.millis) s1.request(3) s1.expectNext(1) s1.expectComplete() - val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) s2.request(3) s2.expectNext(2) @@ -103,9 +103,9 @@ class FlowSplitAfterSpec extends AkkaSpec { "support cancelling substreams" in assertAllStagesStopped { new SubstreamsSupport(splitAfter = 5, elementCount = 8) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) s1.cancel() - val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s2 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) s2.request(4) s2.expectNext(6) @@ -120,7 +120,7 @@ class FlowSplitAfterSpec extends AkkaSpec { "support cancelling the master stream" in assertAllStagesStopped { new SubstreamsSupport(splitAfter = 5, elementCount = 8) { - val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(expectSubFlow().runWith(Sink.publisher(1))) masterSubscription.cancel() s1.request(5) s1.expectNext(1) @@ -138,7 +138,7 @@ class FlowSplitAfterSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .splitAfter(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -150,7 +150,7 @@ class FlowSplitAfterSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) substreamPuppet.request(10) substreamPuppet.expectNext(1) @@ -171,7 +171,7 @@ class FlowSplitAfterSpec extends AkkaSpec { val publisher = Source(publisherProbeProbe) .splitAfter(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -183,7 +183,7 @@ class FlowSplitAfterSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream1 = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(1))) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -202,7 +202,7 @@ class FlowSplitAfterSpec extends AkkaSpec { substreamPuppet1.expectNext(6) substreamPuppet1.expectComplete() val substream2 = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(1))) substreamPuppet2.request(10) upstreamSubscription.sendNext(7) substreamPuppet2.expectNext(7) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 9ab3b14345..2c88e21820 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -35,7 +35,7 @@ class FlowSplitWhenSpec extends AkkaSpec { class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { val source = Source(1 to elementCount) - val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher) + val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher(1)) val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]() groupStream.subscribe(masterSubscriber) @@ -57,7 +57,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(elementCount = 4) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) masterSubscriber.expectNoMsg(100.millis) s1.request(2) @@ -66,7 +66,7 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.request(1) s1.expectComplete() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) s2.request(1) s2.expectNext(3) @@ -83,7 +83,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "work when first element is split-by" in assertAllStagesStopped { new SubstreamsSupport(1, elementCount = 3) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) masterSubscriber.expectNoMsg(100.millis) s1.request(5) @@ -98,9 +98,9 @@ class FlowSplitWhenSpec extends AkkaSpec { "support cancelling substreams" in assertAllStagesStopped { new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) s1.cancel() - val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s2 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) s2.request(4) s2.expectNext(5) @@ -184,7 +184,7 @@ class FlowSplitWhenSpec extends AkkaSpec { "support cancelling the master stream" in assertAllStagesStopped { new SubstreamsSupport(splitWhen = 5, elementCount = 8) { - val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher)) + val s1 = StreamPuppet(getSubFlow().runWith(Sink.publisher(1))) masterSubscription.cancel() s1.request(4) s1.expectNext(1) @@ -201,7 +201,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -213,7 +213,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream = subscriber.expectNext() - val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher(1))) substreamPuppet.request(10) substreamPuppet.expectNext(1) @@ -234,7 +234,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val publisher = Source(publisherProbeProbe) .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) @@ -246,7 +246,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(1) val substream1 = subscriber.expectNext() - val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher(1))) substreamPuppet1.request(10) substreamPuppet1.expectNext(1) @@ -264,7 +264,7 @@ class FlowSplitWhenSpec extends AkkaSpec { upstreamSubscription.sendNext(6) substreamPuppet1.expectComplete() val substream2 = subscriber.expectNext() - val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher(1))) substreamPuppet2.request(10) substreamPuppet2.expectNext(6) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index 66200d80d0..e707b66fa8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -29,7 +29,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "A Flow with transform operations" must { "produce one-to-one transformation as expected" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { var tot = 0 @@ -38,7 +38,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher) + runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -52,7 +52,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce one-to-several transformation as expected" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { var tot = 0 @@ -72,7 +72,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } }). - runWith(Sink.publisher) + runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -109,7 +109,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.pull() } else ctx.push(elem) } - }).runWith(Sink.publisher) + }).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Int]() p.subscribe(subscriber) @@ -135,7 +135,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher) + val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher(1)) val p2 = Source(p). transform(() ⇒ new PushStage[Int, Int] { var tot = 0 @@ -147,7 +147,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.publisher) + runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -161,7 +161,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def")).runWith(Sink.publisher) + val p = Source(List("a", "bc", "def")).runWith(Sink.publisher(1)) val p2 = Source(p). transform(() ⇒ new PushStage[String, Int] { var concat = "" @@ -177,7 +177,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug ctx.push(tot) } }). - runWith(Sink.fanoutPublisher(2, 2)) + runWith(Sink.publisher(2)) val c1 = TestSubscriber.manualProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription() @@ -200,7 +200,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support emit onUpstreamFinish" in assertAllStagesStopped { - val p = Source(List("a")).runWith(Sink.publisher) + val p = Source(List("a")).runWith(Sink.publisher(1)) val p2 = Source(p). transform(() ⇒ new StatefulStage[String, String] { var s = "" @@ -213,7 +213,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug override def onUpstreamFinish(ctx: Context[String]) = terminationEmit(Iterator.single(s + "B"), ctx) }). - runWith(Sink.publisher) + runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -244,7 +244,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "report error when exception is thrown" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) val p2 = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -268,7 +268,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support emit of final elements when onUpstreamFailure" in assertAllStagesStopped { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) val p2 = Source(p). map(elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem). transform(() ⇒ new StatefulStage[Int, Int] { @@ -292,7 +292,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support cancel as expected" in assertAllStagesStopped { - val p = Source(1 to 100).runWith(Sink.publisher) + val p = Source(1 to 100).runWith(Sink.publisher(1)) val received = Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -312,7 +312,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "support producing elements from empty inputs" in assertAllStagesStopped { - val p = Source(List.empty[Int]).runWith(Sink.publisher) + val p = Source(List.empty[Int]).runWith(Sink.publisher(1)) Source(p). transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { @@ -416,7 +416,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = terminationEmit(Iterator(42), ctx) }) - .runWith(Sink.publisher) + .runWith(Sink.publisher(1)) val inSub = in.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index 9bebe5e48c..4eb929a70d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -222,7 +222,7 @@ class GraphFlowSpec extends AkkaSpec { "work with a Source when having KeyedSink inside" in { val probe = TestSubscriber.manualProbe[Int]() - val pubSink = Sink.publisher[Int] + val pubSink = Sink.publisher[Int](1) val sink = Sink.fromGraph(FlowGraph.create(pubSink) { implicit b ⇒ p ⇒ SinkShape(p.inlet) @@ -277,7 +277,7 @@ class GraphFlowSpec extends AkkaSpec { "materialize properly" in { val probe = TestSubscriber.manualProbe[Int]() val inSource = Source.subscriber[Int] - val outSink = Sink.publisher[Int] + val outSink = Sink.publisher[Int](1) val flow = Flow.fromGraph(FlowGraph.create(partialGraph) { implicit b ⇒ partial ⇒ @@ -309,7 +309,7 @@ class GraphFlowSpec extends AkkaSpec { val subscriber = m1 val publisher = m3 - source1.runWith(Sink.publisher).subscribe(subscriber) + source1.runWith(Sink.publisher(1)).subscribe(subscriber) publisher.subscribe(probe) validateProbe(probe, stdRequests, stdResult) @@ -318,7 +318,7 @@ class GraphFlowSpec extends AkkaSpec { "allow connecting source to sink directly" in { val probe = TestSubscriber.manualProbe[Int]() val inSource = Source.subscriber[Int] - val outSink = Sink.publisher[Int] + val outSink = Sink.publisher[Int](1) val source = Source.fromGraph(FlowGraph.create(inSource) { implicit b ⇒ src ⇒ @@ -340,7 +340,7 @@ class GraphFlowSpec extends AkkaSpec { val subscriber = m1 val publisher = m2 - source1.runWith(Sink.publisher).subscribe(subscriber) + source1.runWith(Sink.publisher(1)).subscribe(subscriber) publisher.subscribe(probe) validateProbe(probe, 4, (0 to 3).toSet) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 5237b7ae05..87273e8321 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -47,7 +47,7 @@ class GraphBalanceSpec extends AkkaSpec { "support waiting for demand from all downstream subscriptions" in { val s1 = TestSubscriber.manualProbe[Int]() - val p2 = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int]) { implicit b ⇒ + val p2 = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](1)) { implicit b ⇒ p2Sink ⇒ val balance = b.add(Balance[Int](2, waitForAllDownstreams = true)) Source(List(1, 2, 3)) ~> balance.in @@ -78,7 +78,7 @@ class GraphBalanceSpec extends AkkaSpec { "support waiting for demand from all non-cancelled downstream subscriptions" in assertAllStagesStopped { val s1 = TestSubscriber.manualProbe[Int]() - val (p2, p3) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int], Sink.publisher[Int])(Keep.both) { implicit b ⇒ + val (p2, p3) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](1), Sink.publisher[Int](1))(Keep.both) { implicit b ⇒ (p2Sink, p3Sink) ⇒ val balance = b.add(Balance[Int](3, waitForAllDownstreams = true)) Source(List(1, 2, 3)) ~> balance.in diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index cdc234610c..b184eb78ee 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -155,7 +155,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual } "be able to run plain flow" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(List(1, 2, 3)).runWith(Sink.publisher(1)) val s = TestSubscriber.manualProbe[Int] val flow = Flow[Int].map(_ * 2) RunnableGraph.fromGraph(FlowGraph.create() { implicit builder ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index 12ca4add12..b502d33c85 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -19,7 +19,7 @@ class PublisherSinkSpec extends AkkaSpec { "be unique when created twice" in assertAllStagesStopped { - val (pub1, pub2) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int], Sink.publisher[Int])(Keep.both) { implicit b ⇒ + val (pub1, pub2) = RunnableGraph.fromGraph(FlowGraph.create(Sink.publisher[Int](1), Sink.publisher[Int](1))(Keep.both) { implicit b ⇒ (p1, p2) ⇒ import FlowGraph.Implicits._ @@ -40,14 +40,14 @@ class PublisherSinkSpec extends AkkaSpec { } "work with SubscriberSource" in { - val (sub, pub) = Source.subscriber[Int].toMat(Sink.publisher)(Keep.both).run() + val (sub, pub) = Source.subscriber[Int].toMat(Sink.publisher(1))(Keep.both).run() Source(1 to 100).to(Sink(sub)).run() Await.result(Source(pub).grouped(1000).runWith(Sink.head), 3.seconds) should ===(1 to 100) } "be able to use Publisher in materialized value transformation" in { val f = Source(1 to 3).runWith( - Sink.publisher[Int].mapMaterializedValue(p ⇒ Source(p).runFold(0)(_ + _))) + Sink.publisher[Int](1).mapMaterializedValue(p ⇒ Source(p).runFold(0)(_ + _))) Await.result(f, 3.seconds) should be(6) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index a2ec35a06e..e8755cfb68 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -17,7 +17,7 @@ class SourceSpec extends AkkaSpec { "Single Source" must { "produce element" in { - val p = Source.single(1).runWith(Sink.publisher) + val p = Source.single(1).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -27,7 +27,7 @@ class SourceSpec extends AkkaSpec { } "reject later subscriber" in { - val p = Source.single(1).runWith(Sink.publisher) + val p = Source.single(1).runWith(Sink.publisher(1)) val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[Int]() p.subscribe(c1) @@ -45,7 +45,7 @@ class SourceSpec extends AkkaSpec { "Empty Source" must { "complete immediately" in { - val p = Source.empty.runWith(Sink.publisher) + val p = Source.empty.runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndComplete() @@ -60,7 +60,7 @@ class SourceSpec extends AkkaSpec { "Failed Source" must { "emit error immediately" in { val ex = new RuntimeException with NoStackTrace - val p = Source.failed(ex).runWith(Sink.publisher) + val p = Source.failed(ex).runWith(Sink.publisher(1)) val c = TestSubscriber.manualProbe[Int]() p.subscribe(c) c.expectSubscriptionAndError(ex) @@ -75,7 +75,7 @@ class SourceSpec extends AkkaSpec { "Maybe Source" must { "complete materialized future with None when stream cancels" in Utils.assertAllStagesStopped { val neverSource = Source.maybe[Int] - val pubSink = Sink.publisher[Int] + val pubSink = Sink.publisher[Int](1) val (f, neverPub) = neverSource.toMat(pubSink)(Keep.both).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index 2d09cb2e7c..078fd97f98 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -41,7 +41,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in assertAllStagesStopped { val publisherProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher) + val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -57,7 +57,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = TestSubscriber.manualProbe[Int]() - s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) + s1.runWith(Sink.publisher(1)).subscribe(s1SubscriberProbe) val s1Subscription = s1SubscriberProbe.expectSubscription() s1Subscription.request(100) s1SubscriberProbe.expectNext(1) @@ -65,7 +65,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = TestSubscriber.manualProbe[Int]() - s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) + s2.runWith(Sink.publisher(1)).subscribe(s2SubscriberProbe) val s2Subscription = s2SubscriberProbe.expectSubscription() s2Subscription.request(100) s2SubscriberProbe.expectNext(2) @@ -83,7 +83,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in assertAllStagesStopped { val publisherProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -108,7 +108,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "not timeout and cancel substream publishers when they have been subscribed to" in { val publisherProbe = TestPublisher.manualProbe[Int]() - val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher(1)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) @@ -123,7 +123,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = TestSubscriber.manualProbe[Int]() - s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) + s1.runWith(Sink.publisher(1)).subscribe(s1SubscriberProbe) val s1Sub = s1SubscriberProbe.expectSubscription() s1Sub.request(1) s1SubscriberProbe.expectNext(1) @@ -131,7 +131,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = TestSubscriber.manualProbe[Int]() - s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) + s2.runWith(Sink.publisher(1)).subscribe(s2SubscriberProbe) val s2Sub = s2SubscriberProbe.expectSubscription() // sleep long enough for timeout to trigger if not canceled diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 08f5449d7a..449c9bf2f2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -49,7 +49,7 @@ class TickSourceSpec extends AkkaSpec { } "reject multiple subscribers, but keep the first" in { - val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.publisher) + val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.publisher(1)) val c1 = TestSubscriber.manualProbe[String]() val c2 = TestSubscriber.manualProbe[String]() p.subscribe(c1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index abbccb03a3..6c95a21289 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -24,7 +24,7 @@ private[akka] class ConcatAllImpl(f: Any ⇒ Source[Any, _], materializer: Actor import akka.stream.impl.MultiStreamInputProcessor._ val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher)(materializer) + val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher(1))(materializer) // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index c48cf1863b..9155a10bf2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -1,13 +1,17 @@ package akka.stream.impl -import akka.actor.{ Actor, ActorRef } +import akka.actor.{ Deploy, Props, Actor, ActorRef } import akka.stream.ActorMaterializerSettings import org.reactivestreams.Subscriber /** * INTERNAL API */ -private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, val pump: Pump) +private[akka] abstract class FanoutOutputs(val maxNumberOfSubscribers: Int, + val maxBufferSize: Int, + val initialBufferSize: Int, + self: ActorRef, + val pump: Pump) extends DefaultOutputTransferStates with SubscriberManagement[Any] { @@ -88,16 +92,18 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu } +private[akka] object FanoutProcessorImpl { + def props(actorMaterializerSettings: ActorMaterializerSettings, maxNumberOfSubscribers: Int): Props = + Props(new FanoutProcessorImpl(actorMaterializerSettings, maxNumberOfSubscribers)).withDeploy(Deploy.local) +} /** * INTERNAL API */ -private[akka] class FanoutProcessorImpl( - _settings: ActorMaterializerSettings, - initialFanoutBufferSize: Int, - maximumFanoutBufferSize: Int) extends ActorProcessorImpl(_settings) { +private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings, maxNumberOfSubscribers: Int) + extends ActorProcessorImpl(_settings) { override val primaryOutputs: FanoutOutputs = - new FanoutOutputs(maximumFanoutBufferSize, initialFanoutBufferSize, self, this) { + new FanoutOutputs(maxNumberOfSubscribers, settings.maxInputBufferSize, settings.initialInputBufferSize, self, this) { override def afterShutdown(): Unit = afterFlush() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 514b8f499d..c7bfe54a68 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -66,26 +66,25 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha * INTERNAL API */ private[akka] final class FanoutPublisherSink[In]( - initialBufferSize: Int, - maximumBufferSize: Int, + maxNumberOfSubscribers: Int, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { val actorMaterializer = ActorMaterializer.downcast(context.materializer) - val fanoutActor = actorMaterializer.actorOf(context, - Props(new FanoutProcessorImpl(actorMaterializer.effectiveSettings(context.effectiveAttributes), - initialBufferSize, maximumBufferSize)).withDeploy(Deploy.local)) - val fanoutProcessor = ActorProcessorFactory[In, In](fanoutActor) + val fanoutProcessor = ActorProcessorFactory[In, In]( + actorMaterializer.actorOf( + context, + FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes), maxNumberOfSubscribers))) (fanoutProcessor, fanoutProcessor) } override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = - new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attributes, shape) + new FanoutPublisherSink[In](maxNumberOfSubscribers, attributes, shape) override def withAttributes(attr: Attributes): Module = - new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attr, amendShape(attr)) + new FanoutPublisherSink[In](maxNumberOfSubscribers, attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index ad15a86802..f35a165fd0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -370,10 +370,10 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { tryOnSubscribe(s, sub) sub.closeLatch() // allow onNext only now terminationStatus.getAndSet(Allowed) match { - case null ⇒ // nothing happened yet - case Completed ⇒ tryOnComplete(s) - case Failed(ex) ⇒ tryOnError(s, ex) - case Allowed ⇒ // all good + case null ⇒ // nothing happened yet + case VirtualProcessor.Completed ⇒ tryOnComplete(s) + case VirtualProcessor.Failed(ex) ⇒ tryOnError(s, ex) + case VirtualProcessor.Allowed ⇒ // all good } } catch { case NonFatal(ex) ⇒ sub.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 472925a140..29ecf6490c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -62,6 +62,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff def initialBufferSize: Int def maxBufferSize: Int + def maxNumberOfSubscribers: Int /** * called when we are ready to consume more elements from our upstream @@ -231,12 +232,18 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff private def addSubscription(subscriber: Subscriber[_ >: T]): Unit = { import ReactiveStreamsCompliance._ - val newSubscription = createSubscription(subscriber) - subscriptions ::= newSubscription - buffer.initCursor(newSubscription) - try tryOnSubscribe(subscriber, newSubscription) - catch { - case _: SpecViolation ⇒ unregisterSubscriptionInternal(newSubscription) + if (maxNumberOfSubscribers < 1 || subscriptions.size >= maxNumberOfSubscribers) { + tryOnSubscribe(subscriber, CancelledSubscription) + tryOnError(subscriber, + new IllegalStateException(s"Max number of Subscribers exceeded. [${maxNumberOfSubscribers}]")) + } else { + val newSubscription = createSubscription(subscriber) + subscriptions ::= newSubscription + buffer.initCursor(newSubscription) + try tryOnSubscribe(subscriber, newSubscription) + catch { + case _: SpecViolation ⇒ unregisterSubscriptionInternal(newSubscription) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 3d7bce587a..4b939b5bdb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -181,17 +181,17 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.joinMat(bidi)(combinerToScala(combine))) /** - * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. + * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. * - * The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`, + * The returned tuple contains the materialized values of the `Source` and `Sink`, * e.g. the `Subscriber` of a `Source.subscriber` and `Publisher` of a `Sink.publisher`. * - * @tparam T materialized type of given KeyedSource - * @tparam U materialized type of given KeyedSink + * @tparam T materialized type of given Source + * @tparam U materialized type of given Sink */ def runWith[T, U](source: Graph[SourceShape[In], T], sink: Graph[SinkShape[Out], U], materializer: Materializer): akka.japi.Pair[T, U] = { - val p = delegate.runWith(source, sink)(materializer) - akka.japi.Pair(p._1.asInstanceOf[T], p._2.asInstanceOf[U]) + val (som, sim) = delegate.runWith(source, sink)(materializer) + akka.japi.Pair(som, sim) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index f32f024c76..aa256cdf45 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -45,10 +45,14 @@ object Sink { /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. - * that can handle one [[org.reactivestreams.Subscriber]]. + * that can handle `maxNumberOfSubscribers` [[org.reactivestreams.Subscriber]]s. + * + * If `maxNumberOfSubscribers` is greater than 1, the size of the `inputBuffer` configured for this stage + * becomes the maximum number of elements that the fastest [[org.reactivestreams.Subscriber]] can be ahead + * of the slowest one before slowing the processing down due to back pressure. */ - def publisher[In](): Sink[In, Publisher[In]] = - new Sink(scaladsl.Sink.publisher) + def publisher[In](maxNumberOfSubscribers: Int): Sink[In, Publisher[In]] = + new Sink(scaladsl.Sink.publisher(maxNumberOfSubscribers)) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized @@ -73,13 +77,6 @@ object Sink { def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(ec: ExecutionContext): Sink[T, Future[Unit]] = new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec)) - /** - * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] - * that can handle more than one [[org.reactivestreams.Subscriber]]. - */ - def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] = - new Sink(scaladsl.Sink.fanoutPublisher(initialBufferSize, maximumBufferSize)) - /** * A `Sink` that when the flow is completed, either through a failure or normal * completion, apply the provided function with [[scala.util.Success]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 4bc9f4336b..2ab40a814f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -223,7 +223,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it. */ def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = { - Source.subscriber[In].via(this).toMat(Sink.publisher[Out])(Keep.both[Subscriber[In], Publisher[Out]]) + Source.subscriber[In].via(this).toMat(Sink.publisher[Out](1))(Keep.both[Subscriber[In], Publisher[Out]]) .mapMaterializedValue { case (sub, pub) ⇒ new Processor[In, Out] { override def onError(t: Throwable): Unit = sub.onError(t) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 9a9bf71795..7e65ee199d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -80,18 +80,21 @@ object Sink { /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. - * that can handle one [[org.reactivestreams.Subscriber]]. + * that can handle `maxNumberOfSubscribers` [[org.reactivestreams.Subscriber]]s. + * + * If `maxNumberOfSubscribers` is greater than 1, the size of the `inputBuffer` configured for this stage + * becomes the maximum number of elements that the fastest [[org.reactivestreams.Subscriber]] can be ahead + * of the slowest one before slowing the processing down due to back pressure. */ - def publisher[T]: Sink[T, Publisher[T]] = - new Sink(new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) - - /** - * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] - * that can handle more than one [[org.reactivestreams.Subscriber]]. - */ - def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] = - new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, DefaultAttributes.fanoutPublisherSink, - shape("FanoutPublisherSink"))) + def publisher[T](maxNumberOfSubscribers: Int): Sink[T, Publisher[T]] = + new Sink( + maxNumberOfSubscribers match { + case 1 ⇒ new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")) + case n ⇒ + new FanoutPublisherSink[T](n, + DefaultAttributes.fanoutPublisherSink, + shape("FanoutPublisherSink")) + }) /** * A `Sink` that will consume the stream and discard the elements.