chore: Fuse in and out handler for setupStage (#2259)

This commit is contained in:
He-Pin(kerr) 2025-09-22 17:16:02 +08:00 committed by GitHub
parent 878d219f1f
commit 750bf235f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 53 additions and 75 deletions

View file

@ -200,4 +200,5 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Acto
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.isShutdown")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.system")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.this")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage$")

View file

@ -46,33 +46,43 @@ import pekko.stream.stage.OutHandler
}
@scala.annotation.nowarn("msg=inferred structural type")
private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
import SetupStage._
private def createStageLogic(matPromise: Promise[M]) =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val subInlet = new SubSinkInlet[U]("SetupFlowStage")
private val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
private val subletInOutHandler = new InHandler with OutHandler {
override def onPush(): Unit = push(out, subInlet.grab())
override def onUpstreamFinish(): Unit = complete(out)
override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(cause: Throwable): Unit = cancel(in, cause)
}
subInlet.setHandler(subletInOutHandler)
subOutlet.setHandler(subletInOutHandler)
val subInlet = new SubSinkInlet[U]("SetupFlowStage")
val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
override def onPush(): Unit = subOutlet.push(grab(in))
override def onUpstreamFinish(): Unit = subOutlet.complete()
override def onUpstreamFailure(ex: Throwable): Unit = subOutlet.fail(ex)
override def onPull(): Unit = subInlet.pull()
override def onDownstreamFinish(cause: Throwable): Unit = subInlet.cancel(cause)
subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet))
subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, cause)))
setHandlers(in, out, this)
setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet))
setHandler(out, delegateToSubInlet(subInlet))
override def preStart(): Unit = {
try {
val flow = factory(materializer, attributes)
override def preStart(): Unit = {
try {
val flow = factory(materializer, attributes)
val mat = subFusingMaterializer.materialize(
Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
attributes)
matPromise.success(mat)
} catch {
case NonFatal(ex) =>
matPromise.failure(ex)
throw ex
val mat = subFusingMaterializer.materialize(
Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
attributes)
matPromise.success(mat)
} catch {
case NonFatal(ex) =>
matPromise.failure(ex)
throw ex
}
}
}
}
}
/** Internal Api */
@ -90,62 +100,29 @@ import pekko.stream.stage.OutHandler
}
@scala.annotation.nowarn("msg=inferred structural type")
private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
import SetupStage._
private def createStageLogic(matPromise: Promise[M]) =
new GraphStageLogic(shape) with OutHandler with InHandler {
private val subInlet = new SubSinkInlet[T]("SetupSourceStage")
override def onPush(): Unit = push(out, subInlet.grab())
override def onUpstreamFinish(): Unit = complete(out)
override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
override def onPull(): Unit = subInlet.pull()
override def onDownstreamFinish(cause: Throwable): Unit = subInlet.cancel(cause)
val subInlet = new SubSinkInlet[T]("SetupSourceStage")
subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet))
setHandler(out, delegateToSubInlet(subInlet))
subInlet.setHandler(this)
setHandler(out, this)
override def preStart(): Unit = {
try {
val source = factory(materializer, attributes)
override def preStart(): Unit = {
try {
val source = factory(materializer, attributes)
val mat = subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), attributes)
matPromise.success(mat)
} catch {
case NonFatal(ex) =>
matPromise.failure(ex)
throw ex
val mat = subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), attributes)
matPromise.success(mat)
} catch {
case NonFatal(ex) =>
matPromise.failure(ex)
throw ex
}
}
}
}
}
private object SetupStage {
def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler {
override def onPush(): Unit =
subOutlet.push(grab())
override def onUpstreamFinish(): Unit =
subOutlet.complete()
override def onUpstreamFailure(ex: Throwable): Unit =
subOutlet.fail(ex)
}
def delegateToOutlet[T](
push: T => Unit,
complete: () => Unit,
fail: Throwable => Unit,
subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler {
override def onPush(): Unit =
push(subInlet.grab())
override def onUpstreamFinish(): Unit =
complete()
override def onUpstreamFailure(ex: Throwable): Unit =
fail(ex)
}
def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler {
override def onPull(): Unit =
subInlet.pull()
override def onDownstreamFinish(cause: Throwable): Unit =
subInlet.cancel(cause)
}
def delegateToInlet(pull: () => Unit, cancel: (Throwable) => Unit) = new OutHandler {
override def onPull(): Unit =
pull()
override def onDownstreamFinish(cause: Throwable): Unit =
cancel(cause)
}
}