parent
dcfa56e547
commit
9961495fad
20 changed files with 922 additions and 466 deletions
|
|
@ -3,15 +3,16 @@
|
|||
*/
|
||||
package docs.stream
|
||||
|
||||
import akka.stream.javadsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.stage.{ OutHandler, GraphStage, GraphStageLogic }
|
||||
import akka.stream.scaladsl.{ Keep, Sink, Flow, Source }
|
||||
import akka.stream.stage._
|
||||
import akka.stream._
|
||||
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec }
|
||||
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.{ Promise, Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.immutable.Iterable
|
||||
|
||||
class GraphStageDocSpec extends AkkaSpec {
|
||||
|
||||
|
|
@ -83,4 +84,425 @@ class GraphStageDocSpec extends AkkaSpec {
|
|||
Await.result(result2, 3.seconds) should ===(5050)
|
||||
}
|
||||
|
||||
}
|
||||
//#one-to-one
|
||||
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
|
||||
|
||||
val in = Inlet[A]("Map.in")
|
||||
val out = Outlet[B]("Map.out")
|
||||
|
||||
override val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(attr: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
push(out, f(grab(in)))
|
||||
}
|
||||
})
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
pull(in)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
//#one-to-one
|
||||
|
||||
"Demonstrate a one to one element GraphStage" in {
|
||||
// tests:
|
||||
val stringLength = Flow.fromGraph(new Map[String, Int](_.length))
|
||||
|
||||
val result =
|
||||
Source(Vector("one", "two", "three"))
|
||||
.via(stringLength)
|
||||
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||
|
||||
Await.result(result, 3.seconds) should ===(Seq(3, 3, 5))
|
||||
}
|
||||
|
||||
//#many-to-one
|
||||
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
|
||||
|
||||
val in = Inlet[A]("Filter.in")
|
||||
val out = Outlet[A]("Filter.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
if (p(elem)) push(out, elem)
|
||||
else pull(in)
|
||||
}
|
||||
})
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
pull(in)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
//#many-to-one
|
||||
|
||||
"Demonstrate a many to one element GraphStage" in {
|
||||
|
||||
// tests:
|
||||
val evenFilter = Flow.fromGraph(new Filter[Int](_ % 2 == 0))
|
||||
|
||||
val result =
|
||||
Source(Vector(1, 2, 3, 4, 5, 6))
|
||||
.via(evenFilter)
|
||||
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||
|
||||
Await.result(result, 3.seconds) should ===(Seq(2, 4, 6))
|
||||
}
|
||||
|
||||
//#one-to-many
|
||||
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
|
||||
|
||||
val in = Inlet[A]("Duplicator.in")
|
||||
val out = Outlet[A]("Duplicator.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
// Again: note that all mutable state
|
||||
// MUST be inside the GraphStageLogic
|
||||
var lastElem: Option[A] = None
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
lastElem = Some(elem)
|
||||
push(out, elem)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (lastElem.isDefined) emit(out, lastElem.get)
|
||||
complete(out)
|
||||
}
|
||||
|
||||
})
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (lastElem.isDefined) {
|
||||
push(out, lastElem.get)
|
||||
lastElem = None
|
||||
} else {
|
||||
pull(in)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
//#one-to-many
|
||||
|
||||
"Demonstrate a one to many element GraphStage" in {
|
||||
// tests:
|
||||
val duplicator = Flow.fromGraph(new Duplicator[Int])
|
||||
|
||||
val result =
|
||||
Source(Vector(1, 2, 3))
|
||||
.via(duplicator)
|
||||
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||
|
||||
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
|
||||
}
|
||||
|
||||
"Demonstrate a simpler one to many stage" in {
|
||||
//#simpler-one-to-many
|
||||
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
|
||||
|
||||
val in = Inlet[A]("Duplicator.in")
|
||||
val out = Outlet[A]("Duplicator.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
// this will temporarily suspend this handler until the two elems
|
||||
// are emitted and then reinstates it
|
||||
emitMultiple(out, Iterable(elem, elem))
|
||||
}
|
||||
})
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
pull(in)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
//#simpler-one-to-many
|
||||
|
||||
// tests:
|
||||
val duplicator = Flow.fromGraph(new Duplicator[Int])
|
||||
|
||||
val result =
|
||||
Source(Vector(1, 2, 3))
|
||||
.via(duplicator)
|
||||
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||
|
||||
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
|
||||
|
||||
}
|
||||
|
||||
"Demonstrate chaining of graph stages" in {
|
||||
val sink = Sink.fold[List[Int], Int](List.empty[Int])((acc, n) => acc :+ n)
|
||||
|
||||
//#graph-stage-chain
|
||||
val resultFuture = Source(1 to 5)
|
||||
.via(new Filter(_ % 2 == 0))
|
||||
.via(new Duplicator())
|
||||
.via(new Map(_ / 2))
|
||||
.runWith(sink)
|
||||
|
||||
//#graph-stage-chain
|
||||
|
||||
Await.result(resultFuture, 3.seconds) should ===(List(1, 1, 2, 2))
|
||||
}
|
||||
|
||||
"Demonstrate an asynchronous side channel" in {
|
||||
import system.dispatcher
|
||||
//#async-side-channel
|
||||
// will close upstream when the future completes
|
||||
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
|
||||
|
||||
val in = Inlet[A]("KillSwitch.in")
|
||||
val out = Outlet[A]("KillSwitch.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
|
||||
override def preStart(): Unit = {
|
||||
val callback = getAsyncCallback[Unit] { (_) =>
|
||||
completeStage()
|
||||
}
|
||||
switch.foreach(callback.invoke)
|
||||
}
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = { push(out, grab(in)) }
|
||||
})
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = { pull(in) }
|
||||
})
|
||||
}
|
||||
}
|
||||
//#async-side-channel
|
||||
|
||||
// tests:
|
||||
val switch = Promise[Unit]()
|
||||
val duplicator = Flow.fromGraph(new KillSwitch[Int](switch.future))
|
||||
|
||||
// TODO this is probably racey, is there a way to make sure it happens after?
|
||||
val valueAfterKill = switch.future.flatMap(_ => Future(4))
|
||||
|
||||
val result =
|
||||
Source(Vector(1, 2, 3)).concat(Source.fromFuture(valueAfterKill))
|
||||
.via(duplicator)
|
||||
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||
|
||||
switch.success(Unit)
|
||||
|
||||
Await.result(result, 3.seconds) should ===(Seq(1, 2, 3))
|
||||
}
|
||||
|
||||
"Demonstrate a graph stage with a timer" in {
|
||||
|
||||
//#timed
|
||||
// each time an event is pushed through it will trigger a period of silence
|
||||
class TimedGate[A](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
|
||||
|
||||
val in = Inlet[A]("TimedGate.in")
|
||||
val out = Outlet[A]("TimedGate.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new TimerGraphStageLogic(shape) {
|
||||
|
||||
var open = false
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
if (open) pull(in)
|
||||
else {
|
||||
push(out, elem)
|
||||
open = true
|
||||
scheduleOnce(None, silencePeriod)
|
||||
}
|
||||
}
|
||||
})
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = { pull(in) }
|
||||
})
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = {
|
||||
open = false
|
||||
}
|
||||
}
|
||||
}
|
||||
//#timed
|
||||
|
||||
// tests:
|
||||
val result =
|
||||
Source(Vector(1, 2, 3))
|
||||
.via(new TimedGate[Int](2.second))
|
||||
.takeWithin(250.millis)
|
||||
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
|
||||
|
||||
Await.result(result, 3.seconds) should ===(Seq(1))
|
||||
}
|
||||
|
||||
"Demonstrate a custom materialized value" in {
|
||||
|
||||
//#materialized
|
||||
class FirstValue[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[A]] {
|
||||
|
||||
val in = Inlet[A]("FirstValue.in")
|
||||
val out = Outlet[A]("FirstValue.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[A]) = {
|
||||
val promise = Promise[A]()
|
||||
val logic = new GraphStageLogic(shape) {
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
promise.success(elem)
|
||||
push(out, elem)
|
||||
|
||||
// replace handler with one just forwarding
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
push(out, grab(in))
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
pull(in)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
(logic, promise.future)
|
||||
}
|
||||
}
|
||||
//#materialized
|
||||
|
||||
// tests:
|
||||
val flow = Source(Vector(1, 2, 3))
|
||||
.viaMat(new FirstValue)(Keep.right)
|
||||
.to(Sink.ignore)
|
||||
|
||||
val result: Future[Int] = flow.run()
|
||||
|
||||
Await.result(result, 3.seconds) should ===(1)
|
||||
|
||||
}
|
||||
|
||||
"Demonstrate a detached graph stage" in {
|
||||
|
||||
//#detached
|
||||
class TwoBuffer[A] extends GraphStage[FlowShape[A, A]] {
|
||||
|
||||
val in = Inlet[A]("TwoBuffer.in")
|
||||
val out = Outlet[A]("TwoBuffer.out")
|
||||
|
||||
val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
|
||||
val buffer = mutable.Queue[A]()
|
||||
def bufferFull = buffer.size == 2
|
||||
var downstreamWaiting = false
|
||||
|
||||
override def preStart(): Unit = {
|
||||
// a detached stage needs to start upstream demand
|
||||
// itself as it is not triggered by downstream demand
|
||||
pull(in)
|
||||
}
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
buffer.enqueue(elem)
|
||||
if (downstreamWaiting) {
|
||||
downstreamWaiting = false
|
||||
val bufferedElem = buffer.dequeue()
|
||||
push(out, bufferedElem)
|
||||
}
|
||||
if (!bufferFull) {
|
||||
pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (buffer.nonEmpty) {
|
||||
// emit the rest if possible
|
||||
emitMultiple(out, buffer.toIterator)
|
||||
}
|
||||
completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (buffer.isEmpty) {
|
||||
downstreamWaiting = true
|
||||
} else {
|
||||
val elem = buffer.dequeue
|
||||
push(out, elem)
|
||||
}
|
||||
if (!bufferFull && !hasBeenPulled(in)) {
|
||||
pull(in)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
//#detached
|
||||
|
||||
// tests:
|
||||
val result1 = Source(Vector(1, 2, 3))
|
||||
.via(new TwoBuffer)
|
||||
.runFold(Vector.empty[Int])((acc, n) => acc :+ n)
|
||||
|
||||
Await.result(result1, 3.seconds) should ===(Vector(1, 2, 3))
|
||||
|
||||
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||
val publisher = TestPublisher.probe[Int]()
|
||||
val flow2 =
|
||||
Source.fromPublisher(publisher)
|
||||
.via(new TwoBuffer)
|
||||
.to(Sink.fromSubscriber(subscriber))
|
||||
|
||||
val result2 = flow2.run()
|
||||
|
||||
val sub = subscriber.expectSubscription()
|
||||
// this happens even though the subscriber has not signalled any demand
|
||||
publisher.sendNext(1)
|
||||
publisher.sendNext(2)
|
||||
|
||||
sub.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue