Allow groupBy to recreate already closed substreams #24758

`groupBy`'s default behavior is to filter elements, that would go to a substream
that is already closed. This allows `groupBy` to recreate those already closed streams,
if wanted, and run them as if they would've never run before in the first place.

Overload instead of default parameter

Make set of closed keys unmodifiable if not needed

Adjust documentation, adding a warning on memory consumption.

Add MiMa exclude.
This commit is contained in:
Markus Thömmes 2018-04-25 19:26:08 +02:00 committed by Patrik Nordwall
parent 89b3820673
commit 35e53f570c
7 changed files with 117 additions and 10 deletions

View file

@ -14,7 +14,21 @@ Demultiplex the incoming stream into separate output streams.
## Description
Demultiplex the incoming stream into separate output streams.
This operation demultiplexes the incoming stream into separate output streams, one for each element key. The
key is computed for each element using the given function. When a new key is encountered for the first time
a new substream is opened and subsequently fed with all elements belonging to that key.
Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
elements are subject to race-conditions. If elements arrive for a stream that is in the process
of closing these elements might get lost.
@@@ warning
If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all
keys of streams that have already been closed. If you expect an infinite number of keys this can cause
memory issues. Elements belonging to those keys are drained directly and not send to the substream.
@@@
@@@div { .callout }

View file

@ -26,6 +26,8 @@ This operation splits the incoming stream into separate output
streams, one for each element key. The key is computed for each element
using the given function, which is `f` in the above diagram. When a new key is encountered for the first time
a new substream is opened and subsequently fed with all elements belonging to that key.
If `allowClosedSubstreamRecreation` is set to `true` a substream belonging to a specific key
will be recreated if it was closed before, otherwise elements belonging to that key will be dropped.
If you add a `Sink` or `Flow` right after the `groupBy` stage,
all transformations are applied to all encountered substreams in the same fashion.

View file

@ -459,6 +459,34 @@ class FlowGroupBySpec extends StreamSpec {
upstream.sendComplete()
}
"allow to recreate an already closed substream (#24758)" in assertAllStagesStopped {
val (up, down) = Flow[Int]
.groupBy(2, identity, true)
.take(1) // close the substream after 1 element
.mergeSubstreams
.runWith(TestSource.probe[Int], TestSink.probe)
down.request(4)
// Creates and closes substream "1"
up.sendNext(1)
down.expectNext(1)
// Creates and closes substream "2"
up.sendNext(2)
down.expectNext(2)
// Recreates and closes substream "1" twice
up.sendNext(1)
down.expectNext(1)
up.sendNext(1)
down.expectNext(1)
// Cleanup, not part of the actual test
up.sendComplete()
down.expectComplete()
}
"cancel if downstream has cancelled & all substreams cancel" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]()

View file

@ -1,2 +1,6 @@
# +str add in-line inspect operator for side effecting #24610
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
# #24758 recreate already closed substreams
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy")

View file

@ -4,6 +4,7 @@
package akka.stream.impl.fusing
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import akka.NotUsed
@ -214,7 +215,7 @@ import scala.collection.JavaConverters._
/**
* INTERNAL API
*/
@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T K, val allowClosedSubstreamRecreation: Boolean = false) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("GroupBy.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
@ -224,7 +225,7 @@ import scala.collection.JavaConverters._
parent
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
private val closedSubstreams = new java.util.HashSet[Any]()
private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]()
private var timeout: FiniteDuration = _
private var substreamWaitingToBePushed: Option[SubstreamSource] = None
private var nextElementKey: K = null.asInstanceOf[K]
@ -334,8 +335,9 @@ import scala.collection.JavaConverters._
override protected def onTimer(timerKey: Any): Unit = {
val substreamSource = activeSubstreamsMap.get(timerKey)
if (substreamSource != null) {
substreamSource.timeout(timeout)
closedSubstreams.add(timerKey)
if (!allowClosedSubstreamRecreation) {
closedSubstreams.add(timerKey)
}
activeSubstreamsMap.remove(timerKey)
if (isClosed(in)) tryCompleteAll()
}
@ -349,7 +351,9 @@ import scala.collection.JavaConverters._
private def completeSubStream(): Unit = {
complete()
activeSubstreamsMap.remove(key)
closedSubstreams.add(key)
if (!allowClosedSubstreamRecreation) {
closedSubstreams.add(key)
}
}
private def tryCompleteHandler(): Unit = {

View file

@ -1711,6 +1711,15 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage
* keeps track of all keys of streams that have already been closed. If you expect an infinite
* number of keys this can cause memory issues. Elements belonging to those keys are drained
* directly and not send to the substream.
*
* Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
* elements are subject to race-conditions. If elements arrive for a stream that is in the process
* of closing these elements might get lost.
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubFlow]]. This means that after this combinator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
@ -1744,9 +1753,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* @param maxSubstreams configures the maximum number of substreams (keys)
* that are supported; if more distinct keys are encountered then the stream fails
* @param f computes the key for each element
* @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their
* corresponding keys arrive after completion
*/
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K], allowClosedSubstreamRecreation: Boolean): SubFlow[In, Out, Mat] =
new SubFlow(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation))
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
* using the given function. When a new key is encountered for the first time
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* WARNING: The stage keeps track of all keys of streams that have already been closed.
* If you expect an infinite number of keys this can cause memory issues. Elements belonging
* to those keys are drained directly and not send to the substream.
*
* @see [[#groupBy]]
*/
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.groupBy(maxSubstreams, f.apply))
new SubFlow(delegate.groupBy(maxSubstreams, f.apply, false))
/**
* This operation applies the given predicate to all incoming elements and

View file

@ -1768,6 +1768,15 @@ trait FlowOps[+Out, +Mat] {
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage
* keeps track of all keys of streams that have already been closed. If you expect an infinite
* number of keys this can cause memory issues. Elements belonging to those keys are drained
* directly and not send to the substream.
*
* Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
* elements are subject to race-conditions. If elements arrive for a stream that is in the process
* of closing these elements might get lost.
*
* The object returned from this method is not a normal [[Source]] or [[Flow]],
* it is a [[SubFlow]]. This means that after this combinator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
@ -1803,20 +1812,38 @@ trait FlowOps[+Out, +Mat] {
*
* @param maxSubstreams configures the maximum number of substreams (keys)
* that are supported; if more distinct keys are encountered then the stream fails
* @param f computes the key for each element
* @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their
* corresponding keys arrive after completion
*/
def groupBy[K](maxSubstreams: Int, f: Out K): SubFlow[Out, Mat, Repr, Closed] = {
def groupBy[K](maxSubstreams: Int, f: Out K, allowClosedSubstreamRecreation: Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(new GroupBy(maxSubstreams, f))
via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation))
.map(_.via(flow))
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, NotUsed]) Closed = s
via(new GroupBy(maxSubstreams, f))
via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation))
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
new SubFlowImpl(Flow[Out], merge, finish)
}
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
* using the given function. When a new key is encountered for the first time
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* WARNING: The stage keeps track of all keys of streams that have already been closed.
* If you expect an infinite number of keys this can cause memory issues. Elements belonging
* to those keys are drained directly and not send to the substream.
*
* @see [[#groupBy]]
*/
def groupBy[K](maxSubstreams: Int, f: Out K): SubFlow[Out, Mat, Repr, Closed] = groupBy(maxSubstreams, f, false)
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with