From 8c0ffe3b2ab7a193ebc3a1bd13c82be1f551d7cd Mon Sep 17 00:00:00 2001 From: kerr Date: Tue, 15 Dec 2020 18:18:48 +0800 Subject: [PATCH] =str Remove outHandler of FlattenMerge (#29890) --- .../stream/impl/fusing/StreamOfStreams.scala | 141 +++++++++--------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 68d4e6970b..b9a5c99b08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -39,97 +39,96 @@ import akka.util.ccompat.JavaConverters._ override def initialAttributes = DefaultAttributes.flattenMerge override val shape = FlowShape(in, out) - override def createLogic(enclosingAttributes: Attributes) = new GraphStageLogic(shape) { - var sources = Set.empty[SubSinkInlet[T]] - var pendingSingleSources = 0 - def activeSources = sources.size + pendingSingleSources + override def createLogic(enclosingAttributes: Attributes) = + new GraphStageLogic(shape) with OutHandler with InHandler { + var sources = Set.empty[SubSinkInlet[T]] + var pendingSingleSources = 0 + def activeSources = sources.size + pendingSingleSources - // To be able to optimize for SingleSource without materializing them the queue may hold either - // SubSinkInlet[T] or SingleSource - var queue: BufferImpl[AnyRef] = _ + // To be able to optimize for SingleSource without materializing them the queue may hold either + // SubSinkInlet[T] or SingleSource + var queue: BufferImpl[AnyRef] = _ - override def preStart(): Unit = queue = BufferImpl(breadth, enclosingAttributes) + override def preStart(): Unit = queue = BufferImpl(breadth, enclosingAttributes) - def pushOut(): Unit = { - queue.dequeue() match { - case src: SubSinkInlet[T] @unchecked => - push(out, src.grab()) - if (!src.isClosed) src.pull() - else removeSource(src) - case single: SingleSource[T] @unchecked => - push(out, single.elem) - removeSource(single) + def pushOut(): Unit = { + queue.dequeue() match { + case src: SubSinkInlet[T] @unchecked => + push(out, src.grab()) + if (!src.isClosed) src.pull() + else removeSource(src) + case single: SingleSource[T] @unchecked => + push(out, single.elem) + removeSource(single) + } } - } - setHandler(in, new InHandler { override def onPush(): Unit = { val source = grab(in) addSource(source) if (activeSources < breadth) tryPull(in) } - override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage() - }) - setHandler(out, new OutHandler { + override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage() override def onPull(): Unit = { pull(in) - setHandler(out, outHandler) - } - }) - - val outHandler = new OutHandler { - // could be unavailable due to async input having been executed before this notification - override def onPull(): Unit = if (queue.nonEmpty && isAvailable(out)) pushOut() - } - - def addSource(source: Graph[SourceShape[T], M]): Unit = { - // If it's a SingleSource or wrapped such we can push the element directly instead of materializing it. - // Have to use AnyRef because of OptionVal null value. - TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match { - case OptionVal.Some(single) => - if (isAvailable(out) && queue.isEmpty) { - push(out, single.elem.asInstanceOf[T]) - } else { - queue.enqueue(single) - pendingSingleSources += 1 + setHandler(out, new OutHandler { + override def onPull(): Unit = { + // could be unavailable due to async input having been executed before this notification + if (queue.nonEmpty && isAvailable(out)) pushOut() } - case _ => - val sinkIn = new SubSinkInlet[T]("FlattenMergeSink") - sinkIn.setHandler(new InHandler { - override def onPush(): Unit = { - if (isAvailable(out)) { - push(out, sinkIn.grab()) - sinkIn.pull() - } else { - queue.enqueue(sinkIn) - } + }) + } + + setHandlers(in, out, this) + + def addSource(source: Graph[SourceShape[T], M]): Unit = { + // If it's a SingleSource or wrapped such we can push the element directly instead of materializing it. + // Have to use AnyRef because of OptionVal null value. + TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match { + case OptionVal.Some(single) => + if (isAvailable(out) && queue.isEmpty) { + push(out, single.elem.asInstanceOf[T]) + } else { + queue.enqueue(single) + pendingSingleSources += 1 } - override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn) - }) - sinkIn.pull() - sources += sinkIn - val graph = Source.fromGraph(source).to(sinkIn.sink) - interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes) + case _ => + val sinkIn = new SubSinkInlet[T]("FlattenMergeSink") + sinkIn.setHandler(new InHandler { + override def onPush(): Unit = { + if (isAvailable(out)) { + push(out, sinkIn.grab()) + sinkIn.pull() + } else { + queue.enqueue(sinkIn) + } + } + override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn) + }) + sinkIn.pull() + sources += sinkIn + val graph = Source.fromGraph(source).to(sinkIn.sink) + interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes) + } } - } - def removeSource(src: AnyRef): Unit = { - val pullSuppressed = activeSources == breadth - src match { - case sub: SubSinkInlet[T] @unchecked => - sources -= sub - case _: SingleSource[_] => - pendingSingleSources -= 1 + def removeSource(src: AnyRef): Unit = { + val pullSuppressed = activeSources == breadth + src match { + case sub: SubSinkInlet[T] @unchecked => + sources -= sub + case _: SingleSource[_] => + pendingSingleSources -= 1 + } + if (pullSuppressed) tryPull(in) + if (activeSources == 0 && isClosed(in)) completeStage() } - if (pullSuppressed) tryPull(in) - if (activeSources == 0 && isClosed(in)) completeStage() + + override def postStop(): Unit = sources.foreach(_.cancel()) + } - override def postStop(): Unit = sources.foreach(_.cancel()) - - } - override def toString: String = s"FlattenMerge($breadth)" }