diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala index 2ae158cb8a..356ed65cc5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala @@ -694,6 +694,19 @@ class FlowGroupBySpec extends StreamSpec(""" queue.complete() } + "should not fail when one sub stream completed" in { + Source(1 to 10) + .groupBy(2, _ % 2, allowClosedSubstreamRecreation = false) + .take(2) + .mergeSubstreams + .fold(Set.empty[Int])((set, elem) => set + elem) + .runWith(TestSink()) + .ensureSubscription() + .request(10) + .expectNext(Set(1, 2, 3, 4)) + .expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index 7b0e37e942..dc84291b2e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -364,11 +364,12 @@ import pekko.util.ccompat.JavaConverters._ nextElementValue = elem } } else { - if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) - throw tooManySubstreamsOpenException - else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) + if (closedSubstreams.contains(key)) { + // If the sub stream is already closed, we just skip the current element and pull the next element. pull(in) - else runSubstream(key, elem) + } else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) { + throw tooManySubstreamsOpenException + } else runSubstream(key, elem) } } catch { case NonFatal(ex) =>