Fix mapConcat context propagation (#30289)

* Fix mapConcat context propagation

* Remove default ContextPropagation implementation to reduce possible overhead when no Telemetry is in use

* fix pullFirstSubElement onPull

* CR follow up
This commit is contained in:
Yury Gribkov 2021-06-08 06:09:47 -04:00 committed by GitHub
parent d69d04957b
commit 3da5c2bdd0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 11 deletions

View file

@ -26,11 +26,6 @@ import akka.annotation.InternalApi
}
private[akka] final class ContextPropagationImpl extends ContextPropagation {
private val buffer = Buffer[Unit](1, 1)
def suspendContext(): Unit = {
buffer.enqueue(())
}
def resumeContext(): Unit = {
buffer.dequeue()
}
def suspendContext(): Unit = ()
def resumeContext(): Unit = ()
}

View file

@ -2212,15 +2212,20 @@ private[akka] final class StatefulMapConcat[In, Out](val f: () => In => Iterable
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var currentIterator: Iterator[Out] = _
var plainFun = f()
val contextPropagation = ContextPropagation()
def hasNext = if (currentIterator != null) currentIterator.hasNext else false
setHandlers(in, out, this)
def pushPull(): Unit =
def pushPull(shouldResumeContext: Boolean): Unit =
if (hasNext) {
if (shouldResumeContext) contextPropagation.resumeContext()
push(out, currentIterator.next())
if (!hasNext && isClosed(in)) completeStage()
if (hasNext) {
// suspend context for the next element
contextPropagation.suspendContext()
} else if (isClosed(in)) completeStage()
} else if (!isClosed(in))
pull(in)
else completeStage()
@ -2230,13 +2235,13 @@ private[akka] final class StatefulMapConcat[In, Out](val f: () => In => Iterable
override def onPush(): Unit =
try {
currentIterator = plainFun(grab(in)).iterator
pushPull()
pushPull(shouldResumeContext = false)
} catch handleException
override def onUpstreamFinish(): Unit = onFinish()
override def onPull(): Unit =
try pushPull()
try pushPull(shouldResumeContext = true)
catch handleException
private def handleException: Catcher[Unit] = {