From 417196fb46e68b228af5b06b63f29ff0358b30a8 Mon Sep 17 00:00:00 2001 From: jxnu-liguobin Date: Mon, 25 Dec 2023 08:30:27 +0800 Subject: [PATCH] Optimize closeState flag (#872) Co-authored-by: jxnu-liguobin --- .../scala/org/apache/pekko/stream/impl/fusing/Ops.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index d70b1c9bbb..5368955e7b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -2251,8 +2251,8 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = override def onUpstreamFailure(ex: Throwable): Unit = closeStateAndFail(ex) override def onDownstreamFinish(cause: Throwable): Unit = { - onComplete(state) needInvokeOnCompleteCallback = false + onComplete(state) super.onDownstreamFinish(cause) } @@ -2265,19 +2265,19 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = } private def closeStateAndComplete(): Unit = { + needInvokeOnCompleteCallback = false onComplete(state) match { case Some(elem) => emit(out, elem, () => completeStage()) case None => completeStage() } - needInvokeOnCompleteCallback = false } private def closeStateAndFail(ex: Throwable): Unit = { + needInvokeOnCompleteCallback = false onComplete(state) match { case Some(elem) => emit(out, elem, () => failStage(ex)) case None => failStage(ex) } - needInvokeOnCompleteCallback = false } override def onPull(): Unit = pull(in)