=str Implement Source.never as a dedicated GraphStage.

This commit is contained in:
He-Pin 2023-04-01 17:44:01 +08:00 committed by kerr
parent d6ddba13d3
commit f24e3887f3
2 changed files with 17 additions and 3 deletions

View file

@ -457,6 +457,21 @@ import pekko.stream.stage._
}
}
@InternalApi
private[pekko] object NeverSource extends GraphStage[SourceShape[Nothing]] {
private val out = Outlet[Nothing]("NeverSource.out")
val shape: SourceShape[Nothing] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.neverSource
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic with OutHandler =
new GraphStageLogic(shape) with OutHandler {
override def onPull(): Unit = ()
setHandler(out, this)
}
}
@InternalApi
private[pekko] object NeverSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
private val in = Inlet[Any]("NeverSink.in")

View file

@ -293,7 +293,7 @@ object Source {
*/
def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](
stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] =
StreamConverters.fromJavaStream(stream);
StreamConverters.fromJavaStream(stream)
/**
* Creates [[Source]] that will continually produce given elements in specified order.
@ -516,8 +516,7 @@ object Source {
* This stream could be useful in tests.
*/
def never[T]: Source[T, NotUsed] = _never
private[this] val _never: Source[Nothing, NotUsed] =
future(Future.never).withAttributes(DefaultAttributes.neverSource)
private[this] val _never: Source[Nothing, NotUsed] = fromGraph(GraphStages.NeverSource)
/**
* Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream.