From 5db362f58f2daf533c825cfebf0357a67324f221 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Wed, 3 Sep 2025 03:07:57 +0800 Subject: [PATCH] chore: Remove the deprecated onDownstreamFinish method in GraphStage (#2129) --- .../remove-deprecated-methods.excludes | 1 + .../pekko/stream/stage/GraphStage.scala | 25 ++----------------- 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index 6b1862b9e1..843da33386 100644 --- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -191,3 +191,4 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Dela ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew") ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.OverflowStrategies$DropNew$") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.OverflowStrategy.dropNew") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.stage.GraphStageLogic.lastCancellationCause_=") diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 41b2020627..04f46f1d88 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream.stage import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } import java.util.concurrent.atomic.AtomicReference -import scala.annotation.nowarn import scala.annotation.tailrec import scala.collection.{ immutable, mutable } import scala.concurrent.{ Future, Promise } @@ -697,13 +696,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final def completeStage(): Unit = internalCompleteStage(SubscriptionWithCancelException.StageWasCompleted, OptionVal.None) - // Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where - // `OutHandler` implementations call `super.onDownstreamFinished()`. - /** - * INTERNAL API - */ - @InternalApi private[stream] var lastCancellationCause: Throwable = _ - /** * Automatically invokes [[cancel]] or [[complete]] on all the input or output ports that have been called, * then marks the stage as stopped. @@ -1896,27 +1888,14 @@ trait OutHandler { @throws(classOf[Exception]) def onPull(): Unit - private def onDownstreamFinish(): Unit = { - val thisStage = GraphInterpreter.currentInterpreter.activeStage - require( - thisStage.lastCancellationCause ne null, - "onDownstreamFinish() must not be called without a cancellation cause") - thisStage.cancelStage(thisStage.lastCancellationCause) - } - /** * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ @throws(classOf[Exception]) def onDownstreamFinish(cause: Throwable): Unit = { - val thisStage = GraphInterpreter.currentInterpreter.activeStage - try { - require(cause ne null, "Cancellation cause must not be null") - require(thisStage.lastCancellationCause eq null, "onDownstreamFinish(cause) must not be called recursively") - thisStage.lastCancellationCause = cause - onDownstreamFinish(): @nowarn("msg=deprecated") // if not overridden, call old deprecated variant - } finally thisStage.lastCancellationCause = null + require(cause ne null, "Cancellation cause must not be null") + GraphInterpreter.currentInterpreter.activeStage.cancelStage(cause) } }