= str #19803 fix 1 element buffer handling

* Sink.queue and Sink.actorRefWithAck can now handle one element input
  buffers
* Sink.queue now buffers as much upstream elements as declared in its
  InputBuffer attribute, instead of one less.
* Tests for streaming with full buffer now do not depend on default
  input buffer size (16); instead, they set sink input buffer size
  explicitly.
This commit is contained in:
Bojan Petrovic 2016-02-16 13:09:01 +01:00 committed by Roland Kuhn
parent c91fa3545e
commit 0bc04f3466
4 changed files with 77 additions and 17 deletions

View file

@ -303,6 +303,8 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
var currentRequest: Option[Requested[T]] = None
override def preStart(): Unit = {
// Allocates one additional element to hold stream
// closed/failure indicators
buffer = Buffer(maxBuffer + 1, materializer)
setKeepGoing(true)
initCallback(callback.invoke)
@ -319,7 +321,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
case None
if (buffer.isEmpty) currentRequest = Some(promise)
else {
if (buffer.used == maxBuffer - 1) tryPull(in)
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(promise)
}
})
@ -347,7 +349,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
setHandler(in, new InHandler {
override def onPush(): Unit = {
enqueueAndNotify(Success(Some(grab(in))))
if (buffer.used < maxBuffer - 1) pull(in)
if (buffer.used < maxBuffer) pull(in)
}
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))