diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index d70b1c9bbb..5368955e7b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -2251,8 +2251,8 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = override def onUpstreamFailure(ex: Throwable): Unit = closeStateAndFail(ex) override def onDownstreamFinish(cause: Throwable): Unit = { - onComplete(state) needInvokeOnCompleteCallback = false + onComplete(state) super.onDownstreamFinish(cause) } @@ -2265,19 +2265,19 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = } private def closeStateAndComplete(): Unit = { + needInvokeOnCompleteCallback = false onComplete(state) match { case Some(elem) => emit(out, elem, () => completeStage()) case None => completeStage() } - needInvokeOnCompleteCallback = false } private def closeStateAndFail(ex: Throwable): Unit = { + needInvokeOnCompleteCallback = false onComplete(state) match { case Some(elem) => emit(out, elem, () => failStage(ex)) case None => failStage(ex) } - needInvokeOnCompleteCallback = false } override def onPull(): Unit = pull(in)