From 86884133ed2bd4e759f8b5a886c7d2c285f6837c Mon Sep 17 00:00:00 2001 From: Christopher Hunt Date: Fri, 9 Nov 2018 18:28:44 +0100 Subject: [PATCH] Avoid memory being retained for groupBy #24758 This commit avoids memory being retained for groupBy. Prior to the commit, closedSubstreams could grow unbounded. This commit includes the size of closedSubstreams when considering to take on a new substream, while retaining the semantics described by its Flow API. --- .../src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala | 2 +- .../main/scala/akka/stream/impl/fusing/StreamOfStreams.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 36315344f6..ae0fde4516 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -149,7 +149,7 @@ class FlowGroupBySpec extends StreamSpec { } "accept cancellation of substreams" in assertAllStagesStopped { - new SubstreamsSupport(groupCount = 2) { + new SubstreamsSupport(groupCount = 2, maxSubstreams = 3) { StreamPuppet(getSubFlow(1).runWith(Sink.asPublisher(false))).cancel() val substream = StreamPuppet(getSubFlow(0).runWith(Sink.asPublisher(false))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 7489d0d7a4..e4e412318d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -334,7 +334,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource nextElementValue = elem } } else { - if (activeSubstreamsMap.size == maxSubstreams) + if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) throw tooManySubstreamsOpenException else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) pull(in)