+doc flow / flowgraph docs, moved quickstart

+ moved quickstart
+ more info about simple materialization
This commit is contained in:
Konrad 'ktoso' Malawski 2014-12-11 14:57:48 +01:00
parent 3b1bc67090
commit 1c722b8ae1
5 changed files with 405 additions and 238 deletions

View file

@ -0,0 +1,88 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.actor.Cancellable
import akka.stream.scaladsl.MaterializedMap
import akka.stream.scaladsl.RunnableFlow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.AkkaSpec
import concurrent.Future
// TODO replace with => and disable this intellij setting
class FlowDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
//#imports
import akka.stream.FlowMaterializer
//#imports
implicit val mat = FlowMaterializer()
"source is immutable" in {
//#source-immutable
val source = Source(1 to 10)
source.map(_ 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0
//#source-immutable
}
"materialization in steps" in {
//#materialization-in-steps
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// connect the Source to the Sink, obtaining a RunnableFlow
val runnable: RunnableFlow = source.to(sink)
// materialize the flow
val materialized: MaterializedMap = runnable.run()
// get the materialized value of the FoldSink
val sum: Future[Int] = materialized.get(sink)
//#materialization-in-steps
}
"materialization runWith" in {
//#materialization-runWith
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// materialize the flow
val materialized: MaterializedMap = source.to(sink).run()
// get the materialized value from the running streams MaterializedMap
val sum: Future[Int] = materialized.get(sink)
//#materialization-runWith
}
"compound source cannot be used as key" in {
//#compound-source-is-not-keyed-runWith
import scala.concurrent.duration._
case object Tick
val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel()
val timerMap = timer.map(tick "tick")
val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable!
//#compound-source-is-not-keyed-runWith
//#compound-source-is-not-keyed-run
// retain the materialized map, in order to retrieve the timers Cancellable
val materialized = timerMap.to(Sink.ignore).run()
val timerCancellable = materialized.get(timer)
timerCancellable.cancel()
//#compound-source-is-not-keyed-run
}
}

View file

@ -15,6 +15,9 @@ import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Zip
import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.duration._
// TODO replace with => and disable this intellij setting
class FlowGraphDocSpec extends AkkaSpec {
@ -89,4 +92,31 @@ class FlowGraphDocSpec extends AkkaSpec {
}.getMessage should include("must have at least 1 outgoing edge")
}
"reusing a flow in a graph" in {
//#flow-graph-reusing-a-flow
val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
//#flow-graph-reusing-a-flow
// format: OFF
val g =
//#flow-graph-reusing-a-flow
FlowGraph { implicit b
import FlowGraphImplicits._
val broadcast = Broadcast[Int]
Source.single(1) ~> broadcast
broadcast ~> sharedDoubler ~> topHeadSink
broadcast ~> sharedDoubler ~> bottomHeadSink
}
//#flow-graph-reusing-a-flow
// format: ON
val map = g.run()
Await.result(map.get(topHeadSink), 300.millis) shouldEqual 2
Await.result(map.get(bottomHeadSink), 300.millis) shouldEqual 2
}
}

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.FlowGraphImplicits
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Zip
import akka.stream.testkit.AkkaSpec
// TODO replace with => and disable this intellij setting
class StreamDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
//#imports
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Broadcast
//#imports
implicit val mat = FlowMaterializer()
}