diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index 1c26f95e11..1dc2d34087 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -169,6 +169,13 @@ actor reference. The actor will be stopped when the stream is completed, failed or cancelled from downstream, i.e. you can watch it to get notified when that happens. + +Scala +: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-actorRef } + +Java +: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-actorRef } + ## Integrating with External Services Stream transformations and side effects involving external non-stream based services can be diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 5186123020..5e6381893e 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -770,4 +770,26 @@ public class IntegrationDocTest extends AbstractJavaTest { } }; } + + @Test + public void illustrateSourceActorRef() throws Exception { + new TestKit(system) { + { + // #source-actorRef + int bufferSize = 10; + + Source source = + Source.actorRef( + bufferSize, OverflowStrategy.dropHead()); // note: backpressure is not supported + ActorRef actorRef = + source.map(x -> x * x).to(Sink.foreach(x -> System.out.println("got: " + x))).run(mat); + + actorRef.tell(1, ActorRef.noSender()); + actorRef.tell(2, ActorRef.noSender()); + actorRef.tell(3, ActorRef.noSender()); + actorRef.tell(new akka.actor.Status.Success("done"), ActorRef.noSender()); + // #source-actorRef + } + }; + } } diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 62982fdf64..08d5b91501 100644 --- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -499,4 +499,20 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#source-queue } + "illustrate use of source actor ref" in { + //#source-actorRef + val bufferSize = 10 + + val ref = Source + .actorRef[Int](bufferSize, OverflowStrategy.fail) // note: backpressure is not supported + .map(x ⇒ x * x) + .toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left) + .run() + + ref ! 1 + ref ! 2 + ref ! 3 + ref ! akka.actor.Status.Success("done") + //#source-actorRef + } }