=str Fold InHandler and OutHandler for operator Scan.
This commit is contained in:
parent
a7a49bd24e
commit
0eed6a128b
1 changed files with 8 additions and 10 deletions
|
|
@ -424,17 +424,10 @@ private[stream] object Collect {
|
|||
import shape.{ in, out }
|
||||
|
||||
// Initial behavior makes sure that the zero gets flushed if upstream is empty
|
||||
setHandler(out,
|
||||
new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
push(out, aggregator)
|
||||
setHandlers(in, out, self)
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(
|
||||
setHandlers(
|
||||
in,
|
||||
new InHandler {
|
||||
out,
|
||||
new InHandler with OutHandler {
|
||||
override def onPush(): Unit = ()
|
||||
|
||||
override def onUpstreamFinish(): Unit =
|
||||
|
|
@ -445,6 +438,11 @@ private[stream] object Collect {
|
|||
completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
override def onPull(): Unit = {
|
||||
push(out, aggregator)
|
||||
setHandlers(in, out, self)
|
||||
}
|
||||
})
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue