diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala index ea8ad1e36c..036976b238 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala @@ -457,6 +457,21 @@ import pekko.stream.stage._ } } + @InternalApi + private[pekko] object NeverSource extends GraphStage[SourceShape[Nothing]] { + private val out = Outlet[Nothing]("NeverSource.out") + val shape: SourceShape[Nothing] = SourceShape(out) + + override def initialAttributes: Attributes = DefaultAttributes.neverSource + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic with OutHandler = + new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = () + + setHandler(out, this) + } + } + @InternalApi private[pekko] object NeverSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] { private val in = Inlet[Any]("NeverSink.in") diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 593547d958..1d935d990a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -293,7 +293,7 @@ object Source { */ def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]]( stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] = - StreamConverters.fromJavaStream(stream); + StreamConverters.fromJavaStream(stream) /** * Creates [[Source]] that will continually produce given elements in specified order. @@ -516,8 +516,7 @@ object Source { * This stream could be useful in tests. */ def never[T]: Source[T, NotUsed] = _never - private[this] val _never: Source[Nothing, NotUsed] = - future(Future.never).withAttributes(DefaultAttributes.neverSource) + private[this] val _never: Source[Nothing, NotUsed] = fromGraph(GraphStages.NeverSource) /** * Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream.