From 0eed6a128b28e8aaf866639397a2c5dfedbd6aac Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 17 Dec 2023 15:47:32 +0800 Subject: [PATCH] =str Fold InHandler and OutHandler for operator Scan. --- .../apache/pekko/stream/impl/fusing/Ops.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 2b2eb94705..5c9f45689e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -424,17 +424,10 @@ private[stream] object Collect { import shape.{ in, out } // Initial behavior makes sure that the zero gets flushed if upstream is empty - setHandler(out, - new OutHandler { - override def onPull(): Unit = { - push(out, aggregator) - setHandlers(in, out, self) - } - }) - - setHandler( + setHandlers( in, - new InHandler { + out, + new InHandler with OutHandler { override def onPush(): Unit = () override def onUpstreamFinish(): Unit = @@ -445,6 +438,11 @@ private[stream] object Collect { completeStage() } }) + + override def onPull(): Unit = { + push(out, aggregator) + setHandlers(in, out, self) + } }) override def onPull(): Unit = pull(in)