From fa3b36dcee962805d4d9ff1cfe09d175db13da6f Mon Sep 17 00:00:00 2001 From: Bojan Petrovic Date: Tue, 2 Feb 2016 19:38:00 +0100 Subject: [PATCH] fix Sink.queue when buffer has run full #19675 --- .../akka/stream/scaladsl/QueueSinkSpec.scala | 19 ++++++++++++++++++- .../main/scala/akka/stream/impl/Sinks.scala | 5 ++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 24ae8b5cb2..8f63552522 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -8,14 +8,16 @@ import akka.pattern.pipe import akka.stream.{ OverflowStrategy, ActorMaterializer } import akka.stream.testkit.Utils._ import akka.stream.testkit.{ AkkaSpec, _ } +import org.scalatest.concurrent.ScalaFutures import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace -class QueueSinkSpec extends AkkaSpec { +class QueueSinkSpec extends AkkaSpec with ScalaFutures { implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() + implicit val patience = PatienceConfig(2.second) val ex = new RuntimeException("ex") with NoStackTrace @@ -112,5 +114,20 @@ class QueueSinkSpec extends AkkaSpec { queue.pull().onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } } + "keep on sending even after the buffer has been full" in assertAllStagesStopped { + val (probe, queue) = Source(1 to 20) + .alsoToMat(Flow[Int].take(15).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) + .toMat(Sink.queue())(Keep.both) + .run() + probe.futureValue should ===(akka.Done) + for (i ← 1 to 20) { + queue.pull() pipeTo testActor + expectMsg(Some(i)) + } + queue.pull() pipeTo testActor + expectMsg(None) + + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 1d3553f374..b970bdfef5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -277,7 +277,10 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request")) case None ⇒ if (buffer.isEmpty) currentRequest = Some(promise) - else sendDownstream(promise) + else { + if (buffer.used == maxBuffer - 1) tryPull(in) + sendDownstream(promise) + } }) def sendDownstream(promise: Requested[T]): Unit = {