fix Sink.queue when buffer has run full #19675

This commit is contained in:
Bojan Petrovic 2016-02-02 19:38:00 +01:00
parent 9580b58011
commit fa3b36dcee
2 changed files with 22 additions and 2 deletions

View file

@ -8,14 +8,16 @@ import akka.pattern.pipe
import akka.stream.{ OverflowStrategy, ActorMaterializer } import akka.stream.{ OverflowStrategy, ActorMaterializer }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, _ } import akka.stream.testkit.{ AkkaSpec, _ }
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
class QueueSinkSpec extends AkkaSpec { class QueueSinkSpec extends AkkaSpec with ScalaFutures {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val patience = PatienceConfig(2.second)
val ex = new RuntimeException("ex") with NoStackTrace val ex = new RuntimeException("ex") with NoStackTrace
@ -112,5 +114,20 @@ class QueueSinkSpec extends AkkaSpec {
queue.pull().onFailure { case e e.isInstanceOf[IllegalStateException] should ===(true) } queue.pull().onFailure { case e e.isInstanceOf[IllegalStateException] should ===(true) }
} }
"keep on sending even after the buffer has been full" in assertAllStagesStopped {
val (probe, queue) = Source(1 to 20)
.alsoToMat(Flow[Int].take(15).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right)
.toMat(Sink.queue())(Keep.both)
.run()
probe.futureValue should ===(akka.Done)
for (i 1 to 20) {
queue.pull() pipeTo testActor
expectMsg(Some(i))
}
queue.pull() pipeTo testActor
expectMsg(None)
}
} }
} }

View file

@ -277,7 +277,10 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS
promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request")) promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
case None case None
if (buffer.isEmpty) currentRequest = Some(promise) if (buffer.isEmpty) currentRequest = Some(promise)
else sendDownstream(promise) else {
if (buffer.used == maxBuffer - 1) tryPull(in)
sendDownstream(promise)
}
}) })
def sendDownstream(promise: Requested[T]): Unit = { def sendDownstream(promise: Requested[T]): Unit = {