chore: Pull instead of throw exception in groupBy operator. (#1210)
When a sub stream is already closed, but the main stream is still continue, instead of throw a TooManySubStreamOpenException, just drop the current element and continue pulling.
This commit is contained in:
parent
689e30bbe9
commit
b4ff0fcabf
2 changed files with 18 additions and 4 deletions
|
|
@ -694,6 +694,19 @@ class FlowGroupBySpec extends StreamSpec("""
|
|||
queue.complete()
|
||||
}
|
||||
|
||||
"should not fail when one sub stream completed" in {
|
||||
Source(1 to 10)
|
||||
.groupBy(2, _ % 2, allowClosedSubstreamRecreation = false)
|
||||
.take(2)
|
||||
.mergeSubstreams
|
||||
.fold(Set.empty[Int])((set, elem) => set + elem)
|
||||
.runWith(TestSink())
|
||||
.ensureSubscription()
|
||||
.request(10)
|
||||
.expectNext(Set(1, 2, 3, 4))
|
||||
.expectComplete()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -364,11 +364,12 @@ import pekko.util.ccompat.JavaConverters._
|
|||
nextElementValue = elem
|
||||
}
|
||||
} else {
|
||||
if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams)
|
||||
throw tooManySubstreamsOpenException
|
||||
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
|
||||
if (closedSubstreams.contains(key)) {
|
||||
// If the sub stream is already closed, we just skip the current element and pull the next element.
|
||||
pull(in)
|
||||
else runSubstream(key, elem)
|
||||
} else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) {
|
||||
throw tooManySubstreamsOpenException
|
||||
} else runSubstream(key, elem)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) =>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue