diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index d990adc16b..9f42197494 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -100,6 +100,18 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { c.expectNoMsg(100.millis) } + "not emit empty group when finished while not being pushed" taggedAs TimingTest in { + val p = TestPublisher.manualProbe[Int]() + val c = TestSubscriber.manualProbe[immutable.Seq[Int]]() + Source.fromPublisher(p).groupedWithin(1000, 50.millis).to(Sink.fromSubscriber(c)).run() + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.request(1) + pSub.expectRequest + pSub.sendComplete + c.expectComplete + } + "reset time window when max elements reached" taggedAs TimingTest in { val inputs = Iterator.from(1) val upstream = TestPublisher.probe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 3ebb32aea1..4efc12d796 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1414,7 +1414,7 @@ final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphSta // AND // - timer fired OR group is full private var groupClosed = false - private var groupEmitted = false + private var groupEmitted = true private var finished = false private var elements = 0