diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md index a1fffb66c3..bd2108d63c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md @@ -4,11 +4,9 @@ Materialize an `ActorRef`; sending messages to it will emit them on the stream. @ref[Source operators](../index.md#source-operators) -@@@ div { .group-scala } ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #actorRef } -@@@ +@apidoc[Source.actorRef](Source$) { scala="#actorRef[T](completionMatcher:PartialFunction[Any,akka.stream.CompletionStrategy],failureMatcher:PartialFunction[Any,Throwable],bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy):akka.stream.scaladsl.Source[T,akka.actor.ActorRef]" java="#actorRef(akka.japi.function.Function,akka.japi.function.Function,int,akka.stream.OverflowStrategy)" } ## Description diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 97403aa922..75c076494e 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -6,6 +6,7 @@ package jdocs.stream.operators; // #imports // #range-imports +import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.actor.testkit.typed.javadsl.ManualTime; @@ -81,7 +82,17 @@ public class SourceDocExamples { // #actor-ref int bufferSize = 100; - Source source = Source.actorRef(bufferSize, OverflowStrategy.dropHead()); + Source source = + Source.actorRef( + elem -> { + // complete stream immediately if we send it Done + if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately()); + else return Optional.empty(); + }, + // never fail the stream because of a message + elem -> Optional.empty(), + bufferSize, + OverflowStrategy.dropHead()); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system); actorRef.tell("hello", ActorRef.noSender()); diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index 2b1234551b..b54e9a12cb 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -4,6 +4,7 @@ package docs.stream.operators +import akka.Done import akka.actor.ActorSystem import akka.testkit.TestProbe @@ -29,23 +30,30 @@ object SourceOperators { def actorRef(): Unit = { //#actorRef - - import akka.actor.Status.Success + import akka.Done import akka.actor.ActorRef import akka.stream.OverflowStrategy import akka.stream.CompletionStrategy import akka.stream.scaladsl._ + import scala.util.Failure - val bufferSize = 100 - - val source: Source[Any, ActorRef] = Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead) + val source: Source[Any, ActorRef] = Source.actorRef( + completionMatcher = { + case Done => + // complete stream immediately if we send it Done + CompletionStrategy.immediately + }, + // never fail the stream because of a message + failureMatcher = PartialFunction.empty, + bufferSize = 100, + overflowStrategy = OverflowStrategy.dropHead) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() actorRef ! "hello" actorRef ! "hello" // The stream completes successfully with the following message - actorRef ! Success(CompletionStrategy.immediately) + actorRef ! Done //#actorRef }