20232: Fix missing last group in groupedWithin

This commit is contained in:
Endre Sándor Varga 2016-04-05 12:46:31 +02:00
parent 4a8018f63f
commit 60c8648b59
2 changed files with 28 additions and 19 deletions

View file

@ -956,13 +956,15 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends
override def initialAttributes = DefaultAttributes.groupedWithin
val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private val buf: VectorBuilder[T] = new VectorBuilder
// True if:
// - buf is nonEmpty
// AND
// - timer fired OR group is full
private var groupClosed = false
private var groupEmitted = false
private var finished = false
private var elements = 0
@ -974,6 +976,7 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends
}
private def nextElement(elem: T): Unit = {
groupEmitted = false
buf += elem
elements += 1
if (elements == n) {
@ -988,6 +991,7 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends
}
private def emitGroup(): Unit = {
groupEmitted = true
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
@ -1001,24 +1005,21 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends
else if (!hasBeenPulled(in)) pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit =
if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round
override def onUpstreamFinish(): Unit = {
finished = true
if (!groupClosed && elements > 0) closeGroup()
else completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
override def onPush(): Unit = {
if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round
}
setHandler(out, new OutHandler {
override def onPull(): Unit = if (groupClosed) emitGroup()
override def onDownstreamFinish(): Unit = completeStage()
})
override def onPull(): Unit = if (groupClosed) emitGroup()
override protected def onTimer(timerKey: Any) =
if (elements > 0) closeGroup()
override def onUpstreamFinish(): Unit = {
finished = true
if (groupEmitted) completeStage()
else closeGroup()
}
override protected def onTimer(timerKey: Any) = if (elements > 0) closeGroup()
setHandlers(in, out, this)
}
}