Merge pull request #24984 from markusthoemmes/allow-groupby-substream-recreate
Allow `groupBy` to recreate already closed substreams #24758
This commit is contained in:
commit
d08f31bcdb
7 changed files with 117 additions and 10 deletions
|
|
@ -14,7 +14,21 @@ Demultiplex the incoming stream into separate output streams.
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Demultiplex the incoming stream into separate output streams.
|
This operation demultiplexes the incoming stream into separate output streams, one for each element key. The
|
||||||
|
key is computed for each element using the given function. When a new key is encountered for the first time
|
||||||
|
a new substream is opened and subsequently fed with all elements belonging to that key.
|
||||||
|
|
||||||
|
Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
|
||||||
|
elements are subject to race-conditions. If elements arrive for a stream that is in the process
|
||||||
|
of closing these elements might get lost.
|
||||||
|
|
||||||
|
@@@ warning
|
||||||
|
|
||||||
|
If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all
|
||||||
|
keys of streams that have already been closed. If you expect an infinite number of keys this can cause
|
||||||
|
memory issues. Elements belonging to those keys are drained directly and not send to the substream.
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,8 @@ This operation splits the incoming stream into separate output
|
||||||
streams, one for each element key. The key is computed for each element
|
streams, one for each element key. The key is computed for each element
|
||||||
using the given function, which is `f` in the above diagram. When a new key is encountered for the first time
|
using the given function, which is `f` in the above diagram. When a new key is encountered for the first time
|
||||||
a new substream is opened and subsequently fed with all elements belonging to that key.
|
a new substream is opened and subsequently fed with all elements belonging to that key.
|
||||||
|
If `allowClosedSubstreamRecreation` is set to `true` a substream belonging to a specific key
|
||||||
|
will be recreated if it was closed before, otherwise elements belonging to that key will be dropped.
|
||||||
|
|
||||||
If you add a `Sink` or `Flow` right after the `groupBy` stage,
|
If you add a `Sink` or `Flow` right after the `groupBy` stage,
|
||||||
all transformations are applied to all encountered substreams in the same fashion.
|
all transformations are applied to all encountered substreams in the same fashion.
|
||||||
|
|
|
||||||
|
|
@ -459,6 +459,34 @@ class FlowGroupBySpec extends StreamSpec {
|
||||||
upstream.sendComplete()
|
upstream.sendComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"allow to recreate an already closed substream (#24758)" in assertAllStagesStopped {
|
||||||
|
val (up, down) = Flow[Int]
|
||||||
|
.groupBy(2, identity, true)
|
||||||
|
.take(1) // close the substream after 1 element
|
||||||
|
.mergeSubstreams
|
||||||
|
.runWith(TestSource.probe[Int], TestSink.probe)
|
||||||
|
|
||||||
|
down.request(4)
|
||||||
|
|
||||||
|
// Creates and closes substream "1"
|
||||||
|
up.sendNext(1)
|
||||||
|
down.expectNext(1)
|
||||||
|
|
||||||
|
// Creates and closes substream "2"
|
||||||
|
up.sendNext(2)
|
||||||
|
down.expectNext(2)
|
||||||
|
|
||||||
|
// Recreates and closes substream "1" twice
|
||||||
|
up.sendNext(1)
|
||||||
|
down.expectNext(1)
|
||||||
|
up.sendNext(1)
|
||||||
|
down.expectNext(1)
|
||||||
|
|
||||||
|
// Cleanup, not part of the actual test
|
||||||
|
up.sendComplete()
|
||||||
|
down.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
"cancel if downstream has cancelled & all substreams cancel" in assertAllStagesStopped {
|
"cancel if downstream has cancelled & all substreams cancel" in assertAllStagesStopped {
|
||||||
val upstream = TestPublisher.probe[Int]()
|
val upstream = TestPublisher.probe[Int]()
|
||||||
val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]()
|
val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]()
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,6 @@
|
||||||
# +str add in-line inspect operator for side effecting #24610
|
# +str add in-line inspect operator for side effecting #24610
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
|
||||||
|
|
||||||
|
# #24758 recreate already closed substreams
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this")
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy")
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
package akka.stream.impl.fusing
|
package akka.stream.impl.fusing
|
||||||
|
|
||||||
|
import java.util.Collections
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
|
@ -214,7 +215,7 @@ import scala.collection.JavaConverters._
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
|
@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K, val allowClosedSubstreamRecreation: Boolean = false) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
|
||||||
val in: Inlet[T] = Inlet("GroupBy.in")
|
val in: Inlet[T] = Inlet("GroupBy.in")
|
||||||
val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
|
val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
|
||||||
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
|
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
|
||||||
|
|
@ -224,7 +225,7 @@ import scala.collection.JavaConverters._
|
||||||
parent ⇒
|
parent ⇒
|
||||||
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||||
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
|
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
|
||||||
private val closedSubstreams = new java.util.HashSet[Any]()
|
private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]()
|
||||||
private var timeout: FiniteDuration = _
|
private var timeout: FiniteDuration = _
|
||||||
private var substreamWaitingToBePushed: Option[SubstreamSource] = None
|
private var substreamWaitingToBePushed: Option[SubstreamSource] = None
|
||||||
private var nextElementKey: K = null.asInstanceOf[K]
|
private var nextElementKey: K = null.asInstanceOf[K]
|
||||||
|
|
@ -334,8 +335,9 @@ import scala.collection.JavaConverters._
|
||||||
override protected def onTimer(timerKey: Any): Unit = {
|
override protected def onTimer(timerKey: Any): Unit = {
|
||||||
val substreamSource = activeSubstreamsMap.get(timerKey)
|
val substreamSource = activeSubstreamsMap.get(timerKey)
|
||||||
if (substreamSource != null) {
|
if (substreamSource != null) {
|
||||||
substreamSource.timeout(timeout)
|
if (!allowClosedSubstreamRecreation) {
|
||||||
closedSubstreams.add(timerKey)
|
closedSubstreams.add(timerKey)
|
||||||
|
}
|
||||||
activeSubstreamsMap.remove(timerKey)
|
activeSubstreamsMap.remove(timerKey)
|
||||||
if (isClosed(in)) tryCompleteAll()
|
if (isClosed(in)) tryCompleteAll()
|
||||||
}
|
}
|
||||||
|
|
@ -349,8 +351,10 @@ import scala.collection.JavaConverters._
|
||||||
private def completeSubStream(): Unit = {
|
private def completeSubStream(): Unit = {
|
||||||
complete()
|
complete()
|
||||||
activeSubstreamsMap.remove(key)
|
activeSubstreamsMap.remove(key)
|
||||||
|
if (!allowClosedSubstreamRecreation) {
|
||||||
closedSubstreams.add(key)
|
closedSubstreams.add(key)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private def tryCompleteHandler(): Unit = {
|
private def tryCompleteHandler(): Unit = {
|
||||||
if (parent.isClosed(in) && !hasNextForSubSource) {
|
if (parent.isClosed(in) && !hasNextForSubSource) {
|
||||||
|
|
|
||||||
|
|
@ -1711,6 +1711,15 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
* a new substream is opened and subsequently fed with all elements belonging to
|
* a new substream is opened and subsequently fed with all elements belonging to
|
||||||
* that key.
|
* that key.
|
||||||
*
|
*
|
||||||
|
* WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage
|
||||||
|
* keeps track of all keys of streams that have already been closed. If you expect an infinite
|
||||||
|
* number of keys this can cause memory issues. Elements belonging to those keys are drained
|
||||||
|
* directly and not send to the substream.
|
||||||
|
*
|
||||||
|
* Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
|
||||||
|
* elements are subject to race-conditions. If elements arrive for a stream that is in the process
|
||||||
|
* of closing these elements might get lost.
|
||||||
|
*
|
||||||
* The object returned from this method is not a normal [[Flow]],
|
* The object returned from this method is not a normal [[Flow]],
|
||||||
* it is a [[SubFlow]]. This means that after this combinator all transformations
|
* it is a [[SubFlow]]. This means that after this combinator all transformations
|
||||||
* are applied to all encountered substreams in the same fashion. Substream mode
|
* are applied to all encountered substreams in the same fashion. Substream mode
|
||||||
|
|
@ -1744,9 +1753,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
*
|
*
|
||||||
* @param maxSubstreams configures the maximum number of substreams (keys)
|
* @param maxSubstreams configures the maximum number of substreams (keys)
|
||||||
* that are supported; if more distinct keys are encountered then the stream fails
|
* that are supported; if more distinct keys are encountered then the stream fails
|
||||||
|
* @param f computes the key for each element
|
||||||
|
* @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their
|
||||||
|
* corresponding keys arrive after completion
|
||||||
|
*/
|
||||||
|
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K], allowClosedSubstreamRecreation: Boolean): SubFlow[In, Out, Mat] =
|
||||||
|
new SubFlow(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This operation demultiplexes the incoming stream into separate output
|
||||||
|
* streams, one for each element key. The key is computed for each element
|
||||||
|
* using the given function. When a new key is encountered for the first time
|
||||||
|
* a new substream is opened and subsequently fed with all elements belonging to
|
||||||
|
* that key.
|
||||||
|
*
|
||||||
|
* WARNING: The stage keeps track of all keys of streams that have already been closed.
|
||||||
|
* If you expect an infinite number of keys this can cause memory issues. Elements belonging
|
||||||
|
* to those keys are drained directly and not send to the substream.
|
||||||
|
*
|
||||||
|
* @see [[#groupBy]]
|
||||||
*/
|
*/
|
||||||
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] =
|
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] =
|
||||||
new SubFlow(delegate.groupBy(maxSubstreams, f.apply))
|
new SubFlow(delegate.groupBy(maxSubstreams, f.apply, false))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This operation applies the given predicate to all incoming elements and
|
* This operation applies the given predicate to all incoming elements and
|
||||||
|
|
|
||||||
|
|
@ -1768,6 +1768,15 @@ trait FlowOps[+Out, +Mat] {
|
||||||
* a new substream is opened and subsequently fed with all elements belonging to
|
* a new substream is opened and subsequently fed with all elements belonging to
|
||||||
* that key.
|
* that key.
|
||||||
*
|
*
|
||||||
|
* WARNING: If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage
|
||||||
|
* keeps track of all keys of streams that have already been closed. If you expect an infinite
|
||||||
|
* number of keys this can cause memory issues. Elements belonging to those keys are drained
|
||||||
|
* directly and not send to the substream.
|
||||||
|
*
|
||||||
|
* Note: If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming
|
||||||
|
* elements are subject to race-conditions. If elements arrive for a stream that is in the process
|
||||||
|
* of closing these elements might get lost.
|
||||||
|
*
|
||||||
* The object returned from this method is not a normal [[Source]] or [[Flow]],
|
* The object returned from this method is not a normal [[Source]] or [[Flow]],
|
||||||
* it is a [[SubFlow]]. This means that after this combinator all transformations
|
* it is a [[SubFlow]]. This means that after this combinator all transformations
|
||||||
* are applied to all encountered substreams in the same fashion. Substream mode
|
* are applied to all encountered substreams in the same fashion. Substream mode
|
||||||
|
|
@ -1803,20 +1812,38 @@ trait FlowOps[+Out, +Mat] {
|
||||||
*
|
*
|
||||||
* @param maxSubstreams configures the maximum number of substreams (keys)
|
* @param maxSubstreams configures the maximum number of substreams (keys)
|
||||||
* that are supported; if more distinct keys are encountered then the stream fails
|
* that are supported; if more distinct keys are encountered then the stream fails
|
||||||
|
* @param f computes the key for each element
|
||||||
|
* @param allowClosedSubstreamRecreation enables recreation of already closed substreams if elements with their
|
||||||
|
* corresponding keys arrive after completion
|
||||||
*/
|
*/
|
||||||
def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = {
|
def groupBy[K](maxSubstreams: Int, f: Out ⇒ K, allowClosedSubstreamRecreation: Boolean): SubFlow[Out, Mat, Repr, Closed] = {
|
||||||
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
|
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
|
||||||
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
|
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
|
||||||
via(new GroupBy(maxSubstreams, f))
|
via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation))
|
||||||
.map(_.via(flow))
|
.map(_.via(flow))
|
||||||
.via(new FlattenMerge(breadth))
|
.via(new FlattenMerge(breadth))
|
||||||
}
|
}
|
||||||
val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒
|
val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒
|
||||||
via(new GroupBy(maxSubstreams, f))
|
via(new GroupBy(maxSubstreams, f, allowClosedSubstreamRecreation))
|
||||||
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
|
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
|
||||||
new SubFlowImpl(Flow[Out], merge, finish)
|
new SubFlowImpl(Flow[Out], merge, finish)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This operation demultiplexes the incoming stream into separate output
|
||||||
|
* streams, one for each element key. The key is computed for each element
|
||||||
|
* using the given function. When a new key is encountered for the first time
|
||||||
|
* a new substream is opened and subsequently fed with all elements belonging to
|
||||||
|
* that key.
|
||||||
|
*
|
||||||
|
* WARNING: The stage keeps track of all keys of streams that have already been closed.
|
||||||
|
* If you expect an infinite number of keys this can cause memory issues. Elements belonging
|
||||||
|
* to those keys are drained directly and not send to the substream.
|
||||||
|
*
|
||||||
|
* @see [[#groupBy]]
|
||||||
|
*/
|
||||||
|
def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = groupBy(maxSubstreams, f, false)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This operation applies the given predicate to all incoming elements and
|
* This operation applies the given predicate to all incoming elements and
|
||||||
* emits them to a stream of output streams, always beginning a new one with
|
* emits them to a stream of output streams, always beginning a new one with
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue