diff --git a/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java b/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java index 8f6487862f..4b404111b6 100644 --- a/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/IntegrationDocTest.java @@ -11,6 +11,7 @@ import akka.stream.*; import akka.stream.javadsl.*; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; +import akka.util.Timeout; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -20,6 +21,8 @@ import docs.stream.TwitterStreamQuickstartDocTest.Model.Tweet; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import scala.Option; +import scala.PartialFunction; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; @@ -27,6 +30,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static akka.pattern.PatternsCS.ask; import static docs.stream.TwitterStreamQuickstartDocTest.Model.AKKA; @@ -39,6 +43,7 @@ public class IntegrationDocTest extends AbstractJavaTest { static ActorSystem system; static Materializer mat; + static ActorRef ref; @BeforeClass public static void setup() { @@ -54,6 +59,7 @@ public class IntegrationDocTest extends AbstractJavaTest { system = ActorSystem.create("ActorPublisherDocTest", config); mat = ActorMaterializer.create(system); + ref = system.actorOf(Props.create(Translator.class)); } @AfterClass @@ -61,6 +67,7 @@ public class IntegrationDocTest extends AbstractJavaTest { JavaTestKit.shutdownActorSystem(system); system = null; mat = null; + ref = null; } @@ -281,6 +288,40 @@ public class IntegrationDocTest extends AbstractJavaTest { } } //#sometimes-slow-service + + //#ask-actor + static class Translator extends UntypedActor { + @Override + public void onReceive(Object message) { + if (message instanceof String) { + String word = (String) message; + // ... process message + String reply = word.toUpperCase(); + // reply to the ask + getSender().tell(reply, getSelf()); + } else { + unhandled(message); + } + } + } + //#ask-actor + + @SuppressWarnings("unchecked") + @Test + public void mapAsyncPlusAsk() throws Exception { + //#mapAsync-ask + Source words = + Source.from(Arrays.asList("hello", "hi")); + Timeout askTimeout = Timeout.apply(5, TimeUnit.SECONDS); + + words + .mapAsync(5, elem -> ask(ref, elem, askTimeout)) + .map(elem -> (String) elem) + // continue processing of the replies from the actor + .map(elem -> elem.toLowerCase()) + .runWith(Sink.ignore(), mat); + //#mapAsync-ask + } @Test diff --git a/akka-docs/rst/java/stream/stream-integrations.rst b/akka-docs/rst/java/stream/stream-integrations.rst index 6f47436fe1..aef739e108 100644 --- a/akka-docs/rst/java/stream/stream-integrations.rst +++ b/akka-docs/rst/java/stream/stream-integrations.rst @@ -7,27 +7,88 @@ Integration Integrating with Actors ======================= -For piping the elements of a stream as messages to an ordinary actor you can use the -``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is +For piping the elements of a stream as messages to an ordinary actor you can use +``ask`` in a ``mapAsync`` or use ``Sink.actorRefWithAck``. + +Messages can be sent to a stream with ``Source.queue`` or via the :class:`ActorRef` that is materialized by ``Source.actorRef``. -For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are -provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with -an :class:`Actor`. +mapAsync + ask +^^^^^^^^^^^^^^ -These can be consumed by other Reactive Stream libraries or used as an -Akka Streams :class:`Source` or :class:`Sink`. +A nice way to delegate some processing of elements in a stream to an actor is to +use ``ask`` in ``mapAsync``. The back-pressure of the stream is maintained by +the ``CompletionStage`` of the ``ask`` and the mailbox of the actor will not be filled with +more messages than the given ``parallelism`` of the ``mapAsync`` stage. -.. warning:: +.. includecode:: ../code/docs/stream/IntegrationDocTest.java#mapAsync-ask - :class:`AbstractActorPublisher` and :class:`AbstractActorSubscriber` cannot be used with remote actors, - because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the - the stream may deadlock. +Note that the messages received in the actor will be in the same order as +the stream elements, i.e. the ``parallelism`` does not change the ordering +of the messages. There is a performance advantage of using parallelism > 1 +even though the actor will only process one message at a time because then there +is already a message in the mailbox when the actor has completed previous +message. + +The actor must reply to the ``getSender()`` for each message from the stream. That +reply will complete the ``CompletionStage`` of the ``ask`` and it will be the element that +is emitted downstreams from ``mapAsync``. + +.. includecode:: ../code/docs/stream/IntegrationDocTest.java#ask-actor + +The stream can be completed with failure by sending ``akka.actor.Status.Failure`` +as reply from the actor. + +If the ``ask`` fails due to timeout the stream will be completed with +``TimeoutException`` failure. If that is not desired outcome you can use ``recover`` +on the ``ask`` :class:`CompletionStage`. + +If you don't care about the reply values and only use them as back-pressure signals you +can use ``Sink.ignore`` after the ``mapAsync`` stage and then actor is effectively a sink +of the stream. + +The same pattern can be used with :ref:`Actor routers `. Then you +can use ``mapAsyncUnordered`` for better efficiency if you don't care about the +order of the emitted downstream elements (the replies). + +Sink.actorRefWithAck +^^^^^^^^^^^^^^^^^^^^ + +The sink sends the elements of the stream to the given :class:`ActorRef` that sends back back-pressure signal. +First element is always `onInitMessage`, then stream is waiting for the given acknowledgement message +from the given actor which means that it is ready to process elements. It also requires the given acknowledgement +message after each stream element to make back-pressure work. + +If the target actor terminates the stream will be cancelled. When the stream is completed successfully the +given ``onCompleteMessage`` will be sent to the destination actor. When the stream is completed with +failure a ``akka.actor.Status.Failure`` message will be sent to the destination actor. .. note:: - These Actors are designed to be implemented using Java 8 lambda expressions. In case you need to stay on a JVM - prior to 8, Akka provides :class:`UntypedActorPublisher` and :class:`UntypedActorSubscriber` which can be used - easily from any language level. + + Using ``Sink.actorRef`` or ordinary ``tell`` from a ``map`` or ``foreach`` stage means that there is + no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages + fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero + `mailbox-push-timeout-time` or use a rate limiting stage in front. It's often better to + use ``Sink.actorRefWithAck`` or ``ask`` in ``mapAsync``, though. + +Source.queue +^^^^^^^^^^^^ + +``Source.queue`` can be used for emitting elements to a stream from an actor (or from anything running outside +the stream). The elements will be buffered until the stream can process them. You can ``offer`` elements to +the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will +be buffered until request for demand is received. + +Use overflow strategy ``akka.stream.OverflowStrategy.backpressure`` to avoid dropping of elements if the +buffer is full. + +``SourceQueue.offer`` returns ``CompletionStage`` which completes with +``QueueOfferResult.enqueued`` if element was added to buffer or sent downstream. It completes with +``QueueOfferResult.dropped`` if element was dropped. Can also complete with ``QueueOfferResult.Failure`` - +when stream failed or ``QueueOfferResult.QueueClosed`` when downstream is completed. + +When used from an actor you typically ``pipe`` the result of the ``CompletionStage`` back to the actor to +continue processing. Source.actorRef ^^^^^^^^^^^^^^^ @@ -37,9 +98,10 @@ stream if there is demand from downstream, otherwise they will be buffered until demand is received. Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space -available in the buffer. The strategy ``OverflowStrategy.backpressure()`` is not supported -for this Source type, you should consider using ``ActorPublisher`` if you want a backpressured -actor interface. +available in the buffer. The strategy ``OverflowStrategy.backpressure`` is not supported +for this Source type, i.e. elements will be dropped if the buffer is filled by sending +at a rate that is faster than the stream can consume. You should consider using ``Source.queue`` +if you want a backpressured actor interface. The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or ``akka.actor.Status.Success`` to the actor reference. @@ -50,96 +112,6 @@ actor reference. The actor will be stopped when the stream is completed, failed or cancelled from downstream, i.e. you can watch it to get notified when that happens. -Sink.actorRef -^^^^^^^^^^^^^ - -The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates -the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage`` -will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure`` -message will be sent to the destination actor. - -.. warning:: - - There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming - the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors - it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate - limiting stage in front of this stage. - -ActorPublisher -^^^^^^^^^^^^^^ - -Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a -stream publisher that keeps track of the subscription life cycle and requested elements. - -Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: - -.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#job-manager - -You send elements to the stream by calling ``onNext``. You are allowed to send as many -elements as have been requested by the stream subscriber. This amount can be inquired with -``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``, -otherwise ``onNext`` will throw ``IllegalStateException``. - -When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message -is delivered to this actor, and you can act on that event. The ``totalDemand`` -is updated automatically. - -When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message -is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored. - -You can complete the stream by calling ``onComplete``. After that you are not allowed to -call ``onNext``, ``onError`` and ``onComplete``. - -You can terminate the stream with failure by calling ``onError``. After that you are not allowed to -call ``onNext``, ``onError`` and ``onComplete``. - -If you suspect that this ``AbstractActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout`` -method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when -the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform -cleanup and stop itself. - -If the actor is stopped the stream will be completed, unless it was not already terminated with -failure, completed or canceled. - -More detailed information can be found in the API documentation. - -This is how it can be used as input :class:`Source` to a :class:`Flow`: - -.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage - -You can only attach one subscriber to this publisher. Use a ``Broadcast``-element or -attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscribers. - -ActorSubscriber -^^^^^^^^^^^^^^^ - -Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with -full control of stream back pressure. It will receive -``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` -messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. - -Here is an example of such an actor. It dispatches incoming jobs to child worker actors: - -.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#worker-pool - -Subclass must define the ``RequestStrategy`` to control stream back pressure. -After each incoming message the ``AbstractActorSubscriber`` will automatically invoke -the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream. - -* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. -* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or - delegated to other actors. -* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with - ``ZeroRequestStrategy`` or some other strategy. In that case - you must also call ``request`` when the actor is started or when it is ready, otherwise - it will not receive any elements. - -More detailed information can be found in the API documentation. - -This is how it can be used as output :class:`Sink` to a :class:`Flow`: - -.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#actor-subscriber-usage - Integrating with External Services ================================== @@ -438,3 +410,101 @@ passing a factory function that will create the :class:`Processor` instances: .. includecode:: ../code/docs/stream/ReactiveStreamsDocTest.java#use-processor Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`. + +Implementing Reactive Streams Publisher or Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher`` and +any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore we recommend that you +implement Reactive Streams integrations with built-in stages or :ref:`custom stages `. + +For historical reasons the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are +provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with +an :class:`Actor`. + +These can be consumed by other Reactive Stream libraries or used as an Akka Streams :class:`Source` or :class:`Sink`. + +.. warning:: + + :class:`ActorPublisher` and :class:`ActorSubscriber` will probably be deprecated in future versions of Akka. + +.. warning:: + + :class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors, + because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the + the stream may deadlock. + +ActorPublisher +-------------- + +Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a +stream publisher that keeps track of the subscription life cycle and requested elements. + +Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: + +.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#job-manager + +You send elements to the stream by calling ``onNext``. You are allowed to send as many +elements as have been requested by the stream subscriber. This amount can be inquired with +``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``, +otherwise ``onNext`` will throw ``IllegalStateException``. + +When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message +is delivered to this actor, and you can act on that event. The ``totalDemand`` +is updated automatically. + +When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message +is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored. + +You can complete the stream by calling ``onComplete``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +You can terminate the stream with failure by calling ``onError``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +If you suspect that this ``AbstractActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout`` +method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when +the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform +cleanup and stop itself. + +If the actor is stopped the stream will be completed, unless it was not already terminated with +failure, completed or canceled. + +More detailed information can be found in the API documentation. + +This is how it can be used as input :class:`Source` to a :class:`Flow`: + +.. includecode:: ../code/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage + +You can only attach one subscriber to this publisher. Use a ``Broadcast``-element or +attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscribers. + +ActorSubscriber +--------------- + +Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with +full control of stream back pressure. It will receive +``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` +messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. + +Here is an example of such an actor. It dispatches incoming jobs to child worker actors: + +.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#worker-pool + +Subclass must define the ``RequestStrategy`` to control stream back pressure. +After each incoming message the ``AbstractActorSubscriber`` will automatically invoke +the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream. + +* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. +* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or + delegated to other actors. +* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with + ``ZeroRequestStrategy`` or some other strategy. In that case + you must also call ``request`` when the actor is started or when it is ready, otherwise + it will not receive any elements. + +More detailed information can be found in the API documentation. + +This is how it can be used as output :class:`Sink` to a :class:`Flow`: + +.. includecode:: ../code/docs/stream/ActorSubscriberDocTest.java#actor-subscriber-usage diff --git a/akka-docs/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/IntegrationDocSpec.scala index 2d1fd35366..8b5977e94c 100644 --- a/akka-docs/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -15,7 +15,6 @@ import akka.actor.ActorRef import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.Props -import akka.pattern.ask import akka.util.Timeout import akka.stream.Attributes import akka.stream.ActorAttributes @@ -24,6 +23,7 @@ import akka.stream.ActorMaterializerSettings import java.util.concurrent.atomic.AtomicInteger import akka.stream.Supervision import akka.stream.scaladsl.Flow +import akka.Done object IntegrationDocSpec { import TwitterStreamQuickstartDocSpec._ @@ -120,6 +120,17 @@ object IntegrationDocSpec { } //#sometimes-slow-service + //#ask-actor + class Translator extends Actor { + def receive = { + case word: String => + // ... process message + val reply = word.toUpperCase + sender() ! reply // reply to the ask + } + } + //#ask-actor + } class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { @@ -127,6 +138,22 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { import IntegrationDocSpec._ implicit val materializer = ActorMaterializer() + val ref: ActorRef = system.actorOf(Props[Translator]) + + "mapAsync + ask" in { + //#mapAsync-ask + import akka.pattern.ask + implicit val askTimeout = Timeout(5.seconds) + val words: Source[String, NotUsed] = + Source(List("hello", "hi")) + + words + .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String]) + // continue processing of the replies from the actor + .map(_.toLowerCase) + .runWith(Sink.ignore) + //#mapAsync-ask + } "calling external service with mapAsync" in { val probe = TestProbe() @@ -136,7 +163,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#tweet-authors val authors: Source[Author, NotUsed] = tweets - .filter(_.hashtags.contains(akka)) + .filter(_.hashtags.contains(akkaTag)) .map(_.author) //#tweet-authors @@ -171,7 +198,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { "lookup email with mapAsync and supervision" in { val addressSystem = new AddressSystem2 val authors: Source[Author, NotUsed] = - tweets.filter(_.hashtags.contains(akka)).map(_.author) + tweets.filter(_.hashtags.contains(akkaTag)).map(_.author) //#email-addresses-mapAsync-supervision import ActorAttributes.supervisionStrategy @@ -191,7 +218,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#external-service-mapAsyncUnordered val authors: Source[Author, NotUsed] = - tweets.filter(_.hashtags.contains(akka)).map(_.author) + tweets.filter(_.hashtags.contains(akkaTag)).map(_.author) val emailAddresses: Source[String, NotUsed] = authors @@ -224,7 +251,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val addressSystem = new AddressSystem val smsServer = new SmsServer(probe.ref) - val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) + val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author) val phoneNumbers = authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle)) @@ -261,7 +288,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val addressSystem = new AddressSystem val smsServer = new SmsServer(probe.ref) - val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) + val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author) val phoneNumbers = authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle)) @@ -293,7 +320,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db") //#save-tweets - val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akka)) + import akka.pattern.ask + + val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akkaTag)) implicit val timeout = Timeout(3.seconds) val saveTweets: RunnableGraph[NotUsed] = diff --git a/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index 84004cde12..c033312446 100644 --- a/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -23,7 +23,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { trait Fixture { //#authors val authors = Flow[Tweet] - .filter(_.hashtags.contains(akka)) + .filter(_.hashtags.contains(akkaTag)) .map(_.author) //#authors diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 6097220af2..b24dc76d4f 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -28,7 +28,7 @@ object TwitterStreamQuickstartDocSpec { body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet } - val akka = Hashtag("#akka") + val akkaTag = Hashtag("#akka") //#model abstract class TweetSourceDecl { @@ -76,7 +76,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#authors-filter-map val authors: Source[Author, NotUsed] = tweets - .filter(_.hashtags.contains(akka)) + .filter(_.hashtags.contains(akkaTag)) .map(_.author) //#first-sample //#authors-filter-map @@ -84,7 +84,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { trait Example3 { //#authors-collect val authors: Source[Author, NotUsed] = - tweets.collect { case t if t.hashtags.contains(akka) => t.author } + tweets.collect { case t if t.hashtags.contains(akkaTag) => t.author } //#authors-collect } @@ -124,7 +124,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val bcast = b.add(Broadcast[Tweet](2)) tweets ~> bcast.in - bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors + bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags ClosedShape }) @@ -192,7 +192,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val sumSink = Sink.fold[Int, Int](0)(_ + _) val counterRunnableGraph: RunnableGraph[Future[Int]] = tweetsInMinuteFromNow - .filter(_.hashtags contains akka) + .filter(_.hashtags contains akkaTag) .map(t => 1) .toMat(sumSink)(Keep.right) diff --git a/akka-docs/rst/scala/stream/stream-integrations.rst b/akka-docs/rst/scala/stream/stream-integrations.rst index 8669460c05..33e8658d48 100644 --- a/akka-docs/rst/scala/stream/stream-integrations.rst +++ b/akka-docs/rst/scala/stream/stream-integrations.rst @@ -7,22 +7,88 @@ Integration Integrating with Actors ======================= -For piping the elements of a stream as messages to an ordinary actor you can use the -``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is +For piping the elements of a stream as messages to an ordinary actor you can use +``ask`` in a ``mapAsync`` or use ``Sink.actorRefWithAck``. + +Messages can be sent to a stream with ``Source.queue`` or via the :class:`ActorRef` that is materialized by ``Source.actorRef``. -For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are -provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with -an :class:`Actor`. +mapAsync + ask +^^^^^^^^^^^^^^ -These can be consumed by other Reactive Stream libraries or used as an -Akka Streams :class:`Source` or :class:`Sink`. +A nice way to delegate some processing of elements in a stream to an actor is to +use ``ask`` in ``mapAsync``. The back-pressure of the stream is maintained by +the ``Future`` of the ``ask`` and the mailbox of the actor will not be filled with +more messages than the given ``parallelism`` of the ``mapAsync`` stage. -.. warning:: +.. includecode:: ../code/docs/stream/IntegrationDocSpec.scala#mapAsync-ask - :class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors, - because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the - the stream may deadlock. +Note that the messages received in the actor will be in the same order as +the stream elements, i.e. the ``parallelism`` does not change the ordering +of the messages. There is a performance advantage of using parallelism > 1 +even though the actor will only process one message at a time because then there +is already a message in the mailbox when the actor has completed previous +message. + +The actor must reply to the ``sender()`` for each message from the stream. That +reply will complete the ``Future`` of the ``ask`` and it will be the element that +is emitted downstreams from ``mapAsync``. + +.. includecode:: ../code/docs/stream/IntegrationDocSpec.scala#ask-actor + +The stream can be completed with failure by sending ``akka.actor.Status.Failure`` +as reply from the actor. + +If the ``ask`` fails due to timeout the stream will be completed with +``TimeoutException`` failure. If that is not desired outcome you can use ``recover`` +on the ``ask`` :class:`Future`. + +If you don't care about the reply values and only use them as back-pressure signals you +can use ``Sink.ignore`` after the ``mapAsync`` stage and then actor is effectively a sink +of the stream. + +The same pattern can be used with :ref:`Actor routers `. Then you +can use ``mapAsyncUnordered`` for better efficiency if you don't care about the +order of the emitted downstream elements (the replies). + +Sink.actorRefWithAck +^^^^^^^^^^^^^^^^^^^^ + +The sink sends the elements of the stream to the given :class:`ActorRef` that sends back back-pressure signal. +First element is always `onInitMessage`, then stream is waiting for the given acknowledgement message +from the given actor which means that it is ready to process elements. It also requires the given acknowledgement +message after each stream element to make back-pressure work. + +If the target actor terminates the stream will be cancelled. When the stream is completed successfully the +given ``onCompleteMessage`` will be sent to the destination actor. When the stream is completed with +failure a ``akka.actor.Status.Failure`` message will be sent to the destination actor. + +.. note:: + + Using ``Sink.actorRef`` or ordinary ``tell`` from a ``map`` or ``foreach`` stage means that there is + no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages + fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero + `mailbox-push-timeout-time` or use a rate limiting stage in front. It's often better to + use ``Sink.actorRefWithAck`` or ``ask`` in ``mapAsync``, though. + +Source.queue +^^^^^^^^^^^^ + +``Source.queue`` can be used for emitting elements to a stream from an actor (or from anything running outside +the stream). The elements will be buffered until the stream can process them. You can ``offer`` elements to +the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will +be buffered until request for demand is received. + +Use overflow strategy ``akka.stream.OverflowStrategy.backpressure`` to avoid dropping of elements if the +buffer is full. + +``SourceQueue.offer`` returns ``Future[QueueOfferResult]`` which completes with ``QueueOfferResult.Enqueued`` +if element was added to buffer or sent downstream. It completes with ``QueueOfferResult.Dropped`` if element +was dropped. Can also complete with ``QueueOfferResult.Failure`` - when stream failed or +``QueueOfferResult.QueueClosed`` when downstream is completed. + +When used from an actor you typically ``pipe`` the result of the ``Future`` back to the actor to +continue processing. Source.actorRef ^^^^^^^^^^^^^^^ @@ -33,8 +99,9 @@ demand is received. Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space available in the buffer. The strategy ``OverflowStrategy.backpressure`` is not supported -for this Source type, you should consider using ``ActorPublisher`` if you want a backpressured -actor interface. +for this Source type, i.e. elements will be dropped if the buffer is filled by sending +at a rate that is faster than the stream can consume. You should consider using ``Source.queue`` +if you want a backpressured actor interface. The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or ``akka.actor.Status.Success`` to the actor reference. @@ -45,96 +112,6 @@ actor reference. The actor will be stopped when the stream is completed, failed or cancelled from downstream, i.e. you can watch it to get notified when that happens. -Sink.actorRef -^^^^^^^^^^^^^ - -The sink sends the elements of the stream to the given :class:`ActorRef`. If the target actor terminates -the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage`` -will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure`` -message will be sent to the destination actor. - -.. warning:: - - There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming - the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors - it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate - limiting stage in front of this stage. - -ActorPublisher -^^^^^^^^^^^^^^ - -Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a -stream publisher that keeps track of the subscription life cycle and requested elements. - -Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: - -.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#job-manager - -You send elements to the stream by calling ``onNext``. You are allowed to send as many -elements as have been requested by the stream subscriber. This amount can be inquired with -``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``, -otherwise ``onNext`` will throw ``IllegalStateException``. - -When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message -is delivered to this actor, and you can act on that event. The ``totalDemand`` -is updated automatically. - -When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message -is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored. - -You can complete the stream by calling ``onComplete``. After that you are not allowed to -call ``onNext``, ``onError`` and ``onComplete``. - -You can terminate the stream with failure by calling ``onError``. After that you are not allowed to -call ``onNext``, ``onError`` and ``onComplete``. - -If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout`` -method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when -the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform -cleanup and stop itself. - -If the actor is stopped the stream will be completed, unless it was not already terminated with -failure, completed or canceled. - -More detailed information can be found in the API documentation. - -This is how it can be used as input :class:`Source` to a :class:`Flow`: - -.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage - -A publisher that is created with ``Sink.asPublisher`` supports a specified number of subscribers. Additional -subscription attempts will be rejected with an :class:`IllegalStateException`. - -ActorSubscriber -^^^^^^^^^^^^^^^ - -Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a -stream subscriber with full control of stream back pressure. It will receive -``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` -messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. - -Here is an example of such an actor. It dispatches incoming jobs to child worker actors: - -.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool - -Subclass must define the ``RequestStrategy`` to control stream back pressure. -After each incoming message the ``ActorSubscriber`` will automatically invoke -the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream. - -* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. -* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or - delegated to other actors. -* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with - ``ZeroRequestStrategy`` or some other strategy. In that case - you must also call ``request`` when the actor is started or when it is ready, otherwise - it will not receive any elements. - -More detailed information can be found in the API documentation. - -This is how it can be used as output :class:`Sink` to a :class:`Flow`: - -.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage - Integrating with External Services ================================== @@ -433,3 +410,101 @@ passing a factory function that will create the :class:`Processor` instances: .. includecode:: ../code/docs/stream/ReactiveStreamsDocSpec.scala#use-processor Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`. + +Implementing Reactive Streams Publisher or Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher`` and +any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore we recommend that you +implement Reactive Streams integrations with built-in stages or :ref:`custom stages `. + +For historical reasons the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are +provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with +an :class:`Actor`. + +These can be consumed by other Reactive Stream libraries or used as an Akka Streams :class:`Source` or :class:`Sink`. + +.. warning:: + + :class:`ActorPublisher` and :class:`ActorSubscriber` will probably be deprecated in future versions of Akka. + +.. warning:: + + :class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors, + because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the + the stream may deadlock. + +ActorPublisher +-------------- + +Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a +stream publisher that keeps track of the subscription life cycle and requested elements. + +Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: + +.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#job-manager + +You send elements to the stream by calling ``onNext``. You are allowed to send as many +elements as have been requested by the stream subscriber. This amount can be inquired with +``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``, +otherwise ``onNext`` will throw ``IllegalStateException``. + +When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message +is delivered to this actor, and you can act on that event. The ``totalDemand`` +is updated automatically. + +When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message +is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored. + +You can complete the stream by calling ``onComplete``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +You can terminate the stream with failure by calling ``onError``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout`` +method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when +the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform +cleanup and stop itself. + +If the actor is stopped the stream will be completed, unless it was not already terminated with +failure, completed or canceled. + +More detailed information can be found in the API documentation. + +This is how it can be used as input :class:`Source` to a :class:`Flow`: + +.. includecode:: ../code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage + +A publisher that is created with ``Sink.asPublisher`` supports a specified number of subscribers. Additional +subscription attempts will be rejected with an :class:`IllegalStateException`. + +ActorSubscriber +--------------- + +Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a +stream subscriber with full control of stream back pressure. It will receive +``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` +messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. + +Here is an example of such an actor. It dispatches incoming jobs to child worker actors: + +.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool + +Subclass must define the ``RequestStrategy`` to control stream back pressure. +After each incoming message the ``ActorSubscriber`` will automatically invoke +the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream. + +* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. +* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or + delegated to other actors. +* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with + ``ZeroRequestStrategy`` or some other strategy. In that case + you must also call ``request`` when the actor is started or when it is ready, otherwise + it will not receive any elements. + +More detailed information can be found in the API documentation. + +This is how it can be used as output :class:`Sink` to a :class:`Flow`: + +.. includecode:: ../code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage diff --git a/akka-stream/src/main/scala/akka/stream/QueueOfferResult.scala b/akka-stream/src/main/scala/akka/stream/QueueOfferResult.scala index ccce45c917..7d189f2e18 100644 --- a/akka-stream/src/main/scala/akka/stream/QueueOfferResult.scala +++ b/akka-stream/src/main/scala/akka/stream/QueueOfferResult.scala @@ -15,11 +15,21 @@ object QueueOfferResult { */ final case object Enqueued extends QueueOfferResult + /** + * Java API: The `Enqueued` singleton instance + */ + def enqueued: QueueOfferResult = Enqueued + /** * Type is used to indicate that stream is dropped an element */ final case object Dropped extends QueueOfferResult + /** + * Java API: The `Enqueued` singleton instance + */ + def dropped: QueueOfferResult = Dropped + /** * Type is used to indicate that stream is failed before or during call to the stream * @param cause - exception that stream failed with diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index e1687f96d6..acd9f675b8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -316,10 +316,10 @@ object Source { * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage>` which completes with `Success(true)` - * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete - * with [[akka.stream.javadsl.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] - * when downstream is completed. + * [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage` which completes with + * `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with + * `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - + * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. * * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage` * call when buffer is full. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index dba0c809e0..bbf9065419 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -398,7 +398,7 @@ object Source { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. * - * See also [[akka.stream.javadsl.Source.queue]]. + * See also [[akka.stream.scaladsl.Source.queue]]. * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer @@ -456,10 +456,10 @@ object Source { * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.scaladsl.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)` - * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete - * with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] - * when downstream is completed. + * [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with + * `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with + * `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - + * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. * * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` * call when buffer is full.