allow Source.queue concurrent pushing (#28273)

This commit is contained in:
Yakiv Yereskovskyi 2020-03-26 16:56:45 +02:00 committed by GitHub
parent bf6576ce79
commit 5605f04cb7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 42 deletions

View file

@ -155,7 +155,7 @@ class QueueSourceSpec extends StreamSpec {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val probe = TestProbe() val probe = TestProbe()
val queue = val queue =
TestSourceStage(new QueueSource[Int](1, OverflowStrategy.dropHead), probe).to(Sink.fromSubscriber(s)).run() TestSourceStage(new QueueSource[Int](1, OverflowStrategy.dropHead, 1), probe).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription val sub = s.expectSubscription
sub.request(1) sub.request(1)
@ -165,7 +165,7 @@ class QueueSourceSpec extends StreamSpec {
sub.cancel() sub.cancel()
} }
"fail offer future if user does not wait in backpressure mode" in assertAllStagesStopped { "fail the second (concurrent) offer in backpressure mode with default maxConcurrentPulls" in assertAllStagesStopped {
val (queue, probe) = Source.queue[Int](5, OverflowStrategy.backpressure).toMat(TestSink.probe)(Keep.both).run() val (queue, probe) = Source.queue[Int](5, OverflowStrategy.backpressure).toMat(TestSink.probe)(Keep.both).run()
for (i <- 1 to 5) assertSuccess(queue.offer(i)) for (i <- 1 to 5) assertSuccess(queue.offer(i))
@ -182,6 +182,28 @@ class QueueSourceSpec extends StreamSpec {
probe.request(6).expectNext(2, 3, 4, 5, 6).expectComplete() probe.request(6).expectNext(2, 3, 4, 5, 6).expectComplete()
} }
"allow to wait `n` offer futures in backpressure mode with `n` maxConcurrentPulls" in assertAllStagesStopped {
val n = 2
val (queue, probe) = Source.queue[Int](5, OverflowStrategy.backpressure, n).toMat(TestSink.probe)(Keep.both).run()
for (i <- 1 to 5) assertSuccess(queue.offer(i))
queue.offer(6).pipeTo(testActor)
queue.offer(7).pipeTo(testActor)
queue.offer(8).pipeTo(testActor)
expectMsgType[Status.Failure].cause shouldBe an[IllegalStateException]
probe.requestNext(1)
expectMsg(QueueOfferResult.Enqueued)
probe.requestNext(2)
expectMsg(QueueOfferResult.Enqueued)
queue.complete()
probe.request(7).expectNext(3, 4, 5, 6, 7).expectComplete()
}
"complete watching future with failure if stream failed" in assertAllStagesStopped { "complete watching future with failure if stream failed" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()

View file

@ -0,0 +1,2 @@
# disable compatibility check for @InternalApi private[akka] class
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.this")

View file

@ -27,10 +27,15 @@ import scala.concurrent.{ Future, Promise }
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) @InternalApi private[akka] final class QueueSource[T](
maxBuffer: Int,
overflowStrategy: OverflowStrategy,
maxConcurrentOffers: Int)
extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] {
import QueueSource._ import QueueSource._
require(maxConcurrentOffers > 0, "Max concurrent offers must be greater than 0")
val out = Outlet[T]("queueSource.out") val out = Outlet[T]("queueSource.out")
override val shape: SourceShape[T] = SourceShape.of(out) override val shape: SourceShape[T] = SourceShape.of(out)
@ -42,11 +47,12 @@ import scala.concurrent.{ Future, Promise }
override protected def logSource: Class[_] = classOf[QueueSource[_]] override protected def logSource: Class[_] = classOf[QueueSource[_]]
var buffer: Buffer[T] = _ var buffer: Buffer[T] = _
var pendingOffer: Option[Offer[T]] = None var pendingOffers: Buffer[Offer[T]] = _
var terminating = false var terminating = false
override def preStart(): Unit = { override def preStart(): Unit = {
if (maxBuffer > 0) buffer = Buffer(maxBuffer, inheritedAttributes) if (maxBuffer > 0) buffer = Buffer(maxBuffer, inheritedAttributes)
pendingOffers = Buffer(maxConcurrentOffers, inheritedAttributes)
} }
override def postStop(): Unit = { override def postStop(): Unit = {
val exception = new StreamDetachedException() val exception = new StreamDetachedException()
@ -101,14 +107,12 @@ import scala.concurrent.{ Future, Promise }
s.logLevel, s.logLevel,
"Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [{}]", "Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [{}]",
name) name)
pendingOffer match { if (pendingOffers.isFull)
case Some(_) => offer.promise.failure(
offer.promise.failure( new IllegalStateException(
new IllegalStateException( s"Too many concurrent offers. Specified maximum is $maxConcurrentOffers. " +
"You have to wait for the previous offer to be resolved to send another request")) "You have to wait for one previous future to be resolved to send another request"))
case None => else pendingOffers.enqueue(offer)
pendingOffer = Some(offer)
}
} }
} }
@ -123,8 +127,8 @@ import scala.concurrent.{ Future, Promise }
} else if (isAvailable(out)) { } else if (isAvailable(out)) {
push(out, elem) push(out, elem)
promise.success(QueueOfferResult.Enqueued) promise.success(QueueOfferResult.Enqueued)
} else if (pendingOffer.isEmpty) } else if (!pendingOffers.isFull)
pendingOffer = Some(offer) pendingOffers.enqueue(offer)
else else
overflowStrategy match { overflowStrategy match {
case s @ (_: DropHead | _: DropBuffer) => case s @ (_: DropHead | _: DropBuffer) =>
@ -133,8 +137,8 @@ import scala.concurrent.{ Future, Promise }
"Dropping element because buffer is full and overflowStrategy is: [{}] in stream [{}]", "Dropping element because buffer is full and overflowStrategy is: [{}] in stream [{}]",
s, s,
name) name)
pendingOffer.get.promise.success(QueueOfferResult.Dropped) pendingOffers.dequeue().promise.success(QueueOfferResult.Dropped)
pendingOffer = Some(offer) pendingOffers.enqueue(offer)
case s @ (_: DropTail | _: DropNew) => case s @ (_: DropTail | _: DropNew) =>
log.log( log.log(
s.logLevel, s.logLevel,
@ -163,7 +167,7 @@ import scala.concurrent.{ Future, Promise }
} }
case Completion => case Completion =>
if (maxBuffer != 0 && buffer.nonEmpty || pendingOffer.nonEmpty) terminating = true if (maxBuffer != 0 && buffer.nonEmpty || pendingOffers.nonEmpty) terminating = true
else { else {
completion.success(Done) completion.success(Done)
completeStage() completeStage()
@ -177,37 +181,25 @@ import scala.concurrent.{ Future, Promise }
setHandler(out, this) setHandler(out, this)
override def onDownstreamFinish(cause: Throwable): Unit = { override def onDownstreamFinish(cause: Throwable): Unit = {
pendingOffer match { while (pendingOffers.nonEmpty) pendingOffers.dequeue().promise.success(QueueOfferResult.QueueClosed)
case Some(Offer(_, promise)) =>
promise.success(QueueOfferResult.QueueClosed)
pendingOffer = None
case None => // do nothing
}
completion.success(Done) completion.success(Done)
completeStage() completeStage()
} }
override def onPull(): Unit = { override def onPull(): Unit = {
if (maxBuffer == 0) { if (maxBuffer == 0) {
pendingOffer match { if (pendingOffers.nonEmpty) {
case Some(Offer(elem, promise)) => val offer = pendingOffers.dequeue()
push(out, elem) push(out, offer.elem)
promise.success(QueueOfferResult.Enqueued) offer.promise.success(QueueOfferResult.Enqueued)
pendingOffer = None if (terminating) {
if (terminating) { completion.success(Done)
completion.success(Done) completeStage()
completeStage() }
}
case None =>
} }
} else if (buffer.nonEmpty) { } else if (buffer.nonEmpty) {
push(out, buffer.dequeue()) push(out, buffer.dequeue())
pendingOffer match { while (pendingOffers.nonEmpty && !buffer.isFull) enqueueAndSuccess(pendingOffers.dequeue())
case Some(offer) =>
enqueueAndSuccess(offer)
pendingOffer = None
case None => //do nothing
}
if (terminating && buffer.isEmpty) { if (terminating && buffer.isEmpty) {
completion.success(Done) completion.success(Done)
completeStage() completeStage()

View file

@ -697,13 +697,52 @@ object Source {
* for downstream demand unless there is another message waiting for downstream demand, in that case * for downstream demand unless there is another message waiting for downstream demand, in that case
* offer result will be completed according to the overflow strategy. * offer result will be completed according to the overflow strategy.
* *
* SourceQueue that current source is materialized to is for single thread usage only. * The materialized SourceQueue may only be used from a single producer.
* *
* @param bufferSize size of buffer in element count * @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(_.asJava)) new Source(
scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers = 1).mapMaterializedValue(_.asJava))
/**
* Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of
* `offer():CompletionStage` call when buffer is full.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]].
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
* for downstream demand unless there is another message waiting for downstream demand, in that case
* offer result will be completed according to the overflow strategy.
*
* The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers.
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0
*/
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] =
new Source(
scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers).mapMaterializedValue(_.asJava))
/** /**
* Start a new `Source` from some resource which can be opened, read and closed. * Start a new `Source` from some resource which can be opened, read and closed.

View file

@ -817,11 +817,51 @@ object Source {
* for downstream demand unless there is another message waiting for downstream demand, in that case * for downstream demand unless there is another message waiting for downstream demand, in that case
* offer result will be completed according to the overflow strategy. * offer result will be completed according to the overflow strategy.
* *
* The materialized SourceQueue may only be used from a single producer.
*
* @param bufferSize size of buffer in element count * @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource)) queue(bufferSize, overflowStrategy, maxConcurrentOffers = 1)
/**
* Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.scaladsl.SourceQueueWithComplete.offer]] returns `Future[QueueOfferResult]` which completes with
* `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of
* `offer():Future` call when buffer is full.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]].
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
* for downstream demand unless there is another message waiting for downstream demand, in that case
* offer result will be completed according to the overflow strategy.
*
* The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers.
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0
*/
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] =
Source.fromGraph(
new QueueSource(bufferSize, overflowStrategy, maxConcurrentOffers).withAttributes(DefaultAttributes.queueSource))
/** /**
* Start a new `Source` from some resource which can be opened, read and closed. * Start a new `Source` from some resource which can be opened, read and closed.