diff --git a/akka-docs/src/main/paradox/stream/operators/Source/empty.md b/akka-docs/src/main/paradox/stream/operators/Source/empty.md
index 0cddd1ef71..59f3c643cb 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source/empty.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source/empty.md
@@ -4,6 +4,8 @@ Complete right away without ever emitting any elements.
@ref[Source operators](../index.md#source-operators)
+@ref:[`Source.never`](never.md) a source which emits nothing and never completes.
+
## Signature
@apidoc[Source.empty](Source$) { scala="#empty[T]:akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#empty()" java="#empty(java.lang.Class)" }
diff --git a/akka-docs/src/main/paradox/stream/operators/Source/never.md b/akka-docs/src/main/paradox/stream/operators/Source/never.md
new file mode 100644
index 0000000000..4b07141417
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source/never.md
@@ -0,0 +1,27 @@
+# never
+
+Never emit any elements, never complete and never fail.
+
+@ref[Source operators](../index.md#source-operators)
+
+@ref:[`Source.empty`](empty.md), a source which emits nothing and completes immediately.
+
+## Signature
+
+@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #never }
+
+@@@
+
+## Description
+
+Create a source which never emits any elements, never completes and never failes. Useful for tests.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** never
+
+**completes** never
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 1153e85ccd..649b321b9d 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -36,6 +36,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|@ref[lazySingle](Source/lazySingle.md)|Defers creation of a single element source until there is demand.|
|Source|@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.|
|Source|@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.|
+|Source|@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.|
|Source|@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. |
|Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.|
@@ -484,6 +485,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mergePrioritized](Source-or-Flow/mergePrioritized.md)
* [mergeSorted](Source-or-Flow/mergeSorted.md)
* [monitor](Source-or-Flow/monitor.md)
+* [never](Source/never.md)
* [onComplete](Sink/onComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala
new file mode 100644
index 0000000000..f5235de0bb
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2014-2020 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.stream.testkit.scaladsl.StreamTestKit._
+import akka.stream.testkit.{ StreamSpec, TestSubscriber }
+import akka.testkit.DefaultTimeout
+
+import scala.concurrent.duration._
+
+class NeverSourceSpec extends StreamSpec with DefaultTimeout {
+
+ "The Never Source" must {
+
+ "never completes" in assertAllStagesStopped {
+ val neverSource = Source.never[Int]
+ val pubSink = Sink.asPublisher[Int](false)
+
+ val neverPub = neverSource.toMat(pubSink)(Keep.right).run()
+
+ val c = TestSubscriber.manualProbe[Int]()
+ neverPub.subscribe(c)
+ val subs = c.expectSubscription()
+
+ subs.request(1)
+ c.expectNoMessage(300.millis)
+
+ subs.cancel()
+ }
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
index 3dd5239b4c..1cbfc50e23 100755
--- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
@@ -104,6 +104,7 @@ import akka.stream.Attributes._
val singleSource = name("singleSource")
val emptySource = name("emptySource")
val maybeSource = name("MaybeSource")
+ val neverSource = name("neverSource")
val failedSource = name("failedSource")
val concatSource = name("concatSource")
val concatMatSource = name("concatMatSource")
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 7356dd0acc..770f33fd5b 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -304,6 +304,13 @@ object Source {
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
scaladsl.Source.future(futureElement).asJava
+ /**
+ * Never emits any elements, never completes and never fails.
+ * This stream could be useful in tests.
+ */
+ def never[T]: Source[T, NotUsed] =
+ scaladsl.Source.never.asJava
+
/**
* Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream.
* If the `CompletionStage` is completed with a failure the stream is failed.
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
index 18bb3f3cd3..8270764fc3 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
@@ -512,6 +512,14 @@ object Source {
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
fromGraph(new FutureSource[T](futureElement))
+ /**
+ * Never emits any elements, never completes and never fails.
+ * This stream could be useful in tests.
+ */
+ def never[T]: Source[T, NotUsed] = _never
+ private[this] val _never: Source[Nothing, NotUsed] =
+ future(Future.never).withAttributes(DefaultAttributes.neverSource)
+
/**
* Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream.
* If the `CompletionStage` is completed with a failure the stream is failed.