Merge pull request #24988 from huntc/groupby-decider
Akka stream groupBy does not invoke decider
This commit is contained in:
commit
34bb7fee3c
3 changed files with 32 additions and 1 deletions
|
|
@ -336,6 +336,18 @@ class FlowGroupBySpec extends StreamSpec {
|
||||||
s1.expectError(ex)
|
s1.expectError(ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"resume when exceeding maxSubstreams" in {
|
||||||
|
val (up, down) = Flow[Int]
|
||||||
|
.groupBy(0, identity).mergeSubstreams
|
||||||
|
.withAttributes(ActorAttributes.supervisionStrategy(resumingDecider))
|
||||||
|
.runWith(TestSource.probe[Int], TestSink.probe)
|
||||||
|
|
||||||
|
down.request(1)
|
||||||
|
|
||||||
|
up.sendNext(1)
|
||||||
|
down.expectNoMessage(1.second)
|
||||||
|
}
|
||||||
|
|
||||||
"emit subscribe before completed" in assertAllStagesStopped {
|
"emit subscribe before completed" in assertAllStagesStopped {
|
||||||
val futureGroupSource =
|
val futureGroupSource =
|
||||||
Source.single(0)
|
Source.single(0)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream
|
||||||
|
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception signals that the maximum number of substreams declared has been exceeded.
|
||||||
|
* A finite limit is imposed so that memory usage is controlled.
|
||||||
|
*/
|
||||||
|
final class TooManySubstreamsOpenException
|
||||||
|
extends IllegalStateException("Cannot open a new substream as there are too many substreams open")
|
||||||
|
with NoStackTrace {
|
||||||
|
}
|
||||||
|
|
@ -223,6 +223,7 @@ import scala.collection.JavaConverters._
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler {
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler {
|
||||||
parent ⇒
|
parent ⇒
|
||||||
|
|
||||||
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||||
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
|
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
|
||||||
private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]()
|
private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]()
|
||||||
|
|
@ -234,6 +235,8 @@ import scala.collection.JavaConverters._
|
||||||
private val substreamsJustStared = new java.util.HashSet[Any]()
|
private val substreamsJustStared = new java.util.HashSet[Any]()
|
||||||
private var firstPushCounter: Int = 0
|
private var firstPushCounter: Int = 0
|
||||||
|
|
||||||
|
private val tooManySubstreamsOpenException = new TooManySubstreamsOpenException
|
||||||
|
|
||||||
private def nextId(): Long = { _nextId += 1; _nextId }
|
private def nextId(): Long = { _nextId += 1; _nextId }
|
||||||
|
|
||||||
private def hasNextElement = nextElementKey != null
|
private def hasNextElement = nextElementKey != null
|
||||||
|
|
@ -304,7 +307,7 @@ import scala.collection.JavaConverters._
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (activeSubstreamsMap.size == maxSubstreams)
|
if (activeSubstreamsMap.size == maxSubstreams)
|
||||||
fail(new IllegalStateException(s"Cannot open substream for key '$key': too many substreams open"))
|
throw tooManySubstreamsOpenException
|
||||||
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
|
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
|
||||||
pull(in)
|
pull(in)
|
||||||
else runSubstream(key, elem)
|
else runSubstream(key, elem)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue