diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md index 6aef965b1e..b7bd884903 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/actorRefWithAck.md @@ -1,4 +1,4 @@ -# actorRefWithAck +# Sink.actorRefWithAck Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. @@ -9,6 +9,25 @@ Send the elements from the stream to an `ActorRef` which must then acknowledge r Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. +## Example + +Actor to be interacted with: + +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor } + +Using the `actorRefWithAck` operator with the above actor: + +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck } + +## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/queue.md b/akka-docs/src/main/paradox/stream/operators/Sink/queue.md index f474e275b1..b537040ddf 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/queue.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/queue.md @@ -18,6 +18,9 @@ Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. a buffer in case stream emitting elements faster than queue pulling them. +## Reactive Streams semantics + + @@@div { .callout } **cancels** when `SinkQueue.cancel` is called diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md index d3f2e1d245..ff55ca7807 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md @@ -1,4 +1,4 @@ -# ask +# Flow.ask Use the `ask` pattern to send a request-reply message to the target `ref` actor. diff --git a/akka-docs/src/main/paradox/stream/operators/Source/queue.md b/akka-docs/src/main/paradox/stream/operators/Source/queue.md index fd2bea6e9d..5378e22194 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/queue.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/queue.md @@ -1,4 +1,4 @@ -# queue +# Source.queue Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. @@ -19,6 +19,21 @@ a buffer, if elements are pushed onto the queue faster than the source is consum a strategy specified by the user. Functionality for tracking when an element has been emitted is available through `SourceQueue.offer`. +Using `Source.queue` you can push elements to the queue and they will be emitted to the stream if there is +demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer +will be discarded if downstream is terminated. + +In combination with the queue, the @ref[`throttle`](./../Source-or-Flow/throttle.md) operator can be used to control the processing to a given limit, e.g. `5 elements` per `3 seconds`. + +## Example + +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #source-queue } + +## Reactive Streams Semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index c6ffeae6c4..108807bdda 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -20,6 +20,10 @@ materialized by `Source.actorRef`. ### ask +@@@ note + See also: @ref[Flow.ask operator reference docs](operators/Source-or-Flow/ask.md), @ref[ActorFlow.ask operator reference docs](operators/ActorFlow/ask.md) for Akka Typed +@@@ + A nice way to delegate some processing of elements in a stream to an actor is to use `ask`. The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of the `ask` and the mailbox of the actor will not be filled with more messages than the given @@ -67,6 +71,10 @@ since multiple actors are being asked concurrently to begin with, and no single ### Sink.actorRefWithAck +@@@ note + See also: [Sink.actorRefWithAck operator reference docs](operators/Sink/actorRefWithAck.md) +@@@ + The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. First element is always *onInitMessage*, then stream is waiting for the given acknowledgement message from the given actor which means that it is ready to process elements. It also requires the given acknowledgement @@ -109,20 +117,35 @@ use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though. ### Source.queue +`Source.queue` is an improvement over `Sink.actorRef`, since it can provide backpressure. +The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation. + `Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside the stream). The elements will be buffered until the stream can process them. You can `offer` elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will -be buffered until request for demand is received. +be buffered until request for demand is received. Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the buffer is full, instead the returned @scala[`Future`]@java[`CompletionStage`] does not complete until there is space in the buffer and `offer` should not be called again until it completes. +Using `Source.queue` you can push elements to the queue and they will be emitted to the stream if there is +demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer +will be discarded if downstream is terminated. + +You could combine it with the @ref[`throttle`](operators/Source-or-Flow/throttle.md) operator is used to slow down the stream to `5 element` per `3 seconds` and other patterns. + `SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`] which completes with `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #source-queue } + When used from an actor you typically `pipe` the result of the @scala[`Future`]@java[`CompletionStage`] back to the actor to continue processing. diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index bd839a9157..912b00b288 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -20,7 +20,9 @@ import jdocs.stream.TwitterStreamQuickstartDocTest.Model.Tweet; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -712,5 +714,29 @@ public class IntegrationDocTest extends AbstractJavaTest { }; } + @Test + public void illustrateSourceQueue() throws Exception { + new TestKit(system) { + { + //#source-queue + int bufferSize = 5; + int elementsToProcess = 3; + + SourceQueueWithComplete sourceQueue = + Source.queue(bufferSize, OverflowStrategy.backpressure()) + .throttle(elementsToProcess, Duration.ofSeconds(3)) + .map(x -> x * x) + .to(Sink.foreach(x -> System.out.println("got: " + x))) + .run(mat); + + Source source + = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + + source.map(x -> sourceQueue.offer(x)).runWith(Sink.ignore(), mat); + + //#source-queue + } + }; + } } diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 10f88338a4..516662240c 100644 --- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -9,24 +9,21 @@ import akka.NotUsed import scala.concurrent.duration._ import akka.testkit.AkkaSpec import akka.stream.scaladsl._ -import akka.stream.ActorMaterializer +import akka.stream._ import scala.concurrent.Future import akka.testkit.TestProbe import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Status } import com.typesafe.config.ConfigFactory import akka.util.Timeout -import akka.stream.Attributes -import akka.stream.ActorAttributes import scala.concurrent.ExecutionContext -import akka.stream.ActorMaterializerSettings import java.util.concurrent.atomic.AtomicInteger -import akka.stream.Supervision import akka.stream.scaladsl.Flow import akka.Done import akka.actor.Status.Status +import akka.stream.QueueOfferResult.{ Dropped, Enqueued } object IntegrationDocSpec { import TwitterStreamQuickstartDocSpec._ @@ -476,4 +473,30 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { "after: J")) } + "illustrate use of source queue" in { + //#source-queue + val bufferSize = 5 + val elementsToProcess = 3 + + val queue = Source + .queue[Int](bufferSize, OverflowStrategy.backpressure) + .throttle(elementsToProcess, 3.second) + .map(x ⇒ x * x) + .toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left) + .run() + + val source = Source(1 to 10) + + implicit val ec = system.dispatcher + source.mapAsync(1)(x ⇒ { + queue.offer(x).map { + case QueueOfferResult.Enqueued ⇒ println(s"enqueued $x") + case QueueOfferResult.Dropped ⇒ println(s"dropped $x") + case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed ${ex.getMessage}") + case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed") + } + }).runWith(Sink.ignore) + //#source-queue + } + }