From f8618b24b053001040f10a40bc1c9685c10a417a Mon Sep 17 00:00:00 2001 From: Seeta Ramayya <3521036+Seetaramayya@users.noreply.github.com> Date: Wed, 2 Jan 2019 17:08:35 +0100 Subject: [PATCH] added examples for Stream # actorRef operator. As part of #25468 (#26162) --- .../stream/operators/Source/actorRef.md | 8 ++++++ .../stream/operators/SourceDocExamples.java | 25 +++++++++++++++++ .../stream/operators/SourceOperators.scala | 28 ++++++++++++++++++- 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md index 93ad75e22e..ae409ea748 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md @@ -24,3 +24,11 @@ elements or failing the stream; the strategy is chosen by the user. @@@ +## Examples + + +Scala +: @@snip [actorRef.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #actorRef } + +Java +: @@snip [actorRef.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #actor-ref-imports #actor-ref } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 5200413349..b6bdbb4fe3 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -13,6 +13,13 @@ import akka.stream.Materializer; import akka.stream.javadsl.Source; //#range-imports +//#actor-ref-imports +import akka.actor.ActorRef; +import akka.actor.Status.Success; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Sink; +//#actor-ref-imports + import java.util.Arrays; //#imports @@ -59,4 +66,22 @@ public class SourceDocExamples { //#run-range } + static void actorRef() { + //#actor-ref + + final ActorSystem system = ActorSystem.create(); + final Materializer materializer = ActorMaterializer.create(system); + + int bufferSize = 100; + Source source = Source.actorRef(bufferSize, OverflowStrategy.dropHead()); + + ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(materializer); + actorRef.tell("hello", ActorRef.noSender()); + actorRef.tell("hello", ActorRef.noSender()); + + // The stream completes successfully with the following message + actorRef.tell(new Success("completes stream"), ActorRef.noSender()); + //#actor-ref + } + } diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index 7ca8913684..6aa9e1e806 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -4,6 +4,9 @@ package docs.stream.operators +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer + object SourceOperators { def fromFuture = { @@ -11,8 +14,8 @@ object SourceOperators { import akka.actor.ActorSystem import akka.stream.ActorMaterializer - import akka.{ Done, NotUsed } import akka.stream.scaladsl._ + import akka.{ Done, NotUsed } import scala.concurrent.Future @@ -25,4 +28,27 @@ object SourceOperators { val done: Future[Done] = source.runWith(sink) //10 //#sourceFromFuture } + + def actorRef(): Unit = { + //#actorRef + + import akka.actor.Status.Success + import akka.actor.ActorRef + import akka.stream.OverflowStrategy + import akka.stream.scaladsl._ + + implicit val system: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + val bufferSize = 100 + + val source: Source[Any, ActorRef] = Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead) + val actorRef: ActorRef = source.to(Sink.foreach(println)).run() + + actorRef ! "hello" + actorRef ! "hello" + + // The stream completes successfully with the following message + actorRef ! Success("completes stream") + //#actorRef + } }