BoundedSourceQueue API #29574

A new queue implementation that drops new elements immediately when the buffer is full. Does not use async callbacks like Source.queue with OverflowStrategy.dropNew, which can still result in OOM errors (#25798).
This commit is contained in:
Johan Andrén 2020-11-06 09:00:15 +01:00 committed by GitHub
commit 2b8d0b2285
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 587 additions and 16 deletions

View file

@ -1,15 +1,44 @@
# Source.queue
Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source.
Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source.
@ref[Source operators](../index.md#source-operators)
## Signature
## Signature (`BoundedSourceQueue`)
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):akka.stream.scaladsl.Source[T,akka.stream.scaladsl.BoundedSourceQueue[T]]" java="#queue(int)" }
## Description (`BoundedSourceQueue`)
The `BoundedSourceQueue` is an optimized variant of the `SourceQueue` with `OverflowStrategy.dropNew`.
The `BoundedSourceQueue` will give immediate, synchronous feedback whether an element was accepted or not and is therefore recommended for situations where overload and dropping elements is expected and needs to be handled quickly.
In contrast, the `SourceQueue` offers more variety of `OverflowStrategies` but feedback is only asynchronously provided through a @scala[`Future`]@java[`CompletionStage`] value.
In cases where elements need to be discarded quickly at times of overload to avoid out-of-memory situations, delivering feedback asynchronously can itself become a problem.
This happens if elements come in faster than the feedback can be delivered in which case the feedback mechanism itself is part of the reason that an out-of-memory situation arises.
In summary, prefer `BoundedSourceQueue` over `SourceQueue` with `OverflowStrategy.dropNew` especially in high-load scenarios.
Use `SourceQueue` if you need one of the other `OverflowStrategies`.
The `BoundedSourceQueue` contains a buffer that can be used by many producers on different threads.
When the buffer is full, the `BoundedSourceQueue` will not accept more elements.
The return value of `BoundedSourceQueue.offer()` immediately returns a `QueueOfferResult` (as opposed to an asynchronous value returned by `SourceQueue`).
A synchronous result is important in order to avoid situations where offer acknowledgements are handled slower than the rate of which elements are offered, which will eventually lead to an Out Of Memory error.
## Example (`BoundedSourceQueue`)
Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue-synchronous }
Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue-synchronous }
## Signature (`SourceQueue`)
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy):akka.stream.scaladsl.Source[T,akka.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,akka.stream.OverflowStrategy)" }
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy,maxConcurrentOffers:Int):akka.stream.scaladsl.Source[T,akka.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,akka.stream.OverflowStrategy,int)" }
## Description
## Description (`SourceQueue`)
Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains
a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with
@ -22,7 +51,7 @@ will be discarded if downstream is terminated.
In combination with the queue, the @ref[`throttle`](./../Source-or-Flow/throttle.md) operator can be used to control the processing to a given limit, e.g. `5 elements` per `3 seconds`.
## Example
## Example (`SourceQueue`)
Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue }

View file

@ -35,7 +35,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="lazysource"></a>@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.|
|Source|<a name="maybe"></a>@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.|
|Source|<a name="never"></a>@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.|
|Source|<a name="queue"></a>@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. |
|Source|<a name="queue"></a>@ref[queue](Source/queue.md)|Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source.|
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.|
|Source|<a name="single"></a>@ref[single](Source/single.md)|Stream a single object once.|

View file

@ -765,6 +765,44 @@ public class IntegrationDocTest extends AbstractJavaTest {
};
}
@Test
public void illustrateSynchronousSourceQueue() throws Exception {
new TestKit(system) {
{
// #source-queue-synchronous
int bufferSize = 10;
int elementsToProcess = 5;
BoundedSourceQueue<Integer> sourceQueue =
Source.<Integer>queue(bufferSize)
.throttle(elementsToProcess, Duration.ofSeconds(3))
.map(x -> x * x)
.to(Sink.foreach(x -> System.out.println("got: " + x)))
.run(system);
List<Integer> fastElements = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
fastElements.stream()
.forEach(
x -> {
QueueOfferResult result = sourceQueue.offer(x);
if (result == QueueOfferResult.enqueued()) {
System.out.println("enqueued " + x);
} else if (result == QueueOfferResult.dropped()) {
System.out.println("dropped " + x);
} else if (result instanceof QueueOfferResult.Failure) {
QueueOfferResult.Failure failure = (QueueOfferResult.Failure) result;
System.out.println("Offer failed " + failure.cause().getMessage());
} else if (result instanceof QueueOfferResult.QueueClosed$) {
System.out.println("Bounded Source Queue closed");
}
});
// #source-queue-synchronous
}
};
}
@Test
public void illustrateSourceActorRef() throws Exception {
new TestKit(system) {

View file

@ -491,6 +491,36 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#source-queue
}
"illustrate use of synchronous source queue" in {
//#source-queue-synchronous
val bufferSize = 1000
//#source-queue-synchronous
// format: OFF
//#source-queue-synchronous
val queue = Source
.queue[Int](bufferSize)
.map(x => x * x)
.toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left)
.run()
//#source-queue-synchronous
// format: OFF
//#source-queue-synchronous
val fastElements = 1 to 10
implicit val ec = system.dispatcher
fastElements.foreach { x =>
queue.offer(x) match {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
case QueueOfferResult.Dropped => println(s"dropped $x")
case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed => println("Source Queue closed")
}
}
//#source-queue-synchronous
}
"illustrate use of source actor ref" in {
//#source-actorRef
val bufferSize = 10

View file

@ -0,0 +1,189 @@
/*
* Copyright (C) 2020-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ CountDownLatch, ThreadLocalRandom }
import akka.stream.{ QueueCompletionResult, QueueOfferResult }
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import scala.concurrent.duration._
class BoundedSourceQueueSpec extends StreamSpec {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds)
val ex = new RuntimeException("oops")
"BoundedSourceQueue" should {
"not drop elements if buffer is not full" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](100).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
val elements = 1 to 100
elements.foreach { i =>
queue.offer(i) should be(QueueOfferResult.Enqueued)
}
queue.complete()
val subIt = Iterator.continually(sub.requestNext())
subIt.zip(elements.iterator).foreach {
case (subEle, origEle) => subEle should be(origEle)
}
sub.expectComplete()
}
"drop elements if buffer is full" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
val elements = 1 to 100
val histo =
elements
.map { i =>
queue.offer(i)
}
.groupBy(identity)
.map { case (k, v) => (k, v.size) }
// it should be 100 elements - 10 buffer slots = 90, but there might be other implicit buffers involved
histo(QueueOfferResult.Dropped) should be > 80
}
"buffer size cannot be less than 1" in {
assertThrows[IllegalArgumentException](Source.queue[Int](0))
}
"raise exception if the queue is completed twice" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
assertThrows[IllegalStateException](queue.complete())
}
"raise exception if the queue is failed twice" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
assertThrows[IllegalStateException](queue.fail(ex))
}
"return a QueueClosed result if completed" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
queue.offer(1) should be(QueueOfferResult.QueueClosed)
sub.expectSubscriptionAndComplete()
}
"return a Failure result if queue failed" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
queue.offer(1) should be(QueueOfferResult.Failure(ex))
sub.request(1)
sub.expectError(ex)
}
"return a Failure result if stream failed" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).map(_ => throw ex).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.offer(1) should be(QueueOfferResult.Enqueued)
sub.expectSubscriptionAndError() should be(ex)
// internal state will be eventually updated when stream cancellation reaches BoundedSourceQueueStage
awaitAssert(queue.offer(1) should be(QueueOfferResult.Failure(ex)))
}
"without cancellation only flag elements as enqueued that will also passed to downstream" in {
val counter = new AtomicLong()
val (queue, result) =
Source.queue[Int](100000).toMat(Sink.fold(0L)(_ + _))(Keep.both).run()
val numThreads = 32
val stopProb = 1000000
val expected = 1d / (1d - math.pow(1d - 1d / stopProb, numThreads))
println(s"Expected elements per thread: $expected") // variance might be quite high depending on number of threads
val barrier = new CountDownLatch(numThreads)
class QueueingThread extends Thread {
override def run(): Unit = {
var numElemsEnqueued = 0
var numElemsDropped = 0
def runLoop(): Unit = {
val r = ThreadLocalRandom.current()
while (true) {
val i = r.nextInt(0, Int.MaxValue)
queue.offer(i) match {
case QueueOfferResult.Enqueued =>
counter.addAndGet(i)
numElemsEnqueued += 1
case QueueOfferResult.Dropped =>
numElemsDropped += 1
case _: QueueCompletionResult => return // other thread completed
}
if ((i % stopProb) == 0) { // probabilistic exit condition
queue.complete()
return
}
if (i % 100 == 0) Thread.sleep(1) // probabilistic producer throttling delay
}
}
barrier.countDown()
barrier.await() // wait for all threads being in this state before starting race
runLoop()
println(f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: $numElemsDropped%7d before completion")
}
}
(1 to numThreads).foreach { i =>
val t = new QueueingThread
t.setName(s"QueuingThread-$i")
t.start()
}
result.futureValue should be(counter.get())
}
// copied from akka-remote SendQueueSpec
"deliver bursts of messages" in {
// this test verifies that the wakeup signal is triggered correctly
val burstSize = 100
val (sendQueue, downstream) =
Source.fromGraph(Source.queue[Int](128)).grouped(burstSize).async.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
for (round <- 1 to 100000) {
for (n <- 1 to burstSize) {
if (sendQueue.offer(round * 1000 + n) != QueueOfferResult.Enqueued)
fail(s"offer failed at round $round message $n")
}
downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList)
downstream.request(1)
}
downstream.cancel()
}
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream
import akka.annotation.DoNotInherit
/**
* A queue of the given size that gives immediate feedback whether an element could be enqueued or not.
*
* Not for user extension
*/
@DoNotInherit
trait BoundedSourceQueue[T] {
/**
* Returns a [[akka.stream.QueueOfferResult]] that notifies the caller if the element could be enqueued or not, or
* the completion status of the queue.
*
* A result of `QueueOfferResult.Enqueued` does not guarantee that an element also has been or will be processed by
* the downstream.
*/
def offer(elem: T): QueueOfferResult
/**
* Completes the stream normally.
*/
def complete(): Unit
/**
* Completes the stream with a failure.
*/
def fail(ex: Throwable): Unit
}

View file

@ -114,6 +114,8 @@ object OverflowStrategy {
/**
* If the buffer is full when a new element arrives, drops the new element.
*
* Deprecated since 2.6.11. Use Source.queue instead.
*/
def dropNew: OverflowStrategy = DropNew(Logging.DebugLevel)

View file

@ -4,10 +4,22 @@
package akka.stream
import akka.annotation.DoNotInherit
/**
* Not for user extension
*/
@DoNotInherit
sealed abstract class QueueOfferResult
/**
* Contains types that is used as return types for async callbacks to streams
* Not for user extension
*/
@DoNotInherit
sealed abstract class QueueCompletionResult extends QueueOfferResult
/**
* Contains types that is used as return types for streams Source queues
*/
object QueueOfferResult {
@ -35,10 +47,10 @@ object QueueOfferResult {
* Type is used to indicate that stream is failed before or during call to the stream
* @param cause - exception that stream failed with
*/
final case class Failure(cause: Throwable) extends QueueOfferResult
final case class Failure(cause: Throwable) extends QueueCompletionResult
/**
* Type is used to indicate that stream is completed before call
*/
case object QueueClosed extends QueueOfferResult
case object QueueClosed extends QueueCompletionResult
}

View file

@ -0,0 +1,169 @@
/*
* Copyright (C) 2020-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import akka.annotation.InternalApi
import akka.dispatch.AbstractBoundedNodeQueue
import akka.stream._
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler, StageLogging }
/**
* INTERNAL API
*/
@InternalApi private[akka] object BoundedSourceQueueStage {
sealed trait State
case object NeedsActivation extends State
case object Running extends State
case class Done(result: QueueCompletionResult) extends State
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class BoundedSourceQueueStage[T](bufferSize: Int)
extends GraphStageWithMaterializedValue[SourceShape[T], BoundedSourceQueue[T]] {
import BoundedSourceQueueStage._
require(bufferSize > 0, "BoundedSourceQueueStage.bufferSize must be > 0")
val out = Outlet[T]("BoundedSourceQueueStage.out")
val shape = SourceShape(out)
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes): (GraphStageLogic, BoundedSourceQueue[T]) = {
val state = new AtomicReference[State](Running)
val queue = new AbstractBoundedNodeQueue[T](bufferSize) {}
object Logic extends GraphStageLogic(shape) with OutHandler with StageLogging {
setHandler(out, this)
val callback = getAsyncCallback[Unit] { _ =>
clearNeedsActivation()
run()
}
override def onPull(): Unit = run()
override def onDownstreamFinish(cause: Throwable): Unit = {
setDone(Done(QueueOfferResult.Failure(cause)))
super.onDownstreamFinish(cause)
}
override def postStop(): Unit = {
// drain queue
while (!queue.isEmpty) queue.poll()
val exception = new StreamDetachedException()
setDone(Done(QueueOfferResult.Failure(exception)))
}
/**
* Main loop of the queue. We do two volatile reads for the fast path of pushing elements
* from the queue to the stream: one for the state and one to poll the queue. This leads to a somewhat simple design
* that will quickly pick up failures from the queue interface.
*
* An even more optimized version could use a fast path in onPull to avoid reading the state for every element.
*/
@tailrec
def run(): Unit =
state.get() match {
case Running =>
if (isAvailable(out)) {
val next = queue.poll()
if (next == null) { // queue empty
if (!setNeedsActivation())
run() // didn't manage to set because stream has been completed in the meantime
else if (!queue.isEmpty) /* && setNeedsActivation was true */ {
// tricky case: new element might have been added in the meantime without callback being sent because
// NeedsActivation had not yet been set
clearNeedsActivation()
run()
} // else Queue.isEmpty && setNeedsActivation was true: waiting for next offer
} else
push(out, next) // and then: wait for pull
} // else: wait for pull
case Done(QueueOfferResult.QueueClosed) =>
if (queue.isEmpty)
completeStage()
else if (isAvailable(out)) {
push(out, queue.poll())
run() // another round, might be empty now
}
// else !Queue.isEmpty: wait for pull to drain remaining elements
case Done(QueueOfferResult.Failure(ex)) => failStage(ex)
case NeedsActivation => throw new IllegalStateException // needs to be cleared before
}
}
object Mat extends BoundedSourceQueue[T] {
override def offer(elem: T): QueueOfferResult = state.get() match {
case Running | NeedsActivation =>
if (queue.add(elem)) {
// need to query state again because stage might have switched from Running -> NeedsActivation only after
// the last state.get but before queue.add.
if (state.get() == NeedsActivation)
// if this thread wins the race to toggle the flag, schedule async callback here
if (clearNeedsActivation())
Logic.callback.invoke(())
QueueOfferResult.Enqueued
} else
QueueOfferResult.Dropped
case Done(result) => result
}
override def complete(): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.QueueClosed)))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}
override def fail(ex: Throwable): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.Failure(ex))))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}
}
// some state transition helpers
@tailrec
def setDone(done: Done): Boolean =
state.get() match {
case _: Done => false
case x =>
if (!state.compareAndSet(x, done)) setDone(done)
else true
}
@tailrec
def clearNeedsActivation(): Boolean =
state.get() match {
case NeedsActivation =>
if (!state.compareAndSet(NeedsActivation, Running)) clearNeedsActivation()
else true
case _ => false
}
@tailrec
def setNeedsActivation(): Boolean =
state.get() match {
case Running =>
if (!state.compareAndSet(Running, NeedsActivation)) setNeedsActivation()
else true
case _ => false
}
(Logic, Mat)
}
}

View file

@ -19,7 +19,6 @@ import scala.reflect.ClassTag
import com.github.ghik.silencer.silent
import org.reactivestreams.{ Publisher, Subscriber }
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import akka.dispatch.ExecutionContexts
@ -691,6 +690,33 @@ object Source {
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.BoundedSourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. The buffer size is passed in as a parameter.
* Elements in the buffer will be discarded if downstream is terminated.
*
* Pushed elements may be dropped if there is no space available in the buffer. Elements will also be dropped if the
* queue is failed through the materialized `BoundedQueueSource` or the `Source` is cancelled by the downstream.
* An element that was reported to be `enqueued` is not guaranteed to be processed by the rest of the stream. If the
* queue is failed by calling `BoundedQueueSource.fail` or the downstream cancels the stream, elements in the buffer
* are discarded.
*
* Acknowledgement of pushed elements is immediate.
* [[akka.stream.BoundedSourceQueue.offer]] returns [[akka.stream.QueueOfferResult]] which is implemented as:
*
* `QueueOfferResult.enqueued()` element was added to buffer, but may still be discarded later when the queue is
* failed or cancelled
* `QueueOfferResult.dropped()` element was dropped
* `QueueOfferResult.QueueClosed` the queue was completed with [[akka.stream.BoundedSourceQueue.complete]]
* `QueueOfferResult.Failure` the queue was failed with [[akka.stream.BoundedSourceQueue.fail]] or if the
* stream failed
*
* @param bufferSize size of the buffer in number of elements
*/
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
scaladsl.Source.queue(bufferSize).asJava
/**
* Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
@ -702,13 +728,16 @@ object Source {
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* `QueueOfferResult.enqueued()` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped()` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage`
* call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]].
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
@ -736,13 +765,16 @@ object Source {
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* `QueueOfferResult.enqueued()` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped()` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of
* `offer():CompletionStage` call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]].
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
@ -754,7 +786,8 @@ object Source {
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0, not
* applicable when `OverflowStrategy.dropNew` is used
*/
def queue[T](
bufferSize: Int,

View file

@ -814,6 +814,33 @@ object Source {
source.addAttributes(DefaultAttributes.zipWithN)
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.BoundedSourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. The buffer size is passed in as a parameter.
* Elements in the buffer will be discarded if downstream is terminated.
*
* Pushed elements may be dropped if there is no space available in the buffer. Elements will also be dropped if the
* queue is failed through the materialized `BoundedQueueSource` or the `Source` is cancelled by the downstream.
* An element that was reported to be `enqueued` is not guaranteed to be processed by the rest of the stream. If the
* queue is failed by calling `BoundedQueueSource.fail` or the downstream cancels the stream, elements in the buffer
* are discarded.
*
* Acknowledgement of pushed elements is immediate.
* [[akka.stream.BoundedSourceQueue.offer]] returns [[akka.stream.QueueOfferResult]] which is implemented as:
*
* `QueueOfferResult.Enqueued` element was added to buffer, but may still be discarded later when the queue is
* failed or cancelled
* `QueueOfferResult.Dropped` element was dropped
* `QueueOfferResult.QueueComplete` the queue was completed with [[akka.stream.BoundedSourceQueue.complete]]
* `QueueOfferResult.Failure` the queue was failed with [[akka.stream.BoundedSourceQueue.fail]] or if
* the stream failed
*
* @param bufferSize size of the buffer in number of elements
*/
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize))
/**
* Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
@ -832,6 +859,9 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]].
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
@ -865,6 +895,9 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of
* `offer():Future` call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]].
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
@ -876,7 +909,8 @@ object Source {
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0, not
* applicable when `OverflowStrategy.dropNew` is used
*/
def queue[T](
bufferSize: Int,