improve streams actor integration docs, #21698 (#21700)

* mapAsync + ask should be the first choice
* add missing Source.queue
* prefer actorRefWithAck
* move ActorPublisher and ActorSubscriber to the end
  with additional warning
* fix wrong doc of SourceQueue offer
* and add missing java api
This commit is contained in:
Patrik Nordwall 2016-10-26 10:24:51 +02:00 committed by GitHub
parent bcf4de5b2c
commit aa8c253d14
9 changed files with 457 additions and 232 deletions

View file

@ -11,6 +11,7 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@ -20,6 +21,8 @@ import docs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.PartialFunction;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
@ -27,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static akka.pattern.PatternsCS.ask;
import static docs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
@ -39,6 +43,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
static ActorRef ref;
@BeforeClass
public static void setup() {
@ -54,6 +59,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
system = ActorSystem.create("ActorPublisherDocTest", config);
mat = ActorMaterializer.create(system);
ref = system.actorOf(Props.create(Translator.class));
}
@AfterClass
@ -61,6 +67,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
ref = null;
}
@ -281,6 +288,40 @@ public class IntegrationDocTest extends AbstractJavaTest {
}
}
//#sometimes-slow-service
//#ask-actor
static class Translator extends UntypedActor {
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String word = (String) message;
// ... process message
String reply = word.toUpperCase();
// reply to the ask
getSender().tell(reply, getSelf());
} else {
unhandled(message);
}
}
}
//#ask-actor
@SuppressWarnings("unchecked")
@Test
public void mapAsyncPlusAsk() throws Exception {
//#mapAsync-ask
Source<String, NotUsed> words =
Source.from(Arrays.asList("hello", "hi"));
Timeout askTimeout = Timeout.apply(5, TimeUnit.SECONDS);
words
.mapAsync(5, elem -> ask(ref, elem, askTimeout))
.map(elem -> (String) elem)
// continue processing of the replies from the actor
.map(elem -> elem.toLowerCase())
.runWith(Sink.ignore(), mat);
//#mapAsync-ask
}
@Test

View file

@ -7,27 +7,88 @@ Integration
Integrating with Actors
=======================
For piping the elements of a stream as messages to an ordinary actor you can use the
``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is
For piping the elements of a stream as messages to an ordinary actor you can use
``ask`` in a ``mapAsync`` or use ``Sink.actorRefWithAck``.
Messages can be sent to a stream with ``Source.queue`` or via the :class:`ActorRef` that is
materialized by ``Source.actorRef``.
For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
an :class:`Actor`.
mapAsync + ask
^^^^^^^^^^^^^^
These can be consumed by other Reactive Stream libraries or used as an
Akka Streams :class:`Source` or :class:`Sink`.
A nice way to delegate some processing of elements in a stream to an actor is to
use ``ask`` in ``mapAsync``. The back-pressure of the stream is maintained by
the ``CompletionStage`` of the ``ask`` and the mailbox of the actor will not be filled with
more messages than the given ``parallelism`` of the ``mapAsync`` stage.
.. warning::
.. includecode:: ../code/docs/stream/IntegrationDocTest.java#mapAsync-ask
:class:`AbstractActorPublisher` and :class:`AbstractActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
Note that the messages received in the actor will be in the same order as
the stream elements, i.e. the ``parallelism`` does not change the ordering
of the messages. There is a performance advantage of using parallelism > 1
even though the actor will only process one message at a time because then there
is already a message in the mailbox when the actor has completed previous
message.
The actor must reply to the ``getSender()`` for each message from the stream. That
reply will complete the ``CompletionStage`` of the ``ask`` and it will be the element that
is emitted downstreams from ``mapAsync``.
.. includecode:: ../code/docs/stream/IntegrationDocTest.java#ask-actor
The stream can be completed with failure by sending ``akka.actor.Status.Failure``
as reply from the actor.
If the ``ask`` fails due to timeout the stream will be completed with
``TimeoutException`` failure. If that is not desired outcome you can use ``recover``
on the ``ask`` :class:`CompletionStage`.
If you don't care about the reply values and only use them as back-pressure signals you
can use ``Sink.ignore`` after the ``mapAsync`` stage and then actor is effectively a sink
of the stream.
The same pattern can be used with :ref:`Actor routers <routing-java>`. Then you
can use ``mapAsyncUnordered`` for better efficiency if you don't care about the
order of the emitted downstream elements (the replies).
Sink.actorRefWithAck
^^^^^^^^^^^^^^^^^^^^
The sink sends the elements of the stream to the given :class:`ActorRef` that sends back back-pressure signal.
First element is always `onInitMessage`, then stream is waiting for the given acknowledgement message
from the given actor which means that it is ready to process elements. It also requires the given acknowledgement
message after each stream element to make back-pressure work.
If the target actor terminates the stream will be cancelled. When the stream is completed successfully the
given ``onCompleteMessage`` will be sent to the destination actor. When the stream is completed with
failure a ``akka.actor.Status.Failure`` message will be sent to the destination actor.
.. note::
These Actors are designed to be implemented using Java 8 lambda expressions. In case you need to stay on a JVM
prior to 8, Akka provides :class:`UntypedActorPublisher` and :class:`UntypedActorSubscriber` which can be used
easily from any language level.
Using ``Sink.actorRef`` or ordinary ``tell`` from a ``map`` or ``foreach`` stage means that there is
no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages
fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero
`mailbox-push-timeout-time` or use a rate limiting stage in front. It's often better to
use ``Sink.actorRefWithAck`` or ``ask`` in ``mapAsync``, though.
Source.queue
^^^^^^^^^^^^
``Source.queue`` can be used for emitting elements to a stream from an actor (or from anything running outside
the stream). The elements will be buffered until the stream can process them. You can ``offer`` elements to
the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will
be buffered until request for demand is received.
Use overflow strategy ``akka.stream.OverflowStrategy.backpressure`` to avoid dropping of elements if the
buffer is full.
``SourceQueue.offer`` returns ``CompletionStage<QueueOfferResult>`` which completes with
``QueueOfferResult.enqueued`` if element was added to buffer or sent downstream. It completes with
``QueueOfferResult.dropped`` if element was dropped. Can also complete with ``QueueOfferResult.Failure`` -
when stream failed or ``QueueOfferResult.QueueClosed`` when downstream is completed.
When used from an actor you typically ``pipe`` the result of the ``CompletionStage`` back to the actor to
continue processing.
Source.actorRef
^^^^^^^^^^^^^^^
@ -37,9 +98,10 @@ stream if there is demand from downstream, otherwise they will be buffered until
demand is received.
Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space
available in the buffer. The strategy ``OverflowStrategy.backpressure()`` is not supported
for this Source type, you should consider using ``ActorPublisher`` if you want a backpressured
actor interface.
available in the buffer. The strategy ``OverflowStrategy.backpressure`` is not supported
for this Source type, i.e. elements will be dropped if the buffer is filled by sending
at a rate that is faster than the stream can consume. You should consider using ``Source.queue``
if you want a backpressured actor interface.
The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or
``akka.actor.Status.Success`` to the actor reference.
@ -50,96 +112,6 @@ actor reference.
The actor will be stopped when the stream is completed, failed or cancelled from downstream,
i.e. you can watch it to get notified when that happens.
Sink.actorRef
^^^^^^^^^^^^^
The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates
the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage``
will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure``
message will be sent to the destination actor.
.. warning::
There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming
the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors
it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
limiting stage in front of this stage.
ActorPublisher
^^^^^^^^^^^^^^
Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#job-manager
You send elements to the stream by calling ``onNext``. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``,
otherwise ``onNext`` will throw ``IllegalStateException``.
When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message
is delivered to this actor, and you can act on that event. The ``totalDemand``
is updated automatically.
When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message
is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored.
You can complete the stream by calling ``onComplete``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
You can terminate the stream with failure by calling ``onError``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
If you suspect that this ``AbstractActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout``
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input :class:`Source` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage
You can only attach one subscriber to this publisher. Use a ``Broadcast``-element or
attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscribers.
ActorSubscriber
^^^^^^^^^^^^^^^
Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with
full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#worker-pool
Subclass must define the ``RequestStrategy`` to control stream back pressure.
After each incoming message the ``AbstractActorSubscriber`` will automatically invoke
the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream.
* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself.
* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with
``ZeroRequestStrategy`` or some other strategy. In that case
you must also call ``request`` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output :class:`Sink` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#actor-subscriber-usage
Integrating with External Services
==================================
@ -438,3 +410,101 @@ passing a factory function that will create the :class:`Processor` instances:
.. includecode:: ../code/docs/stream/ReactiveStreamsDocTest.java#use-processor
Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`.
Implementing Reactive Streams Publisher or Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher`` and
any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore we recommend that you
implement Reactive Streams integrations with built-in stages or :ref:`custom stages <stream-customize-java>`.
For historical reasons the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
an :class:`Actor`.
These can be consumed by other Reactive Stream libraries or used as an Akka Streams :class:`Source` or :class:`Sink`.
.. warning::
:class:`ActorPublisher` and :class:`ActorSubscriber` will probably be deprecated in future versions of Akka.
.. warning::
:class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
ActorPublisher
--------------
Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#job-manager
You send elements to the stream by calling ``onNext``. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``,
otherwise ``onNext`` will throw ``IllegalStateException``.
When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message
is delivered to this actor, and you can act on that event. The ``totalDemand``
is updated automatically.
When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message
is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored.
You can complete the stream by calling ``onComplete``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
You can terminate the stream with failure by calling ``onError``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
If you suspect that this ``AbstractActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout``
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input :class:`Source` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage
You can only attach one subscriber to this publisher. Use a ``Broadcast``-element or
attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscribers.
ActorSubscriber
---------------
Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with
full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#worker-pool
Subclass must define the ``RequestStrategy`` to control stream back pressure.
After each incoming message the ``AbstractActorSubscriber`` will automatically invoke
the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream.
* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself.
* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with
``ZeroRequestStrategy`` or some other strategy. In that case
you must also call ``request`` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output :class:`Sink` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#actor-subscriber-usage

View file

@ -15,7 +15,6 @@ import akka.actor.ActorRef
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import akka.stream.Attributes
import akka.stream.ActorAttributes
@ -24,6 +23,7 @@ import akka.stream.ActorMaterializerSettings
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.Supervision
import akka.stream.scaladsl.Flow
import akka.Done
object IntegrationDocSpec {
import TwitterStreamQuickstartDocSpec._
@ -120,6 +120,17 @@ object IntegrationDocSpec {
}
//#sometimes-slow-service
//#ask-actor
class Translator extends Actor {
def receive = {
case word: String =>
// ... process message
val reply = word.toUpperCase
sender() ! reply // reply to the ask
}
}
//#ask-actor
}
class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
@ -127,6 +138,22 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
import IntegrationDocSpec._
implicit val materializer = ActorMaterializer()
val ref: ActorRef = system.actorOf(Props[Translator])
"mapAsync + ask" in {
//#mapAsync-ask
import akka.pattern.ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
// continue processing of the replies from the actor
.map(_.toLowerCase)
.runWith(Sink.ignore)
//#mapAsync-ask
}
"calling external service with mapAsync" in {
val probe = TestProbe()
@ -136,7 +163,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#tweet-authors
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akka))
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
//#tweet-authors
@ -171,7 +198,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
"lookup email with mapAsync and supervision" in {
val addressSystem = new AddressSystem2
val authors: Source[Author, NotUsed] =
tweets.filter(_.hashtags.contains(akka)).map(_.author)
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
//#email-addresses-mapAsync-supervision
import ActorAttributes.supervisionStrategy
@ -191,7 +218,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#external-service-mapAsyncUnordered
val authors: Source[Author, NotUsed] =
tweets.filter(_.hashtags.contains(akka)).map(_.author)
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
val emailAddresses: Source[String, NotUsed] =
authors
@ -224,7 +251,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val addressSystem = new AddressSystem
val smsServer = new SmsServer(probe.ref)
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
val phoneNumbers =
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
@ -261,7 +288,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val addressSystem = new AddressSystem
val smsServer = new SmsServer(probe.ref)
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
val phoneNumbers =
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
@ -293,7 +320,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db")
//#save-tweets
val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akka))
import akka.pattern.ask
val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akkaTag))
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableGraph[NotUsed] =

View file

@ -23,7 +23,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
trait Fixture {
//#authors
val authors = Flow[Tweet]
.filter(_.hashtags.contains(akka))
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
//#authors

View file

@ -28,7 +28,7 @@ object TwitterStreamQuickstartDocSpec {
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
val akka = Hashtag("#akka")
val akkaTag = Hashtag("#akka")
//#model
abstract class TweetSourceDecl {
@ -76,7 +76,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#authors-filter-map
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akka))
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
//#first-sample
//#authors-filter-map
@ -84,7 +84,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
trait Example3 {
//#authors-collect
val authors: Source[Author, NotUsed] =
tweets.collect { case t if t.hashtags.contains(akka) => t.author }
tweets.collect { case t if t.hashtags.contains(akkaTag) => t.author }
//#authors-collect
}
@ -124,7 +124,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in
bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
ClosedShape
})
@ -192,7 +192,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableGraph: RunnableGraph[Future[Int]] =
tweetsInMinuteFromNow
.filter(_.hashtags contains akka)
.filter(_.hashtags contains akkaTag)
.map(t => 1)
.toMat(sumSink)(Keep.right)

View file

@ -7,22 +7,88 @@ Integration
Integrating with Actors
=======================
For piping the elements of a stream as messages to an ordinary actor you can use the
``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is
For piping the elements of a stream as messages to an ordinary actor you can use
``ask`` in a ``mapAsync`` or use ``Sink.actorRefWithAck``.
Messages can be sent to a stream with ``Source.queue`` or via the :class:`ActorRef` that is
materialized by ``Source.actorRef``.
For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
an :class:`Actor`.
mapAsync + ask
^^^^^^^^^^^^^^
These can be consumed by other Reactive Stream libraries or used as an
Akka Streams :class:`Source` or :class:`Sink`.
A nice way to delegate some processing of elements in a stream to an actor is to
use ``ask`` in ``mapAsync``. The back-pressure of the stream is maintained by
the ``Future`` of the ``ask`` and the mailbox of the actor will not be filled with
more messages than the given ``parallelism`` of the ``mapAsync`` stage.
.. warning::
.. includecode:: ../code/docs/stream/IntegrationDocSpec.scala#mapAsync-ask
:class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
Note that the messages received in the actor will be in the same order as
the stream elements, i.e. the ``parallelism`` does not change the ordering
of the messages. There is a performance advantage of using parallelism > 1
even though the actor will only process one message at a time because then there
is already a message in the mailbox when the actor has completed previous
message.
The actor must reply to the ``sender()`` for each message from the stream. That
reply will complete the ``Future`` of the ``ask`` and it will be the element that
is emitted downstreams from ``mapAsync``.
.. includecode:: ../code/docs/stream/IntegrationDocSpec.scala#ask-actor
The stream can be completed with failure by sending ``akka.actor.Status.Failure``
as reply from the actor.
If the ``ask`` fails due to timeout the stream will be completed with
``TimeoutException`` failure. If that is not desired outcome you can use ``recover``
on the ``ask`` :class:`Future`.
If you don't care about the reply values and only use them as back-pressure signals you
can use ``Sink.ignore`` after the ``mapAsync`` stage and then actor is effectively a sink
of the stream.
The same pattern can be used with :ref:`Actor routers <routing-scala>`. Then you
can use ``mapAsyncUnordered`` for better efficiency if you don't care about the
order of the emitted downstream elements (the replies).
Sink.actorRefWithAck
^^^^^^^^^^^^^^^^^^^^
The sink sends the elements of the stream to the given :class:`ActorRef` that sends back back-pressure signal.
First element is always `onInitMessage`, then stream is waiting for the given acknowledgement message
from the given actor which means that it is ready to process elements. It also requires the given acknowledgement
message after each stream element to make back-pressure work.
If the target actor terminates the stream will be cancelled. When the stream is completed successfully the
given ``onCompleteMessage`` will be sent to the destination actor. When the stream is completed with
failure a ``akka.actor.Status.Failure`` message will be sent to the destination actor.
.. note::
Using ``Sink.actorRef`` or ordinary ``tell`` from a ``map`` or ``foreach`` stage means that there is
no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages
fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero
`mailbox-push-timeout-time` or use a rate limiting stage in front. It's often better to
use ``Sink.actorRefWithAck`` or ``ask`` in ``mapAsync``, though.
Source.queue
^^^^^^^^^^^^
``Source.queue`` can be used for emitting elements to a stream from an actor (or from anything running outside
the stream). The elements will be buffered until the stream can process them. You can ``offer`` elements to
the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will
be buffered until request for demand is received.
Use overflow strategy ``akka.stream.OverflowStrategy.backpressure`` to avoid dropping of elements if the
buffer is full.
``SourceQueue.offer`` returns ``Future[QueueOfferResult]`` which completes with ``QueueOfferResult.Enqueued``
if element was added to buffer or sent downstream. It completes with ``QueueOfferResult.Dropped`` if element
was dropped. Can also complete with ``QueueOfferResult.Failure`` - when stream failed or
``QueueOfferResult.QueueClosed`` when downstream is completed.
When used from an actor you typically ``pipe`` the result of the ``Future`` back to the actor to
continue processing.
Source.actorRef
^^^^^^^^^^^^^^^
@ -33,8 +99,9 @@ demand is received.
Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space
available in the buffer. The strategy ``OverflowStrategy.backpressure`` is not supported
for this Source type, you should consider using ``ActorPublisher`` if you want a backpressured
actor interface.
for this Source type, i.e. elements will be dropped if the buffer is filled by sending
at a rate that is faster than the stream can consume. You should consider using ``Source.queue``
if you want a backpressured actor interface.
The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or
``akka.actor.Status.Success`` to the actor reference.
@ -45,96 +112,6 @@ actor reference.
The actor will be stopped when the stream is completed, failed or cancelled from downstream,
i.e. you can watch it to get notified when that happens.
Sink.actorRef
^^^^^^^^^^^^^
The sink sends the elements of the stream to the given :class:`ActorRef`. If the target actor terminates
the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage``
will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure``
message will be sent to the destination actor.
.. warning::
There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming
the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors
it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
limiting stage in front of this stage.
ActorPublisher
^^^^^^^^^^^^^^
Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#job-manager
You send elements to the stream by calling ``onNext``. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``,
otherwise ``onNext`` will throw ``IllegalStateException``.
When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message
is delivered to this actor, and you can act on that event. The ``totalDemand``
is updated automatically.
When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message
is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored.
You can complete the stream by calling ``onComplete``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
You can terminate the stream with failure by calling ``onError``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout``
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input :class:`Source` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage
A publisher that is created with ``Sink.asPublisher`` supports a specified number of subscribers. Additional
subscription attempts will be rejected with an :class:`IllegalStateException`.
ActorSubscriber
^^^^^^^^^^^^^^^
Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
stream subscriber with full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool
Subclass must define the ``RequestStrategy`` to control stream back pressure.
After each incoming message the ``ActorSubscriber`` will automatically invoke
the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream.
* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself.
* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with
``ZeroRequestStrategy`` or some other strategy. In that case
you must also call ``request`` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output :class:`Sink` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage
Integrating with External Services
==================================
@ -433,3 +410,101 @@ passing a factory function that will create the :class:`Processor` instances:
.. includecode:: ../code/docs/stream/ReactiveStreamsDocSpec.scala#use-processor
Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`.
Implementing Reactive Streams Publisher or Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher`` and
any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore we recommend that you
implement Reactive Streams integrations with built-in stages or :ref:`custom stages <stream-customize-scala>`.
For historical reasons the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
an :class:`Actor`.
These can be consumed by other Reactive Stream libraries or used as an Akka Streams :class:`Source` or :class:`Sink`.
.. warning::
:class:`ActorPublisher` and :class:`ActorSubscriber` will probably be deprecated in future versions of Akka.
.. warning::
:class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
ActorPublisher
--------------
Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#job-manager
You send elements to the stream by calling ``onNext``. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``,
otherwise ``onNext`` will throw ``IllegalStateException``.
When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message
is delivered to this actor, and you can act on that event. The ``totalDemand``
is updated automatically.
When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message
is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored.
You can complete the stream by calling ``onComplete``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
You can terminate the stream with failure by calling ``onError``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout``
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input :class:`Source` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage
A publisher that is created with ``Sink.asPublisher`` supports a specified number of subscribers. Additional
subscription attempts will be rejected with an :class:`IllegalStateException`.
ActorSubscriber
---------------
Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
stream subscriber with full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool
Subclass must define the ``RequestStrategy`` to control stream back pressure.
After each incoming message the ``ActorSubscriber`` will automatically invoke
the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream.
* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself.
* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with
``ZeroRequestStrategy`` or some other strategy. In that case
you must also call ``request`` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output :class:`Sink` to a :class:`Flow`:
.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage

View file

@ -15,11 +15,21 @@ object QueueOfferResult {
*/
final case object Enqueued extends QueueOfferResult
/**
* Java API: The `Enqueued` singleton instance
*/
def enqueued: QueueOfferResult = Enqueued
/**
* Type is used to indicate that stream is dropped an element
*/
final case object Dropped extends QueueOfferResult
/**
* Java API: The `Enqueued` singleton instance
*/
def dropped: QueueOfferResult = Dropped
/**
* Type is used to indicate that stream is failed before or during call to the stream
* @param cause - exception that stream failed with

View file

@ -316,10 +316,10 @@ object Source {
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage<StreamCallbackStatus<Boolean>>` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.javadsl.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed.
* [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage`
* call when buffer is full.

View file

@ -398,7 +398,7 @@ object Source {
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* See also [[akka.stream.javadsl.Source.queue]].
* See also [[akka.stream.scaladsl.Source.queue]].
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
@ -456,10 +456,10 @@ object Source {
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.scaladsl.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed.
* [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with
* `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.