From 101c1d9b652df69a3cf32c74242d93c52209ce54 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Wed, 1 Jul 2020 21:27:29 +0200 Subject: [PATCH] Docs: Actor sink stream operators (#29171) --- .../stream/operators/ActorSink/actorRef.md | 18 +++++++++++++++--- .../ActorSink/actorRefWithBackpressure.md | 19 ++++++++++++++++++- .../paradox/stream/operators/Sink/actorRef.md | 10 ++++++++-- .../Sink/actorRefWithBackpressure.md | 13 ++++++++++--- .../main/paradox/stream/operators/index.md | 8 ++++---- .../akka/stream/typed/javadsl/ActorSink.scala | 7 +++++++ .../stream/typed/scaladsl/ActorSink.scala | 11 +++++++++-- .../stream/typed/ActorSinkWithAckExample.java | 19 ++++++++++++++----- .../stream/typed/ActorSourceSinkExample.scala | 8 ++++---- 9 files changed, 89 insertions(+), 24 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRef.md b/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRef.md index afd8878c47..f08e497822 100644 --- a/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRef.md @@ -1,6 +1,6 @@ # ActorSink.actorRef -Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`], without considering backpressure. +Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API, without considering backpressure. @ref[Actor interop operators](../index.md#actor-interop-operators) @@ -37,6 +37,18 @@ of the actor will grow. For potentially slow consumer actors it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate limiting operator in front of this `Sink`. -## Examples +See also: -TODO (in progress) +* @ref[`ActorSink.actorRefWithBackpressure`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API supporting backpressure +* @ref[`Sink.actorRef`](../Sink/actorRef.md) The corresponding operator for the classic actors API +* @ref[`Sink.actorRefWithBackpressue`](../Sink/actorRefWithBackpressure.md) Send elements to an actor of the classic actors API supporting backpressure + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** when the actor terminates + +**backpressures** never + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRefWithBackpressure.md b/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRefWithBackpressure.md index 25398ae7a0..cf7ebe048c 100644 --- a/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRefWithBackpressure.md +++ b/akka-docs/src/main/paradox/stream/operators/ActorSink/actorRefWithBackpressure.md @@ -1,6 +1,6 @@ # ActorSink.actorRefWithBackpressure -Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements. +Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements. @ref[Actor interop operators](../index.md#actor-interop-operators) @@ -24,6 +24,12 @@ This operator is included in: Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements. +See also: + +* @ref[`ActorSink.actorRef`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API, without considering backpressure +* @ref[`Sink.actorRef`](../Sink/actorRef.md) Send elements to an actor of the classic actors API, without considering backpressure +* @ref[`Sink.actorRefWithBackpressue`](../Sink/actorRefWithBackpressure.md) The corresponding operator for the classic actors API + ## Examples Scala @@ -31,3 +37,14 @@ Scala Java : @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-backpressure } + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** when the actor terminates + +**backpressures** when the actor acknowledgement has not arrived + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/actorRef.md b/akka-docs/src/main/paradox/stream/operators/Sink/actorRef.md index 9aacf693d8..e31e33490e 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/actorRef.md @@ -1,8 +1,8 @@ # Sink.actorRef -Send the elements from the stream to an `ActorRef`. +Send the elements from the stream to an `ActorRef` of the classic actors API. -@ref[Sink operators](../index.md#sink-operators) +@ref[Actor interop operators](../index.md#actor-interop-operators) ## Signature @@ -12,6 +12,12 @@ Send the elements from the stream to an `ActorRef`. Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox. +See also: + +* @ref[`Sink.actorRefWithBackpressue`](../Sink/actorRefWithBackpressure.md) Send elements to an actor with backpressure support +* @ref[`ActorSink.actorRef`](../ActorSink/actorRef.md) The corresponding operator for the new actors API +* @ref[`ActorSink.actorRefWithBackpressure`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API supporting backpressure + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md b/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md index b243a03d5b..3e45220511 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithBackpressure.md @@ -1,8 +1,8 @@ # Sink.actorRefWithBackpressure -Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. +Send the elements from the stream to an `ActorRef` (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink. -@ref[Sink operators](../index.md#sink-operators) +@ref[Actor interop operators](../index.md#actor-interop-operators) ## Signature @@ -13,6 +13,13 @@ Send the elements from the stream to an `ActorRef` which must then acknowledge r Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. +See also: + +* @ref[`Sink.actorRef`](../Sink/actorRef.md) Send elements to an actor, without considering backpressure +* @ref[`ActorSink.actorRef`](../ActorSink/actorRef.md) The corresponding operator for the new actors API +* @ref[`ActorSink.actorRefWithBackpressure`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API supporting backpressure + + ## Example Actor to be interacted with: @@ -31,7 +38,7 @@ Scala Java : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithBackpressure } -## Reactive Streams semantics +## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 1e52617b48..3253428348 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -54,8 +54,6 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav | |Operator|Description| |--|--|--| -|Sink|@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef`.| -|Sink|@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].| @@ -317,11 +315,13 @@ Operators meant for inter-operating between Akka Streams and Actors: | |Operator|Description| |--|--|--| |Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream.| +|Sink|@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef` of the classic actors API.| |ActorSource|@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.| -|ActorSink|@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`], without considering backpressure.| +|ActorSink|@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API, without considering backpressure.| |Source|@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| +|Sink|@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.| |ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| -|ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.| +|ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.| |Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala index ab26ff4add..6451657050 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala @@ -48,6 +48,13 @@ object ActorSink { * will be sent to the destination actor. * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. + * + * @param ref the receiving actor as `ActorRef` (where `T` must include the control messages below) + * @param messageAdapter a function that wraps the stream elements to be sent to the actor together with an `ActorRef[A]` which accepts the ack message + * @param onInitMessage a function that wraps an `ActorRef` into a messages to couple the receiving actor to the sink + * @param ackMessage a fixed message that is expected after every element sent to the receiving actor + * @param onCompleteMessage the message to be sent to the actor when the stream completes + * @param onFailureMessage a function that creates a message to be sent to the actor in case the stream fails from a `Throwable` */ def actorRefWithBackpressure[T, M, A]( ref: ActorRef[M], diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala index 8e371bc376..22e502a825 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala @@ -34,8 +34,8 @@ object ActorSink { Sink.actorRef(ref.toClassic, onCompleteMessage, onFailureMessage) /** - * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. - * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signals. + * The first element is always `onInitMessage`, then stream is waiting for acknowledgement message * `ackMessage` from the given actor which means that it is ready to process * elements. It also requires `ackMessage` message after each stream element * to make backpressure work. @@ -45,6 +45,13 @@ object ActorSink { * will be sent to the destination actor. * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. + * + * @param ref the receiving actor as `ActorRef[T]` (where `T` must include the control messages below) + * @param messageAdapter a function that wraps the stream elements to be sent to the actor together with an `ActorRef[A]` which accepts the ack message + * @param onInitMessage a function that wraps an `ActorRef[A]` into a messages to couple the receiving actor to the sink + * @param ackMessage a fixed message that is expected after every element sent to the receiving actor + * @param onCompleteMessage the message to be sent to the actor when the stream completes + * @param onFailureMessage a function that creates a message to be sent to the actor in case the stream fails from a `Throwable` */ def actorRefWithBackpressure[T, M, A]( ref: ActorRef[M], diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java index a528402e3a..1ff3f5a6d7 100644 --- a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java @@ -7,7 +7,7 @@ package docs.akka.stream.typed; // #actor-sink-ref-with-backpressure import akka.NotUsed; import akka.actor.typed.ActorRef; -import akka.stream.ActorMaterializer; +import akka.actor.typed.ActorSystem; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.typed.javadsl.ActorSink; @@ -50,18 +50,27 @@ public class ActorSinkWithAckExample { } // #actor-sink-ref-with-backpressure - final ActorMaterializer mat = null; + final ActorSystem system = null; { // #actor-sink-ref-with-backpressure - final ActorRef actor = null; + final ActorRef actorRef = // spawned actor + null; // #hidden + + final Ack ackMessage = new Ack(); + final Complete completeMessage = new Complete(); final Sink sink = ActorSink.actorRefWithBackpressure( - actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new); + actorRef, + (responseActorRef, element) -> new Message(responseActorRef, element), + (responseActorRef) -> new Init(responseActorRef), + ackMessage, + completeMessage, + (exception) -> new Fail(exception)); - Source.single("msg1").runWith(sink, mat); + Source.single("msg1").runWith(sink, system); // #actor-sink-ref-with-backpressure } } diff --git a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala index 0294309cb3..98d490ac50 100644 --- a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala +++ b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala @@ -162,11 +162,11 @@ object ActorSourceSinkExample { val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure( ref = actor, + messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element), + onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef), + ackMessage = Ack, onCompleteMessage = Complete, - onFailureMessage = Fail.apply, - messageAdapter = Message.apply, - onInitMessage = Init.apply, - ackMessage = Ack) + onFailureMessage = (exception) => Fail(exception)) Source.single("msg1").runWith(sink) // #actor-sink-ref-with-backpressure