diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index b67aafb7fb..20b2ca5620 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -336,6 +336,18 @@ class FlowGroupBySpec extends StreamSpec { s1.expectError(ex) } + "resume when exceeding maxSubstreams" in { + val (up, down) = Flow[Int] + .groupBy(0, identity).mergeSubstreams + .withAttributes(ActorAttributes.supervisionStrategy(resumingDecider)) + .runWith(TestSource.probe[Int], TestSink.probe) + + down.request(1) + + up.sendNext(1) + down.expectNoMessage(1.second) + } + "emit subscribe before completed" in assertAllStagesStopped { val futureGroupSource = Source.single(0) diff --git a/akka-stream/src/main/scala/akka/stream/TooManySubstreamsOpenException.scala b/akka-stream/src/main/scala/akka/stream/TooManySubstreamsOpenException.scala new file mode 100644 index 0000000000..6b19707da5 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/TooManySubstreamsOpenException.scala @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream + +import scala.util.control.NoStackTrace + +/** + * This exception signals that the maximum number of substreams declared has been exceeded. + * A finite limit is imposed so that memory usage is controlled. + */ +final class TooManySubstreamsOpenException + extends IllegalStateException("Cannot open a new substream as there are too many substreams open") + with NoStackTrace { +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index d2c3104afc..8b756fd0c8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -223,6 +223,7 @@ import scala.collection.JavaConverters._ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler { parent ⇒ + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]() private val closedSubstreams = if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any]) else new java.util.HashSet[Any]() @@ -234,6 +235,8 @@ import scala.collection.JavaConverters._ private val substreamsJustStared = new java.util.HashSet[Any]() private var firstPushCounter: Int = 0 + private val tooManySubstreamsOpenException = new TooManySubstreamsOpenException + private def nextId(): Long = { _nextId += 1; _nextId } private def hasNextElement = nextElementKey != null @@ -304,7 +307,7 @@ import scala.collection.JavaConverters._ } } else { if (activeSubstreamsMap.size == maxSubstreams) - fail(new IllegalStateException(s"Cannot open substream for key '$key': too many substreams open")) + throw tooManySubstreamsOpenException else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) pull(in) else runSubstream(key, elem)