parent
867131f626
commit
fb1905870c
4 changed files with 89 additions and 3 deletions
|
|
@ -4,15 +4,14 @@
|
|||
package docs.stream
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.{ Keep, Sink, Flow, Source }
|
||||
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
|
||||
import akka.stream.stage._
|
||||
import akka.stream._
|
||||
|
||||
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
|
||||
import akka.testkit.{ AkkaSpec, TestLatch }
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.{ Promise, Await, Future }
|
||||
import scala.concurrent.{ Await, Future, Promise }
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.immutable.Iterable
|
||||
|
||||
|
|
@ -86,6 +85,35 @@ class GraphStageDocSpec extends AkkaSpec {
|
|||
Await.result(result2, 3.seconds) should ===(5050)
|
||||
}
|
||||
|
||||
"Demonstrate creation of GraphStage Sink" in {
|
||||
//#custom-sink-example
|
||||
import akka.stream.SinkShape
|
||||
import akka.stream.stage.GraphStage
|
||||
import akka.stream.stage.InHandler
|
||||
|
||||
class StdoutSink extends GraphStage[SinkShape[Int]] {
|
||||
val in: Inlet[Int] = Inlet("StdoutSink")
|
||||
override val shape: SinkShape[Int] = SinkShape(in)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) {
|
||||
|
||||
// This requests one element at the Sink startup.
|
||||
override def preStart(): Unit = pull(in)
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
println(grab(in))
|
||||
pull(in)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
//#custom-sink-example
|
||||
|
||||
Source(List(0, 1, 2)).runWith(Sink.fromGraph(new StdoutSink))
|
||||
}
|
||||
|
||||
//#one-to-one
|
||||
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue