diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md index 73c6197ee9..aa822853ab 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupBy.md @@ -14,7 +14,21 @@ Demultiplex the incoming stream into separate output streams. ## Description -Demultiplex the incoming stream into separate output streams. +This operation demultiplexes the incoming stream into separate output streams, one for each element key. The +key is computed for each element using the given function. When a new key is encountered for the first time +a new substream is opened and subsequently fed with all elements belonging to that key. + +Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming +elements are subject to race-conditions. If elements arrive for a stream that is in the process +of closing these elements might get lost. + +@@@ warning + +If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all +keys of streams that have already been closed. If you expect an infinite number of keys this can cause +memory issues. Elements belonging to those keys are drained directly and not send to the substream. + +@@@ @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/stream-substream.md b/akka-docs/src/main/paradox/stream/stream-substream.md index 6914c16857..76b8df9546 100644 --- a/akka-docs/src/main/paradox/stream/stream-substream.md +++ b/akka-docs/src/main/paradox/stream/stream-substream.md @@ -26,6 +26,8 @@ This operation splits the incoming stream into separate output streams, one for each element key. The key is computed for each element using the given function, which is `f` in the above diagram. When a new key is encountered for the first time a new substream is opened and subsequently fed with all elements belonging to that key. +If `allowClosedSubstreamRecreation` is set to `true` a substream belonging to a specific key +will be recreated if it was closed before, otherwise elements belonging to that key will be dropped. If you add a `Sink` or `Flow` right after the `groupBy` stage, all transformations are applied to all encountered substreams in the same fashion. diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index bdad7e6606..406790f93f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -459,6 +459,34 @@ class FlowGroupBySpec extends StreamSpec { upstream.sendComplete() } + "allow to recreate an already closed substream (#24758)" in assertAllStagesStopped { + val (up, down) = Flow[Int] + .groupBy(2, identity, true) + .take(1) // close the substream after 1 element + .mergeSubstreams + .runWith(TestSource.probe[Int], TestSink.probe) + + down.request(4) + + // Creates and closes substream "1" + up.sendNext(1) + down.expectNext(1) + + // Creates and closes substream "2" + up.sendNext(2) + down.expectNext(2) + + // Recreates and closes substream "1" twice + up.sendNext(1) + down.expectNext(1) + up.sendNext(1) + down.expectNext(1) + + // Cleanup, not part of the actual test + up.sendComplete() + down.expectComplete() + } + "cancel if downstream has cancelled & all substreams cancel" in assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]() diff --git a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes index c105fe6a6b..2f2a497a45 100644 --- a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes @@ -1,2 +1,6 @@ # +str add in-line inspect operator for side effecting #24610 ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap") + +# #24758 recreate already closed substreams +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 4fad974fb0..d2c3104afc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -4,6 +4,7 @@ package akka.stream.impl.fusing +import java.util.Collections import java.util.concurrent.atomic.AtomicReference import akka.NotUsed @@ -214,7 +215,7 @@ import scala.collection.JavaConverters._ /** * INTERNAL API */ -@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K, val allowClosedSubstreamRecreation: Boolean = false) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("GroupBy.in") val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) @@ -224,7 +225,7 @@ import scala.collection.JavaConverters._ parent ⇒ lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]() - private val closedSubstreams = new java.util.HashSet[Any]() + private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]() private var timeout: FiniteDuration = _ private var substreamWaitingToBePushed: Option[SubstreamSource] = None private var nextElementKey: K = null.asInstanceOf[K] @@ -334,8 +335,9 @@ import scala.collection.JavaConverters._ override protected def onTimer(timerKey: Any): Unit = { val substreamSource = activeSubstreamsMap.get(timerKey) if (substreamSource != null) { - substreamSource.timeout(timeout) - closedSubstreams.add(timerKey) + if (!allowClosedSubstreamRecreation) { + closedSubstreams.add(timerKey) + } activeSubstreamsMap.remove(timerKey) if (isClosed(in)) tryCompleteAll() } @@ -349,7 +351,9 @@ import scala.collection.JavaConverters._ private def completeSubStream(): Unit = { complete() activeSubstreamsMap.remove(key) - closedSubstreams.add(key) + if (!allowClosedSubstreamRecreation) { + closedSubstreams.add(key) + } } private def tryCompleteHandler(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 68769b5e6a..a4f6047f6a 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1711,6 +1711,15 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * a new substream is opened and subsequently fed with all elements belonging to * that key. * + * WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage + * keeps track of all keys of streams that have already been closed. If you expect an infinite + * number of keys this can cause memory issues. Elements belonging to those keys are drained + * directly and not send to the substream. + * + * Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming + * elements are subject to race-conditions. If elements arrive for a stream that is in the process + * of closing these elements might get lost. + * * The object returned from this method is not a normal [[Flow]], * it is a [[SubFlow]]. This means that after this combinator all transformations * are applied to all encountered substreams in the same fashion. Substream mode @@ -1744,9 +1753,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * @param maxSubstreams configures the maximum number of substreams (keys) * that are supported; if more distinct keys are encountered then the stream fails + * @param f computes the key for each element + * @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their + * corresponding keys arrive after completion + */ + def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K], allowClosedSubstreamRecreation: Boolean): SubFlow[In, Out, Mat] = + new SubFlow(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation)) + + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * a new substream is opened and subsequently fed with all elements belonging to + * that key. + * + * WARNING: The stage keeps track of all keys of streams that have already been closed. + * If you expect an infinite number of keys this can cause memory issues. Elements belonging + * to those keys are drained directly and not send to the substream. + * + * @see [[#groupBy]] */ def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] = - new SubFlow(delegate.groupBy(maxSubstreams, f.apply)) + new SubFlow(delegate.groupBy(maxSubstreams, f.apply, false)) /** * This operation applies the given predicate to all incoming elements and diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 5fabcd39d4..2d33d42732 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1768,6 +1768,15 @@ trait FlowOps[+Out, +Mat] { * a new substream is opened and subsequently fed with all elements belonging to * that key. * + * WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage + * keeps track of all keys of streams that have already been closed. If you expect an infinite + * number of keys this can cause memory issues. Elements belonging to those keys are drained + * directly and not send to the substream. + * + * Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming + * elements are subject to race-conditions. If elements arrive for a stream that is in the process + * of closing these elements might get lost. + * * The object returned from this method is not a normal [[Source]] or [[Flow]], * it is a [[SubFlow]]. This means that after this combinator all transformations * are applied to all encountered substreams in the same fashion. Substream mode @@ -1803,20 +1812,38 @@ trait FlowOps[+Out, +Mat] { * * @param maxSubstreams configures the maximum number of substreams (keys) * that are supported; if more distinct keys are encountered then the stream fails + * @param f computes the key for each element + * @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their + * corresponding keys arrive after completion */ - def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = { + def groupBy[K](maxSubstreams: Int, f: Out ⇒ K, allowClosedSubstreamRecreation: Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - via(new GroupBy(maxSubstreams, f)) + via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - via(new GroupBy(maxSubstreams, f)) + via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation)) .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) new SubFlowImpl(Flow[Out], merge, finish) } + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * a new substream is opened and subsequently fed with all elements belonging to + * that key. + * + * WARNING: The stage keeps track of all keys of streams that have already been closed. + * If you expect an infinite number of keys this can cause memory issues. Elements belonging + * to those keys are drained directly and not send to the substream. + * + * @see [[#groupBy]] + */ + def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = groupBy(maxSubstreams, f, false) + /** * This operation applies the given predicate to all incoming elements and * emits them to a stream of output streams, always beginning a new one with