diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala new file mode 100644 index 0000000000..2331ca7971 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.scaladsl.Sink +import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.scaladsl.Source + +class ReactiveStreamsDocSpec extends AkkaSpec { + import TwitterStreamQuickstartDocSpec._ + + implicit val mat = FlowMaterializer() + + //#imports + import org.reactivestreams.Publisher + import org.reactivestreams.Subscriber + //#imports + + trait Fixture { + //#authors + val authors = Flow[Tweet] + .filter(_.hashtags.contains(Akka)) + .map(_.author) + + //#authors + + //#tweets-publisher + def tweets: Publisher[Tweet] + //#tweets-publisher + + //#author-storage-subscriber + def storage: Subscriber[Author] + //#author-storage-subscriber + + //#author-alert-subscriber + def alert: Subscriber[Author] + //#author-alert-subscriber + } + + val impl = new Fixture { + override def tweets: Publisher[Tweet] = + TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher) + + override def storage = SubscriberProbe[Author] + + override def alert = SubscriberProbe[Author] + } + + def assertResult(storage: SubscriberProbe[Author]): Unit = { + val sub = storage.expectSubscription() + sub.request(10) + storage.expectNext(Author("rolandkuhn")) + storage.expectNext(Author("patriknw")) + storage.expectNext(Author("bantonsson")) + storage.expectNext(Author("drewhk")) + storage.expectNext(Author("ktosopl")) + storage.expectNext(Author("mmartynas")) + storage.expectNext(Author("akkateam")) + storage.expectComplete() + } + + "reactive streams publisher via flow to subscriber" in { + import impl._ + val storage = impl.storage + + //#connect-all + Source(tweets).via(authors).to(Sink(storage)).run() + //#connect-all + + assertResult(storage) + } + + "flow as publisher and subscriber" in { + import impl._ + val storage = impl.storage + + //#flow-publisher-subscriber + val (in: Subscriber[Tweet], out: Publisher[Author]) = + authors.runWith(Source.subscriber[Tweet], Sink.publisher[Author]) + + tweets.subscribe(in) + out.subscribe(storage) + //#flow-publisher-subscriber + + assertResult(storage) + } + + "source as publisher" in { + import impl._ + val storage = impl.storage + + //#source-publisher + val authorPublisher: Publisher[Author] = + Source(tweets).via(authors).runWith(Sink.publisher) + + authorPublisher.subscribe(storage) + //#source-publisher + + assertResult(storage) + } + + "source as fanoutPublisher" in { + import impl._ + val storage = impl.storage + val alert = impl.alert + + //#source-fanoutPublisher + val authorPublisher: Publisher[Author] = + Source(tweets).via(authors) + .runWith(Sink.fanoutPublisher(initialBufferSize = 8, maximumBufferSize = 16)) + + authorPublisher.subscribe(storage) + authorPublisher.subscribe(alert) + //#source-fanoutPublisher + + // this relies on fanoutPublisher buffer size > number of authors + assertResult(storage) + assertResult(alert) + } + + "sink as subscriber" in { + import impl._ + val storage = impl.storage + + //#sink-subscriber + val tweetSubscriber: Subscriber[Tweet] = + authors.to(Sink(storage)).runWith(Source.subscriber[Tweet]) + + tweets.subscribe(tweetSubscriber) + //#sink-subscriber + + assertResult(storage) + } + +} diff --git a/akka-docs-dev/rst/scala/stream-integration-reactive-streams.rst b/akka-docs-dev/rst/scala/stream-integration-reactive-streams.rst new file mode 100644 index 0000000000..be144156ad --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-integration-reactive-streams.rst @@ -0,0 +1,77 @@ +.. _stream-integration-reactive-streams-scala: + +Integrating with Reactive Streams +================================= + +`Reactive Streams`_ defines a standard for asynchronous stream processing with non-blocking +back pressure. It makes it possible to plug together stream libraries that adhere to the standard. +Akka Streams is one such library. + +An incomplete list of other implementations: + +* `Reactor (1.1+)`_ +* `RxJava`_ +* `Ratpack`_ +* `Slick`_ + +.. _Reactive Streams: http://reactive-streams.org/ +.. _Reactor (1.1+): http://github.com/reactor/reactor +.. _RxJava: https://github.com/ReactiveX/RxJavaReactiveStreams +.. _Ratpack: http://www.ratpack.io/manual/current/streams.html +.. _Slick: http://slick.typesafe.com + +The two most important interfaces in Reactive Streams are the :class:`Publisher` and :class:`Subscriber`. + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#imports + +Let us assume that a library provides a publisher of tweets: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#tweets-publisher + +and another library knows how to store author handles in a database: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#author-storage-subscriber + +Using an Akka Streams :class:`Flow` we can transform the stream and connect those: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala + :include: authors,connect-all + +The :class:`Publisher` is used as an input :class:`Source` to the flow and the +:class:`Subscriber` is used as an output :class:`Sink`. + +A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#flow-publisher-subscriber + +A publisher can be connected to a subscriber with the ``subscribe`` method. + +It is also possible to expose a :class:`Source` as a :class:`Publisher` +by using the ``publisher`` :class:`Sink`: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher + +A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second +subscription attempt will be rejected with an :class:`IllegalStateException`. + +A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher`` +instead: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala + :include: author-alert-subscriber,author-storage-subscriber + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-fanoutPublisher + +The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber +before slowing down the stream. + +To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber` +by using the ``subscriber`` :class:`Source`: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#sink-subscriber + + + + + + \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index 8b077d725e..f7d8f98b5d 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -23,6 +23,7 @@ It should be roughly: :maxdepth: 1 stream-integration-external + stream-integration-reactive-streams Motivation ========== @@ -448,15 +449,6 @@ Integrating with Actors // TODO: how do I create my own sources / sinks? -Integration with Reactive Streams enabled libraries -=================================================== - -// TODO: some info about reactive streams in general - -// TODO: Simply runWith(Sink.publisher) and runWith(Source.subscriber) to get the corresponding reactive streams types. - -// TODO: fanoutPublisher - ActorPublisher ^^^^^^^^^^^^^^