more docs for Sink.queue and actorRefWithAck (#25063)

This commit is contained in:
Konrad `ktoso` Malawski 2018-05-22 18:27:54 +09:00 committed by Arnout Engelen
parent 8f40dc7f03
commit de04758bb8
7 changed files with 118 additions and 9 deletions

View file

@ -1,4 +1,4 @@
# actorRefWithAck # Sink.actorRefWithAck
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
@ -9,6 +9,25 @@ Send the elements from the stream to an `ActorRef` which must then acknowledge r
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message,
to provide back pressure onto the sink. to provide back pressure onto the sink.
## Example
Actor to be interacted with:
Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor }
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor }
Using the `actorRefWithAck` operator with the above actor:
Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck }
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck }
## Reactive Streams semantics
@@@div { .callout } @@@div { .callout }

View file

@ -18,6 +18,9 @@ Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.
a buffer in case stream emitting elements faster than queue pulling them. a buffer in case stream emitting elements faster than queue pulling them.
## Reactive Streams semantics
@@@div { .callout } @@@div { .callout }
**cancels** when `SinkQueue.cancel` is called **cancels** when `SinkQueue.cancel` is called

View file

@ -1,4 +1,4 @@
# ask # Flow.ask
Use the `ask` pattern to send a request-reply message to the target `ref` actor. Use the `ask` pattern to send a request-reply message to the target `ref` actor.

View file

@ -1,4 +1,4 @@
# queue # Source.queue
Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source.
@ -19,6 +19,21 @@ a buffer, if elements are pushed onto the queue faster than the source is consum
a strategy specified by the user. Functionality for tracking when an element has been emitted is available through a strategy specified by the user. Functionality for tracking when an element has been emitted is available through
`SourceQueue.offer`. `SourceQueue.offer`.
Using `Source.queue` you can push elements to the queue and they will be emitted to the stream if there is
demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer
will be discarded if downstream is terminated.
In combination with the queue, the @ref[`throttle`](./../Source-or-Flow/throttle.md) operator can be used to control the processing to a given limit, e.g. `5 elements` per `3 seconds`.
## Example
Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue }
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #source-queue }
## Reactive Streams Semantics
@@@div { .callout } @@@div { .callout }

View file

@ -20,6 +20,10 @@ materialized by `Source.actorRef`.
### ask ### ask
@@@ note
See also: @ref[Flow.ask operator reference docs](operators/Source-or-Flow/ask.md), @ref[ActorFlow.ask operator reference docs](operators/ActorFlow/ask.md) for Akka Typed
@@@
A nice way to delegate some processing of elements in a stream to an actor is to use `ask`. A nice way to delegate some processing of elements in a stream to an actor is to use `ask`.
The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of
the `ask` and the mailbox of the actor will not be filled with more messages than the given the `ask` and the mailbox of the actor will not be filled with more messages than the given
@ -67,6 +71,10 @@ since multiple actors are being asked concurrently to begin with, and no single
### Sink.actorRefWithAck ### Sink.actorRefWithAck
@@@ note
See also: [Sink.actorRefWithAck operator reference docs](operators/Sink/actorRefWithAck.md)
@@@
The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. The sink sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
First element is always *onInitMessage*, then stream is waiting for the given acknowledgement message First element is always *onInitMessage*, then stream is waiting for the given acknowledgement message
from the given actor which means that it is ready to process elements. It also requires the given acknowledgement from the given actor which means that it is ready to process elements. It also requires the given acknowledgement
@ -109,20 +117,35 @@ use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though.
### Source.queue ### Source.queue
`Source.queue` is an improvement over `Sink.actorRef`, since it can provide backpressure.
The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation.
`Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside `Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside
the stream). The elements will be buffered until the stream can process them. You can `offer` elements to the stream). The elements will be buffered until the stream can process them. You can `offer` elements to
the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will
be buffered until request for demand is received. be buffered until request for demand is received.
Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the
buffer is full, instead the returned @scala[`Future`]@java[`CompletionStage`] does not complete until there is space in the buffer is full, instead the returned @scala[`Future`]@java[`CompletionStage`] does not complete until there is space in the
buffer and `offer` should not be called again until it completes. buffer and `offer` should not be called again until it completes.
Using `Source.queue` you can push elements to the queue and they will be emitted to the stream if there is
demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer
will be discarded if downstream is terminated.
You could combine it with the @ref[`throttle`](operators/Source-or-Flow/throttle.md) operator is used to slow down the stream to `5 element` per `3 seconds` and other patterns.
`SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage<QueueOfferResult>`] which completes with `QueueOfferResult.Enqueued` `SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage<QueueOfferResult>`] which completes with `QueueOfferResult.Enqueued`
if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element
was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or
`QueueOfferResult.QueueClosed` when downstream is completed. `QueueOfferResult.QueueClosed` when downstream is completed.
Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue }
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #source-queue }
When used from an actor you typically `pipe` the result of the @scala[`Future`]@java[`CompletionStage`] back to the actor to When used from an actor you typically `pipe` the result of the @scala[`Future`]@java[`CompletionStage`] back to the actor to
continue processing. continue processing.

View file

@ -20,7 +20,9 @@ import jdocs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -712,5 +714,29 @@ public class IntegrationDocTest extends AbstractJavaTest {
}; };
} }
@Test
public void illustrateSourceQueue() throws Exception {
new TestKit(system) {
{
//#source-queue
int bufferSize = 5;
int elementsToProcess = 3;
SourceQueueWithComplete<Integer> sourceQueue =
Source.<Integer>queue(bufferSize, OverflowStrategy.backpressure())
.throttle(elementsToProcess, Duration.ofSeconds(3))
.map(x -> x * x)
.to(Sink.foreach(x -> System.out.println("got: " + x)))
.run(mat);
Source<Integer, NotUsed> source
= Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
source.map(x -> sourceQueue.offer(x)).runWith(Sink.ignore(), mat);
//#source-queue
}
};
}
} }

View file

@ -9,24 +9,21 @@ import akka.NotUsed
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.ActorMaterializer import akka.stream._
import scala.concurrent.Future import scala.concurrent.Future
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Status } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Status }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.util.Timeout import akka.util.Timeout
import akka.stream.Attributes
import akka.stream.ActorAttributes
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import akka.stream.ActorMaterializerSettings
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.stream.Supervision
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
import akka.Done import akka.Done
import akka.actor.Status.Status import akka.actor.Status.Status
import akka.stream.QueueOfferResult.{ Dropped, Enqueued }
object IntegrationDocSpec { object IntegrationDocSpec {
import TwitterStreamQuickstartDocSpec._ import TwitterStreamQuickstartDocSpec._
@ -476,4 +473,30 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
"after: J")) "after: J"))
} }
"illustrate use of source queue" in {
//#source-queue
val bufferSize = 5
val elementsToProcess = 3
val queue = Source
.queue[Int](bufferSize, OverflowStrategy.backpressure)
.throttle(elementsToProcess, 3.second)
.map(x x * x)
.toMat(Sink.foreach(x println(s"completed $x")))(Keep.left)
.run()
val source = Source(1 to 10)
implicit val ec = system.dispatcher
source.mapAsync(1)(x {
queue.offer(x).map {
case QueueOfferResult.Enqueued println(s"enqueued $x")
case QueueOfferResult.Dropped println(s"dropped $x")
case QueueOfferResult.Failure(ex) println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed println("Source Queue closed")
}
}).runWith(Sink.ignore)
//#source-queue
}
} }