BoundedQueueSource API

This commit is contained in:
Sean Glover 2020-10-21 17:34:28 -04:00
parent 708f8b870c
commit 779161d556
11 changed files with 358 additions and 115 deletions

View file

@ -1,15 +1,44 @@
# Source.queue
Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source.
Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source.
@ref[Source operators](../index.md#source-operators)
## Signature
## Signature (`BoundedSourceQueue`)
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):akka.stream.scaladsl.Source[T,akka.stream.scaladsl.BoundedSourceQueue[T]]" java="#queue(int)" }
## Description (`BoundedSourceQueue`)
The `BoundedSourceQueue` is an optimized variant of the `SourceQueue` with `OverflowStrategy.dropNew`.
The `BoundedSourceQueue` will give immediate, synchronous feedback whether an element was accepted or not and is therefore recommended for situations where overload and dropping elements is expected and needs to be handled quickly.
In contrast, the `SourceQueue` offers more variety of `OverflowStrategies` but feedback is only asynchronously provided through a @scala[`Future`]@java[`CompletionStage`] value.
In cases where elements need to be discarded quickly at times of overload to avoid out-of-memory situations, delivering feedback asynchronously can itself become a problem.
This happens if elements come in faster than the feedback can be delivered in which case the feedback mechanism itself is part of the reason that an out-of-memory situation arises.
In summary, prefer `BoundedSourceQueue` over `SourceQueue` with `OverflowStrategy.dropNew` especially in high-load scenarios.
Use `SourceQueue` if you need one of the other `OverflowStrategies`.
The `BoundedSourceQueue` contains a buffer that can be used by many producers on different threads.
When the buffer is full, the `BoundedSourceQueue` will not accept more elements.
The return value of `BoundedSourceQueue.offer()` immediately returns a `QueueOfferResult` (as opposed to an asynchronous value returned by `SourceQueue`).
A synchronous result is important in order to avoid situations where offer acknowledgements are handled slower than the rate of which elements are offered, which will eventually lead to an Out Of Memory error.
## Example (`BoundedSourceQueue`)
Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue-synchronous }
Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue-synchronous }
## Signature (`SourceQueue`)
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy):akka.stream.scaladsl.Source[T,akka.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,akka.stream.OverflowStrategy)" }
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy,maxConcurrentOffers:Int):akka.stream.scaladsl.Source[T,akka.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,akka.stream.OverflowStrategy,int)" }
## Description
## Description (`SourceQueue`)
Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains
a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with
@ -22,7 +51,7 @@ will be discarded if downstream is terminated.
In combination with the queue, the @ref[`throttle`](./../Source-or-Flow/throttle.md) operator can be used to control the processing to a given limit, e.g. `5 elements` per `3 seconds`.
## Example
## Example (`SourceQueue`)
Scala
: @@snip [IntegrationDocSpec.scala](/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue }

View file

@ -35,7 +35,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="lazysource"></a>@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.|
|Source|<a name="maybe"></a>@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.|
|Source|<a name="never"></a>@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.|
|Source|<a name="queue"></a>@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. |
|Source|<a name="queue"></a>@ref[queue](Source/queue.md)|Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source.|
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.|
|Source|<a name="single"></a>@ref[single](Source/single.md)|Stream a single object once.|

View file

@ -765,6 +765,44 @@ public class IntegrationDocTest extends AbstractJavaTest {
};
}
@Test
public void illustrateSynchronousSourceQueue() throws Exception {
new TestKit(system) {
{
// #source-queue-synchronous
int bufferSize = 10;
int elementsToProcess = 5;
BoundedSourceQueue<Integer> sourceQueue =
Source.<Integer>queue(bufferSize)
.throttle(elementsToProcess, Duration.ofSeconds(3))
.map(x -> x * x)
.to(Sink.foreach(x -> System.out.println("got: " + x)))
.run(system);
List<Integer> fastElements = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
fastElements.stream()
.forEach(
x -> {
QueueOfferResult result = sourceQueue.offer(x);
if (result == QueueOfferResult.enqueued()) {
System.out.println("enqueued " + x);
} else if (result == QueueOfferResult.dropped()) {
System.out.println("dropped " + x);
} else if (result instanceof QueueOfferResult.Failure) {
QueueOfferResult.Failure failure = (QueueOfferResult.Failure) result;
System.out.println("Offer failed " + failure.cause().getMessage());
} else if (result instanceof QueueOfferResult.QueueClosed$) {
System.out.println("Bounded Source Queue closed");
}
});
// #source-queue-synchronous
}
};
}
@Test
public void illustrateSourceActorRef() throws Exception {
new TestKit(system) {

View file

@ -491,6 +491,36 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#source-queue
}
"illustrate use of synchronous source queue" in {
//#source-queue-synchronous
val bufferSize = 1000
//#source-queue-synchronous
// format: OFF
//#source-queue-synchronous
val queue = Source
.queue[Int](bufferSize)
.map(x => x * x)
.toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left)
.run()
//#source-queue-synchronous
// format: OFF
//#source-queue-synchronous
val fastElements = 1 to 10
implicit val ec = system.dispatcher
fastElements.foreach { x =>
queue.offer(x) match {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
case QueueOfferResult.Dropped => println(s"dropped $x")
case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed => println("Source Queue closed")
}
}
//#source-queue-synchronous
}
"illustrate use of source actor ref" in {
//#source-actorRef
val bufferSize = 10

View file

@ -1,53 +1,49 @@
/*
* Copyright (C) 2020-2019 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2020-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.{ CountDownLatch, ThreadLocalRandom }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ CountDownLatch, ThreadLocalRandom }
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.impl.FastDroppingQueue
import FastDroppingQueue.OfferResult
import akka.stream.testkit.TestSubscriber
import akka.stream.{ QueueCompletionResult, QueueOfferResult }
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.MustMatchers
import org.scalatest.WordSpec
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import scala.concurrent.duration._
class FastDroppingQueueSpec extends WordSpec with BeforeAndAfterAll with MustMatchers with ScalaFutures {
implicit val system = ActorSystem("SimpleQueueSpec")
implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher
class BoundedSourceQueueSpec extends StreamSpec {
"SimpleQueue" should {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds)
val ex = new RuntimeException("oops")
"BoundedSourceQueue" should {
"not drop elements if buffer is not full" in {
val sub = TestSubscriber.probe[Int]()
val queue =
FastDroppingQueue[Int](100).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
Source.queue[Int](100).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
val elements = 1 to 100
elements.foreach { i =>
queue.offer(i) mustBe OfferResult.Enqueued
queue.offer(i) should be(QueueOfferResult.Enqueued)
}
queue.complete()
val subIt = Iterator.continually(sub.requestNext())
subIt.zip(elements.iterator).foreach {
case (subEle, origEle) => subEle mustBe origEle
case (subEle, origEle) => subEle should be(origEle)
}
sub.expectComplete()
}
"drop elements if buffer is full" in {
val sub = TestSubscriber.probe[Int]()
val queue =
FastDroppingQueue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
val elements = 1 to 100
@ -57,15 +53,68 @@ class FastDroppingQueueSpec extends WordSpec with BeforeAndAfterAll with MustMat
queue.offer(i)
}
.groupBy(identity)
.mapValues(_.size)
.map { case (k, v) => (k, v.size) }
// it should be 100 elements - 10 buffer slots = 90, but there might be other implicit buffers involved
histo(OfferResult.Dropped) must be > 80
histo(QueueOfferResult.Dropped) should be > 80
}
"buffer size cannot be less than 1" in {
assertThrows[IllegalArgumentException](Source.queue[Int](0))
}
"raise exception if the queue is completed twice" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
assertThrows[IllegalStateException](queue.complete())
}
"raise exception if the queue is failed twice" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](1).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
assertThrows[IllegalStateException](queue.fail(ex))
}
"return a QueueClosed result if completed" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.complete()
queue.offer(1) should be(QueueOfferResult.QueueClosed)
sub.expectSubscriptionAndComplete()
}
"return a Failure result if queue failed" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.fail(ex)
queue.offer(1) should be(QueueOfferResult.Failure(ex))
sub.request(1)
sub.expectError(ex)
}
"return a Failure result if stream failed" in {
val sub = TestSubscriber.probe[Int]()
val queue =
Source.queue[Int](10).map(_ => throw ex).toMat(Sink.fromSubscriber(sub))(Keep.left).run()
queue.offer(1) should be(QueueOfferResult.Enqueued)
sub.expectSubscriptionAndError() should be(ex)
// internal state will be eventually updated when stream cancellation reaches BoundedSourceQueueStage
awaitAssert(queue.offer(1) should be(QueueOfferResult.Failure(ex)))
}
"without cancellation only flag elements as enqueued that will also passed to downstream" in {
val counter = new AtomicLong()
val (queue, result) =
FastDroppingQueue[Int](100000).toMat(Sink.fold(0L)(_ + _))(Keep.both).run()
Source.queue[Int](100000).toMat(Sink.fold(0L)(_ + _))(Keep.both).run()
val numThreads = 32
val stopProb = 1000000
@ -83,12 +132,12 @@ class FastDroppingQueueSpec extends WordSpec with BeforeAndAfterAll with MustMat
while (true) {
val i = r.nextInt(0, Int.MaxValue)
queue.offer(i) match {
case OfferResult.Enqueued =>
case QueueOfferResult.Enqueued =>
counter.addAndGet(i)
numElemsEnqueued += 1
case OfferResult.Dropped =>
case QueueOfferResult.Dropped =>
numElemsDropped += 1
case _: OfferResult.CompletionResult => return // other thread completed
case _: QueueCompletionResult => return // other thread completed
}
if ((i % stopProb) == 0) { // probabilistic exit condition
@ -113,7 +162,7 @@ class FastDroppingQueueSpec extends WordSpec with BeforeAndAfterAll with MustMat
t.start()
}
result.futureValue mustBe counter.get()
result.futureValue should be(counter.get())
}
// copied from akka-remote SendQueueSpec
@ -121,13 +170,13 @@ class FastDroppingQueueSpec extends WordSpec with BeforeAndAfterAll with MustMat
// this test verifies that the wakeup signal is triggered correctly
val burstSize = 100
val (sendQueue, downstream) =
Source.fromGraph(FastDroppingQueue[Int](128)).grouped(burstSize).async.toMat(TestSink.probe)(Keep.both).run()
Source.fromGraph(Source.queue[Int](128)).grouped(burstSize).async.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
for (round <- 1 to 100000) {
for (n <- 1 to burstSize) {
if (sendQueue.offer(round * 1000 + n) != OfferResult.Enqueued)
if (sendQueue.offer(round * 1000 + n) != QueueOfferResult.Enqueued)
fail(s"offer failed at round $round message $n")
}
downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList)
@ -137,8 +186,4 @@ class FastDroppingQueueSpec extends WordSpec with BeforeAndAfterAll with MustMat
downstream.cancel()
}
}
override implicit def patienceConfig: PatienceConfig = PatienceConfig(5.seconds)
override protected def afterAll(): Unit = system.terminate()
}

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream
import akka.annotation.DoNotInherit
/**
* A queue of the given size that gives immediate feedback whether an element could be enqueued or not.
*
* Not for user extension
*/
@DoNotInherit
trait BoundedSourceQueue[T] {
/**
* Returns a [[akka.stream.QueueOfferResult]] that notifies the caller if the element could be enqueued or not, or
* the completion status of the queue.
*
* A result of `QueueOfferResult.Enqueued` does not guarantee that an element also has been or will be processed by
* the downstream.
*/
def offer(elem: T): QueueOfferResult
/**
* Completes the stream normally.
*/
def complete(): Unit
/**
* Completes the stream with a failure.
*/
def fail(ex: Throwable): Unit
}

View file

@ -114,6 +114,8 @@ object OverflowStrategy {
/**
* If the buffer is full when a new element arrives, drops the new element.
*
* Deprecated since 2.6.11. Use Source.queue instead.
*/
def dropNew: OverflowStrategy = DropNew(Logging.DebugLevel)

View file

@ -4,10 +4,22 @@
package akka.stream
import akka.annotation.DoNotInherit
/**
* Not for user extension
*/
@DoNotInherit
sealed abstract class QueueOfferResult
/**
* Contains types that is used as return types for async callbacks to streams
* Not for user extension
*/
@DoNotInherit
sealed abstract class QueueCompletionResult extends QueueOfferResult
/**
* Contains types that is used as return types for streams Source queues
*/
object QueueOfferResult {
@ -35,10 +47,10 @@ object QueueOfferResult {
* Type is used to indicate that stream is failed before or during call to the stream
* @param cause - exception that stream failed with
*/
final case class Failure(cause: Throwable) extends QueueOfferResult
final case class Failure(cause: Throwable) extends QueueCompletionResult
/**
* Type is used to indicate that stream is completed before call
*/
case object QueueClosed extends QueueOfferResult
case object QueueClosed extends QueueCompletionResult
}

View file

@ -1,72 +1,48 @@
/*
* Copyright (C) 2020-2019 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2020-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.dispatch.AbstractBoundedNodeQueue
import akka.stream.scaladsl.Source
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler }
import akka.stream.{ Attributes, Outlet, SourceShape }
import scala.annotation.tailrec
trait FastDroppingQueue[T] {
import akka.annotation.InternalApi
import akka.dispatch.AbstractBoundedNodeQueue
import akka.stream._
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler, StageLogging }
/**
* Returns true if element could be enqueued and false if not.
*
* Even if it returns true it does not guarantee that an element also has been or will be processed by the downstream.
*/
def offer(elem: T): FastDroppingQueue.OfferResult
def complete(): Unit
def fail(ex: Throwable): Unit
/**
* INTERNAL API
*/
@InternalApi private[akka] object BoundedSourceQueueStage {
sealed trait State
case object NeedsActivation extends State
case object Running extends State
case class Done(result: QueueCompletionResult) extends State
}
object FastDroppingQueue {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class BoundedSourceQueueStage[T](bufferSize: Int)
extends GraphStageWithMaterializedValue[SourceShape[T], BoundedSourceQueue[T]] {
import BoundedSourceQueueStage._
/**
* A queue of the given size that gives immediate feedback whether an element could be enqueued or not.
* @param size
* @tparam T
* @return
*/
def apply[T](size: Int): Source[T, FastDroppingQueue[T]] =
Source.fromGraph(new FastDroppingQueueStage[T](size))
require(bufferSize > 0, "BoundedSourceQueueStage.bufferSize must be > 0")
sealed trait OfferResult
object OfferResult {
case object Enqueued extends OfferResult
case object Dropped extends OfferResult
sealed trait CompletionResult extends OfferResult
case object Completed extends CompletionResult
case object Cancelled extends CompletionResult
case class Failed(cause: Throwable) extends CompletionResult
}
}
class FastDroppingQueueStage[T](bufferSize: Int)
extends GraphStageWithMaterializedValue[SourceShape[T], FastDroppingQueue[T]] {
val out = Outlet[T]("FastDroppingQueueStage.out")
val out = Outlet[T]("BoundedSourceQueueStage.out")
val shape = SourceShape(out)
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes): (GraphStageLogic, FastDroppingQueue[T]) = {
import FastDroppingQueue._
sealed trait State
case object NeedsActivation extends State
case object Running extends State
case class Done(result: OfferResult.CompletionResult) extends State
inheritedAttributes: Attributes): (GraphStageLogic, BoundedSourceQueue[T]) = {
val state = new AtomicReference[State](Running)
val queue = new AbstractBoundedNodeQueue[T](bufferSize) {}
object Logic extends GraphStageLogic(shape) with OutHandler {
object Logic extends GraphStageLogic(shape) with OutHandler with StageLogging {
setHandler(out, this)
val callback = getAsyncCallback[Unit] { _ =>
clearNeedsActivation()
@ -75,15 +51,17 @@ class FastDroppingQueueStage[T](bufferSize: Int)
override def onPull(): Unit = run()
override def onDownstreamFinish(): Unit = {
setDone(Done(OfferResult.Cancelled))
super.onDownstreamFinish()
override def onDownstreamFinish(cause: Throwable): Unit = {
setDone(Done(QueueOfferResult.Failure(cause)))
super.onDownstreamFinish(cause)
}
override def postStop(): Unit =
override def postStop(): Unit = {
// drain queue
while (!queue.isEmpty) queue.poll()
val exception = new StreamDetachedException()
setDone(Done(QueueOfferResult.Failure(exception)))
}
/**
* Main loop of the queue. We do two volatile reads for the fast path of pushing elements
@ -112,21 +90,21 @@ class FastDroppingQueueStage[T](bufferSize: Int)
push(out, next) // and then: wait for pull
} // else: wait for pull
case Done(OfferResult.Completed) =>
if (queue.isEmpty) completeStage()
case Done(QueueOfferResult.QueueClosed) =>
if (queue.isEmpty)
completeStage()
else if (isAvailable(out)) {
push(out, queue.poll())
run() // another round, might be empty now
}
// else !Queue.isEmpty: wait for pull to drain remaining elements
case Done(OfferResult.Failed(ex)) => failStage(ex)
case Done(OfferResult.Cancelled) => throw new IllegalStateException // should not happen
case NeedsActivation => throw new IllegalStateException // needs to be cleared before
case Done(QueueOfferResult.Failure(ex)) => failStage(ex)
case NeedsActivation => throw new IllegalStateException // needs to be cleared before
}
}
object Mat extends FastDroppingQueue[T] {
override def offer(elem: T): OfferResult = state.get() match {
object Mat extends BoundedSourceQueue[T] {
override def offer(elem: T): QueueOfferResult = state.get() match {
case Running | NeedsActivation =>
if (queue.add(elem)) {
// need to query state again because stage might have switched from Running -> NeedsActivation only after
@ -136,18 +114,25 @@ class FastDroppingQueueStage[T](bufferSize: Int)
if (clearNeedsActivation())
Logic.callback.invoke(())
OfferResult.Enqueued
QueueOfferResult.Enqueued
} else
OfferResult.Dropped
QueueOfferResult.Dropped
case Done(result) => result
}
override def complete(): Unit = // FIXME: should we fail here in some way if it was already completed?
if (setDone(Done(OfferResult.Completed)))
override def complete(): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.QueueClosed)))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
override def fail(ex: Throwable): Unit = // FIXME: should we fail here in some way if it was already completed?
if (setDone(Done(OfferResult.Failed(ex))))
}
override def fail(ex: Throwable): Unit = {
if (state.get().isInstanceOf[Done])
throw new IllegalStateException("The queue has already been completed.")
if (setDone(Done(QueueOfferResult.Failure(ex))))
Logic.callback.invoke(()) // if this thread won the completion race also schedule an async callback
}
}
// some state transition helpers

View file

@ -19,7 +19,6 @@ import scala.reflect.ClassTag
import com.github.ghik.silencer.silent
import org.reactivestreams.{ Publisher, Subscriber }
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import akka.dispatch.ExecutionContexts
@ -691,6 +690,33 @@ object Source {
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.BoundedSourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. The buffer size is passed in as a parameter.
* Elements in the buffer will be discarded if downstream is terminated.
*
* Pushed elements may be dropped if there is no space available in the buffer. Elements will also be dropped if the
* queue is failed through the materialized `BoundedQueueSource` or the `Source` is cancelled by the downstream.
* An element that was reported to be `enqueued` is not guaranteed to be processed by the rest of the stream. If the
* queue is failed by calling `BoundedQueueSource.fail` or the downstream cancels the stream, elements in the buffer
* are discarded.
*
* Acknowledgement of pushed elements is immediate.
* [[akka.stream.BoundedSourceQueue.offer]] returns [[akka.stream.QueueOfferResult]] which is implemented as:
*
* `QueueOfferResult.enqueued()` element was added to buffer, but may still be discarded later when the queue is
* failed or cancelled
* `QueueOfferResult.dropped()` element was dropped
* `QueueOfferResult.QueueClosed` the queue was completed with [[akka.stream.BoundedSourceQueue.complete]]
* `QueueOfferResult.Failure` the queue was failed with [[akka.stream.BoundedSourceQueue.fail]] or if the
* stream failed
*
* @param bufferSize size of the buffer in number of elements
*/
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
scaladsl.Source.queue(bufferSize).asJava
/**
* Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
@ -702,13 +728,16 @@ object Source {
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* `QueueOfferResult.enqueued()` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped()` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage`
* call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]].
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
@ -736,13 +765,16 @@ object Source {
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* `QueueOfferResult.enqueued()` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped()` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of
* `offer():CompletionStage` call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]].
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
@ -754,7 +786,8 @@ object Source {
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0, not
* applicable when `OverflowStrategy.dropNew` is used
*/
def queue[T](
bufferSize: Int,

View file

@ -814,6 +814,33 @@ object Source {
source.addAttributes(DefaultAttributes.zipWithN)
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.BoundedSourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. The buffer size is passed in as a parameter.
* Elements in the buffer will be discarded if downstream is terminated.
*
* Pushed elements may be dropped if there is no space available in the buffer. Elements will also be dropped if the
* queue is failed through the materialized `BoundedQueueSource` or the `Source` is cancelled by the downstream.
* An element that was reported to be `enqueued` is not guaranteed to be processed by the rest of the stream. If the
* queue is failed by calling `BoundedQueueSource.fail` or the downstream cancels the stream, elements in the buffer
* are discarded.
*
* Acknowledgement of pushed elements is immediate.
* [[akka.stream.BoundedSourceQueue.offer]] returns [[akka.stream.QueueOfferResult]] which is implemented as:
*
* `QueueOfferResult.Enqueued` element was added to buffer, but may still be discarded later when the queue is
* failed or cancelled
* `QueueOfferResult.Dropped` element was dropped
* `QueueOfferResult.QueueComplete` the queue was completed with [[akka.stream.BoundedSourceQueue.complete]]
* `QueueOfferResult.Failure` the queue was failed with [[akka.stream.BoundedSourceQueue.fail]] or if
* the stream failed
*
* @param bufferSize size of the buffer in number of elements
*/
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize))
/**
* Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
@ -832,6 +859,9 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]].
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
@ -865,6 +895,9 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of
* `offer():Future` call when buffer is full.
*
* Instead of using the strategy [[akka.stream.OverflowStrategy.dropNew]] it's recommended to use
* `Source.queue(bufferSize)` instead which returns a [[QueueOfferResult]] synchronously.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]].
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
@ -876,7 +909,8 @@ object Source {
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0, not
* applicable when `OverflowStrategy.dropNew` is used
*/
def queue[T](
bufferSize: Int,