=str #23398 groupedWeightedWithin zero cost bugfix (#23399)

* added a failing test to demonstrate a bug in groupedWeightedWithin with zero cost function

* fixed the bug by adding a flag indicating if there are elements in buffer, and using it instead of totalWeight in case onTimer is called
This commit is contained in:
Gilad Hoch 2017-08-08 14:30:32 +03:00 committed by Patrik Nordwall
parent f623d10522
commit 0bcf32fccb
2 changed files with 30 additions and 3 deletions

View file

@ -1435,6 +1435,7 @@ private[stream] object Collect {
private var groupEmitted = true
private var finished = false
private var totalWeight = 0L
private var hasElements = false
override def preStart() = {
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
@ -1444,8 +1445,9 @@ private[stream] object Collect {
private def nextElement(elem: T): Unit = {
groupEmitted = false
val cost = costFn(elem)
if (cost < 0) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
if (cost < 0L) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
else {
hasElements = true
if (totalWeight + cost <= maxWeight) {
buf += elem
totalWeight += cost
@ -1466,6 +1468,7 @@ private[stream] object Collect {
}
}
} else {
//we have a single heavy element that weighs more than the limit
if (totalWeight == 0L) {
buf += elem
totalWeight += cost
@ -1502,7 +1505,8 @@ private[stream] object Collect {
pending = null.asInstanceOf[T]
groupEmitted = false
} else {
totalWeight = 0
totalWeight = 0L
hasElements = false
}
pushEagerly = false
if (isAvailable(in)) nextElement(grab(in))
@ -1521,7 +1525,7 @@ private[stream] object Collect {
else tryCloseGroup()
}
override protected def onTimer(timerKey: Any) = if (totalWeight > 0) {
override protected def onTimer(timerKey: Any) = if (hasElements) {
if (isAvailable(out)) emitGroup()
else pushEagerly = true
}