Docs: Actor sink stream operators (#29171)
This commit is contained in:
parent
1e5cb5a720
commit
101c1d9b65
9 changed files with 89 additions and 24 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
# ActorSink.actorRef
|
# ActorSink.actorRef
|
||||||
|
|
||||||
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`], without considering backpressure.
|
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API, without considering backpressure.
|
||||||
|
|
||||||
@ref[Actor interop operators](../index.md#actor-interop-operators)
|
@ref[Actor interop operators](../index.md#actor-interop-operators)
|
||||||
|
|
||||||
|
|
@ -37,6 +37,18 @@ of the actor will grow. For potentially slow consumer actors it is recommended
|
||||||
to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
|
to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
|
||||||
limiting operator in front of this `Sink`.
|
limiting operator in front of this `Sink`.
|
||||||
|
|
||||||
## Examples
|
See also:
|
||||||
|
|
||||||
TODO (in progress)
|
* @ref[`ActorSink.actorRefWithBackpressure`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API supporting backpressure
|
||||||
|
* @ref[`Sink.actorRef`](../Sink/actorRef.md) The corresponding operator for the classic actors API
|
||||||
|
* @ref[`Sink.actorRefWithBackpressue`](../Sink/actorRefWithBackpressure.md) Send elements to an actor of the classic actors API supporting backpressure
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**cancels** when the actor terminates
|
||||||
|
|
||||||
|
**backpressures** never
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# ActorSink.actorRefWithBackpressure
|
# ActorSink.actorRefWithBackpressure
|
||||||
|
|
||||||
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.
|
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.
|
||||||
|
|
||||||
@ref[Actor interop operators](../index.md#actor-interop-operators)
|
@ref[Actor interop operators](../index.md#actor-interop-operators)
|
||||||
|
|
||||||
|
|
@ -24,6 +24,12 @@ This operator is included in:
|
||||||
|
|
||||||
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.
|
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
* @ref[`ActorSink.actorRef`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API, without considering backpressure
|
||||||
|
* @ref[`Sink.actorRef`](../Sink/actorRef.md) Send elements to an actor of the classic actors API, without considering backpressure
|
||||||
|
* @ref[`Sink.actorRefWithBackpressue`](../Sink/actorRefWithBackpressure.md) The corresponding operator for the classic actors API
|
||||||
|
|
||||||
## Examples
|
## Examples
|
||||||
|
|
||||||
Scala
|
Scala
|
||||||
|
|
@ -31,3 +37,14 @@ Scala
|
||||||
|
|
||||||
Java
|
Java
|
||||||
: @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-backpressure }
|
: @@snip [ActorSinkWithAckExample.java](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSinkWithAckExample.java) { #actor-sink-ref-with-backpressure }
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**cancels** when the actor terminates
|
||||||
|
|
||||||
|
**backpressures** when the actor acknowledgement has not arrived
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
# Sink.actorRef
|
# Sink.actorRef
|
||||||
|
|
||||||
Send the elements from the stream to an `ActorRef`.
|
Send the elements from the stream to an `ActorRef` of the classic actors API.
|
||||||
|
|
||||||
@ref[Sink operators](../index.md#sink-operators)
|
@ref[Actor interop operators](../index.md#actor-interop-operators)
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
|
|
@ -12,6 +12,12 @@ Send the elements from the stream to an `ActorRef`.
|
||||||
|
|
||||||
Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox.
|
Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox.
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
* @ref[`Sink.actorRefWithBackpressue`](../Sink/actorRefWithBackpressure.md) Send elements to an actor with backpressure support
|
||||||
|
* @ref[`ActorSink.actorRef`](../ActorSink/actorRef.md) The corresponding operator for the new actors API
|
||||||
|
* @ref[`ActorSink.actorRefWithBackpressure`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API supporting backpressure
|
||||||
|
|
||||||
## Reactive Streams semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
# Sink.actorRefWithBackpressure
|
# Sink.actorRefWithBackpressure
|
||||||
|
|
||||||
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
|
Send the elements from the stream to an `ActorRef` (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
|
||||||
|
|
||||||
@ref[Sink operators](../index.md#sink-operators)
|
@ref[Actor interop operators](../index.md#actor-interop-operators)
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
|
|
@ -13,6 +13,13 @@ Send the elements from the stream to an `ActorRef` which must then acknowledge r
|
||||||
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message,
|
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message,
|
||||||
to provide back pressure onto the sink.
|
to provide back pressure onto the sink.
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
* @ref[`Sink.actorRef`](../Sink/actorRef.md) Send elements to an actor, without considering backpressure
|
||||||
|
* @ref[`ActorSink.actorRef`](../ActorSink/actorRef.md) The corresponding operator for the new actors API
|
||||||
|
* @ref[`ActorSink.actorRefWithBackpressure`](../ActorSink/actorRefWithBackpressure.md) Send elements to an actor of the new actors API supporting backpressure
|
||||||
|
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
Actor to be interacted with:
|
Actor to be interacted with:
|
||||||
|
|
|
||||||
|
|
@ -54,8 +54,6 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|
||||||
|
|
||||||
| |Operator|Description|
|
| |Operator|Description|
|
||||||
|--|--|--|
|
|--|--|--|
|
||||||
|Sink|<a name="actorref"></a>@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef`.|
|
|
||||||
|Sink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.|
|
|
||||||
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|
||||||
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|
||||||
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|
||||||
|
|
@ -317,11 +315,13 @@ Operators meant for inter-operating between Akka Streams and Actors:
|
||||||
| |Operator|Description|
|
| |Operator|Description|
|
||||||
|--|--|--|
|
|--|--|--|
|
||||||
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream.|
|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream.|
|
||||||
|
|Sink|<a name="actorref"></a>@ref[actorRef](Sink/actorRef.md)|Send the elements from the stream to an `ActorRef` of the classic actors API.|
|
||||||
|ActorSource|<a name="actorref"></a>@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.|
|
|ActorSource|<a name="actorref"></a>@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.|
|
||||||
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`], without considering backpressure.|
|
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API, without considering backpressure.|
|
||||||
|Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
|Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
||||||
|
|Sink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)|Send the elements from the stream to an `ActorRef` (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.|
|
||||||
|ActorSource|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
|ActorSource|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
||||||
|ActorSink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.|
|
|ActorSink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.|
|
||||||
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|
||||||
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.|
|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.|
|
||||||
|Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|
|
|Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,13 @@ object ActorSink {
|
||||||
* will be sent to the destination actor.
|
* will be sent to the destination actor.
|
||||||
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
||||||
* function will be sent to the destination actor.
|
* function will be sent to the destination actor.
|
||||||
|
*
|
||||||
|
* @param ref the receiving actor as `ActorRef<T>` (where `T` must include the control messages below)
|
||||||
|
* @param messageAdapter a function that wraps the stream elements to be sent to the actor together with an `ActorRef[A]` which accepts the ack message
|
||||||
|
* @param onInitMessage a function that wraps an `ActorRef<A>` into a messages to couple the receiving actor to the sink
|
||||||
|
* @param ackMessage a fixed message that is expected after every element sent to the receiving actor
|
||||||
|
* @param onCompleteMessage the message to be sent to the actor when the stream completes
|
||||||
|
* @param onFailureMessage a function that creates a message to be sent to the actor in case the stream fails from a `Throwable`
|
||||||
*/
|
*/
|
||||||
def actorRefWithBackpressure[T, M, A](
|
def actorRefWithBackpressure[T, M, A](
|
||||||
ref: ActorRef[M],
|
ref: ActorRef[M],
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@ object ActorSink {
|
||||||
Sink.actorRef(ref.toClassic, onCompleteMessage, onFailureMessage)
|
Sink.actorRef(ref.toClassic, onCompleteMessage, onFailureMessage)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
|
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signals.
|
||||||
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
|
* The first element is always `onInitMessage`, then stream is waiting for acknowledgement message
|
||||||
* `ackMessage` from the given actor which means that it is ready to process
|
* `ackMessage` from the given actor which means that it is ready to process
|
||||||
* elements. It also requires `ackMessage` message after each stream element
|
* elements. It also requires `ackMessage` message after each stream element
|
||||||
* to make backpressure work.
|
* to make backpressure work.
|
||||||
|
|
@ -45,6 +45,13 @@ object ActorSink {
|
||||||
* will be sent to the destination actor.
|
* will be sent to the destination actor.
|
||||||
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
|
||||||
* function will be sent to the destination actor.
|
* function will be sent to the destination actor.
|
||||||
|
*
|
||||||
|
* @param ref the receiving actor as `ActorRef[T]` (where `T` must include the control messages below)
|
||||||
|
* @param messageAdapter a function that wraps the stream elements to be sent to the actor together with an `ActorRef[A]` which accepts the ack message
|
||||||
|
* @param onInitMessage a function that wraps an `ActorRef[A]` into a messages to couple the receiving actor to the sink
|
||||||
|
* @param ackMessage a fixed message that is expected after every element sent to the receiving actor
|
||||||
|
* @param onCompleteMessage the message to be sent to the actor when the stream completes
|
||||||
|
* @param onFailureMessage a function that creates a message to be sent to the actor in case the stream fails from a `Throwable`
|
||||||
*/
|
*/
|
||||||
def actorRefWithBackpressure[T, M, A](
|
def actorRefWithBackpressure[T, M, A](
|
||||||
ref: ActorRef[M],
|
ref: ActorRef[M],
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ package docs.akka.stream.typed;
|
||||||
// #actor-sink-ref-with-backpressure
|
// #actor-sink-ref-with-backpressure
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.actor.typed.ActorRef;
|
import akka.actor.typed.ActorRef;
|
||||||
import akka.stream.ActorMaterializer;
|
import akka.actor.typed.ActorSystem;
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import akka.stream.typed.javadsl.ActorSink;
|
import akka.stream.typed.javadsl.ActorSink;
|
||||||
|
|
@ -50,18 +50,27 @@ public class ActorSinkWithAckExample {
|
||||||
}
|
}
|
||||||
// #actor-sink-ref-with-backpressure
|
// #actor-sink-ref-with-backpressure
|
||||||
|
|
||||||
final ActorMaterializer mat = null;
|
final ActorSystem<Void> system = null;
|
||||||
|
|
||||||
{
|
{
|
||||||
// #actor-sink-ref-with-backpressure
|
// #actor-sink-ref-with-backpressure
|
||||||
|
|
||||||
final ActorRef<Protocol> actor = null;
|
final ActorRef<Protocol> actorRef = // spawned actor
|
||||||
|
null; // #hidden
|
||||||
|
|
||||||
|
final Ack ackMessage = new Ack();
|
||||||
|
final Complete completeMessage = new Complete();
|
||||||
|
|
||||||
final Sink<String, NotUsed> sink =
|
final Sink<String, NotUsed> sink =
|
||||||
ActorSink.actorRefWithBackpressure(
|
ActorSink.actorRefWithBackpressure(
|
||||||
actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new);
|
actorRef,
|
||||||
|
(responseActorRef, element) -> new Message(responseActorRef, element),
|
||||||
|
(responseActorRef) -> new Init(responseActorRef),
|
||||||
|
ackMessage,
|
||||||
|
completeMessage,
|
||||||
|
(exception) -> new Fail(exception));
|
||||||
|
|
||||||
Source.single("msg1").runWith(sink, mat);
|
Source.single("msg1").runWith(sink, system);
|
||||||
// #actor-sink-ref-with-backpressure
|
// #actor-sink-ref-with-backpressure
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -162,11 +162,11 @@ object ActorSourceSinkExample {
|
||||||
|
|
||||||
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
|
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
|
||||||
ref = actor,
|
ref = actor,
|
||||||
|
messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element),
|
||||||
|
onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef),
|
||||||
|
ackMessage = Ack,
|
||||||
onCompleteMessage = Complete,
|
onCompleteMessage = Complete,
|
||||||
onFailureMessage = Fail.apply,
|
onFailureMessage = (exception) => Fail(exception))
|
||||||
messageAdapter = Message.apply,
|
|
||||||
onInitMessage = Init.apply,
|
|
||||||
ackMessage = Ack)
|
|
||||||
|
|
||||||
Source.single("msg1").runWith(sink)
|
Source.single("msg1").runWith(sink)
|
||||||
// #actor-sink-ref-with-backpressure
|
// #actor-sink-ref-with-backpressure
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue