=str Reduce some Inhandler and Outhandler allocation. (#31278)

This commit is contained in:
kerr 2022-04-02 00:53:02 +08:00 committed by GitHub
parent 7bfd118c71
commit d9f4933114
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 17 additions and 33 deletions

View file

@ -56,9 +56,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
}
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {}
})
setHandler(out, GraphStageLogic.EagerTerminateOutput)
}
val logic = new OutputStreamSourceLogic

View file

@ -272,9 +272,7 @@ private[stream] object ConnectionSourceStage {
private var previousWriteBufferSize = 0
// No reading until role have been decided
setHandler(bytesOut, new OutHandler {
override def onPull(): Unit = ()
})
setHandler(bytesOut, GraphStageLogic.EagerTerminateOutput)
override def preStart(): Unit = {
setKeepGoing(true)

View file

@ -215,12 +215,8 @@ private final class RestartWithBackoffFlow[In, Out](
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
setHandler(out, new OutHandler {
override def onPull() = ()
})
setHandler(in, GraphStageLogic.EagerTerminateInput)
setHandler(out, GraphStageLogic.EagerTerminateOutput)
// We need to ensure that the other end of the sub flow is also completed, so that we don't
// receive any callbacks from it.
@ -462,9 +458,7 @@ object RestartWithBackoffFlow {
override def onDownstreamFinish(cause: Throwable): Unit = {
this.cause = OptionVal.Some(cause)
scheduleOnce("CompleteState", delay)
setHandler(in, new InHandler {
def onPush(): Unit = {}
})
setHandler(in, GraphStageLogic.EagerTerminateInput)
}
override protected def onTimer(timerKey: Any): Unit = {

View file

@ -4,11 +4,11 @@
package akka.stream.scaladsl
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.stream.stage.{ GraphStage, GraphStageLogic }
import akka.stream.{ Attributes, Inlet, RestartSettings, SinkShape }
import akka.stream.stage.{ GraphStage, InHandler }
import scala.concurrent.duration.FiniteDuration
/**
* A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
@ -124,9 +124,7 @@ private final class RestartWithBackoffSink[T](sinkFactory: () => Sink[T, _], res
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
setHandler(in, GraphStageLogic.EagerTerminateInput)
}
backoff()

View file

@ -4,11 +4,11 @@
package akka.stream.scaladsl
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.stream.stage.{ GraphStage, GraphStageLogic }
import akka.stream.{ Attributes, Outlet, RestartSettings, SourceShape }
import akka.stream.stage.{ GraphStage, OutHandler }
import scala.concurrent.duration.FiniteDuration
/**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
@ -203,9 +203,7 @@ private final class RestartWithBackoffSource[T](
}
override protected def backoff() = {
setHandler(out, new OutHandler {
override def onPull() = ()
})
setHandler(out, GraphStageLogic.EagerTerminateOutput)
}
backoff()

View file

@ -533,12 +533,10 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
private def cancel[T](connection: Connection, cause: Throwable): Unit =
attributes.mandatoryAttribute[Attributes.CancellationStrategy].strategy match {
case Attributes.CancellationStrategy.AfterDelay(delay, _) =>
// since the port is not actually cancelled, we install a handler to ignore upcoming elements
connection.inHandler = new InHandler {
// ignore pushs now, since the stage wanted it cancelled already
override def onPush(): Unit = ()
// do not ignore termination signals
}
// since the port is not actually cancelled, we install a handler to ignore upcoming
// ignore pushs now, since the stage wanted it cancelled already
// do not ignore termination signals
connection.inHandler = EagerTerminateInput
val callback = getAsyncCallback[(Connection, Throwable)] {
case (connection, cause) => doCancel(connection, cause)
}