!str - Switches Sink.publisher to use a boolean to indicate fanout rather than a number of allowed subscribers

This commit is contained in:
Viktor Klang 2015-11-03 12:53:24 +01:00
parent f839a1f85d
commit 6cfa4df800
53 changed files with 221 additions and 212 deletions

View file

@ -41,7 +41,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
val impl = new Fixture {
override def tweets: Publisher[Tweet] =
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(1))
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(false))
override def storage = TestSubscriber.manualProbe[Author]
@ -92,7 +92,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
//#source-publisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors).runWith(Sink.publisher(1))
Source(tweets).via(authors).runWith(Sink.publisher(fanout = false))
authorPublisher.subscribe(storage)
//#source-publisher
@ -108,7 +108,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
//#source-fanoutPublisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors)
.runWith(Sink.publisher(maxNumberOfSubscribers = Int.MaxValue))
.runWith(Sink.publisher(fanout = true))
authorPublisher.subscribe(storage)
authorPublisher.subscribe(alert)

View file

@ -409,10 +409,10 @@ by using the Publisher-:class:`Sink`:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher
A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second
subscription attempt will be rejected with an :class:`IllegalStateException`.
A publisher that is created with ``Sink.publisher(false)`` supports only a single subscription.
Additional subscription attempts will be rejected with an :class:`IllegalStateException`.
A publisher that supports multiple subscribers is created as follows:
A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala
:include: author-alert-subscriber,author-storage-subscriber