diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index c9b81f0149..a4007bd8de 100644 --- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -200,4 +200,5 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Acto ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.isShutdown") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.system") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.this") - +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage$") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala index ac47aa4afc..966fd14d56 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala @@ -46,33 +46,43 @@ import pekko.stream.stage.OutHandler } @scala.annotation.nowarn("msg=inferred structural type") - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ + private def createStageLogic(matPromise: Promise[M]) = + new GraphStageLogic(shape) with InHandler with OutHandler { + private val subInlet = new SubSinkInlet[U]("SetupFlowStage") + private val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") + private val subletInOutHandler = new InHandler with OutHandler { + override def onPush(): Unit = push(out, subInlet.grab()) + override def onUpstreamFinish(): Unit = complete(out) + override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex) + override def onPull(): Unit = pull(in) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in, cause) + } + subInlet.setHandler(subletInOutHandler) + subOutlet.setHandler(subletInOutHandler) - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") + override def onPush(): Unit = subOutlet.push(grab(in)) + override def onUpstreamFinish(): Unit = subOutlet.complete() + override def onUpstreamFailure(ex: Throwable): Unit = subOutlet.fail(ex) + override def onPull(): Unit = subInlet.pull() + override def onDownstreamFinish(cause: Throwable): Unit = subInlet.cancel(cause) - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, cause))) + setHandlers(in, out, this) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) + override def preStart(): Unit = { + try { + val flow = factory(materializer, attributes) - override def preStart(): Unit = { - try { - val flow = factory(materializer, attributes) - - val mat = subFusingMaterializer.materialize( - Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)), - attributes) - matPromise.success(mat) - } catch { - case NonFatal(ex) => - matPromise.failure(ex) - throw ex + val mat = subFusingMaterializer.materialize( + Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)), + attributes) + matPromise.success(mat) + } catch { + case NonFatal(ex) => + matPromise.failure(ex) + throw ex + } } } - } } /** Internal Api */ @@ -90,62 +100,29 @@ import pekko.stream.stage.OutHandler } @scala.annotation.nowarn("msg=inferred structural type") - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ + private def createStageLogic(matPromise: Promise[M]) = + new GraphStageLogic(shape) with OutHandler with InHandler { + private val subInlet = new SubSinkInlet[T]("SetupSourceStage") + override def onPush(): Unit = push(out, subInlet.grab()) + override def onUpstreamFinish(): Unit = complete(out) + override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex) + override def onPull(): Unit = subInlet.pull() + override def onDownstreamFinish(cause: Throwable): Unit = subInlet.cancel(cause) - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) + subInlet.setHandler(this) + setHandler(out, this) - override def preStart(): Unit = { - try { - val source = factory(materializer, attributes) + override def preStart(): Unit = { + try { + val source = factory(materializer, attributes) - val mat = subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), attributes) - matPromise.success(mat) - } catch { - case NonFatal(ex) => - matPromise.failure(ex) - throw ex + val mat = subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), attributes) + matPromise.success(mat) + } catch { + case NonFatal(ex) => + matPromise.failure(ex) + throw ex + } } } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T]( - push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(cause: Throwable): Unit = - subInlet.cancel(cause) - } - - def delegateToInlet(pull: () => Unit, cancel: (Throwable) => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(cause: Throwable): Unit = - cancel(cause) - } }