From b4ff0fcabf20d244ae0ffd48974fe90c21eb779c Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Wed, 20 Mar 2024 11:48:39 +0800 Subject: [PATCH] chore: Pull instead of throw exception in groupBy operator. (#1210) When a sub stream is already closed, but the main stream is still continue, instead of throw a TooManySubStreamOpenException, just drop the current element and continue pulling. --- .../pekko/stream/scaladsl/FlowGroupBySpec.scala | 13 +++++++++++++ .../pekko/stream/impl/fusing/StreamOfStreams.scala | 9 +++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala index 2ae158cb8a..356ed65cc5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala @@ -694,6 +694,19 @@ class FlowGroupBySpec extends StreamSpec(""" queue.complete() } + "should not fail when one sub stream completed" in { + Source(1 to 10) + .groupBy(2, _ % 2, allowClosedSubstreamRecreation = false) + .take(2) + .mergeSubstreams + .fold(Set.empty[Int])((set, elem) => set + elem) + .runWith(TestSink()) + .ensureSubscription() + .request(10) + .expectNext(Set(1, 2, 3, 4)) + .expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index 7b0e37e942..dc84291b2e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -364,11 +364,12 @@ import pekko.util.ccompat.JavaConverters._ nextElementValue = elem } } else { - if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) - throw tooManySubstreamsOpenException - else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) + if (closedSubstreams.contains(key)) { + // If the sub stream is already closed, we just skip the current element and pull the next element. pull(in) - else runSubstream(key, elem) + } else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) { + throw tooManySubstreamsOpenException + } else runSubstream(key, elem) } } catch { case NonFatal(ex) =>