diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 7a57b5634f..6541c7a042 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -155,7 +155,7 @@ class QueueSourceSpec extends StreamSpec { val s = TestSubscriber.manualProbe[Int]() val probe = TestProbe() val queue = - TestSourceStage(new QueueSource[Int](1, OverflowStrategy.dropHead), probe).to(Sink.fromSubscriber(s)).run() + TestSourceStage(new QueueSource[Int](1, OverflowStrategy.dropHead, 1), probe).to(Sink.fromSubscriber(s)).run() val sub = s.expectSubscription sub.request(1) @@ -165,7 +165,7 @@ class QueueSourceSpec extends StreamSpec { sub.cancel() } - "fail offer future if user does not wait in backpressure mode" in assertAllStagesStopped { + "fail the second (concurrent) offer in backpressure mode with default maxConcurrentPulls" in assertAllStagesStopped { val (queue, probe) = Source.queue[Int](5, OverflowStrategy.backpressure).toMat(TestSink.probe)(Keep.both).run() for (i <- 1 to 5) assertSuccess(queue.offer(i)) @@ -182,6 +182,28 @@ class QueueSourceSpec extends StreamSpec { probe.request(6).expectNext(2, 3, 4, 5, 6).expectComplete() } + "allow to wait `n` offer futures in backpressure mode with `n` maxConcurrentPulls" in assertAllStagesStopped { + val n = 2 + val (queue, probe) = Source.queue[Int](5, OverflowStrategy.backpressure, n).toMat(TestSink.probe)(Keep.both).run() + + for (i <- 1 to 5) assertSuccess(queue.offer(i)) + + queue.offer(6).pipeTo(testActor) + queue.offer(7).pipeTo(testActor) + queue.offer(8).pipeTo(testActor) + expectMsgType[Status.Failure].cause shouldBe an[IllegalStateException] + + probe.requestNext(1) + expectMsg(QueueOfferResult.Enqueued) + + probe.requestNext(2) + expectMsg(QueueOfferResult.Enqueued) + + queue.complete() + + probe.request(7).expectNext(3, 4, 5, 6, 7).expectComplete() + } + "complete watching future with failure if stream failed" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() diff --git a/akka-stream/src/main/mima-filters/2.6.3.backwards.excludes/28272-queue-source-concurrency.excludes b/akka-stream/src/main/mima-filters/2.6.3.backwards.excludes/28272-queue-source-concurrency.excludes new file mode 100644 index 0000000000..38fd12a321 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.3.backwards.excludes/28272-queue-source-concurrency.excludes @@ -0,0 +1,2 @@ +# disable compatibility check for @InternalApi private[akka] class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 5c7ef660dd..6747ebea25 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -27,10 +27,15 @@ import scala.concurrent.{ Future, Promise } /** * INTERNAL API */ -@InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) +@InternalApi private[akka] final class QueueSource[T]( + maxBuffer: Int, + overflowStrategy: OverflowStrategy, + maxConcurrentOffers: Int) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { import QueueSource._ + require(maxConcurrentOffers > 0, "Max concurrent offers must be greater than 0") + val out = Outlet[T]("queueSource.out") override val shape: SourceShape[T] = SourceShape.of(out) @@ -42,11 +47,12 @@ import scala.concurrent.{ Future, Promise } override protected def logSource: Class[_] = classOf[QueueSource[_]] var buffer: Buffer[T] = _ - var pendingOffer: Option[Offer[T]] = None + var pendingOffers: Buffer[Offer[T]] = _ var terminating = false override def preStart(): Unit = { if (maxBuffer > 0) buffer = Buffer(maxBuffer, inheritedAttributes) + pendingOffers = Buffer(maxConcurrentOffers, inheritedAttributes) } override def postStop(): Unit = { val exception = new StreamDetachedException() @@ -101,14 +107,12 @@ import scala.concurrent.{ Future, Promise } s.logLevel, "Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [{}]", name) - pendingOffer match { - case Some(_) => - offer.promise.failure( - new IllegalStateException( - "You have to wait for the previous offer to be resolved to send another request")) - case None => - pendingOffer = Some(offer) - } + if (pendingOffers.isFull) + offer.promise.failure( + new IllegalStateException( + s"Too many concurrent offers. Specified maximum is $maxConcurrentOffers. " + + "You have to wait for one previous future to be resolved to send another request")) + else pendingOffers.enqueue(offer) } } @@ -123,8 +127,8 @@ import scala.concurrent.{ Future, Promise } } else if (isAvailable(out)) { push(out, elem) promise.success(QueueOfferResult.Enqueued) - } else if (pendingOffer.isEmpty) - pendingOffer = Some(offer) + } else if (!pendingOffers.isFull) + pendingOffers.enqueue(offer) else overflowStrategy match { case s @ (_: DropHead | _: DropBuffer) => @@ -133,8 +137,8 @@ import scala.concurrent.{ Future, Promise } "Dropping element because buffer is full and overflowStrategy is: [{}] in stream [{}]", s, name) - pendingOffer.get.promise.success(QueueOfferResult.Dropped) - pendingOffer = Some(offer) + pendingOffers.dequeue().promise.success(QueueOfferResult.Dropped) + pendingOffers.enqueue(offer) case s @ (_: DropTail | _: DropNew) => log.log( s.logLevel, @@ -163,7 +167,7 @@ import scala.concurrent.{ Future, Promise } } case Completion => - if (maxBuffer != 0 && buffer.nonEmpty || pendingOffer.nonEmpty) terminating = true + if (maxBuffer != 0 && buffer.nonEmpty || pendingOffers.nonEmpty) terminating = true else { completion.success(Done) completeStage() @@ -177,37 +181,25 @@ import scala.concurrent.{ Future, Promise } setHandler(out, this) override def onDownstreamFinish(cause: Throwable): Unit = { - pendingOffer match { - case Some(Offer(_, promise)) => - promise.success(QueueOfferResult.QueueClosed) - pendingOffer = None - case None => // do nothing - } + while (pendingOffers.nonEmpty) pendingOffers.dequeue().promise.success(QueueOfferResult.QueueClosed) completion.success(Done) completeStage() } override def onPull(): Unit = { if (maxBuffer == 0) { - pendingOffer match { - case Some(Offer(elem, promise)) => - push(out, elem) - promise.success(QueueOfferResult.Enqueued) - pendingOffer = None - if (terminating) { - completion.success(Done) - completeStage() - } - case None => + if (pendingOffers.nonEmpty) { + val offer = pendingOffers.dequeue() + push(out, offer.elem) + offer.promise.success(QueueOfferResult.Enqueued) + if (terminating) { + completion.success(Done) + completeStage() + } } } else if (buffer.nonEmpty) { push(out, buffer.dequeue()) - pendingOffer match { - case Some(offer) => - enqueueAndSuccess(offer) - pendingOffer = None - case None => //do nothing - } + while (pendingOffers.nonEmpty && !buffer.isFull) enqueueAndSuccess(pendingOffers.dequeue()) if (terminating && buffer.isEmpty) { completion.success(Done) completeStage() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2113d68a28..9d60e0ad22 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -697,13 +697,52 @@ object Source { * for downstream demand unless there is another message waiting for downstream demand, in that case * offer result will be completed according to the overflow strategy. * - * SourceQueue that current source is materialized to is for single thread usage only. + * The materialized SourceQueue may only be used from a single producer. * * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = - new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(_.asJava)) + new Source( + scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers = 1).mapMaterializedValue(_.asJava)) + + /** + * Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueueWithComplete]]. + * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded + * if downstream is terminated. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * Acknowledgement mechanism is available. + * [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage` which completes with + * `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with + * `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - + * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of + * `offer():CompletionStage` call when buffer is full. + * + * You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]]. + * It returns a future that completes with success when this operator is completed or fails when stream is failed. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait + * for downstream demand unless there is another message waiting for downstream demand, in that case + * offer result will be completed according to the overflow strategy. + * + * The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers. + * + * @param bufferSize size of buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + * @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0 + */ + def queue[T]( + bufferSize: Int, + overflowStrategy: OverflowStrategy, + maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] = + new Source( + scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers).mapMaterializedValue(_.asJava)) /** * Start a new `Source` from some resource which can be opened, read and closed. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 170303c022..8477f6aa16 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -817,11 +817,51 @@ object Source { * for downstream demand unless there is another message waiting for downstream demand, in that case * offer result will be completed according to the overflow strategy. * + * The materialized SourceQueue may only be used from a single producer. + * * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = - Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource)) + queue(bufferSize, overflowStrategy, maxConcurrentOffers = 1) + + /** + * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueueWithComplete]]. + * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded + * if downstream is terminated. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * Acknowledgement mechanism is available. + * [[akka.stream.scaladsl.SourceQueueWithComplete.offer]] returns `Future[QueueOfferResult]` which completes with + * `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with + * `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - + * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `maxConcurrentOffers` number of + * `offer():Future` call when buffer is full. + * + * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]]. + * It returns future that completes with success when the operator is completed or fails when the stream is failed. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait + * for downstream demand unless there is another message waiting for downstream demand, in that case + * offer result will be completed according to the overflow strategy. + * + * The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers. + * + * @param bufferSize size of buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + * @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0 + */ + def queue[T]( + bufferSize: Int, + overflowStrategy: OverflowStrategy, + maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] = + Source.fromGraph( + new QueueSource(bufferSize, overflowStrategy, maxConcurrentOffers).withAttributes(DefaultAttributes.queueSource)) /** * Start a new `Source` from some resource which can be opened, read and closed.