Merge pull request #19676 from ALPHA-60/fix-sink-queue-buffer

fix Sink.queue when buffer has run full #19675
This commit is contained in:
Roland Kuhn 2016-02-11 20:10:18 +01:00
commit f713a5eaef
2 changed files with 22 additions and 2 deletions

View file

@ -277,7 +277,10 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS
promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
case None
if (buffer.isEmpty) currentRequest = Some(promise)
else sendDownstream(promise)
else {
if (buffer.used == maxBuffer - 1) tryPull(in)
sendDownstream(promise)
}
})
def sendDownstream(promise: Requested[T]): Unit = {