This is mainly intended to keep the git history as neat as possible. No actual conversion yet, so now both the rst and the paradox docs are broken, which will be fixed in the next commits.
149 lines
3.7 KiB
Scala
149 lines
3.7 KiB
Scala
/**
|
|
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
|
|
*/
|
|
package docs.stream
|
|
|
|
import akka.NotUsed
|
|
import akka.stream.ActorMaterializer
|
|
import akka.stream.scaladsl.{ Flow, Sink, Source }
|
|
import akka.stream.testkit._
|
|
import org.reactivestreams.Processor
|
|
import akka.testkit.AkkaSpec
|
|
|
|
class ReactiveStreamsDocSpec extends AkkaSpec {
|
|
import TwitterStreamQuickstartDocSpec._
|
|
|
|
implicit val materializer = ActorMaterializer()
|
|
|
|
//#imports
|
|
import org.reactivestreams.Publisher
|
|
import org.reactivestreams.Subscriber
|
|
//#imports
|
|
|
|
trait Fixture {
|
|
//#authors
|
|
val authors = Flow[Tweet]
|
|
.filter(_.hashtags.contains(akkaTag))
|
|
.map(_.author)
|
|
|
|
//#authors
|
|
|
|
//#tweets-publisher
|
|
def tweets: Publisher[Tweet]
|
|
//#tweets-publisher
|
|
|
|
//#author-storage-subscriber
|
|
def storage: Subscriber[Author]
|
|
//#author-storage-subscriber
|
|
|
|
//#author-alert-subscriber
|
|
def alert: Subscriber[Author]
|
|
//#author-alert-subscriber
|
|
}
|
|
|
|
val impl = new Fixture {
|
|
override def tweets: Publisher[Tweet] =
|
|
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.asPublisher(fanout = false))
|
|
|
|
override def storage = TestSubscriber.manualProbe[Author]
|
|
|
|
override def alert = TestSubscriber.manualProbe[Author]
|
|
}
|
|
|
|
def assertResult(storage: TestSubscriber.ManualProbe[Author]): Unit = {
|
|
val sub = storage.expectSubscription()
|
|
sub.request(10)
|
|
storage.expectNext(Author("rolandkuhn"))
|
|
storage.expectNext(Author("patriknw"))
|
|
storage.expectNext(Author("bantonsson"))
|
|
storage.expectNext(Author("drewhk"))
|
|
storage.expectNext(Author("ktosopl"))
|
|
storage.expectNext(Author("mmartynas"))
|
|
storage.expectNext(Author("akkateam"))
|
|
storage.expectComplete()
|
|
}
|
|
|
|
"reactive streams publisher via flow to subscriber" in {
|
|
import impl._
|
|
val storage = impl.storage
|
|
|
|
//#connect-all
|
|
Source.fromPublisher(tweets).via(authors).to(Sink.fromSubscriber(storage)).run()
|
|
//#connect-all
|
|
|
|
assertResult(storage)
|
|
}
|
|
|
|
"flow as publisher and subscriber" in {
|
|
import impl._
|
|
val storage = impl.storage
|
|
|
|
//#flow-publisher-subscriber
|
|
val processor: Processor[Tweet, Author] = authors.toProcessor.run()
|
|
|
|
tweets.subscribe(processor)
|
|
processor.subscribe(storage)
|
|
//#flow-publisher-subscriber
|
|
|
|
assertResult(storage)
|
|
}
|
|
|
|
"source as publisher" in {
|
|
import impl._
|
|
val storage = impl.storage
|
|
|
|
//#source-publisher
|
|
val authorPublisher: Publisher[Author] =
|
|
Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = false))
|
|
|
|
authorPublisher.subscribe(storage)
|
|
//#source-publisher
|
|
|
|
assertResult(storage)
|
|
}
|
|
|
|
"source as fanoutPublisher" in {
|
|
import impl._
|
|
val storage = impl.storage
|
|
val alert = impl.alert
|
|
|
|
//#source-fanoutPublisher
|
|
val authorPublisher: Publisher[Author] =
|
|
Source.fromPublisher(tweets).via(authors)
|
|
.runWith(Sink.asPublisher(fanout = true))
|
|
|
|
authorPublisher.subscribe(storage)
|
|
authorPublisher.subscribe(alert)
|
|
//#source-fanoutPublisher
|
|
|
|
// this relies on fanoutPublisher buffer size > number of authors
|
|
assertResult(storage)
|
|
assertResult(alert)
|
|
}
|
|
|
|
"sink as subscriber" in {
|
|
import impl._
|
|
val storage = impl.storage
|
|
|
|
//#sink-subscriber
|
|
val tweetSubscriber: Subscriber[Tweet] =
|
|
authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber[Tweet])
|
|
|
|
tweets.subscribe(tweetSubscriber)
|
|
//#sink-subscriber
|
|
|
|
assertResult(storage)
|
|
}
|
|
|
|
"use a processor" in {
|
|
|
|
//#use-processor
|
|
// An example Processor factory
|
|
def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run()
|
|
|
|
val flow: Flow[Int, Int, NotUsed] = Flow.fromProcessor(() => createProcessor)
|
|
//#use-processor
|
|
|
|
}
|
|
|
|
}
|