=str Remove outHandler of FlattenMerge (#29890)
This commit is contained in:
parent
8bb74b592d
commit
8c0ffe3b2a
1 changed files with 70 additions and 71 deletions
|
|
@ -39,97 +39,96 @@ import akka.util.ccompat.JavaConverters._
|
|||
override def initialAttributes = DefaultAttributes.flattenMerge
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(enclosingAttributes: Attributes) = new GraphStageLogic(shape) {
|
||||
var sources = Set.empty[SubSinkInlet[T]]
|
||||
var pendingSingleSources = 0
|
||||
def activeSources = sources.size + pendingSingleSources
|
||||
override def createLogic(enclosingAttributes: Attributes) =
|
||||
new GraphStageLogic(shape) with OutHandler with InHandler {
|
||||
var sources = Set.empty[SubSinkInlet[T]]
|
||||
var pendingSingleSources = 0
|
||||
def activeSources = sources.size + pendingSingleSources
|
||||
|
||||
// To be able to optimize for SingleSource without materializing them the queue may hold either
|
||||
// SubSinkInlet[T] or SingleSource
|
||||
var queue: BufferImpl[AnyRef] = _
|
||||
// To be able to optimize for SingleSource without materializing them the queue may hold either
|
||||
// SubSinkInlet[T] or SingleSource
|
||||
var queue: BufferImpl[AnyRef] = _
|
||||
|
||||
override def preStart(): Unit = queue = BufferImpl(breadth, enclosingAttributes)
|
||||
override def preStart(): Unit = queue = BufferImpl(breadth, enclosingAttributes)
|
||||
|
||||
def pushOut(): Unit = {
|
||||
queue.dequeue() match {
|
||||
case src: SubSinkInlet[T] @unchecked =>
|
||||
push(out, src.grab())
|
||||
if (!src.isClosed) src.pull()
|
||||
else removeSource(src)
|
||||
case single: SingleSource[T] @unchecked =>
|
||||
push(out, single.elem)
|
||||
removeSource(single)
|
||||
def pushOut(): Unit = {
|
||||
queue.dequeue() match {
|
||||
case src: SubSinkInlet[T] @unchecked =>
|
||||
push(out, src.grab())
|
||||
if (!src.isClosed) src.pull()
|
||||
else removeSource(src)
|
||||
case single: SingleSource[T] @unchecked =>
|
||||
push(out, single.elem)
|
||||
removeSource(single)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val source = grab(in)
|
||||
addSource(source)
|
||||
if (activeSources < breadth) tryPull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage()
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage()
|
||||
override def onPull(): Unit = {
|
||||
pull(in)
|
||||
setHandler(out, outHandler)
|
||||
}
|
||||
})
|
||||
|
||||
val outHandler = new OutHandler {
|
||||
// could be unavailable due to async input having been executed before this notification
|
||||
override def onPull(): Unit = if (queue.nonEmpty && isAvailable(out)) pushOut()
|
||||
}
|
||||
|
||||
def addSource(source: Graph[SourceShape[T], M]): Unit = {
|
||||
// If it's a SingleSource or wrapped such we can push the element directly instead of materializing it.
|
||||
// Have to use AnyRef because of OptionVal null value.
|
||||
TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match {
|
||||
case OptionVal.Some(single) =>
|
||||
if (isAvailable(out) && queue.isEmpty) {
|
||||
push(out, single.elem.asInstanceOf[T])
|
||||
} else {
|
||||
queue.enqueue(single)
|
||||
pendingSingleSources += 1
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
// could be unavailable due to async input having been executed before this notification
|
||||
if (queue.nonEmpty && isAvailable(out)) pushOut()
|
||||
}
|
||||
case _ =>
|
||||
val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
|
||||
sinkIn.setHandler(new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
if (isAvailable(out)) {
|
||||
push(out, sinkIn.grab())
|
||||
sinkIn.pull()
|
||||
} else {
|
||||
queue.enqueue(sinkIn)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
|
||||
def addSource(source: Graph[SourceShape[T], M]): Unit = {
|
||||
// If it's a SingleSource or wrapped such we can push the element directly instead of materializing it.
|
||||
// Have to use AnyRef because of OptionVal null value.
|
||||
TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match {
|
||||
case OptionVal.Some(single) =>
|
||||
if (isAvailable(out) && queue.isEmpty) {
|
||||
push(out, single.elem.asInstanceOf[T])
|
||||
} else {
|
||||
queue.enqueue(single)
|
||||
pendingSingleSources += 1
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn)
|
||||
})
|
||||
sinkIn.pull()
|
||||
sources += sinkIn
|
||||
val graph = Source.fromGraph(source).to(sinkIn.sink)
|
||||
interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
|
||||
case _ =>
|
||||
val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
|
||||
sinkIn.setHandler(new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
if (isAvailable(out)) {
|
||||
push(out, sinkIn.grab())
|
||||
sinkIn.pull()
|
||||
} else {
|
||||
queue.enqueue(sinkIn)
|
||||
}
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn)
|
||||
})
|
||||
sinkIn.pull()
|
||||
sources += sinkIn
|
||||
val graph = Source.fromGraph(source).to(sinkIn.sink)
|
||||
interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def removeSource(src: AnyRef): Unit = {
|
||||
val pullSuppressed = activeSources == breadth
|
||||
src match {
|
||||
case sub: SubSinkInlet[T] @unchecked =>
|
||||
sources -= sub
|
||||
case _: SingleSource[_] =>
|
||||
pendingSingleSources -= 1
|
||||
def removeSource(src: AnyRef): Unit = {
|
||||
val pullSuppressed = activeSources == breadth
|
||||
src match {
|
||||
case sub: SubSinkInlet[T] @unchecked =>
|
||||
sources -= sub
|
||||
case _: SingleSource[_] =>
|
||||
pendingSingleSources -= 1
|
||||
}
|
||||
if (pullSuppressed) tryPull(in)
|
||||
if (activeSources == 0 && isClosed(in)) completeStage()
|
||||
}
|
||||
if (pullSuppressed) tryPull(in)
|
||||
if (activeSources == 0 && isClosed(in)) completeStage()
|
||||
|
||||
override def postStop(): Unit = sources.foreach(_.cancel())
|
||||
|
||||
}
|
||||
|
||||
override def postStop(): Unit = sources.foreach(_.cancel())
|
||||
|
||||
}
|
||||
|
||||
override def toString: String = s"FlattenMerge($breadth)"
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue