allow Sink.queue concurrent pulling (#27352)

* allow Sink.queue concurrent pulling

* replace methods with default parameters on two overloaded methods to pass binary compatibility check :/

* replace ⇒ with =>

* reformat

* add javadsl

* fix PR comments and add concurrency to Sink.queue

* fix merge after auto resolving

* duplicate changes to javadsl

* revert source changes

* add graceful terminations

* clean up tests

* optimize imports

* trigger rebuild

* cover the case when materializer shutdown before async callbacks were processed

* vars to vals; fix require messages

* disable compatibility check for @InternalApi private[akka] class
This commit is contained in:
Yakiv Yereskovskyi 2020-01-25 03:33:39 +07:00 committed by Patrik Nordwall
parent 83452be2ff
commit a614f0bee7
6 changed files with 148 additions and 60 deletions

View file

@ -4,16 +4,17 @@
package akka.stream.scaladsl
import akka.actor.Status
import akka.pattern.pipe
import akka.stream.AbruptTerminationException
import akka.stream.Attributes.inputBuffer
import akka.stream.Materializer
import akka.stream.StreamDetachedException
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import scala.concurrent.duration._
import akka.stream.testkit.scaladsl.TestSource
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class QueueSinkSpec extends StreamSpec {
@ -34,7 +35,8 @@ class QueueSinkSpec extends StreamSpec {
}
}
"allow to have only one future waiting for result in each point of time" in assertAllStagesStopped {
"allow to have only one future waiting for result in each point of time with default maxConcurrentOffers" in
assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue())
val sub = probe.expectSubscription()
@ -50,6 +52,68 @@ class QueueSinkSpec extends StreamSpec {
queue.pull()
}
"allow to have `n` futures waiting for result in each point of time with `n` maxConcurrentOffers" in
assertAllStagesStopped {
val n = 2
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue(n))
val sub = probe.expectSubscription()
val future1 = queue.pull()
val future2 = queue.pull()
val future3 = queue.pull()
an[IllegalStateException] shouldBe thrownBy { Await.result(future3, remainingOrDefault) }
sub.sendNext(1)
future1.pipeTo(testActor)
expectMsg(Some(1))
sub.sendNext(2)
future2.pipeTo(testActor)
expectMsg(Some(2))
sub.sendComplete()
queue.pull()
}
"fail all futures on abrupt termination" in assertAllStagesStopped {
val n = 2
val mat = Materializer(system)
val queue = TestSource.probe.runWith(Sink.queue(n))(mat)
val future1 = queue.pull()
val future2 = queue.pull()
mat.shutdown()
// async callback can be executed after materializer shutdown so you should also expect StreamDetachedException
val fail1 = future1.failed.futureValue
val fail2 = future2.failed.futureValue
assert(fail1.isInstanceOf[AbruptTerminationException] || fail1.isInstanceOf[StreamDetachedException])
assert(fail2.isInstanceOf[AbruptTerminationException] || fail2.isInstanceOf[StreamDetachedException])
}
"complete all futures with None on upstream complete" in assertAllStagesStopped {
val n = 2
val probe = TestPublisher.probe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue(n))
val future1 = queue.pull()
val future2 = queue.pull()
probe.sendComplete()
future1.futureValue shouldBe None
future2.futureValue shouldBe None
}
"fail all futures on upstream fail" in assertAllStagesStopped {
val n = 2
val probe = TestPublisher.probe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue(n))
val future1 = queue.pull()
val future2 = queue.pull()
val ex = new IllegalArgumentException
probe.sendError(ex)
future1.failed.futureValue shouldBe ex
future2.failed.futureValue shouldBe ex
}
"wait for next element from upstream" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue())
@ -64,27 +128,6 @@ class QueueSinkSpec extends StreamSpec {
queue.pull()
}
"fail future on stream failure" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue())
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMessage(noMsgTimeout)
sub.sendError(ex)
expectMsg(Status.Failure(ex))
}
"fail future when stream failed" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue())
val sub = probe.expectSubscription()
sub.sendError(ex)
the[Exception] thrownBy { Await.result(queue.pull(), remainingOrDefault) } should be(ex)
}
"fail future immediately if stream already canceled" in assertAllStagesStopped {
val queue = Source.empty[Int].runWith(Sink.queue())
// race here because no way to observe that queue sink saw termination

View file

@ -0,0 +1,2 @@
# disable compatibility check for @InternalApi private[akka] class
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSink.this")

View file

@ -311,8 +311,11 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi private[akka] final class QueueSink[T]()
@InternalApi private[akka] final class QueueSink[T](maxConcurrentPulls: Int)
extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
require(maxConcurrentPulls > 0, "Max concurrent pulls must be greater than 0")
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
@ -328,30 +331,25 @@ import scala.util.control.NonFatal
val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
var buffer: Buffer[Received[T]] = _
var currentRequest: Option[Requested[T]] = None
// Allocates one additional element to hold stream closed/failure indicators
val buffer: Buffer[Received[T]] = Buffer(maxBuffer + 1, inheritedAttributes)
val currentRequests: Buffer[Requested[T]] = Buffer(maxConcurrentPulls, inheritedAttributes)
override def preStart(): Unit = {
// Allocates one additional element to hold stream
// closed/failure indicators
buffer = Buffer(maxBuffer + 1, inheritedAttributes)
setKeepGoing(true)
pull(in)
}
private val callback = getAsyncCallback[Output[T]] {
case QueueSink.Pull(pullPromise) =>
currentRequest match {
case Some(_) =>
pullPromise.failure(
new IllegalStateException(
"You have to wait for previous future to be resolved to send another request"))
case None =>
if (buffer.isEmpty) currentRequest = Some(pullPromise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(pullPromise)
}
if (currentRequests.isFull)
pullPromise.failure(
new IllegalStateException(s"Too many concurrent pulls. Specified maximum is $maxConcurrentPulls. " +
"You have to wait for one previous future to be resolved to send another request"))
else if (buffer.isEmpty) currentRequests.enqueue(pullPromise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(pullPromise)
}
case QueueSink.Cancel => completeStage()
}
@ -366,23 +364,28 @@ import scala.util.control.NonFatal
}
}
def enqueueAndNotify(requested: Received[T]): Unit = {
buffer.enqueue(requested)
currentRequest match {
case Some(p) =>
sendDownstream(p)
currentRequest = None
case None => //do nothing
}
}
def onPush(): Unit = {
enqueueAndNotify(Success(Some(grab(in))))
buffer.enqueue(Success(Some(grab(in))))
if (currentRequests.nonEmpty) currentRequests.dequeue().complete(buffer.dequeue())
if (buffer.used < maxBuffer) pull(in)
}
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
override def onUpstreamFinish(): Unit = {
buffer.enqueue(Success(None))
while (currentRequests.nonEmpty && buffer.nonEmpty) currentRequests.dequeue().complete(buffer.dequeue())
while (currentRequests.nonEmpty) currentRequests.dequeue().complete(Success(None))
if (buffer.isEmpty) completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
buffer.enqueue(Failure(ex))
while (currentRequests.nonEmpty && buffer.nonEmpty) currentRequests.dequeue().complete(buffer.dequeue())
while (currentRequests.nonEmpty) currentRequests.dequeue().complete(Failure(ex))
if (buffer.isEmpty) failStage(ex)
}
override def postStop(): Unit =
while (currentRequests.nonEmpty) currentRequests.dequeue().failure(new AbruptStageTerminationException(this))
setHandler(in, this)

View file

@ -333,6 +333,27 @@ object Sink {
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => strategy.apply(num)))
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueueWithCancel]].
* [[akka.stream.javadsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``.
* `CompletionStage` completes when element is available.
*
* Before calling pull method second time you need to ensure that number of pending pulls is less then ``maxConcurrentPulls``
* or wait until some of the previous Futures completes.
* Pull returns Failed future with ''IllegalStateException'' if there will be more then ``maxConcurrentPulls`` number of pending pulls.
*
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueueWithCancel]] including last None
* as completion marker
*
* @see [[akka.stream.javadsl.SinkQueueWithCancel]]
*/
def queue[T](maxConcurrentPulls: Int): Sink[T, SinkQueueWithCancel[T]] =
new Sink(scaladsl.Sink.queue[T](maxConcurrentPulls).mapMaterializedValue(_.asJava))
/**
* Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueueWithCancel]].
* [[akka.stream.javadsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``.
@ -350,8 +371,7 @@ object Sink {
*
* @see [[akka.stream.javadsl.SinkQueueWithCancel]]
*/
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(_.asJava))
def queue[T](): Sink[T, SinkQueueWithCancel[T]] = queue(1)
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,

View file

@ -554,6 +554,27 @@ object Sink {
onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
/**
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]].
* [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]``.
* `Future` completes when element is available.
*
* Before calling pull method second time you need to ensure that number of pending pulls is less then ``maxConcurrentPulls``
* or wait until some of the previous Futures completes.
* Pull returns Failed future with ''IllegalStateException'' if there will be more then ``maxConcurrentPulls`` number of pending pulls.
*
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueueWithCancel]] including last None
* as completion marker
*
* See also [[akka.stream.scaladsl.SinkQueueWithCancel]]
*/
def queue[T](maxConcurrentPulls: Int): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink(maxConcurrentPulls))
/**
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]].
* [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
@ -571,8 +592,7 @@ object Sink {
*
* See also [[akka.stream.scaladsl.SinkQueueWithCancel]]
*/
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink())
def queue[T](): Sink[T, SinkQueueWithCancel[T]] = queue(1)
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,

View file

@ -179,7 +179,7 @@ object StreamConverters {
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = {
// TODO removing the QueueSink name, see issue #22523
Sink
.fromGraph(new QueueSink[T]().withAttributes(Attributes.none))
.fromGraph(new QueueSink[T](1).withAttributes(Attributes.none))
.mapMaterializedValue(
queue =>
StreamSupport