From 07dfdef620ec8bf61e1738b6bbb892bb58fa591f Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Mon, 3 Sep 2018 07:03:28 +0200 Subject: [PATCH] QueueSource now drops after completion (#25349) (#25552) * QueueSource now drops after completion (#25349) * Resolve Johan's review comments --- .../scala/akka/stream/scaladsl/QueueSourceSpec.scala | 11 +++++++++++ .../src/main/scala/akka/stream/impl/QueueSource.scala | 3 +++ 2 files changed, 14 insertions(+) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index c2b60189da..51f33d81d7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -71,6 +71,17 @@ class QueueSourceSpec extends StreamSpec { f.futureValue should ===(QueueOfferResult.Enqueued) } + "reject elements when completed" in { + // Not using the materialized test sink leads to the 42 being enqueued but not emitted due to lack of demand. + // This will also not effectively complete the stream, hence there is enough time (no races) to offer 43 + // and verify it is dropped. + val source = Source.queue[Int](42, OverflowStrategy.backpressure).to(TestSink.probe).run() + source.offer(42) + source.complete() + val f = source.offer(43) + f.futureValue should ===(QueueOfferResult.Dropped) + } + "buffer when needed" in { val s = TestSubscriber.manualProbe[Int]() val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 95745951b7..bddf74faf3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -89,6 +89,9 @@ import scala.util.control.NonFatal } private val callback = getAsyncCallback[Input[T]] { + case Offer(_, promise) if terminating ⇒ + promise.success(QueueOfferResult.Dropped) + case offer @ Offer(elem, promise) ⇒ if (maxBuffer != 0) { bufferElem(offer)