diff --git a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java index 1bce706cc9..feb30c8d9d 100644 --- a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java @@ -95,6 +95,40 @@ public class GraphStageDocTest extends AbstractJavaTest { } //#simple-source + //#simple-sink + public class StdoutSink extends GraphStage> { + public final Inlet in = Inlet.create("StdoutSink.in"); + + private final SinkShape shape = SinkShape.of(in); + @Override + public SinkShape shape() { + return shape; + } + + @Override + public GraphStageLogic createLogic(Attributes inheritedAttributes) { + return new GraphStageLogic(shape()) { + + // This requests one element at the Sink startup. + @Override + public void preStart() { + pull(in); + } + + { + setHandler(in, new AbstractInHandler() { + @Override + public void onPush() throws Exception { + Integer element = grab(in); + System.out.println(element); + pull(in); + } + }); + } + }; + } + } + //#simple-sink @Test public void demonstrateCustomSourceUsage() throws Exception { @@ -116,6 +150,14 @@ public class GraphStageDocTest extends AbstractJavaTest { assertEquals(result2.toCompletableFuture().get(3, TimeUnit.SECONDS), (Integer) 5050); } + @Test + public void demonstrateCustomSinkUsage() throws Exception { + Graph, NotUsed> sinkGraph = new StdoutSink(); + + Sink mySink = Sink.fromGraph(sinkGraph); + + Source.from(Arrays.asList(1, 2, 3)).runWith(mySink, mat); + } //#one-to-one public class Map extends GraphStage> { diff --git a/akka-docs/rst/java/stream/stream-customize.rst b/akka-docs/rst/java/stream/stream-customize.rst index 691a38c628..ffcc8d7fde 100644 --- a/akka-docs/rst/java/stream/stream-customize.rst +++ b/akka-docs/rst/java/stream/stream-customize.rst @@ -58,6 +58,14 @@ source as any other built-in one: .. includecode:: ../code/docs/stream/GraphStageDocTest.java#simple-source-usage +Similarly, to create a custom :class:`Sink` one can register a subclass :class:`InHandler` with the stage :class:`Inlet`. +The ``onPush()`` callback is used to signal the handler a new element has been pushed to the stage, +and can hence be grabbed and used. ``onPush()`` can be overridden to provide custom behaviour. +Please note, most Sinks would need to request upstream elements as soon as they are created: this can be +done by calling ``pull(inlet)`` in the ``preStart()`` callback. + +.. includecode:: ../code/docs/stream/GraphStageDocTest.java#simple-sink + Port states, AbstractInHandler and AbstractOutHandler ----------------------------------------------------- diff --git a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala index 0defd9a17e..efb3437c95 100644 --- a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala @@ -4,15 +4,14 @@ package docs.stream import akka.NotUsed -import akka.stream.scaladsl.{ Keep, Sink, Flow, Source } +import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } import akka.stream.stage._ import akka.stream._ - import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.testkit.{ AkkaSpec, TestLatch } import scala.collection.mutable -import scala.concurrent.{ Promise, Await, Future } +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ import scala.collection.immutable.Iterable @@ -86,6 +85,35 @@ class GraphStageDocSpec extends AkkaSpec { Await.result(result2, 3.seconds) should ===(5050) } + "Demonstrate creation of GraphStage Sink" in { + //#custom-sink-example + import akka.stream.SinkShape + import akka.stream.stage.GraphStage + import akka.stream.stage.InHandler + + class StdoutSink extends GraphStage[SinkShape[Int]] { + val in: Inlet[Int] = Inlet("StdoutSink") + override val shape: SinkShape[Int] = SinkShape(in) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + + // This requests one element at the Sink startup. + override def preStart(): Unit = pull(in) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + println(grab(in)) + pull(in) + } + }) + } + } + //#custom-sink-example + + Source(List(0, 1, 2)).runWith(Sink.fromGraph(new StdoutSink)) + } + //#one-to-one class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] { diff --git a/akka-docs/rst/scala/stream/stream-customize.rst b/akka-docs/rst/scala/stream/stream-customize.rst index 0aaa7c7efd..dd686cf539 100644 --- a/akka-docs/rst/scala/stream/stream-customize.rst +++ b/akka-docs/rst/scala/stream/stream-customize.rst @@ -61,6 +61,14 @@ source as any other built-in one: .. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#simple-source-usage +Similarly, to create a custom :class:`Sink` one can register a subclass :class:`InHandler` with the stage :class:`Inlet`. +The ``onPush()`` callback is used to signal the handler a new element has been pushed to the stage, +and can hence be grabbed and used. ``onPush()`` can be overridden to provide custom behaviour. +Please note, most Sinks would need to request upstream elements as soon as they are created: this can be +done by calling ``pull(inlet)`` in the ``preStart()`` callback. + +.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#custom-sink-example + Port states, InHandler and OutHandler -------------------------------------