Merge pull request #16579 from akka/wip-16549-doc-rs-integration-patriknw

=str #16549 doc: Integration with Reactive Streams
This commit is contained in:
Patrik Nordwall 2014-12-19 15:17:13 +01:00
commit 872d5d10b0
3 changed files with 217 additions and 9 deletions

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.Sink
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.scaladsl.Source
class ReactiveStreamsDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
implicit val mat = FlowMaterializer()
//#imports
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
//#imports
trait Fixture {
//#authors
val authors = Flow[Tweet]
.filter(_.hashtags.contains(Akka))
.map(_.author)
//#authors
//#tweets-publisher
def tweets: Publisher[Tweet]
//#tweets-publisher
//#author-storage-subscriber
def storage: Subscriber[Author]
//#author-storage-subscriber
//#author-alert-subscriber
def alert: Subscriber[Author]
//#author-alert-subscriber
}
val impl = new Fixture {
override def tweets: Publisher[Tweet] =
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher)
override def storage = SubscriberProbe[Author]
override def alert = SubscriberProbe[Author]
}
def assertResult(storage: SubscriberProbe[Author]): Unit = {
val sub = storage.expectSubscription()
sub.request(10)
storage.expectNext(Author("rolandkuhn"))
storage.expectNext(Author("patriknw"))
storage.expectNext(Author("bantonsson"))
storage.expectNext(Author("drewhk"))
storage.expectNext(Author("ktosopl"))
storage.expectNext(Author("mmartynas"))
storage.expectNext(Author("akkateam"))
storage.expectComplete()
}
"reactive streams publisher via flow to subscriber" in {
import impl._
val storage = impl.storage
//#connect-all
Source(tweets).via(authors).to(Sink(storage)).run()
//#connect-all
assertResult(storage)
}
"flow as publisher and subscriber" in {
import impl._
val storage = impl.storage
//#flow-publisher-subscriber
val (in: Subscriber[Tweet], out: Publisher[Author]) =
authors.runWith(Source.subscriber[Tweet], Sink.publisher[Author])
tweets.subscribe(in)
out.subscribe(storage)
//#flow-publisher-subscriber
assertResult(storage)
}
"source as publisher" in {
import impl._
val storage = impl.storage
//#source-publisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors).runWith(Sink.publisher)
authorPublisher.subscribe(storage)
//#source-publisher
assertResult(storage)
}
"source as fanoutPublisher" in {
import impl._
val storage = impl.storage
val alert = impl.alert
//#source-fanoutPublisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors)
.runWith(Sink.fanoutPublisher(initialBufferSize = 8, maximumBufferSize = 16))
authorPublisher.subscribe(storage)
authorPublisher.subscribe(alert)
//#source-fanoutPublisher
// this relies on fanoutPublisher buffer size > number of authors
assertResult(storage)
assertResult(alert)
}
"sink as subscriber" in {
import impl._
val storage = impl.storage
//#sink-subscriber
val tweetSubscriber: Subscriber[Tweet] =
authors.to(Sink(storage)).runWith(Source.subscriber[Tweet])
tweets.subscribe(tweetSubscriber)
//#sink-subscriber
assertResult(storage)
}
}

View file

@ -0,0 +1,77 @@
.. _stream-integration-reactive-streams-scala:
Integrating with Reactive Streams
=================================
`Reactive Streams`_ defines a standard for asynchronous stream processing with non-blocking
back pressure. It makes it possible to plug together stream libraries that adhere to the standard.
Akka Streams is one such library.
An incomplete list of other implementations:
* `Reactor (1.1+)`_
* `RxJava`_
* `Ratpack`_
* `Slick`_
.. _Reactive Streams: http://reactive-streams.org/
.. _Reactor (1.1+): http://github.com/reactor/reactor
.. _RxJava: https://github.com/ReactiveX/RxJavaReactiveStreams
.. _Ratpack: http://www.ratpack.io/manual/current/streams.html
.. _Slick: http://slick.typesafe.com
The two most important interfaces in Reactive Streams are the :class:`Publisher` and :class:`Subscriber`.
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#imports
Let us assume that a library provides a publisher of tweets:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#tweets-publisher
and another library knows how to store author handles in a database:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#author-storage-subscriber
Using an Akka Streams :class:`Flow` we can transform the stream and connect those:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala
:include: authors,connect-all
The :class:`Publisher` is used as an input :class:`Source` to the flow and the
:class:`Subscriber` is used as an output :class:`Sink`.
A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#flow-publisher-subscriber
A publisher can be connected to a subscriber with the ``subscribe`` method.
It is also possible to expose a :class:`Source` as a :class:`Publisher`
by using the ``publisher`` :class:`Sink`:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher
A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second
subscription attempt will be rejected with an :class:`IllegalStateException`.
A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher``
instead:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala
:include: author-alert-subscriber,author-storage-subscriber
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-fanoutPublisher
The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber
before slowing down the stream.
To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber`
by using the ``subscriber`` :class:`Source`:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#sink-subscriber

View file

@ -23,6 +23,7 @@ It should be roughly:
:maxdepth: 1
stream-integration-external
stream-integration-reactive-streams
Motivation
==========
@ -448,15 +449,6 @@ Integrating with Actors
// TODO: how do I create my own sources / sinks?
Integration with Reactive Streams enabled libraries
===================================================
// TODO: some info about reactive streams in general
// TODO: Simply runWith(Sink.publisher) and runWith(Source.subscriber) to get the corresponding reactive streams types.
// TODO: fanoutPublisher
ActorPublisher
^^^^^^^^^^^^^^