feat: Add emitMulti with Spliterator support (#1776)
This commit is contained in:
parent
f3a8075cc4
commit
abc18a5cea
2 changed files with 55 additions and 1 deletions
|
|
@ -121,6 +121,20 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
|
|||
override def toString = "GraphStageLogicSpec.emitEmptyIterable"
|
||||
}
|
||||
|
||||
object EmitSplitIterator extends GraphStage[SourceShape[Int]] {
|
||||
val out = Outlet[Int]("out")
|
||||
override val shape = SourceShape(out)
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||
setHandler(out,
|
||||
new OutHandler {
|
||||
override def onPull(): Unit = emitMultiple(
|
||||
out,
|
||||
java.util.stream.Stream.of(1, 2, 3).spliterator(), () => emit(out, 42, () => completeStage()))
|
||||
})
|
||||
}
|
||||
override def toString = "GraphStageLogicSpec.emitEmptyIterable"
|
||||
}
|
||||
|
||||
private case class ReadNEmitN(n: Int) extends GraphStage[FlowShape[Int, Int]] {
|
||||
override val shape = FlowShape(Inlet[Int]("readN.in"), Outlet[Int]("readN.out"))
|
||||
|
||||
|
|
@ -196,6 +210,12 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with S
|
|||
|
||||
}
|
||||
|
||||
"emit properly when using split iterator" in {
|
||||
|
||||
Source.fromGraph(EmitSplitIterator).runWith(Sink.seq).futureValue should ===(List(1, 2, 3, 42))
|
||||
|
||||
}
|
||||
|
||||
"invoke lifecycle hooks in the right order" in {
|
||||
val g = new GraphStage[FlowShape[Int, Int]] {
|
||||
val in = Inlet[Int]("in")
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import scala.annotation.tailrec
|
|||
import scala.collection.{ immutable, mutable }
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.{ Done, NotUsed }
|
||||
import pekko.actor._
|
||||
|
|
@ -37,6 +36,8 @@ import pekko.stream.stage.ConcurrentAsyncCallbackState.{ NoPendingEvents, State
|
|||
import pekko.util.OptionVal
|
||||
import pekko.util.unused
|
||||
|
||||
import java.util.Spliterator
|
||||
|
||||
/**
|
||||
* Scala API: A GraphStage represents a reusable graph stream processing operator.
|
||||
*
|
||||
|
|
@ -979,6 +980,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
}
|
||||
} else andThen()
|
||||
|
||||
/**
|
||||
* Emit a sequence of elements through the given outlet and continue with the given thunk
|
||||
* afterwards, suspending execution if necessary.
|
||||
* This action replaces the [[OutHandler]] for the given outlet if suspension
|
||||
* is needed and reinstalls the current handler upon receiving an `onPull()`
|
||||
* signal (before invoking the `andThen` function).
|
||||
*/
|
||||
final protected def emitMultiple[T](out: Outlet[T], elems: Spliterator[T], andThen: () => Unit): Unit = {
|
||||
val iter = new EmittingSpliterator[T](out, elems, getNonEmittingHandler(out), andThen)
|
||||
if (isAvailable(out)) {
|
||||
if (!iter.tryPush()) {
|
||||
andThen()
|
||||
} else {
|
||||
setOrAddEmitting(out, iter)
|
||||
}
|
||||
} else {
|
||||
setOrAddEmitting(out, iter)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a sequence of elements through the given outlet, suspending execution if necessary.
|
||||
* This action replaces the [[OutHandler]] for the given outlet if suspension
|
||||
|
|
@ -1118,6 +1139,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
}
|
||||
}
|
||||
|
||||
private final class EmittingSpliterator[T](_out: Outlet[T], elems: Spliterator[T], _previous: OutHandler,
|
||||
_andThen: () => Unit)
|
||||
extends Emitting[T](_out, _previous, _andThen) with java.util.function.Consumer[T] {
|
||||
|
||||
override def onPull(): Unit = if (!elems.tryAdvance(this)) {
|
||||
followUp()
|
||||
}
|
||||
|
||||
def tryPush(): Boolean = elems.tryAdvance(this)
|
||||
|
||||
override def accept(elem: T): Unit = push(out, elem)
|
||||
}
|
||||
|
||||
private class EmittingCompletion[T](_out: Outlet[T], _previous: OutHandler)
|
||||
extends Emitting[T](_out, _previous, DoNothing) {
|
||||
override def onPull(): Unit = complete(out)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue