2014-12-11 14:57:48 +01:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
|
2014-12-11 14:57:48 +01:00
|
|
|
*/
|
|
|
|
|
package docs.stream
|
|
|
|
|
|
|
|
|
|
//#imports
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.{ Done, NotUsed }
|
2014-12-11 14:57:48 +01:00
|
|
|
import akka.actor.ActorSystem
|
2015-10-21 22:45:39 +02:00
|
|
|
import akka.stream.{ ClosedShape, ActorMaterializer, OverflowStrategy }
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream.scaladsl._
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2015-01-15 15:02:19 +01:00
|
|
|
import scala.concurrent.Await
|
|
|
|
|
import scala.concurrent.Future
|
2014-12-11 14:57:48 +01:00
|
|
|
|
|
|
|
|
//#imports
|
|
|
|
|
|
2016-02-25 14:27:45 +01:00
|
|
|
import akka.testkit.AkkaSpec
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
object TwitterStreamQuickstartDocSpec {
|
2014-12-11 14:57:48 +01:00
|
|
|
//#model
|
|
|
|
|
final case class Author(handle: String)
|
|
|
|
|
|
|
|
|
|
final case class Hashtag(name: String)
|
|
|
|
|
|
|
|
|
|
final case class Tweet(author: Author, timestamp: Long, body: String) {
|
2014-12-18 10:34:59 +01:00
|
|
|
def hashtags: Set[Hashtag] =
|
2014-12-22 16:18:26 +01:00
|
|
|
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
|
2014-12-11 14:57:48 +01:00
|
|
|
}
|
2015-01-15 15:02:19 +01:00
|
|
|
|
|
|
|
|
val akka = Hashtag("#akka")
|
2014-12-11 14:57:48 +01:00
|
|
|
//#model
|
|
|
|
|
|
2016-03-20 22:43:19 +00:00
|
|
|
//#tweet-source
|
2016-04-08 11:30:25 +02:00
|
|
|
val tweets: Source[Tweet, NotUsed]
|
|
|
|
|
//#tweet-source
|
2016-03-20 22:43:19 +00:00
|
|
|
= Source(
|
2015-01-15 15:02:19 +01:00
|
|
|
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
|
|
|
|
|
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
|
|
|
|
|
Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
|
|
|
|
|
Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
|
|
|
|
|
Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
|
|
|
|
|
Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
|
|
|
|
|
Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
|
|
|
|
|
Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
|
|
|
|
|
Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
|
|
|
|
|
Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
|
2014-12-18 10:34:59 +01:00
|
|
|
Nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|
|
|
|
import TwitterStreamQuickstartDocSpec._
|
|
|
|
|
|
|
|
|
|
implicit val executionContext = system.dispatcher
|
|
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
// Disable println
|
|
|
|
|
def println(s: Any): Unit = ()
|
|
|
|
|
|
2014-12-11 14:57:48 +01:00
|
|
|
trait Example1 {
|
2015-07-10 12:49:15 +02:00
|
|
|
//#first-sample
|
2014-12-11 14:57:48 +01:00
|
|
|
//#materializer-setup
|
|
|
|
|
implicit val system = ActorSystem("reactive-tweets")
|
2015-06-23 18:28:53 +02:00
|
|
|
implicit val materializer = ActorMaterializer()
|
2014-12-11 14:57:48 +01:00
|
|
|
//#materializer-setup
|
2015-07-10 12:49:15 +02:00
|
|
|
//#first-sample
|
2014-12-11 14:57:48 +01:00
|
|
|
}
|
|
|
|
|
|
2015-12-11 14:45:24 +01:00
|
|
|
implicit val materializer = ActorMaterializer()
|
2014-12-11 14:57:48 +01:00
|
|
|
|
|
|
|
|
"filter and map" in {
|
2015-07-10 12:49:15 +02:00
|
|
|
//#first-sample
|
|
|
|
|
|
2014-12-11 14:57:48 +01:00
|
|
|
//#authors-filter-map
|
2016-01-20 10:00:37 +02:00
|
|
|
val authors: Source[Author, NotUsed] =
|
2014-12-11 14:57:48 +01:00
|
|
|
tweets
|
2014-12-25 11:33:42 +01:00
|
|
|
.filter(_.hashtags.contains(akka))
|
2014-12-11 14:57:48 +01:00
|
|
|
.map(_.author)
|
2015-07-10 12:49:15 +02:00
|
|
|
//#first-sample
|
2014-12-11 14:57:48 +01:00
|
|
|
//#authors-filter-map
|
|
|
|
|
|
|
|
|
|
trait Example3 {
|
|
|
|
|
//#authors-collect
|
2016-01-20 10:00:37 +02:00
|
|
|
val authors: Source[Author, NotUsed] =
|
2014-12-25 11:33:42 +01:00
|
|
|
tweets.collect { case t if t.hashtags.contains(akka) => t.author }
|
2014-12-11 14:57:48 +01:00
|
|
|
//#authors-collect
|
|
|
|
|
}
|
|
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
//#first-sample
|
|
|
|
|
|
2014-12-11 14:57:48 +01:00
|
|
|
//#authors-foreachsink-println
|
|
|
|
|
authors.runWith(Sink.foreach(println))
|
|
|
|
|
//#authors-foreachsink-println
|
2015-07-10 12:49:15 +02:00
|
|
|
//#first-sample
|
2014-12-11 14:57:48 +01:00
|
|
|
|
|
|
|
|
//#authors-foreach-println
|
2015-01-26 14:57:05 +01:00
|
|
|
authors.runForeach(println)
|
2014-12-11 14:57:48 +01:00
|
|
|
//#authors-foreach-println
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"mapConcat hashtags" in {
|
|
|
|
|
//#hashtags-mapConcat
|
2016-01-20 10:00:37 +02:00
|
|
|
val hashtags: Source[Hashtag, NotUsed] = tweets.mapConcat(_.hashtags.toList)
|
2014-12-11 14:57:48 +01:00
|
|
|
//#hashtags-mapConcat
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-11 14:57:48 +01:00
|
|
|
trait HiddenDefinitions {
|
|
|
|
|
//#flow-graph-broadcast
|
2015-01-28 14:19:50 +01:00
|
|
|
val writeAuthors: Sink[Author, Unit] = ???
|
|
|
|
|
val writeHashtags: Sink[Hashtag, Unit] = ???
|
2014-12-11 14:57:48 +01:00
|
|
|
//#flow-graph-broadcast
|
|
|
|
|
}
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2014-12-11 14:57:48 +01:00
|
|
|
"simple broadcast" in {
|
2016-01-20 10:00:37 +02:00
|
|
|
val writeAuthors: Sink[Author, Future[Done]] = Sink.ignore
|
|
|
|
|
val writeHashtags: Sink[Hashtag, Future[Done]] = Sink.ignore
|
2014-12-11 14:57:48 +01:00
|
|
|
|
|
|
|
|
// format: OFF
|
|
|
|
|
//#flow-graph-broadcast
|
2015-11-30 15:45:37 +01:00
|
|
|
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
|
|
|
|
|
import GraphDSL.Implicits._
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
val bcast = b.add(Broadcast[Tweet](2))
|
|
|
|
|
tweets ~> bcast.in
|
|
|
|
|
bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
|
|
|
|
|
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
|
2015-10-21 22:45:39 +02:00
|
|
|
ClosedShape
|
|
|
|
|
})
|
2014-12-11 14:57:48 +01:00
|
|
|
g.run()
|
|
|
|
|
//#flow-graph-broadcast
|
|
|
|
|
// format: ON
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"slowProcessing" in {
|
|
|
|
|
def slowComputation(t: Tweet): Long = {
|
|
|
|
|
Thread.sleep(500) // act as if performing some heavy computation
|
|
|
|
|
42
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#tweets-slow-consumption-dropHead
|
|
|
|
|
tweets
|
|
|
|
|
.buffer(10, OverflowStrategy.dropHead)
|
|
|
|
|
.map(slowComputation)
|
|
|
|
|
.runWith(Sink.ignore)
|
|
|
|
|
//#tweets-slow-consumption-dropHead
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"backpressure by readline" in {
|
|
|
|
|
trait X {
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
|
|
|
|
//#backpressure-by-readline
|
2016-01-20 10:00:37 +02:00
|
|
|
val completion: Future[Done] =
|
2014-12-11 14:57:48 +01:00
|
|
|
Source(1 to 10)
|
2014-12-22 16:18:26 +01:00
|
|
|
.map(i => { println(s"map => $i"); i })
|
2015-01-26 14:57:05 +01:00
|
|
|
.runForeach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
|
2014-12-11 14:57:48 +01:00
|
|
|
|
|
|
|
|
Await.ready(completion, 1.minute)
|
|
|
|
|
//#backpressure-by-readline
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"count elements on finite stream" in {
|
|
|
|
|
//#tweets-fold-count
|
2016-01-20 10:00:37 +02:00
|
|
|
val count: Flow[Tweet, Int, NotUsed] = Flow[Tweet].map(_ => 1)
|
2015-07-10 12:49:15 +02:00
|
|
|
|
2015-02-26 11:33:29 +01:00
|
|
|
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
val counterGraph: RunnableGraph[Future[Int]] =
|
|
|
|
|
tweets
|
|
|
|
|
.via(count)
|
|
|
|
|
.toMat(sumSink)(Keep.right)
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
val sum: Future[Int] = counterGraph.run()
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2014-12-25 11:33:42 +01:00
|
|
|
sum.foreach(c => println(s"Total tweets processed: $c"))
|
2014-12-11 14:57:48 +01:00
|
|
|
//#tweets-fold-count
|
|
|
|
|
|
|
|
|
|
new AnyRef {
|
|
|
|
|
//#tweets-fold-count-oneline
|
2014-12-22 16:18:26 +01:00
|
|
|
val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)
|
2014-12-11 14:57:48 +01:00
|
|
|
//#tweets-fold-count-oneline
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"materialize multiple times" in {
|
|
|
|
|
val tweetsInMinuteFromNow = tweets // not really in second, just acting as if
|
|
|
|
|
|
|
|
|
|
//#tweets-runnable-flow-materialized-twice
|
|
|
|
|
val sumSink = Sink.fold[Int, Int](0)(_ + _)
|
2015-06-23 18:41:55 +02:00
|
|
|
val counterRunnableGraph: RunnableGraph[Future[Int]] =
|
2014-12-11 14:57:48 +01:00
|
|
|
tweetsInMinuteFromNow
|
2014-12-25 11:33:42 +01:00
|
|
|
.filter(_.hashtags contains akka)
|
2014-12-22 16:18:26 +01:00
|
|
|
.map(t => 1)
|
2015-01-28 14:19:50 +01:00
|
|
|
.toMat(sumSink)(Keep.right)
|
2014-12-11 14:57:48 +01:00
|
|
|
|
|
|
|
|
// materialize the stream once in the morning
|
2015-06-23 18:41:55 +02:00
|
|
|
val morningTweetsCount: Future[Int] = counterRunnableGraph.run()
|
2015-01-28 14:19:50 +01:00
|
|
|
// and once in the evening, reusing the flow
|
2015-06-23 18:41:55 +02:00
|
|
|
val eveningTweetsCount: Future[Int] = counterRunnableGraph.run()
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
//#tweets-runnable-flow-materialized-twice
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2015-06-23 18:41:55 +02:00
|
|
|
val sum: Future[Int] = counterRunnableGraph.run()
|
2014-12-11 14:57:48 +01:00
|
|
|
|
2014-12-22 16:18:26 +01:00
|
|
|
sum.map { c => println(s"Total tweets processed: $c") }
|
2014-12-11 14:57:48 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|