streams: move lastCancellationCause helper variable to GraphStageLogic (#27531)

It seems in the previous `_lastCancellationCause = cause` caused a trait
forwarder method to be called.

Refs #27529
This commit is contained in:
Johannes Rudolph 2019-08-21 12:43:40 +02:00 committed by Arnout Engelen
parent 0d8c4c4d8f
commit 682a76e928
2 changed files with 18 additions and 13 deletions

View file

@ -159,7 +159,3 @@ ProblemFilters.exclude[Problem]("akka.stream.StreamRefMessages*")
# #27266 changes to streams internals
ProblemFilters.exclude[Problem]("akka.stream.impl.*")
# added private[this] field to public class, shouldn't have more impact than a potential naming clash
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.stage.OutHandler.akka$stream$stage$OutHandler$$_lastCancellationCause")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.stage.OutHandler.akka$stream$stage$OutHandler$$_lastCancellationCause_=")

View file

@ -679,6 +679,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/
final def completeStage(): Unit = cancelStage(SubscriptionWithCancelException.StageWasCompleted)
// Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where
// `OutHandler` implementations call `super.onDownstreamFinished()`.
/**
* INTERNAL API
*/
@InternalApi private[stream] var lastCancellationCause: Throwable = _
/**
* Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called,
* then marks the stage as stopped.
@ -1805,9 +1812,6 @@ trait OutHandler {
@throws(classOf[Exception])
def onPull(): Unit
// Hack to make sure that old `onDownstreamFinish` can be called without losing the cause in the default implementation
private[this] var _lastCancellationCause: Throwable = _
/**
* Called when the output port will no longer accept any new elements. After this callback no other callbacks will
* be called for this port.
@ -1817,8 +1821,11 @@ trait OutHandler {
// @deprecatedOverriding("Override `def onDownstreamFinish(cause: Throwable)`, instead.", since = "2.6.0") // warns when overriding
@deprecated("Call onDownstreamFinish with a cancellation cause.", since = "2.6.0") // warns when calling
def onDownstreamFinish(): Unit = {
require(_lastCancellationCause ne null, "onDownstreamFinish() must not be called without a cancellation cause")
GraphInterpreter.currentInterpreter.activeStage.cancelStage(_lastCancellationCause)
val thisStage = GraphInterpreter.currentInterpreter.activeStage
require(
thisStage.lastCancellationCause ne null,
"onDownstreamFinish() must not be called without a cancellation cause")
thisStage.cancelStage(thisStage.lastCancellationCause)
}
/**
@ -1826,13 +1833,15 @@ trait OutHandler {
* be called for this port.
*/
@throws(classOf[Exception])
def onDownstreamFinish(cause: Throwable): Unit =
def onDownstreamFinish(cause: Throwable): Unit = {
val thisStage = GraphInterpreter.currentInterpreter.activeStage
try {
require(cause ne null, "Cancellation cause must not be null")
require(_lastCancellationCause eq null, "onDownstreamFinish(cause) must not be called recursively")
_lastCancellationCause = cause
require(thisStage.lastCancellationCause eq null, "onDownstreamFinish(cause) must not be called recursively")
thisStage.lastCancellationCause = cause
(onDownstreamFinish(): @silent("deprecated")) // if not overridden, call old deprecated variant
} finally _lastCancellationCause = null
} finally thisStage.lastCancellationCause = null
}
}
/**