added custom linear stages section

This commit is contained in:
Endre Sándor Varga 2014-12-22 16:56:11 +01:00
parent fc8560afa9
commit beffbab601
9 changed files with 1374 additions and 7 deletions

View file

@ -0,0 +1,124 @@
package docs.stream
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ RunnableFlow, Sink, Source, Flow }
import akka.stream.stage.PushPullStage
import akka.stream.testkit.AkkaSpec
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowStagesSpec extends AkkaSpec {
//#import-stage
import akka.stream.stage._
//#import-stage
implicit val mat = FlowMaterializer()
"stages demo" must {
"demonstrate various PushPullStages" in {
//#one-to-one
class Map[A, B](f: A => B) extends PushPullStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
ctx.push(f(elem))
override def onPull(ctx: Context[B]): Directive =
ctx.pull()
}
//#one-to-one
//#many-to-one
class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def onPull(ctx: Context[A]): Directive =
ctx.pull()
}
//#many-to-one
//#one-to-many
class Duplicator[A]() extends PushPullStage[A, A] {
private var lastElem: A = _
private var oneLeft = false
override def onPush(elem: A, ctx: Context[A]): Directive = {
lastElem = elem
oneLeft = true
ctx.push(elem)
}
override def onPull(ctx: Context[A]): Directive =
if (!ctx.isFinishing) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
oneLeft = false
ctx.push(lastElem)
} else
ctx.pull()
} else {
// If we need to emit a final element after the upstream
// finished
if (oneLeft) ctx.pushAndFinish(lastElem)
else ctx.finish()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
//#one-to-many
val keyedSink = Sink.head[immutable.Seq[Int]]
val sink = Flow[Int].grouped(10).to(keyedSink)
//#stage-chain
val runnable: RunnableFlow = Source(1 to 10)
.transform(() => new Filter(_ % 2 == 0))
.transform(() => new Duplicator())
.transform(() => new Map(_ / 2))
.to(sink)
//#stage-chain
Await.result(runnable.run().get(keyedSink), 3.seconds) should be(Seq(1, 1, 2, 2, 3, 3, 4, 4, 5, 5))
}
"demonstrate various PushStages" in {
import akka.stream.stage._
//#pushstage
class Map[A, B](f: A => B) extends PushStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
ctx.push(f(elem))
}
class Filter[A](p: A => Boolean) extends PushStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
//#pushstage
}
"demonstrate StatefulStage" in {
//#doubler-stateful
class Duplicator[A]() extends StatefulStage[A, A] {
override val initial: StageState[A, A] = new StageState[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
emit(List(elem, elem).iterator, ctx)
}
}
//#doubler-stateful
}
}
}

View file

@ -8,6 +8,7 @@ class StreamBuffersRateSpec extends AkkaSpec {
implicit val mat = FlowMaterializer()
"Demonstrate pipelining" in {
def println(s: Any) = ()
//#pipelining
Source(1 to 3)
.map { i => println(s"A: $i"); i }