diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index 1bdbf5d75f..b77757fd72 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -6,12 +6,13 @@ package akka.stream.scaladsl import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, ThrottleMode } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.testkit.AkkaSpec +import scala.concurrent.Await + class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val settings = ActorMaterializerSettings(system) @@ -135,6 +136,13 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) } + "group with small groups with backpressure" in { + Source(1 to 10) + .groupedWithin(1, 1.day) + .throttle(1, 110.millis, 0, ThrottleMode.Shaping) + .runWith(Sink.seq).futureValue should ===((1 to 10).map(List(_))) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index a041ec56b6..d5d255443d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -956,13 +956,15 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends override def initialAttributes = DefaultAttributes.groupedWithin val shape = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private val buf: VectorBuilder[T] = new VectorBuilder // True if: // - buf is nonEmpty // AND // - timer fired OR group is full private var groupClosed = false + private var groupEmitted = false private var finished = false private var elements = 0 @@ -974,6 +976,7 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends } private def nextElement(elem: T): Unit = { + groupEmitted = false buf += elem elements += 1 if (elements == n) { @@ -988,6 +991,7 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends } private def emitGroup(): Unit = { + groupEmitted = true push(out, buf.result()) buf.clear() if (!finished) startNewGroup() @@ -1001,24 +1005,21 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends else if (!hasBeenPulled(in)) pull(in) } - setHandler(in, new InHandler { - override def onPush(): Unit = - if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round - override def onUpstreamFinish(): Unit = { - finished = true - if (!groupClosed && elements > 0) closeGroup() - else completeStage() - } - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) - }) + override def onPush(): Unit = { + if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round + } - setHandler(out, new OutHandler { - override def onPull(): Unit = if (groupClosed) emitGroup() - override def onDownstreamFinish(): Unit = completeStage() - }) + override def onPull(): Unit = if (groupClosed) emitGroup() - override protected def onTimer(timerKey: Any) = - if (elements > 0) closeGroup() + override def onUpstreamFinish(): Unit = { + finished = true + if (groupEmitted) completeStage() + else closeGroup() + } + + override protected def onTimer(timerKey: Any) = if (elements > 0) closeGroup() + + setHandlers(in, out, this) } }