diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 36315344f6..ae0fde4516 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -149,7 +149,7 @@ class FlowGroupBySpec extends StreamSpec { } "accept cancellation of substreams" in assertAllStagesStopped { - new SubstreamsSupport(groupCount = 2) { + new SubstreamsSupport(groupCount = 2, maxSubstreams = 3) { StreamPuppet(getSubFlow(1).runWith(Sink.asPublisher(false))).cancel() val substream = StreamPuppet(getSubFlow(0).runWith(Sink.asPublisher(false))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 7489d0d7a4..e4e412318d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -334,7 +334,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource nextElementValue = elem } } else { - if (activeSubstreamsMap.size == maxSubstreams) + if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) throw tooManySubstreamsOpenException else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) pull(in)