Docs: Actor source stream operators (#29178)

This commit is contained in:
Enno 2020-06-15 14:11:28 +02:00 committed by GitHub
parent 2c4e114ecd
commit bc68f0d650
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 288 additions and 29 deletions

View file

@ -1,6 +1,6 @@
# ActorSource.actorRef
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream only if they are of the same type as the stream.
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.
@ref[Actor interop operators](../index.md#actor-interop-operators)
@ -22,6 +22,13 @@ This operator is included in:
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] which only accepts messages that are of the same type as the stream.
See also:
* @ref[ActorSource.actorRefWithBackpressure](actorRefWithBackpressure.md) This operator, but with backpressure control
* @ref[Source.actorRef](../Source/actorRef.md) The corresponding operator for the classic actors API
* @ref[Source.actorRefWithBackpressure](../Source/actorRefWithBackpressure.md) The operator for the classic actors API with backpressure control
* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source
## Examples
Scala

View file

@ -1,6 +1,6 @@
# ActorSource.actorRefWithBackpressure
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
@ref[Actor interop operators](../index.md#actor-interop-operators)
@ -23,12 +23,36 @@ This operator is included in:
Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`], sending messages to it will emit them on the stream. The actor responds with the provided ack message
once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
See also:
* @ref[ActorSource.actorRef](actorRef.md) This operator, but without backpressure control
* @ref[Source.actorRef](../Source/actorRef.md) This operator, but without backpressure control for the classic actors API
* @ref[Source.actorRefWithBackpressure](../Source/actorRefWithBackpressure.md) This operator for the classic actors API
* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source
## Example
With `actorRefWithBackpressure` two actors get into play: one which feeds the stream and is materialized when the stream runs, and one which is provided by us and gets the ack signal when an element is emitted into the stream.
For the ack signal we create an @scala[`Emitted` object]@java[empty `Emitted` class].
For "feeding" the stream we use the `Event` @scala[trait]@java[interface].
In this example we create the stream in an actor which itself reacts on the demand of the stream and sends more messages.
Scala
: @@snip [ActorSourceSinkExample.scala](/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-source-with-backpressure }
Java
: @@snip [snip](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceWithBackpressureExample.java) { #sample }
## Reactive Streams semantics
@@@div { .callout }
**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef`
**emits** when there is demand and a message is sent to the materialized @scala[`ActorRef[T]`]@java[`ActorRef<T>`]
**completes** when the `ActorRef` is sent `akka.actor.Status.Success`
**completes** when the passed completion matcher returns a `CompletionStrategy`
@@@

View file

@ -1,8 +1,8 @@
# Source.actorRef
Materialize an `ActorRef`; sending messages to it will emit them on the stream.
Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream.
@ref[Source operators](../index.md#source-operators)
@ref[Actor interop operators](../index.md#actor-interop-operators)
## Signature
@ -21,6 +21,13 @@ already buffered elements will be sent out before signaling completion.
Sending `akka.actor.PoisonPill` will signal completion immediately but this behavior is deprecated and scheduled to be removed.
Using `akka.actor.ActorSystem.stop` to stop the actor and complete the stream is *not supported*.
See also:
* @ref[Source.actorRefWithBackpressure](../Source/actorRefWithBackpressure.md) This operator, but with backpressure control
* @ref[ActorSource.actorRef](actorRef.md) The corresponding operator for the new actors API
* @ref[ActorSource.actorRefWithBackpressure](actorRefWithBackpressure.md) The operator for the new actors API with backpressure control
* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source
## Examples
Scala

View file

@ -1,8 +1,8 @@
# Source.actorRefWithBackpressure
Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
@ref[Source operators](../index.md#source-operators)
@ref[Actor interop operators](../index.md#actor-interop-operators)
## Signature
@ -13,6 +13,13 @@ Materialize an `ActorRef`; sending messages to it will emit them on the stream.
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor responds with the provided ack message
once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
See also:
* @ref[Source.actorRef](../Source/actorRef.md) This operator without backpressure control
* @ref[ActorSource.actorRef](actorRef.md) The operator for the new actors API without backpressure control
* @ref[ActorSource.actorRefWithBackpressure](actorRefWithBackpressure.md) The corresponding operator for the new actors API
* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source
## Examples
Scala
@ -27,6 +34,6 @@ Java
**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef`
**completes** when the `ActorRef` is sent `akka.actor.Status.Success`
**completes** when the passed completion matcher returns a `CompletionStrategy` or fails if the passed failure matcher returns an exception
@@@

View file

@ -7,8 +7,6 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
| |Operator|Description|
|--|--|--|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
@ -318,9 +316,11 @@ Operators meant for inter-operating between Akka Streams and Actors:
| |Operator|Description|
|--|--|--|
|ActorSource|<a name="actorref"></a>@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream only if they are of the same type as the stream.|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream.|
|ActorSource|<a name="actorref"></a>@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.|
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`], without considering backpressure.|
|ActorSource|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|ActorSource|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|ActorSink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.|
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.|

View file

@ -108,13 +108,15 @@ public class SourceDocExamples {
final ActorSystem system = null;
// #actorRefWithBackpressure
Source<Object, ActorRef> source =
Source.actorRefWithBackpressure(
Source<String, ActorRef> source =
Source.<String>actorRefWithBackpressure(
"ack",
// complete when we send "complete"
o -> {
if (o == "complete") return Optional.of(CompletionStrategy.draining());
else return Optional.empty();
},
// do not fail on any message
o -> Optional.empty());
ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);

View file

@ -67,9 +67,14 @@ object SourceOperators {
val probe = TestProbe()
val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", {
val source: Source[String, ActorRef] = Source.actorRefWithBackpressure[String](
ackMessage = "ack",
// complete when we send akka.actor.status.Success
completionMatcher = {
case _: Success => CompletionStrategy.immediately
}, PartialFunction.empty)
},
// do not fail on any message
failureMatcher = PartialFunction.empty)
val actorRef: ActorRef = source.to(Sink.foreach(println)).run()
probe.send(actorRef, "hello")

View file

@ -74,7 +74,10 @@ object ActorSource {
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* The stream can be completed by sending a message that is matched by `completionMatcher` which decides
* if the stream is to drained before completion or should complete immediately.
*
* A message that is matched by `failureMatcher` fails the stream. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).

View file

@ -66,13 +66,23 @@ object ActorSource {
* and a new message will only be accepted after the previous messages has been consumed and acknowledged back.
* The stream will complete with failure if a message is sent before the acknowledgement has been replied back.
*
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* The stream can be completed by sending a message that is matched by `completionMatcher` which decides
* if the stream is to drained before completion or should complete immediately.
*
* A message that is matched by `failureMatcher` fails the stream. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* @param ackTo actor to be signalled when an element has been emitted to the stream
* @param ackMessage a fixed message to be sent to `ackTo` to signal demand
* @param completionMatcher a partial function applied to the messages received materialized actor,
* a matching message will complete the stream with the return [[CompletionStrategy]]
* @param failureMatcher a partial function applied to the messages received materialized actor,
* a matching message will fail the stream with the returned [[Throwable]]
*/
def actorRefWithBackpressure[T, Ack](
ackTo: ActorRef[Ack],

View file

@ -6,8 +6,8 @@ package docs.akka.stream.typed;
// #actor-source-ref
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
@ -41,9 +41,8 @@ public class ActorSourceExample {
}
// #actor-source-ref
final ActorMaterializer mat = null;
{
final ActorSystem<Void> system = null;
// #actor-source-ref
final Source<Protocol, ActorRef<Protocol>> source =
@ -66,7 +65,7 @@ public class ActorSourceExample {
}
})
.to(Sink.foreach(System.out::println))
.run(mat);
.run(system);
ref.tell(new Message("msg1"));
// ref.tell("msg2"); Does not compile

View file

@ -0,0 +1,120 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.stream.typed;
// #sample
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.stream.CompletionStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSource;
import java.util.Optional;
class StreamFeeder extends AbstractBehavior<StreamFeeder.Emitted> {
/** Signals that the latest element is emitted into the stream */
public enum Emitted {
INSTANCE;
}
public interface Event {}
public static class Element implements Event {
public final String content;
public Element(String content) {
this.content = content;
}
@Override
public String toString() {
return "Element(" + content + ")";
}
}
public enum ReachedEnd implements Event {
INSTANCE;
}
public static class FailureOccured implements Event {
public final Exception ex;
public FailureOccured(Exception ex) {
this.ex = ex;
}
}
public static Behavior<Emitted> create() {
return Behaviors.setup(StreamFeeder::new);
}
private int counter = 0;
private final ActorRef<Event> streamSource;
private StreamFeeder(ActorContext<Emitted> context) {
super(context);
streamSource = runStream(context.getSelf(), context.getSystem());
streamSource.tell(new Element("first"));
}
@Override
public Receive<Emitted> createReceive() {
return newReceiveBuilder().onMessage(Emitted.class, this::onEmitted).build();
}
private static ActorRef<Event> runStream(ActorRef<Emitted> ackReceiver, ActorSystem<?> system) {
Source<Event, ActorRef<Event>> source =
ActorSource.actorRefWithBackpressure(
ackReceiver,
Emitted.INSTANCE,
// complete when we send ReachedEnd
(msg) -> {
if (msg == ReachedEnd.INSTANCE) return Optional.of(CompletionStrategy.draining());
else return Optional.empty();
},
(msg) -> {
if (msg instanceof FailureOccured) return Optional.of(((FailureOccured) msg).ex);
else return Optional.empty();
});
return source.to(Sink.foreach(System.out::println)).run(system);
}
private Behavior<Emitted> onEmitted(Emitted message) {
if (counter < 5) {
streamSource.tell(new Element(String.valueOf(counter)));
counter++;
return this;
} else {
streamSource.tell(ReachedEnd.INSTANCE);
return Behaviors.stopped();
}
}
}
// #sample
public class ActorSourceWithBackpressureExample {
public static void main(String[] args) {
// #sample
ActorSystem<StreamFeeder.Emitted> system =
ActorSystem.create(StreamFeeder.create(), "stream-feeder");
// will print:
// Element(first)
// Element(0)
// Element(1)
// Element(2)
// Element(3)
// Element(4)
// #sample
}
}

View file

@ -5,14 +5,14 @@
package docs.akka.stream.typed
import akka.NotUsed
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
object ActorSourceSinkExample {
def compileOnlySourceRef() = {
implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample")
{
// #actor-source-ref
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
@ -42,7 +42,81 @@ object ActorSourceSinkExample {
// #actor-source-ref
}
{
def main(args: Array[String]): Unit = {
// #actor-source-with-backpressure
import akka.actor.typed.ActorRef
import akka.stream.CompletionStrategy
import akka.stream.scaladsl.Sink
import akka.stream.typed.scaladsl.ActorSource
object StreamFeeder {
/** Signals that the latest element is emitted into the stream */
case object Emitted
sealed trait Event
case class Element(content: String) extends Event
case object ReachedEnd extends Event
case class FailureOccured(ex: Exception) extends Event
def apply(): Behavior[Emitted.type] =
Behaviors.setup { context =>
val streamActor = runStream(context.self)(context.system)
streamActor ! Element("first")
sender(streamActor, 0)
}
private def runStream(ackReceiver: ActorRef[Emitted.type])(implicit system: ActorSystem[_]): ActorRef[Event] = {
val source =
ActorSource.actorRefWithBackpressure[Event, Emitted.type](
// get demand signalled to this actor receiving Ack
ackTo = ackReceiver,
ackMessage = Emitted,
// complete when we send ReachedEnd
completionMatcher = {
case ReachedEnd => CompletionStrategy.draining
},
failureMatcher = {
case FailureOccured(ex) => ex
})
val streamActor: ActorRef[Event] = source
.collect {
case Element(msg) => msg
}
.to(Sink.foreach(println))
.run()
streamActor
}
private def sender(streamSource: ActorRef[Event], counter: Int): Behavior[Emitted.type] =
Behaviors.receiveMessage {
case Emitted if counter < 5 =>
streamSource ! Element(counter.toString)
sender(streamSource, counter + 1)
case _ =>
streamSource ! ReachedEnd
Behaviors.stopped
}
}
ActorSystem(StreamFeeder(), "stream-feeder")
// Will print:
// first
// 0
// 1
// 2
// 3
// 4
// #actor-source-with-backpressure
}
def compileOnlyAcotrRef() = {
implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample")
def targetActor(): ActorRef[Protocol] = ???
// #actor-sink-ref
@ -65,7 +139,8 @@ object ActorSourceSinkExample {
}
{
def compileOnlySinkWithBackpressure() = {
implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample")
def targetActor(): ActorRef[Protocol] = ???