From 2bf09d8da281d76c5a507da30a5f59b059c9ae27 Mon Sep 17 00:00:00 2001 From: eyal farago Date: Tue, 27 Oct 2020 14:11:23 +0200 Subject: [PATCH] stream: register newshells with no active interpreters (#29731) Fixes #29730 --- .../util/ByteStringInitializationSpec.scala | 7 ++++- .../tcp/ssl/PeerSubjectVerifierSpec.scala | 2 +- .../scaladsl/FlowFlatMapPrefixSpec.scala | 28 +++++++++++++++++++ .../impl/fusing/ActorGraphInterpreter.scala | 16 +++++++---- 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringInitializationSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringInitializationSpec.scala index c47f6516a7..6cadf5a2da 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringInitializationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringInitializationSpec.scala @@ -39,7 +39,12 @@ class ByteStringInitializationSpec extends AnyWordSpec with Matchers { import scala.language.reflectiveCalls type WithRun = { def run(): Unit } - cleanCl.loadClass("akka.util.ByteStringInitTest").newInstance().asInstanceOf[WithRun].run() + cleanCl + .loadClass("akka.util.ByteStringInitTest") + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[WithRun] + .run() } } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/tcp/ssl/PeerSubjectVerifierSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/tcp/ssl/PeerSubjectVerifierSpec.scala index 150b8280d8..ff41938f76 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/tcp/ssl/PeerSubjectVerifierSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/tcp/ssl/PeerSubjectVerifierSpec.scala @@ -52,7 +52,7 @@ class PeerSubjectVerifierSpec extends AnyWordSpec with Matchers { override def removeValue(name: String): Unit = throw new UnsupportedOperationException() override def getValueNames: Array[String] = throw new UnsupportedOperationException() override def getLocalCertificates: Array[Certificate] = throw new UnsupportedOperationException() - override def getPeerCertificateChain: Array[javax.security.cert.X509Certificate] = + override def getPeerCertificateChain /*: Array[javax.security.cert.X509Certificate]*/ = throw new UnsupportedOperationException() override def getPeerPrincipal: Principal = throw new UnsupportedOperationException() override def getLocalPrincipal: Principal = throw new UnsupportedOperationException() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala index 0e6d6eff76..b56f6e1b0a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala @@ -584,6 +584,34 @@ class FlowFlatMapPrefixSpec extends StreamSpec { subscriber.expectNext("a", "b", "c") subscriber.expectComplete() } + + "complete newShells registration when all active interpreters are done" in assertAllStagesStopped { + @volatile var closeSink: () => Unit = null + + val (fNotUsed, qOut) = Source + .empty[Int] + .flatMapPrefixMat(1) { seq => + log.debug("waiting for closer to be set") + while (null == closeSink) Thread.sleep(50) + log.debug("closing sink") + closeSink() + log.debug("sink closed") + //closing the sink before returning means that it's higly probably + //for the flatMapPrefix stage to receive the downstream cancellation before the actor graph interpreter + //gets a chance to complete the new interpreter shell's registration. + //this in turn exposes a bug in the actor graph interpreter when all active flows complete + //but there are pending new interpreter shells to be registered. + Flow[Int].prepend(Source(seq)) + }(Keep.right) + .toMat(Sink.queue(10))(Keep.both) + .run() + + log.debug("assigning closer") + closeSink = () => qOut.cancel() + + log.debug("closer assigned, waiting for completion") + fNotUsed.futureValue should be(NotUsed) + } } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index ad7477784d..1772f31803 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -753,13 +753,17 @@ import akka.util.OptionVal else if (shortCircuitBuffer != null) shortCircuitBatch() } - private def shortCircuitBatch(): Unit = { - while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty) shortCircuitBuffer - .poll() match { - case b: BoundaryEvent => processEvent(b) - case Resume => finishShellRegistration() + @tailrec private def shortCircuitBatch(): Unit = { + if (shortCircuitBuffer.isEmpty) () + else if (currentLimit == 0) { + self ! Resume + } else { + shortCircuitBuffer.poll() match { + case b: BoundaryEvent => processEvent(b) + case Resume => finishShellRegistration() + } + shortCircuitBatch() } - if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! Resume } private def processEvent(b: BoundaryEvent): Unit = {