From 21048496582d351f7bd1e061f6712bb8afa8ae38 Mon Sep 17 00:00:00 2001 From: contrun Date: Wed, 6 May 2020 16:33:15 +0800 Subject: [PATCH] add Source.never (#29008) * add Source.never * make Source.never single instance, add DefaultAttributes --- .../paradox/stream/operators/Source/empty.md | 2 ++ .../paradox/stream/operators/Source/never.md | 27 +++++++++++++++ .../main/paradox/stream/operators/index.md | 2 ++ .../stream/scaladsl/NeverSourceSpec.scala | 33 +++++++++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 1 + .../scala/akka/stream/javadsl/Source.scala | 7 ++++ .../scala/akka/stream/scaladsl/Source.scala | 8 +++++ 7 files changed, 80 insertions(+) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/never.md create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source/empty.md b/akka-docs/src/main/paradox/stream/operators/Source/empty.md index 0cddd1ef71..59f3c643cb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/empty.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/empty.md @@ -4,6 +4,8 @@ Complete right away without ever emitting any elements. @ref[Source operators](../index.md#source-operators) +@ref:[`Source.never`](never.md) a source which emits nothing and never completes. + ## Signature @apidoc[Source.empty](Source$) { scala="#empty[T]:akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#empty()" java="#empty(java.lang.Class)" } diff --git a/akka-docs/src/main/paradox/stream/operators/Source/never.md b/akka-docs/src/main/paradox/stream/operators/Source/never.md new file mode 100644 index 0000000000..4b07141417 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/never.md @@ -0,0 +1,27 @@ +# never + +Never emit any elements, never complete and never fail. + +@ref[Source operators](../index.md#source-operators) + +@ref:[`Source.empty`](empty.md), a source which emits nothing and completes immediately. + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #never } + +@@@ + +## Description + +Create a source which never emits any elements, never completes and never failes. Useful for tests. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** never + +**completes** never + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 1153e85ccd..649b321b9d 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -36,6 +36,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[lazySingle](Source/lazySingle.md)|Defers creation of a single element source until there is demand.| |Source|@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.| |Source|@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.| +|Source|@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.| |Source|@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. | |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.| @@ -484,6 +485,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [mergePrioritized](Source-or-Flow/mergePrioritized.md) * [mergeSorted](Source-or-Flow/mergeSorted.md) * [monitor](Source-or-Flow/monitor.md) +* [never](Source/never.md) * [onComplete](Sink/onComplete.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala new file mode 100644 index 0000000000..f5235de0bb --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/NeverSourceSpec.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2014-2020 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.testkit.DefaultTimeout + +import scala.concurrent.duration._ + +class NeverSourceSpec extends StreamSpec with DefaultTimeout { + + "The Never Source" must { + + "never completes" in assertAllStagesStopped { + val neverSource = Source.never[Int] + val pubSink = Sink.asPublisher[Int](false) + + val neverPub = neverSource.toMat(pubSink)(Keep.right).run() + + val c = TestSubscriber.manualProbe[Int]() + neverPub.subscribe(c) + val subs = c.expectSubscription() + + subs.request(1) + c.expectNoMessage(300.millis) + + subs.cancel() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 3dd5239b4c..1cbfc50e23 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -104,6 +104,7 @@ import akka.stream.Attributes._ val singleSource = name("singleSource") val emptySource = name("emptySource") val maybeSource = name("MaybeSource") + val neverSource = name("neverSource") val failedSource = name("failedSource") val concatSource = name("concatSource") val concatMatSource = name("concatMatSource") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 7356dd0acc..770f33fd5b 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -304,6 +304,13 @@ object Source { def future[T](futureElement: Future[T]): Source[T, NotUsed] = scaladsl.Source.future(futureElement).asJava + /** + * Never emits any elements, never completes and never fails. + * This stream could be useful in tests. + */ + def never[T]: Source[T, NotUsed] = + scaladsl.Source.never.asJava + /** * Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream. * If the `CompletionStage` is completed with a failure the stream is failed. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 18bb3f3cd3..8270764fc3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -512,6 +512,14 @@ object Source { def future[T](futureElement: Future[T]): Source[T, NotUsed] = fromGraph(new FutureSource[T](futureElement)) + /** + * Never emits any elements, never completes and never fails. + * This stream could be useful in tests. + */ + def never[T]: Source[T, NotUsed] = _never + private[this] val _never: Source[Nothing, NotUsed] = + future(Future.never).withAttributes(DefaultAttributes.neverSource) + /** * Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream. * If the `CompletionStage` is completed with a failure the stream is failed.