+doc akka streams docs, flowGraphs
This commit is contained in:
parent
5892d49bed
commit
8274453549
6 changed files with 380 additions and 40 deletions
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.stream
|
||||
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.scaladsl.Broadcast
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.FlowGraph
|
||||
import akka.stream.scaladsl.FlowGraphImplicits
|
||||
import akka.stream.scaladsl.MaterializedMap
|
||||
import akka.stream.scaladsl.Merge
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.scaladsl.Zip
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
// TODO replace ⇒ with => and disable this intellij setting
|
||||
class FlowGraphDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
implicit val mat = FlowMaterializer()
|
||||
|
||||
"build simple graph" in {
|
||||
//format: OFF
|
||||
//#simple-flow-graph
|
||||
val g = FlowGraph { implicit b ⇒
|
||||
import FlowGraphImplicits._
|
||||
val in = Source(1 to 10)
|
||||
val out = Sink.ignore
|
||||
|
||||
val broadcast = Broadcast[Int]
|
||||
val merge = Merge[Int]
|
||||
|
||||
val f1 = Flow[Int].map(_ + 10)
|
||||
val f3 = Flow[Int].map(_.toString)
|
||||
val f2 = Flow[Int].map(_ + 20)
|
||||
|
||||
in ~> broadcast ~> f1 ~> merge
|
||||
broadcast ~> f2 ~> merge ~> f3 ~> out
|
||||
}
|
||||
//#simple-flow-graph
|
||||
//format: ON
|
||||
|
||||
//#simple-graph-run
|
||||
val map: MaterializedMap = g.run()
|
||||
//#simple-graph-run
|
||||
}
|
||||
|
||||
"build simple graph without implicits" in {
|
||||
//#simple-flow-graph-no-implicits
|
||||
val g = FlowGraph { b ⇒
|
||||
val in = Source(1 to 10)
|
||||
val out = Sink.ignore
|
||||
|
||||
val broadcast = Broadcast[Int]
|
||||
val merge = Merge[Int]
|
||||
|
||||
val f1 = Flow[Int].map(_ + 10)
|
||||
val f3 = Flow[Int].map(_.toString)
|
||||
val f2 = Flow[Int].map(_ + 20)
|
||||
|
||||
b.addEdge(in, broadcast)
|
||||
.addEdge(broadcast, f1, merge)
|
||||
.addEdge(broadcast, f2, merge)
|
||||
.addEdge(merge, f3, out)
|
||||
}
|
||||
//#simple-flow-graph-no-implicits
|
||||
|
||||
g.run()
|
||||
}
|
||||
|
||||
"flow connection errors" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
//#simple-graph
|
||||
FlowGraph { implicit b ⇒
|
||||
import FlowGraphImplicits._
|
||||
val source1 = Source(1 to 10)
|
||||
val source2 = Source(1 to 10)
|
||||
|
||||
val zip = Zip[Int, Int]
|
||||
|
||||
source1 ~> zip.left
|
||||
source2 ~> zip.right
|
||||
// unconnected zip.out (!) => "must have at least 1 outgoing edge"
|
||||
}
|
||||
//#simple-graph
|
||||
}.getMessage should include("must have at least 1 outgoing edge")
|
||||
}
|
||||
|
||||
}
|
||||
25
akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala
Normal file
25
akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.stream
|
||||
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.FlowGraph
|
||||
import akka.stream.scaladsl.FlowGraphImplicits
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.scaladsl.Zip
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
// TODO replace ⇒ with => and disable this intellij setting
|
||||
class StreamDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
//#imports
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.scaladsl.Broadcast
|
||||
//#imports
|
||||
|
||||
implicit val mat = FlowMaterializer()
|
||||
|
||||
}
|
||||
|
|
@ -3,26 +3,142 @@
|
|||
*/
|
||||
package docs.stream
|
||||
|
||||
//#imports
|
||||
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.scaladsl.Broadcast
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.FlowGraph
|
||||
import akka.stream.scaladsl.FlowGraphImplicits
|
||||
import akka.stream.scaladsl.PartialFlowGraph
|
||||
|
||||
//#imports
|
||||
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.scaladsl.UndefinedSink
|
||||
import akka.stream.scaladsl.UndefinedSource
|
||||
import akka.stream.scaladsl.Zip
|
||||
import akka.stream.scaladsl.ZipWith
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
// TODO replace ⇒ with => and disable this intellij setting
|
||||
class StreamPartialFlowGraphDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
implicit val mat = FlowMaterializer()
|
||||
|
||||
"build with open ports" in {
|
||||
// format: OFF
|
||||
//#simple-partial-flow-graph
|
||||
PartialFlowGraph { implicit b ⇒
|
||||
// defined outside as they will be used by different FlowGraphs
|
||||
// 1) first by the PartialFlowGraph to mark its open input and output ports
|
||||
// 2) then by the assembling FlowGraph which will attach real sinks and sources to them
|
||||
val in1 = UndefinedSource[Int]
|
||||
val in2 = UndefinedSource[Int]
|
||||
val in3 = UndefinedSource[Int]
|
||||
val out = UndefinedSink[Int]
|
||||
|
||||
val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b ⇒
|
||||
import FlowGraphImplicits._
|
||||
|
||||
val zip1 = ZipWith[Int, Int, Int](math.max _)
|
||||
val zip2 = ZipWith[Int, Int, Int](math.max _)
|
||||
|
||||
in1 ~> zip1.left
|
||||
in2 ~> zip1.right
|
||||
zip1.out ~> zip2.left
|
||||
in3 ~> zip2.right
|
||||
zip2.out ~> out
|
||||
}
|
||||
// format: ON
|
||||
|
||||
val resultSink = Sink.head[Int]
|
||||
|
||||
val g = FlowGraph { b ⇒
|
||||
// import the partial flow graph explicitly
|
||||
b.importPartialFlowGraph(pickMaxOfThree)
|
||||
|
||||
b.attachSource(in1, Source.single(1))
|
||||
b.attachSource(in2, Source.single(2))
|
||||
b.attachSource(in3, Source.single(3))
|
||||
b.attachSink(out, resultSink)
|
||||
}
|
||||
|
||||
val materialized = g.run()
|
||||
val max: Future[Int] = materialized.get(resultSink)
|
||||
Await.result(max, 300.millis) should equal(3)
|
||||
//#simple-partial-flow-graph
|
||||
|
||||
val g2 =
|
||||
//#simple-partial-flow-graph-import-shorthand
|
||||
FlowGraph(pickMaxOfThree) { b ⇒
|
||||
b.attachSource(in1, Source.single(1))
|
||||
b.attachSource(in2, Source.single(2))
|
||||
b.attachSource(in3, Source.single(3))
|
||||
b.attachSink(out, resultSink)
|
||||
}
|
||||
//#simple-partial-flow-graph-import-shorthand
|
||||
val materialized2 = g.run()
|
||||
val max2: Future[Int] = materialized2.get(resultSink)
|
||||
Await.result(max2, 300.millis) should equal(3)
|
||||
}
|
||||
|
||||
"build source from partial flow graph" in {
|
||||
//#source-from-partial-flow-graph
|
||||
val pairs: Source[(Int, Int)] = Source() { implicit b ⇒
|
||||
import FlowGraphImplicits._
|
||||
|
||||
// prepare graph elements
|
||||
val undefinedSink = UndefinedSink[(Int, Int)]
|
||||
val zip = Zip[Int, Int]
|
||||
def ints = Source(() ⇒ Iterator.from(1))
|
||||
|
||||
// connect the graph
|
||||
ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.left
|
||||
ints ~> Flow[Int].filter(_ % 2 == 0) ~> zip.right
|
||||
zip.out ~> undefinedSink
|
||||
|
||||
// expose undefined sink
|
||||
undefinedSink
|
||||
}
|
||||
|
||||
val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)
|
||||
Await.result(firstPair, 300.millis) should equal(1 → 2)
|
||||
//#source-from-partial-flow-graph
|
||||
}
|
||||
|
||||
"build flow from partial flow graph" in {
|
||||
//#flow-from-partial-flow-graph
|
||||
val pairUpWithToString = Flow() { implicit b ⇒
|
||||
import FlowGraphImplicits._
|
||||
|
||||
// prepare graph elements
|
||||
val undefinedSource = UndefinedSource[Int]
|
||||
val undefinedSink = UndefinedSink[(Int, String)]
|
||||
|
||||
val broadcast = Broadcast[Int]
|
||||
val zip = Zip[Int, String]
|
||||
|
||||
// connect the graph
|
||||
undefinedSource ~> broadcast
|
||||
broadcast ~> Flow[Int].map(identity) ~> zip.left
|
||||
broadcast ~> Flow[Int].map(_.toString) ~> zip.right
|
||||
zip.out ~> undefinedSink
|
||||
|
||||
// expose undefined ports
|
||||
(undefinedSource, undefinedSink)
|
||||
}
|
||||
|
||||
//#flow-from-partial-flow-graph
|
||||
|
||||
// format: OFF
|
||||
val (_, matSink: Future[(Int, String)]) =
|
||||
//#flow-from-partial-flow-graph
|
||||
pairUpWithToString.runWith(Source(List(1)), Sink.head)
|
||||
//#flow-from-partial-flow-graph
|
||||
// format: ON
|
||||
|
||||
Await.result(matSink, 300.millis) should equal(1 → "1")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,14 +101,14 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
//#hashtags-mapConcat
|
||||
}
|
||||
|
||||
"simple broadcast" in {
|
||||
trait X {
|
||||
//#flow-graph-broadcast
|
||||
val writeAuthors: Sink[Author] = ???
|
||||
val writeHashtags: Sink[Hashtag] = ???
|
||||
//#flow-graph-broadcast
|
||||
}
|
||||
trait HiddenDefinitions {
|
||||
//#flow-graph-broadcast
|
||||
val writeAuthors: Sink[Author] = ???
|
||||
val writeHashtags: Sink[Hashtag] = ???
|
||||
//#flow-graph-broadcast
|
||||
}
|
||||
|
||||
"simple broadcast" in {
|
||||
val writeAuthors: Sink[Author] = Sink.ignore
|
||||
val writeHashtags: Sink[Hashtag] = Sink.ignore
|
||||
|
||||
|
|
@ -193,7 +193,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
// the sumSink materialized two different futures
|
||||
// we use it as key to get the materialized value out of the materialized map
|
||||
val morningTweetsCount: Future[Int] = morningMaterialized.get(sumSink)
|
||||
val eveningTweetsCount: Future[Int] = morningMaterialized.get(sumSink)
|
||||
val eveningTweetsCount: Future[Int] = eveningMaterialized.get(sumSink)
|
||||
//#tweets-runnable-flow-materialized-twice
|
||||
|
||||
val map: MaterializedMap = counterRunnableFlow.run()
|
||||
|
|
@ -202,12 +202,6 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
|
||||
sum.map { c ⇒ println(s"Total tweets processed: $c") }
|
||||
//#tweets-fold-count
|
||||
|
||||
new AnyRef {
|
||||
//#tweets-fold-count-oneline
|
||||
val sum: Future[Int] = tweets.map(t ⇒ 1).runWith(sumSink)
|
||||
//#tweets-fold-count-oneline
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue