Avoid memory being retained for groupBy #24758
This commit avoids memory being retained for groupBy. Prior to the commit, closedSubstreams could grow unbounded. This commit includes the size of closedSubstreams when considering to take on a new substream, while retaining the semantics described by its Flow API.
This commit is contained in:
parent
e847ce016a
commit
86884133ed
2 changed files with 2 additions and 2 deletions
|
|
@ -149,7 +149,7 @@ class FlowGroupBySpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"accept cancellation of substreams" in assertAllStagesStopped {
|
"accept cancellation of substreams" in assertAllStagesStopped {
|
||||||
new SubstreamsSupport(groupCount = 2) {
|
new SubstreamsSupport(groupCount = 2, maxSubstreams = 3) {
|
||||||
StreamPuppet(getSubFlow(1).runWith(Sink.asPublisher(false))).cancel()
|
StreamPuppet(getSubFlow(1).runWith(Sink.asPublisher(false))).cancel()
|
||||||
|
|
||||||
val substream = StreamPuppet(getSubFlow(0).runWith(Sink.asPublisher(false)))
|
val substream = StreamPuppet(getSubFlow(0).runWith(Sink.asPublisher(false)))
|
||||||
|
|
|
||||||
|
|
@ -334,7 +334,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource
|
||||||
nextElementValue = elem
|
nextElementValue = elem
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (activeSubstreamsMap.size == maxSubstreams)
|
if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams)
|
||||||
throw tooManySubstreamsOpenException
|
throw tooManySubstreamsOpenException
|
||||||
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
|
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
|
||||||
pull(in)
|
pull(in)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue