stream: register newshells with no active interpreters (#29731)
Fixes #29730
This commit is contained in:
parent
91a5d4b093
commit
2bf09d8da2
4 changed files with 45 additions and 8 deletions
|
|
@ -39,7 +39,12 @@ class ByteStringInitializationSpec extends AnyWordSpec with Matchers {
|
|||
|
||||
import scala.language.reflectiveCalls
|
||||
type WithRun = { def run(): Unit }
|
||||
cleanCl.loadClass("akka.util.ByteStringInitTest").newInstance().asInstanceOf[WithRun].run()
|
||||
cleanCl
|
||||
.loadClass("akka.util.ByteStringInitTest")
|
||||
.getDeclaredConstructor()
|
||||
.newInstance()
|
||||
.asInstanceOf[WithRun]
|
||||
.run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ class PeerSubjectVerifierSpec extends AnyWordSpec with Matchers {
|
|||
override def removeValue(name: String): Unit = throw new UnsupportedOperationException()
|
||||
override def getValueNames: Array[String] = throw new UnsupportedOperationException()
|
||||
override def getLocalCertificates: Array[Certificate] = throw new UnsupportedOperationException()
|
||||
override def getPeerCertificateChain: Array[javax.security.cert.X509Certificate] =
|
||||
override def getPeerCertificateChain /*: Array[javax.security.cert.X509Certificate]*/ =
|
||||
throw new UnsupportedOperationException()
|
||||
override def getPeerPrincipal: Principal = throw new UnsupportedOperationException()
|
||||
override def getLocalPrincipal: Principal = throw new UnsupportedOperationException()
|
||||
|
|
|
|||
|
|
@ -584,6 +584,34 @@ class FlowFlatMapPrefixSpec extends StreamSpec {
|
|||
subscriber.expectNext("a", "b", "c")
|
||||
subscriber.expectComplete()
|
||||
}
|
||||
|
||||
"complete newShells registration when all active interpreters are done" in assertAllStagesStopped {
|
||||
@volatile var closeSink: () => Unit = null
|
||||
|
||||
val (fNotUsed, qOut) = Source
|
||||
.empty[Int]
|
||||
.flatMapPrefixMat(1) { seq =>
|
||||
log.debug("waiting for closer to be set")
|
||||
while (null == closeSink) Thread.sleep(50)
|
||||
log.debug("closing sink")
|
||||
closeSink()
|
||||
log.debug("sink closed")
|
||||
//closing the sink before returning means that it's higly probably
|
||||
//for the flatMapPrefix stage to receive the downstream cancellation before the actor graph interpreter
|
||||
//gets a chance to complete the new interpreter shell's registration.
|
||||
//this in turn exposes a bug in the actor graph interpreter when all active flows complete
|
||||
//but there are pending new interpreter shells to be registered.
|
||||
Flow[Int].prepend(Source(seq))
|
||||
}(Keep.right)
|
||||
.toMat(Sink.queue(10))(Keep.both)
|
||||
.run()
|
||||
|
||||
log.debug("assigning closer")
|
||||
closeSink = () => qOut.cancel()
|
||||
|
||||
log.debug("closer assigned, waiting for completion")
|
||||
fNotUsed.futureValue should be(NotUsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -753,13 +753,17 @@ import akka.util.OptionVal
|
|||
else if (shortCircuitBuffer != null) shortCircuitBatch()
|
||||
}
|
||||
|
||||
private def shortCircuitBatch(): Unit = {
|
||||
while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty) shortCircuitBuffer
|
||||
.poll() match {
|
||||
case b: BoundaryEvent => processEvent(b)
|
||||
case Resume => finishShellRegistration()
|
||||
@tailrec private def shortCircuitBatch(): Unit = {
|
||||
if (shortCircuitBuffer.isEmpty) ()
|
||||
else if (currentLimit == 0) {
|
||||
self ! Resume
|
||||
} else {
|
||||
shortCircuitBuffer.poll() match {
|
||||
case b: BoundaryEvent => processEvent(b)
|
||||
case Resume => finishShellRegistration()
|
||||
}
|
||||
shortCircuitBatch()
|
||||
}
|
||||
if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! Resume
|
||||
}
|
||||
|
||||
private def processEvent(b: BoundaryEvent): Unit = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue