From a614f0bee74056b254de56f5ffdb570443f08690 Mon Sep 17 00:00:00 2001 From: Yakiv Yereskovskyi Date: Sat, 25 Jan 2020 03:33:39 +0700 Subject: [PATCH] allow Sink.queue concurrent pulling (#27352) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * allow Sink.queue concurrent pulling * replace methods with default parameters on two overloaded methods to pass binary compatibility check :/ * replace ⇒ with => * reformat * add javadsl * fix PR comments and add concurrency to Sink.queue * fix merge after auto resolving * duplicate changes to javadsl * revert source changes * add graceful terminations * clean up tests * optimize imports * trigger rebuild * cover the case when materializer shutdown before async callbacks were processed * vars to vals; fix require messages * disable compatibility check for @InternalApi private[akka] class --- .../akka/stream/scaladsl/QueueSinkSpec.scala | 93 ++++++++++++++----- .../27361-queue-sink-concurrency.excludes | 2 + .../main/scala/akka/stream/impl/Sinks.scala | 63 +++++++------ .../main/scala/akka/stream/javadsl/Sink.scala | 24 ++++- .../scala/akka/stream/scaladsl/Sink.scala | 24 ++++- .../stream/scaladsl/StreamConverters.scala | 2 +- 6 files changed, 148 insertions(+), 60 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.6.1.backwards.excludes/27361-queue-sink-concurrency.excludes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 1d30de4aa2..cfa5fc352e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -4,16 +4,17 @@ package akka.stream.scaladsl -import akka.actor.Status import akka.pattern.pipe +import akka.stream.AbruptTerminationException import akka.stream.Attributes.inputBuffer +import akka.stream.Materializer import akka.stream.StreamDetachedException import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ - -import scala.concurrent.duration._ +import akka.stream.testkit.scaladsl.TestSource import scala.concurrent.Await import scala.concurrent.Promise +import scala.concurrent.duration._ import scala.util.control.NoStackTrace class QueueSinkSpec extends StreamSpec { @@ -34,7 +35,8 @@ class QueueSinkSpec extends StreamSpec { } } - "allow to have only one future waiting for result in each point of time" in assertAllStagesStopped { + "allow to have only one future waiting for result in each point of time with default maxConcurrentOffers" in + assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() val queue = Source.fromPublisher(probe).runWith(Sink.queue()) val sub = probe.expectSubscription() @@ -50,6 +52,68 @@ class QueueSinkSpec extends StreamSpec { queue.pull() } + "allow to have `n` futures waiting for result in each point of time with `n` maxConcurrentOffers" in + assertAllStagesStopped { + val n = 2 + val probe = TestPublisher.manualProbe[Int]() + val queue = Source.fromPublisher(probe).runWith(Sink.queue(n)) + val sub = probe.expectSubscription() + val future1 = queue.pull() + val future2 = queue.pull() + val future3 = queue.pull() + an[IllegalStateException] shouldBe thrownBy { Await.result(future3, remainingOrDefault) } + + sub.sendNext(1) + future1.pipeTo(testActor) + expectMsg(Some(1)) + + sub.sendNext(2) + future2.pipeTo(testActor) + expectMsg(Some(2)) + + sub.sendComplete() + queue.pull() + } + + "fail all futures on abrupt termination" in assertAllStagesStopped { + val n = 2 + val mat = Materializer(system) + val queue = TestSource.probe.runWith(Sink.queue(n))(mat) + + val future1 = queue.pull() + val future2 = queue.pull() + mat.shutdown() + + // async callback can be executed after materializer shutdown so you should also expect StreamDetachedException + val fail1 = future1.failed.futureValue + val fail2 = future2.failed.futureValue + assert(fail1.isInstanceOf[AbruptTerminationException] || fail1.isInstanceOf[StreamDetachedException]) + assert(fail2.isInstanceOf[AbruptTerminationException] || fail2.isInstanceOf[StreamDetachedException]) + } + + "complete all futures with None on upstream complete" in assertAllStagesStopped { + val n = 2 + val probe = TestPublisher.probe[Int]() + val queue = Source.fromPublisher(probe).runWith(Sink.queue(n)) + val future1 = queue.pull() + val future2 = queue.pull() + probe.sendComplete() + future1.futureValue shouldBe None + future2.futureValue shouldBe None + } + + "fail all futures on upstream fail" in assertAllStagesStopped { + val n = 2 + val probe = TestPublisher.probe[Int]() + val queue = Source.fromPublisher(probe).runWith(Sink.queue(n)) + val future1 = queue.pull() + val future2 = queue.pull() + val ex = new IllegalArgumentException + probe.sendError(ex) + future1.failed.futureValue shouldBe ex + future2.failed.futureValue shouldBe ex + } + "wait for next element from upstream" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() val queue = Source.fromPublisher(probe).runWith(Sink.queue()) @@ -64,27 +128,6 @@ class QueueSinkSpec extends StreamSpec { queue.pull() } - "fail future on stream failure" in assertAllStagesStopped { - val probe = TestPublisher.manualProbe[Int]() - val queue = Source.fromPublisher(probe).runWith(Sink.queue()) - val sub = probe.expectSubscription() - - queue.pull().pipeTo(testActor) - expectNoMessage(noMsgTimeout) - - sub.sendError(ex) - expectMsg(Status.Failure(ex)) - } - - "fail future when stream failed" in assertAllStagesStopped { - val probe = TestPublisher.manualProbe[Int]() - val queue = Source.fromPublisher(probe).runWith(Sink.queue()) - val sub = probe.expectSubscription() - sub.sendError(ex) - - the[Exception] thrownBy { Await.result(queue.pull(), remainingOrDefault) } should be(ex) - } - "fail future immediately if stream already canceled" in assertAllStagesStopped { val queue = Source.empty[Int].runWith(Sink.queue()) // race here because no way to observe that queue sink saw termination diff --git a/akka-stream/src/main/mima-filters/2.6.1.backwards.excludes/27361-queue-sink-concurrency.excludes b/akka-stream/src/main/mima-filters/2.6.1.backwards.excludes/27361-queue-sink-concurrency.excludes new file mode 100644 index 0000000000..e396e4b7b8 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.1.backwards.excludes/27361-queue-sink-concurrency.excludes @@ -0,0 +1,2 @@ +# disable compatibility check for @InternalApi private[akka] class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSink.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index a0d1238517..45cdb1bba5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -311,8 +311,11 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -@InternalApi private[akka] final class QueueSink[T]() +@InternalApi private[akka] final class QueueSink[T](maxConcurrentPulls: Int) extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] { + + require(maxConcurrentPulls > 0, "Max concurrent pulls must be greater than 0") + type Requested[E] = Promise[Option[E]] val in = Inlet[T]("queueSink.in") @@ -328,30 +331,25 @@ import scala.util.control.NonFatal val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") - var buffer: Buffer[Received[T]] = _ - var currentRequest: Option[Requested[T]] = None + // Allocates one additional element to hold stream closed/failure indicators + val buffer: Buffer[Received[T]] = Buffer(maxBuffer + 1, inheritedAttributes) + val currentRequests: Buffer[Requested[T]] = Buffer(maxConcurrentPulls, inheritedAttributes) override def preStart(): Unit = { - // Allocates one additional element to hold stream - // closed/failure indicators - buffer = Buffer(maxBuffer + 1, inheritedAttributes) setKeepGoing(true) pull(in) } private val callback = getAsyncCallback[Output[T]] { case QueueSink.Pull(pullPromise) => - currentRequest match { - case Some(_) => - pullPromise.failure( - new IllegalStateException( - "You have to wait for previous future to be resolved to send another request")) - case None => - if (buffer.isEmpty) currentRequest = Some(pullPromise) - else { - if (buffer.used == maxBuffer) tryPull(in) - sendDownstream(pullPromise) - } + if (currentRequests.isFull) + pullPromise.failure( + new IllegalStateException(s"Too many concurrent pulls. Specified maximum is $maxConcurrentPulls. " + + "You have to wait for one previous future to be resolved to send another request")) + else if (buffer.isEmpty) currentRequests.enqueue(pullPromise) + else { + if (buffer.used == maxBuffer) tryPull(in) + sendDownstream(pullPromise) } case QueueSink.Cancel => completeStage() } @@ -366,23 +364,28 @@ import scala.util.control.NonFatal } } - def enqueueAndNotify(requested: Received[T]): Unit = { - buffer.enqueue(requested) - currentRequest match { - case Some(p) => - sendDownstream(p) - currentRequest = None - case None => //do nothing - } - } - def onPush(): Unit = { - enqueueAndNotify(Success(Some(grab(in)))) + buffer.enqueue(Success(Some(grab(in)))) + if (currentRequests.nonEmpty) currentRequests.dequeue().complete(buffer.dequeue()) if (buffer.used < maxBuffer) pull(in) } - override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) - override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex)) + override def onUpstreamFinish(): Unit = { + buffer.enqueue(Success(None)) + while (currentRequests.nonEmpty && buffer.nonEmpty) currentRequests.dequeue().complete(buffer.dequeue()) + while (currentRequests.nonEmpty) currentRequests.dequeue().complete(Success(None)) + if (buffer.isEmpty) completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + buffer.enqueue(Failure(ex)) + while (currentRequests.nonEmpty && buffer.nonEmpty) currentRequests.dequeue().complete(buffer.dequeue()) + while (currentRequests.nonEmpty) currentRequests.dequeue().complete(Failure(ex)) + if (buffer.isEmpty) failStage(ex) + } + + override def postStop(): Unit = + while (currentRequests.nonEmpty) currentRequests.dequeue().failure(new AbruptStageTerminationException(this)) setHandler(in, this) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 3042a970a9..d5c0880a34 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -333,6 +333,27 @@ object Sink { new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => strategy.apply(num))) } + /** + * Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueueWithCancel]]. + * [[akka.stream.javadsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``. + * `CompletionStage` completes when element is available. + * + * Before calling pull method second time you need to ensure that number of pending pulls is less then ``maxConcurrentPulls`` + * or wait until some of the previous Futures completes. + * Pull returns Failed future with ''IllegalStateException'' if there will be more then ``maxConcurrentPulls`` number of pending pulls. + * + * `Sink` will request at most number of elements equal to size of `inputBuffer` from + * upstream and then stop back pressure. You can configure size of input + * buffer by using [[Sink.withAttributes]] method. + * + * For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueueWithCancel]] including last None + * as completion marker + * + * @see [[akka.stream.javadsl.SinkQueueWithCancel]] + */ + def queue[T](maxConcurrentPulls: Int): Sink[T, SinkQueueWithCancel[T]] = + new Sink(scaladsl.Sink.queue[T](maxConcurrentPulls).mapMaterializedValue(_.asJava)) + /** * Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueueWithCancel]]. * [[akka.stream.javadsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``. @@ -350,8 +371,7 @@ object Sink { * * @see [[akka.stream.javadsl.SinkQueueWithCancel]] */ - def queue[T](): Sink[T, SinkQueueWithCancel[T]] = - new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(_.asJava)) + def queue[T](): Sink[T, SinkQueueWithCancel[T]] = queue(1) /** * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index f0f0d39796..91e2ec0a12 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -554,6 +554,27 @@ object Sink { onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] = actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) + /** + * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]]. + * [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]``. + * `Future` completes when element is available. + * + * Before calling pull method second time you need to ensure that number of pending pulls is less then ``maxConcurrentPulls`` + * or wait until some of the previous Futures completes. + * Pull returns Failed future with ''IllegalStateException'' if there will be more then ``maxConcurrentPulls`` number of pending pulls. + * + * `Sink` will request at most number of elements equal to size of `inputBuffer` from + * upstream and then stop back pressure. You can configure size of input + * buffer by using [[Sink.withAttributes]] method. + * + * For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueueWithCancel]] including last None + * as completion marker + * + * See also [[akka.stream.scaladsl.SinkQueueWithCancel]] + */ + def queue[T](maxConcurrentPulls: Int): Sink[T, SinkQueueWithCancel[T]] = + Sink.fromGraph(new QueueSink(maxConcurrentPulls)) + /** * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]]. * [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. @@ -571,8 +592,7 @@ object Sink { * * See also [[akka.stream.scaladsl.SinkQueueWithCancel]] */ - def queue[T](): Sink[T, SinkQueueWithCancel[T]] = - Sink.fromGraph(new QueueSink()) + def queue[T](): Sink[T, SinkQueueWithCancel[T]] = queue(1) /** * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 6615c3f1e7..00fcb48f2d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -179,7 +179,7 @@ object StreamConverters { def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = { // TODO removing the QueueSink name, see issue #22523 Sink - .fromGraph(new QueueSink[T]().withAttributes(Attributes.none)) + .fromGraph(new QueueSink[T](1).withAttributes(Attributes.none)) .mapMaterializedValue( queue => StreamSupport