GroupedWithin cold emit fix (#22407)
* groupedWithin cold emit fix * lowered numbers in tests; removed meaningless assertions
This commit is contained in:
parent
d91fe59566
commit
ba213b7fee
2 changed files with 13 additions and 1 deletions
|
|
@ -100,6 +100,18 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
|
|||
c.expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
"not emit empty group when finished while not being pushed" taggedAs TimingTest in {
|
||||
val p = TestPublisher.manualProbe[Int]()
|
||||
val c = TestSubscriber.manualProbe[immutable.Seq[Int]]()
|
||||
Source.fromPublisher(p).groupedWithin(1000, 50.millis).to(Sink.fromSubscriber(c)).run()
|
||||
val pSub = p.expectSubscription
|
||||
val cSub = c.expectSubscription
|
||||
cSub.request(1)
|
||||
pSub.expectRequest
|
||||
pSub.sendComplete
|
||||
c.expectComplete
|
||||
}
|
||||
|
||||
"reset time window when max elements reached" taggedAs TimingTest in {
|
||||
val inputs = Iterator.from(1)
|
||||
val upstream = TestPublisher.probe[Int]()
|
||||
|
|
|
|||
|
|
@ -1414,7 +1414,7 @@ final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphSta
|
|||
// AND
|
||||
// - timer fired OR group is full
|
||||
private var groupClosed = false
|
||||
private var groupEmitted = false
|
||||
private var groupEmitted = true
|
||||
private var finished = false
|
||||
private var elements = 0
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue