diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index 238f7b4805..7af9f02afb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -6,12 +6,14 @@ package akka.stream.scaladsl import scala.collection.immutable import scala.concurrent.duration._ import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random } + import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, ThrottleMode } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import scala.concurrent.Await import akka.testkit.TimingTest +import akka.util.ConstantFun class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { @@ -252,5 +254,26 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { וupstream.sendComplete() downstream.expectComplete() } + + "handle zero cost function to get only timed based grouping without limit" taggedAs TimingTest in { + val upstream = TestPublisher.probe[String]() + val downstream = TestSubscriber.probe[immutable.Seq[String]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(1, 100.millis)(ConstantFun.zeroLong) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + downstream.request(1) + upstream.sendNext("333") + upstream.sendNext("22") + upstream.sendNext("333") + upstream.sendNext("22") + downstream.expectNoMsg(50.millis) + downstream.expectNext(Vector("333", "22", "333", "22"): immutable.Seq[String]) + upstream.sendComplete() + downstream.expectComplete() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 00d65a1c9b..938d66016a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1435,6 +1435,7 @@ private[stream] object Collect { private var groupEmitted = true private var finished = false private var totalWeight = 0L + private var hasElements = false override def preStart() = { schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval) @@ -1444,8 +1445,9 @@ private[stream] object Collect { private def nextElement(elem: T): Unit = { groupEmitted = false val cost = costFn(elem) - if (cost < 0) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) + if (cost < 0L) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) else { + hasElements = true if (totalWeight + cost <= maxWeight) { buf += elem totalWeight += cost @@ -1466,6 +1468,7 @@ private[stream] object Collect { } } } else { + //we have a single heavy element that weighs more than the limit if (totalWeight == 0L) { buf += elem totalWeight += cost @@ -1502,7 +1505,8 @@ private[stream] object Collect { pending = null.asInstanceOf[T] groupEmitted = false } else { - totalWeight = 0 + totalWeight = 0L + hasElements = false } pushEagerly = false if (isAvailable(in)) nextElement(grab(in)) @@ -1521,7 +1525,7 @@ private[stream] object Collect { else tryCloseGroup() } - override protected def onTimer(timerKey: Any) = if (totalWeight > 0) { + override protected def onTimer(timerKey: Any) = if (hasElements) { if (isAvailable(out)) emitGroup() else pushEagerly = true }