FlowDelay with EmitEarly caused a NPE (#27170)

This commit is contained in:
Johan Andrén 2019-07-04 11:06:24 +02:00 committed by Patrik Nordwall
parent 139d9a3c0c
commit 7c6d3b818a
2 changed files with 20 additions and 2 deletions

View file

@ -1775,10 +1775,16 @@ private[stream] object Collect {
}
}
def pullCondition: Boolean =
!strategy.isBackpressure || buffer.used < size
def pullCondition: Boolean = strategy match {
case EmitEarly =>
// when buffer is full we can only emit early if out is available
buffer.used < size || isAvailable(out)
case _ =>
!strategy.isBackpressure || buffer.used < size
}
def grabAndPull(): Unit = {
if (buffer.used == size) throw new IllegalStateException("Trying to enqueue but buffer is full")
buffer.enqueue((System.nanoTime(), grab(in)))
if (pullCondition) pull(in)
}