From d9f49331147c29d9b5237d711ebd3b48a67bfd83 Mon Sep 17 00:00:00 2001 From: kerr Date: Sat, 2 Apr 2022 00:53:02 +0800 Subject: [PATCH] =str Reduce some Inhandler and Outhandler allocation. (#31278) --- .../stream/impl/io/OutputStreamSourceStage.scala | 4 +--- .../main/scala/akka/stream/impl/io/TcpStages.scala | 4 +--- .../scala/akka/stream/scaladsl/RestartFlow.scala | 12 +++--------- .../scala/akka/stream/scaladsl/RestartSink.scala | 10 ++++------ .../scala/akka/stream/scaladsl/RestartSource.scala | 10 ++++------ .../main/scala/akka/stream/stage/GraphStage.scala | 10 ++++------ 6 files changed, 17 insertions(+), 33 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index 8b13adb7de..5dc135de42 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -56,9 +56,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration } } - setHandler(out, new OutHandler { - override def onPull(): Unit = {} - }) + setHandler(out, GraphStageLogic.EagerTerminateOutput) } val logic = new OutputStreamSourceLogic diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index d0f2b8f119..f9cb85ebd0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -272,9 +272,7 @@ private[stream] object ConnectionSourceStage { private var previousWriteBufferSize = 0 // No reading until role have been decided - setHandler(bytesOut, new OutHandler { - override def onPull(): Unit = () - }) + setHandler(bytesOut, GraphStageLogic.EagerTerminateOutput) override def preStart(): Unit = { setKeepGoing(true) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala index 632f2aa57d..7c88ae9607 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala @@ -215,12 +215,8 @@ private final class RestartWithBackoffFlow[In, Out]( } override protected def backoff() = { - setHandler(in, new InHandler { - override def onPush() = () - }) - setHandler(out, new OutHandler { - override def onPull() = () - }) + setHandler(in, GraphStageLogic.EagerTerminateInput) + setHandler(out, GraphStageLogic.EagerTerminateOutput) // We need to ensure that the other end of the sub flow is also completed, so that we don't // receive any callbacks from it. @@ -462,9 +458,7 @@ object RestartWithBackoffFlow { override def onDownstreamFinish(cause: Throwable): Unit = { this.cause = OptionVal.Some(cause) scheduleOnce("CompleteState", delay) - setHandler(in, new InHandler { - def onPush(): Unit = {} - }) + setHandler(in, GraphStageLogic.EagerTerminateInput) } override protected def onTimer(timerKey: Any): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala index 9ab2e47ec4..a22bfc68e6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala @@ -4,11 +4,11 @@ package akka.stream.scaladsl -import scala.concurrent.duration.FiniteDuration - import akka.NotUsed +import akka.stream.stage.{ GraphStage, GraphStageLogic } import akka.stream.{ Attributes, Inlet, RestartSettings, SinkShape } -import akka.stream.stage.{ GraphStage, InHandler } + +import scala.concurrent.duration.FiniteDuration /** * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. @@ -124,9 +124,7 @@ private final class RestartWithBackoffSink[T](sinkFactory: () => Sink[T, _], res } override protected def backoff() = { - setHandler(in, new InHandler { - override def onPush() = () - }) + setHandler(in, GraphStageLogic.EagerTerminateInput) } backoff() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala index e34469cb0a..55aff959a7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala @@ -4,11 +4,11 @@ package akka.stream.scaladsl -import scala.concurrent.duration.FiniteDuration - import akka.NotUsed +import akka.stream.stage.{ GraphStage, GraphStageLogic } import akka.stream.{ Attributes, Outlet, RestartSettings, SourceShape } -import akka.stream.stage.{ GraphStage, OutHandler } + +import scala.concurrent.duration.FiniteDuration /** * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. @@ -203,9 +203,7 @@ private final class RestartWithBackoffSource[T]( } override protected def backoff() = { - setHandler(out, new OutHandler { - override def onPull() = () - }) + setHandler(out, GraphStageLogic.EagerTerminateOutput) } backoff() diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index d3d856b56e..3af76b9da1 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -533,12 +533,10 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: private def cancel[T](connection: Connection, cause: Throwable): Unit = attributes.mandatoryAttribute[Attributes.CancellationStrategy].strategy match { case Attributes.CancellationStrategy.AfterDelay(delay, _) => - // since the port is not actually cancelled, we install a handler to ignore upcoming elements - connection.inHandler = new InHandler { - // ignore pushs now, since the stage wanted it cancelled already - override def onPush(): Unit = () - // do not ignore termination signals - } + // since the port is not actually cancelled, we install a handler to ignore upcoming + // ignore pushs now, since the stage wanted it cancelled already + // do not ignore termination signals + connection.inHandler = EagerTerminateInput val callback = getAsyncCallback[(Connection, Throwable)] { case (connection, cause) => doCancel(connection, cause) }