chore: Remove the deprecated onDownstreamFinish method in GraphStage (#2129)

This commit is contained in:
He-Pin(kerr) 2025-09-03 03:07:57 +08:00 committed by GitHub
parent 18545a6737
commit 5db362f58f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 3 additions and 23 deletions

View file

@ -191,3 +191,4 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Dela
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew$")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.OverflowStrategy.dropNew")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic.lastCancellationCause_=")

View file

@ -16,7 +16,6 @@ package org.apache.pekko.stream.stage
import java.util.concurrent.{ CompletionStage, ConcurrentHashMap }
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.nowarn
import scala.annotation.tailrec
import scala.collection.{ immutable, mutable }
import scala.concurrent.{ Future, Promise }
@ -697,13 +696,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
final def completeStage(): Unit =
internalCompleteStage(SubscriptionWithCancelException.StageWasCompleted, OptionVal.None)
// Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where
// `OutHandler` implementations call `super.onDownstreamFinished()`.
/**
* INTERNAL API
*/
@InternalApi private[stream] var lastCancellationCause: Throwable = _
/**
* Automatically invokes [[cancel]] or [[complete]] on all the input or output ports that have been called,
* then marks the stage as stopped.
@ -1896,27 +1888,14 @@ trait OutHandler {
@throws(classOf[Exception])
def onPull(): Unit
private def onDownstreamFinish(): Unit = {
val thisStage = GraphInterpreter.currentInterpreter.activeStage
require(
thisStage.lastCancellationCause ne null,
"onDownstreamFinish() must not be called without a cancellation cause")
thisStage.cancelStage(thisStage.lastCancellationCause)
}
/**
* Called when the output port will no longer accept any new elements. After this callback no other callbacks will
* be called for this port.
*/
@throws(classOf[Exception])
def onDownstreamFinish(cause: Throwable): Unit = {
val thisStage = GraphInterpreter.currentInterpreter.activeStage
try {
require(cause ne null, "Cancellation cause must not be null")
require(thisStage.lastCancellationCause eq null, "onDownstreamFinish(cause) must not be called recursively")
thisStage.lastCancellationCause = cause
onDownstreamFinish(): @nowarn("msg=deprecated") // if not overridden, call old deprecated variant
} finally thisStage.lastCancellationCause = null
require(cause ne null, "Cancellation cause must not be null")
GraphInterpreter.currentInterpreter.activeStage.cancelStage(cause)
}
}