From 682a76e928c1393b898f9eecd06e39fde69cb8ee Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 21 Aug 2019 12:43:40 +0200 Subject: [PATCH] streams: move `lastCancellationCause` helper variable to GraphStageLogic (#27531) It seems in the previous `_lastCancellationCause = cause` caused a trait forwarder method to be called. Refs #27529 --- .../mima-filters/2.5.x.backwards.excludes | 4 --- .../scala/akka/stream/stage/GraphStage.scala | 27 ++++++++++++------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index f4791643a3..e7aa610441 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -159,7 +159,3 @@ ProblemFilters.exclude[Problem]("akka.stream.StreamRefMessages*") # #27266 changes to streams internals ProblemFilters.exclude[Problem]("akka.stream.impl.*") - -# added private[this] field to public class, shouldn't have more impact than a potential naming clash -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.stage.OutHandler.akka$stream$stage$OutHandler$$_lastCancellationCause") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.stage.OutHandler.akka$stream$stage$OutHandler$$_lastCancellationCause_=") diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index bbb3528f47..cf3fa768cd 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -679,6 +679,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final def completeStage(): Unit = cancelStage(SubscriptionWithCancelException.StageWasCompleted) + // Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where + // `OutHandler` implementations call `super.onDownstreamFinished()`. + /** + * INTERNAL API + */ + @InternalApi private[stream] var lastCancellationCause: Throwable = _ + /** * Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, * then marks the stage as stopped. @@ -1805,9 +1812,6 @@ trait OutHandler { @throws(classOf[Exception]) def onPull(): Unit - // Hack to make sure that old `onDownstreamFinish` can be called without losing the cause in the default implementation - private[this] var _lastCancellationCause: Throwable = _ - /** * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. @@ -1817,8 +1821,11 @@ trait OutHandler { // @deprecatedOverriding("Override `def onDownstreamFinish(cause: Throwable)`, instead.", since = "2.6.0") // warns when overriding @deprecated("Call onDownstreamFinish with a cancellation cause.", since = "2.6.0") // warns when calling def onDownstreamFinish(): Unit = { - require(_lastCancellationCause ne null, "onDownstreamFinish() must not be called without a cancellation cause") - GraphInterpreter.currentInterpreter.activeStage.cancelStage(_lastCancellationCause) + val thisStage = GraphInterpreter.currentInterpreter.activeStage + require( + thisStage.lastCancellationCause ne null, + "onDownstreamFinish() must not be called without a cancellation cause") + thisStage.cancelStage(thisStage.lastCancellationCause) } /** @@ -1826,13 +1833,15 @@ trait OutHandler { * be called for this port. */ @throws(classOf[Exception]) - def onDownstreamFinish(cause: Throwable): Unit = + def onDownstreamFinish(cause: Throwable): Unit = { + val thisStage = GraphInterpreter.currentInterpreter.activeStage try { require(cause ne null, "Cancellation cause must not be null") - require(_lastCancellationCause eq null, "onDownstreamFinish(cause) must not be called recursively") - _lastCancellationCause = cause + require(thisStage.lastCancellationCause eq null, "onDownstreamFinish(cause) must not be called recursively") + thisStage.lastCancellationCause = cause (onDownstreamFinish(): @silent("deprecated")) // if not overridden, call old deprecated variant - } finally _lastCancellationCause = null + } finally thisStage.lastCancellationCause = null + } } /**