groupBy does not invoke decider

groupBy does not invoke a decider when the maximum number of substreams has been exceeded. This commit fixes that.
This commit is contained in:
Christopher Hunt 2018-04-26 15:42:46 +10:00
parent e8a955cdef
commit 97297dd2d1
3 changed files with 32 additions and 1 deletions

View file

@ -336,6 +336,18 @@ class FlowGroupBySpec extends StreamSpec {
s1.expectError(ex)
}
"resume when exceeding maxSubstreams" in {
val (up, down) = Flow[Int]
.groupBy(0, identity).mergeSubstreams
.withAttributes(ActorAttributes.supervisionStrategy(resumingDecider))
.runWith(TestSource.probe[Int], TestSink.probe)
down.request(1)
up.sendNext(1)
down.expectNoMessage(1.second)
}
"emit subscribe before completed" in assertAllStagesStopped {
val futureGroupSource =
Source.single(0)

View file

@ -0,0 +1,16 @@
/**
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream
import scala.util.control.NoStackTrace
/**
* This exception signals that the maximum number of substreams declared has been exceeded.
* A finite limit is imposed so that memory usage is controlled.
*/
final class TooManySubstreamsOpenException
extends IllegalStateException("Cannot open a new substream as there are too many substreams open")
with NoStackTrace {
}

View file

@ -223,6 +223,7 @@ import scala.collection.JavaConverters._
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler {
parent
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]()
@ -234,6 +235,8 @@ import scala.collection.JavaConverters._
private val substreamsJustStared = new java.util.HashSet[Any]()
private var firstPushCounter: Int = 0
private val tooManySubstreamsOpenException = new TooManySubstreamsOpenException
private def nextId(): Long = { _nextId += 1; _nextId }
private def hasNextElement = nextElementKey != null
@ -304,7 +307,7 @@ import scala.collection.JavaConverters._
}
} else {
if (activeSubstreamsMap.size == maxSubstreams)
fail(new IllegalStateException(s"Cannot open substream for key '$key': too many substreams open"))
throw tooManySubstreamsOpenException
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
pull(in)
else runSubstream(key, elem)