From 35e53f570c1c152b0a128f874f96f3d0b6331eac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Wed, 25 Apr 2018 19:26:08 +0200 Subject: [PATCH] Allow `groupBy` to recreate already closed substreams #24758 `groupBy`'s default behavior is to filter elements, that would go to a substream that is already closed. This allows `groupBy` to recreate those already closed streams, if wanted, and run them as if they would've never run before in the first place. Overload instead of default parameter Make set of closed keys unmodifiable if not needed Adjust documentation, adding a warning on memory consumption. Add MiMa exclude. --- .../operators/Source-or-Flow/groupBy.md | 16 ++++++++- .../main/paradox/stream/stream-substream.md | 2 ++ .../stream/scaladsl/FlowGroupBySpec.scala | 28 ++++++++++++++++ .../mima-filters/2.5.12.backwards.excludes | 4 +++ .../stream/impl/fusing/StreamOfStreams.scala | 14 +++++--- .../main/scala/akka/stream/javadsl/Flow.scala | 30 ++++++++++++++++- .../scala/akka/stream/scaladsl/Flow.scala | 33 +++++++++++++++++-- 7 files changed, 117 insertions(+), 10 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md index 73c6197ee9..aa822853ab 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md @@ -14,7 +14,21 @@ Demultiplex the incoming stream into separate output streams. ## Description -Demultiplex the incoming stream into separate output streams. +This operation demultiplexes the incoming stream into separate output streams, one for each element key. The +key is computed for each element using the given function. When a new key is encountered for the first time +a new substream is opened and subsequently fed with all elements belonging to that key. + +Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming +elements are subject to race-conditions. If elements arrive for a stream that is in the process +of closing these elements might get lost. + +@@@ warning + +If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all +keys of streams that have already been closed. If you expect an infinite number of keys this can cause +memory issues. Elements belonging to those keys are drained directly and not send to the substream. + +@@@ @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/stream-substream.md b/akka-docs/src/main/paradox/stream/stream-substream.md index 6914c16857..76b8df9546 100644 --- a/akka-docs/src/main/paradox/stream/stream-substream.md +++ b/akka-docs/src/main/paradox/stream/stream-substream.md @@ -26,6 +26,8 @@ This operation splits the incoming stream into separate output streams, one for each element key. The key is computed for each element using the given function, which is `f` in the above diagram. When a new key is encountered for the first time a new substream is opened and subsequently fed with all elements belonging to that key. +If `allowClosedSubstreamRecreation` is set to `true` a substream belonging to a specific key +will be recreated if it was closed before, otherwise elements belonging to that key will be dropped. If you add a `Sink` or `Flow` right after the `groupBy` stage, all transformations are applied to all encountered substreams in the same fashion. diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index bdad7e6606..406790f93f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -459,6 +459,34 @@ class FlowGroupBySpec extends StreamSpec { upstream.sendComplete() } + "allow to recreate an already closed substream (#24758)" in assertAllStagesStopped { + val (up, down) = Flow[Int] + .groupBy(2, identity, true) + .take(1) // close the substream after 1 element + .mergeSubstreams + .runWith(TestSource.probe[Int], TestSink.probe) + + down.request(4) + + // Creates and closes substream "1" + up.sendNext(1) + down.expectNext(1) + + // Creates and closes substream "2" + up.sendNext(2) + down.expectNext(2) + + // Recreates and closes substream "1" twice + up.sendNext(1) + down.expectNext(1) + up.sendNext(1) + down.expectNext(1) + + // Cleanup, not part of the actual test + up.sendComplete() + down.expectComplete() + } + "cancel if downstream has cancelled & all substreams cancel" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]() diff --git a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes index c105fe6a6b..2f2a497a45 100644 --- a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes @@ -1,2 +1,6 @@ # +str add in-line inspect operator for side effecting #24610 ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap") + +# #24758 recreate already closed substreams +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 4fad974fb0..d2c3104afc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -4,6 +4,7 @@ package akka.stream.impl.fusing +import java.util.Collections import java.util.concurrent.atomic.AtomicReference import akka.NotUsed @@ -214,7 +215,7 @@ import scala.collection.JavaConverters._ /** * INTERNAL API */ -@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K, val allowClosedSubstreamRecreation: Boolean = false) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("GroupBy.in") val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) @@ -224,7 +225,7 @@ import scala.collection.JavaConverters._ parent ⇒ lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]() - private val closedSubstreams = new java.util.HashSet[Any]() + private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]() private var timeout: FiniteDuration = _ private var substreamWaitingToBePushed: Option[SubstreamSource] = None private var nextElementKey: K = null.asInstanceOf[K] @@ -334,8 +335,9 @@ import scala.collection.JavaConverters._ override protected def onTimer(timerKey: Any): Unit = { val substreamSource = activeSubstreamsMap.get(timerKey) if (substreamSource != null) { - substreamSource.timeout(timeout) - closedSubstreams.add(timerKey) + if (!allowClosedSubstreamRecreation) { + closedSubstreams.add(timerKey) + } activeSubstreamsMap.remove(timerKey) if (isClosed(in)) tryCompleteAll() } @@ -349,7 +351,9 @@ import scala.collection.JavaConverters._ private def completeSubStream(): Unit = { complete() activeSubstreamsMap.remove(key) - closedSubstreams.add(key) + if (!allowClosedSubstreamRecreation) { + closedSubstreams.add(key) + } } private def tryCompleteHandler(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 68769b5e6a..a4f6047f6a 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1711,6 +1711,15 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * a new substream is opened and subsequently fed with all elements belonging to * that key. * + * WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage + * keeps track of all keys of streams that have already been closed. If you expect an infinite + * number of keys this can cause memory issues. Elements belonging to those keys are drained + * directly and not send to the substream. + * + * Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming + * elements are subject to race-conditions. If elements arrive for a stream that is in the process + * of closing these elements might get lost. + * * The object returned from this method is not a normal [[Flow]], * it is a [[SubFlow]]. This means that after this combinator all transformations * are applied to all encountered substreams in the same fashion. Substream mode @@ -1744,9 +1753,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * @param maxSubstreams configures the maximum number of substreams (keys) * that are supported; if more distinct keys are encountered then the stream fails + * @param f computes the key for each element + * @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their + * corresponding keys arrive after completion + */ + def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K], allowClosedSubstreamRecreation: Boolean): SubFlow[In, Out, Mat] = + new SubFlow(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation)) + + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * a new substream is opened and subsequently fed with all elements belonging to + * that key. + * + * WARNING: The stage keeps track of all keys of streams that have already been closed. + * If you expect an infinite number of keys this can cause memory issues. Elements belonging + * to those keys are drained directly and not send to the substream. + * + * @see [[#groupBy]] */ def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] = - new SubFlow(delegate.groupBy(maxSubstreams, f.apply)) + new SubFlow(delegate.groupBy(maxSubstreams, f.apply, false)) /** * This operation applies the given predicate to all incoming elements and diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 5fabcd39d4..2d33d42732 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1768,6 +1768,15 @@ trait FlowOps[+Out, +Mat] { * a new substream is opened and subsequently fed with all elements belonging to * that key. * + * WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage + * keeps track of all keys of streams that have already been closed. If you expect an infinite + * number of keys this can cause memory issues. Elements belonging to those keys are drained + * directly and not send to the substream. + * + * Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming + * elements are subject to race-conditions. If elements arrive for a stream that is in the process + * of closing these elements might get lost. + * * The object returned from this method is not a normal [[Source]] or [[Flow]], * it is a [[SubFlow]]. This means that after this combinator all transformations * are applied to all encountered substreams in the same fashion. Substream mode @@ -1803,20 +1812,38 @@ trait FlowOps[+Out, +Mat] { * * @param maxSubstreams configures the maximum number of substreams (keys) * that are supported; if more distinct keys are encountered then the stream fails + * @param f computes the key for each element + * @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their + * corresponding keys arrive after completion */ - def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = { + def groupBy[K](maxSubstreams: Int, f: Out ⇒ K, allowClosedSubstreamRecreation: Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - via(new GroupBy(maxSubstreams, f)) + via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - via(new GroupBy(maxSubstreams, f)) + via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation)) .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) new SubFlowImpl(Flow[Out], merge, finish) } + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * a new substream is opened and subsequently fed with all elements belonging to + * that key. + * + * WARNING: The stage keeps track of all keys of streams that have already been closed. + * If you expect an infinite number of keys this can cause memory issues. Elements belonging + * to those keys are drained directly and not send to the substream. + * + * @see [[#groupBy]] + */ + def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = groupBy(maxSubstreams, f, false) + /** * This operation applies the given predicate to all incoming elements and * emits them to a stream of output streams, always beginning a new one with