Merge pull request #19783 from ALPHA-60/fix-ignore-zero-sized-buffer

=str #19781 Use overriden `InputBuffer` attribute
This commit is contained in:
Roland Kuhn 2016-02-16 10:31:17 +01:00
commit 2bb45ce211
11 changed files with 69 additions and 17 deletions

View file

@ -289,13 +289,14 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
override def initialAttributes = DefaultAttributes.queueSink
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
type Received[E] = Try[Option[E]]
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
var buffer: Buffer[Received[T]] = _