QueueSource now drops after completion (#25349) (#25552)

* QueueSource now drops after completion (#25349)

* Resolve Johan's review comments
This commit is contained in:
Heiko Seeberger 2018-09-03 07:03:28 +02:00 committed by Konrad `ktoso` Malawski
parent 887c56b562
commit 07dfdef620
2 changed files with 14 additions and 0 deletions

View file

@ -71,6 +71,17 @@ class QueueSourceSpec extends StreamSpec {
f.futureValue should ===(QueueOfferResult.Enqueued)
}
"reject elements when completed" in {
// Not using the materialized test sink leads to the 42 being enqueued but not emitted due to lack of demand.
// This will also not effectively complete the stream, hence there is enough time (no races) to offer 43
// and verify it is dropped.
val source = Source.queue[Int](42, OverflowStrategy.backpressure).to(TestSink.probe).run()
source.offer(42)
source.complete()
val f = source.offer(43)
f.futureValue should ===(QueueOfferResult.Dropped)
}
"buffer when needed" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()

View file

@ -89,6 +89,9 @@ import scala.util.control.NonFatal
}
private val callback = getAsyncCallback[Input[T]] {
case Offer(_, promise) if terminating
promise.success(QueueOfferResult.Dropped)
case offer @ Offer(elem, promise)
if (maxBuffer != 0) {
bufferElem(offer)