Optimize closeState flag (#872)

Co-authored-by: jxnu-liguobin <jxnu-liguobin@outlook.com>
This commit is contained in:
jxnu-liguobin 2023-12-25 08:30:27 +08:00 committed by GitHub
parent 16e587ded1
commit 417196fb46
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2251,8 +2251,8 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
override def onUpstreamFailure(ex: Throwable): Unit = closeStateAndFail(ex)
override def onDownstreamFinish(cause: Throwable): Unit = {
onComplete(state)
needInvokeOnCompleteCallback = false
onComplete(state)
super.onDownstreamFinish(cause)
}
@ -2265,19 +2265,19 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
}
private def closeStateAndComplete(): Unit = {
needInvokeOnCompleteCallback = false
onComplete(state) match {
case Some(elem) => emit(out, elem, () => completeStage())
case None => completeStage()
}
needInvokeOnCompleteCallback = false
}
private def closeStateAndFail(ex: Throwable): Unit = {
needInvokeOnCompleteCallback = false
onComplete(state) match {
case Some(elem) => emit(out, elem, () => failStage(ex))
case None => failStage(ex)
}
needInvokeOnCompleteCallback = false
}
override def onPull(): Unit = pull(in)