diff --git a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRef.md b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRef.md index c08e569dd7..6850c33ee9 100644 --- a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRef.md @@ -1,6 +1,6 @@ # ActorSource.actorRef -Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream only if they are of the same type as the stream. +Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream. @ref[Actor interop operators](../index.md#actor-interop-operators) @@ -22,6 +22,13 @@ This operator is included in: Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] which only accepts messages that are of the same type as the stream. +See also: + +* @ref[ActorSource.actorRefWithBackpressure](actorRefWithBackpressure.md) This operator, but with backpressure control +* @ref[Source.actorRef](../Source/actorRef.md) The corresponding operator for the classic actors API +* @ref[Source.actorRefWithBackpressure](../Source/actorRefWithBackpressure.md) The operator for the classic actors API with backpressure control +* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source + ## Examples Scala diff --git a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md index e6988389ad..cf73e39c10 100644 --- a/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md +++ b/akka-docs/src/main/paradox/stream/operators/ActorSource/actorRefWithBackpressure.md @@ -1,6 +1,6 @@ # ActorSource.actorRefWithBackpressure -Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. +Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. @ref[Actor interop operators](../index.md#actor-interop-operators) @@ -23,12 +23,36 @@ This operator is included in: Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`], sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream. +See also: + +* @ref[ActorSource.actorRef](actorRef.md) This operator, but without backpressure control +* @ref[Source.actorRef](../Source/actorRef.md) This operator, but without backpressure control for the classic actors API +* @ref[Source.actorRefWithBackpressure](../Source/actorRefWithBackpressure.md) This operator for the classic actors API +* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source + +## Example + +With `actorRefWithBackpressure` two actors get into play: one which feeds the stream and is materialized when the stream runs, and one which is provided by us and gets the ack signal when an element is emitted into the stream. + +For the ack signal we create an @scala[`Emitted` object]@java[empty `Emitted` class]. + +For "feeding" the stream we use the `Event` @scala[trait]@java[interface]. + +In this example we create the stream in an actor which itself reacts on the demand of the stream and sends more messages. + + +Scala +: @@snip [ActorSourceSinkExample.scala](/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala) { #actor-source-with-backpressure } + +Java +: @@snip [snip](/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceWithBackpressureExample.java) { #sample } + ## Reactive Streams semantics @@@div { .callout } -**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef` +**emits** when there is demand and a message is sent to the materialized @scala[`ActorRef[T]`]@java[`ActorRef`] -**completes** when the `ActorRef` is sent `akka.actor.Status.Success` +**completes** when the passed completion matcher returns a `CompletionStrategy` @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md index b13ce70aaf..7bc95c93b1 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md @@ -1,8 +1,8 @@ # Source.actorRef -Materialize an `ActorRef`; sending messages to it will emit them on the stream. +Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. -@ref[Source operators](../index.md#source-operators) +@ref[Actor interop operators](../index.md#actor-interop-operators) ## Signature @@ -21,6 +21,13 @@ already buffered elements will be sent out before signaling completion. Sending `akka.actor.PoisonPill` will signal completion immediately but this behavior is deprecated and scheduled to be removed. Using `akka.actor.ActorSystem.stop` to stop the actor and complete the stream is *not supported*. +See also: + +* @ref[Source.actorRefWithBackpressure](../Source/actorRefWithBackpressure.md) This operator, but with backpressure control +* @ref[ActorSource.actorRef](actorRef.md) The corresponding operator for the new actors API +* @ref[ActorSource.actorRefWithBackpressure](actorRefWithBackpressure.md) The operator for the new actors API with backpressure control +* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source + ## Examples Scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md index 4806a22fe1..de29352350 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md @@ -1,8 +1,8 @@ # Source.actorRefWithBackpressure -Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. +Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source. -@ref[Source operators](../index.md#source-operators) +@ref[Actor interop operators](../index.md#actor-interop-operators) ## Signature @@ -13,6 +13,13 @@ Materialize an `ActorRef`; sending messages to it will emit them on the stream. Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream. +See also: + +* @ref[Source.actorRef](../Source/actorRef.md) This operator without backpressure control +* @ref[ActorSource.actorRef](actorRef.md) The operator for the new actors API without backpressure control +* @ref[ActorSource.actorRefWithBackpressure](actorRefWithBackpressure.md) The corresponding operator for the new actors API +* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source + ## Examples Scala @@ -27,6 +34,6 @@ Java **emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef` -**completes** when the `ActorRef` is sent `akka.actor.Status.Success` +**completes** when the passed completion matcher returns a `CompletionStrategy` or fails if the passed failure matcher returns an exception @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 48fa47bae3..1e52617b48 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -7,8 +7,6 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] | |Operator|Description| |--|--|--| -|Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| -|Source|@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| @@ -318,9 +316,11 @@ Operators meant for inter-operating between Akka Streams and Actors: | |Operator|Description| |--|--|--| -|ActorSource|@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream only if they are of the same type as the stream.| +|Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream.| +|ActorSource|@ref[actorRef](ActorSource/actorRef.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.| |ActorSink|@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`], without considering backpressure.| -|ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| +|Source|@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef` of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| +|ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 75c076494e..01df7d3a4f 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -108,13 +108,15 @@ public class SourceDocExamples { final ActorSystem system = null; // #actorRefWithBackpressure - Source source = - Source.actorRefWithBackpressure( + Source source = + Source.actorRefWithBackpressure( "ack", + // complete when we send "complete" o -> { if (o == "complete") return Optional.of(CompletionStrategy.draining()); else return Optional.empty(); }, + // do not fail on any message o -> Optional.empty()); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system); diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index b54e9a12cb..312ca2ddd3 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -67,9 +67,14 @@ object SourceOperators { val probe = TestProbe() - val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", { - case _: Success => CompletionStrategy.immediately - }, PartialFunction.empty) + val source: Source[String, ActorRef] = Source.actorRefWithBackpressure[String]( + ackMessage = "ack", + // complete when we send akka.actor.status.Success + completionMatcher = { + case _: Success => CompletionStrategy.immediately + }, + // do not fail on any message + failureMatcher = PartialFunction.empty) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() probe.send(actorRef, "hello") diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala index b5925cd0cb..2bf66e2d59 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala @@ -74,7 +74,10 @@ object ActorSource { * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. * - * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * The stream can be completed by sending a message that is matched by `completionMatcher` which decides + * if the stream is to drained before completion or should complete immediately. + * + * A message that is matched by `failureMatcher` fails the stream. The extracted * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, * the failure will be signaled downstream immediately (instead of the completion signal). diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala index 60df2faa95..09df61f03d 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala @@ -66,13 +66,23 @@ object ActorSource { * and a new message will only be accepted after the previous messages has been consumed and acknowledged back. * The stream will complete with failure if a message is sent before the acknowledgement has been replied back. * - * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted + * The stream can be completed by sending a message that is matched by `completionMatcher` which decides + * if the stream is to drained before completion or should complete immediately. + * + * A message that is matched by `failureMatcher` fails the stream. The extracted * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, * the failure will be signaled downstream immediately (instead of the completion signal). * * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. + * + * @param ackTo actor to be signalled when an element has been emitted to the stream + * @param ackMessage a fixed message to be sent to `ackTo` to signal demand + * @param completionMatcher a partial function applied to the messages received materialized actor, + * a matching message will complete the stream with the return [[CompletionStrategy]] + * @param failureMatcher a partial function applied to the messages received materialized actor, + * a matching message will fail the stream with the returned [[Throwable]] */ def actorRefWithBackpressure[T, Ack]( ackTo: ActorRef[Ack], diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java index fb9b87e709..2d18e3261b 100644 --- a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceExample.java @@ -6,8 +6,8 @@ package docs.akka.stream.typed; // #actor-source-ref import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; import akka.japi.JavaPartialFunction; -import akka.stream.ActorMaterializer; import akka.stream.OverflowStrategy; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; @@ -41,9 +41,8 @@ public class ActorSourceExample { } // #actor-source-ref - final ActorMaterializer mat = null; - { + final ActorSystem system = null; // #actor-source-ref final Source> source = @@ -66,7 +65,7 @@ public class ActorSourceExample { } }) .to(Sink.foreach(System.out::println)) - .run(mat); + .run(system); ref.tell(new Message("msg1")); // ref.tell("msg2"); Does not compile diff --git a/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceWithBackpressureExample.java b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceWithBackpressureExample.java new file mode 100644 index 0000000000..f5aec29929 --- /dev/null +++ b/akka-stream-typed/src/test/java/docs/akka/stream/typed/ActorSourceWithBackpressureExample.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package docs.akka.stream.typed; + +// #sample +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.stream.CompletionStrategy; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.typed.javadsl.ActorSource; + +import java.util.Optional; + +class StreamFeeder extends AbstractBehavior { + /** Signals that the latest element is emitted into the stream */ + public enum Emitted { + INSTANCE; + } + + public interface Event {} + + public static class Element implements Event { + public final String content; + + public Element(String content) { + this.content = content; + } + + @Override + public String toString() { + return "Element(" + content + ")"; + } + } + + public enum ReachedEnd implements Event { + INSTANCE; + } + + public static class FailureOccured implements Event { + public final Exception ex; + + public FailureOccured(Exception ex) { + this.ex = ex; + } + } + + public static Behavior create() { + return Behaviors.setup(StreamFeeder::new); + } + + private int counter = 0; + private final ActorRef streamSource; + + private StreamFeeder(ActorContext context) { + super(context); + streamSource = runStream(context.getSelf(), context.getSystem()); + streamSource.tell(new Element("first")); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder().onMessage(Emitted.class, this::onEmitted).build(); + } + + private static ActorRef runStream(ActorRef ackReceiver, ActorSystem system) { + Source> source = + ActorSource.actorRefWithBackpressure( + ackReceiver, + Emitted.INSTANCE, + // complete when we send ReachedEnd + (msg) -> { + if (msg == ReachedEnd.INSTANCE) return Optional.of(CompletionStrategy.draining()); + else return Optional.empty(); + }, + (msg) -> { + if (msg instanceof FailureOccured) return Optional.of(((FailureOccured) msg).ex); + else return Optional.empty(); + }); + + return source.to(Sink.foreach(System.out::println)).run(system); + } + + private Behavior onEmitted(Emitted message) { + if (counter < 5) { + streamSource.tell(new Element(String.valueOf(counter))); + counter++; + return this; + } else { + streamSource.tell(ReachedEnd.INSTANCE); + return Behaviors.stopped(); + } + } +} +// #sample + +public class ActorSourceWithBackpressureExample { + + public static void main(String[] args) { + // #sample + ActorSystem system = + ActorSystem.create(StreamFeeder.create(), "stream-feeder"); + + // will print: + // Element(first) + // Element(0) + // Element(1) + // Element(2) + // Element(3) + // Element(4) + // #sample + } +} diff --git a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala index fcfeb39637..0294309cb3 100644 --- a/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala +++ b/akka-stream-typed/src/test/scala/docs/akka/stream/typed/ActorSourceSinkExample.scala @@ -5,14 +5,14 @@ package docs.akka.stream.typed import akka.NotUsed -import akka.actor.typed.{ ActorRef, ActorSystem } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import akka.actor.typed.scaladsl.Behaviors object ActorSourceSinkExample { - implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample") + def compileOnlySourceRef() = { + implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample") - { // #actor-source-ref import akka.actor.typed.ActorRef import akka.stream.OverflowStrategy @@ -42,7 +42,81 @@ object ActorSourceSinkExample { // #actor-source-ref } - { + def main(args: Array[String]): Unit = { + + // #actor-source-with-backpressure + import akka.actor.typed.ActorRef + import akka.stream.CompletionStrategy + import akka.stream.scaladsl.Sink + import akka.stream.typed.scaladsl.ActorSource + + object StreamFeeder { + + /** Signals that the latest element is emitted into the stream */ + case object Emitted + + sealed trait Event + case class Element(content: String) extends Event + case object ReachedEnd extends Event + case class FailureOccured(ex: Exception) extends Event + + def apply(): Behavior[Emitted.type] = + Behaviors.setup { context => + val streamActor = runStream(context.self)(context.system) + streamActor ! Element("first") + sender(streamActor, 0) + } + + private def runStream(ackReceiver: ActorRef[Emitted.type])(implicit system: ActorSystem[_]): ActorRef[Event] = { + val source = + ActorSource.actorRefWithBackpressure[Event, Emitted.type]( + // get demand signalled to this actor receiving Ack + ackTo = ackReceiver, + ackMessage = Emitted, + // complete when we send ReachedEnd + completionMatcher = { + case ReachedEnd => CompletionStrategy.draining + }, + failureMatcher = { + case FailureOccured(ex) => ex + }) + + val streamActor: ActorRef[Event] = source + .collect { + case Element(msg) => msg + } + .to(Sink.foreach(println)) + .run() + + streamActor + } + + private def sender(streamSource: ActorRef[Event], counter: Int): Behavior[Emitted.type] = + Behaviors.receiveMessage { + case Emitted if counter < 5 => + streamSource ! Element(counter.toString) + sender(streamSource, counter + 1) + case _ => + streamSource ! ReachedEnd + Behaviors.stopped + } + } + + ActorSystem(StreamFeeder(), "stream-feeder") + + // Will print: + // first + // 0 + // 1 + // 2 + // 3 + // 4 + // #actor-source-with-backpressure + } + + def compileOnlyAcotrRef() = { + implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample") + def targetActor(): ActorRef[Protocol] = ??? // #actor-sink-ref @@ -65,7 +139,8 @@ object ActorSourceSinkExample { } - { + def compileOnlySinkWithBackpressure() = { + implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ActorSourceSinkExample") def targetActor(): ActorRef[Protocol] = ???