add Source.never (#29008)

* add Source.never

* make Source.never single instance, add DefaultAttributes
This commit is contained in:
contrun 2020-05-06 16:33:15 +08:00 committed by GitHub
parent b2509efdb0
commit 2104849658
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 0 deletions

View file

@ -4,6 +4,8 @@ Complete right away without ever emitting any elements.
@ref[Source operators](../index.md#source-operators)
@ref:[`Source.never`](never.md) a source which emits nothing and never completes.
## Signature
@apidoc[Source.empty](Source$) { scala="#empty[T]:akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#empty()" java="#empty(java.lang.Class)" }

View file

@ -0,0 +1,27 @@
# never
Never emit any elements, never complete and never fail.
@ref[Source operators](../index.md#source-operators)
@ref:[`Source.empty`](empty.md), a source which emits nothing and completes immediately.
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #never }
@@@
## Description
Create a source which never emits any elements, never completes and never failes. Useful for tests.
## Reactive Streams semantics
@@@div { .callout }
**emits** never
**completes** never
@@@

View file

@ -36,6 +36,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="lazysingle"></a>@ref[lazySingle](Source/lazySingle.md)|Defers creation of a single element source until there is demand.|
|Source|<a name="lazysource"></a>@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.|
|Source|<a name="maybe"></a>@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.|
|Source|<a name="never"></a>@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.|
|Source|<a name="queue"></a>@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. |
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.|
@ -484,6 +485,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mergePrioritized](Source-or-Flow/mergePrioritized.md)
* [mergeSorted](Source-or-Flow/mergeSorted.md)
* [monitor](Source-or-Flow/monitor.md)
* [never](Source/never.md)
* [onComplete](Sink/onComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2014-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.testkit.DefaultTimeout
import scala.concurrent.duration._
class NeverSourceSpec extends StreamSpec with DefaultTimeout {
"The Never Source" must {
"never completes" in assertAllStagesStopped {
val neverSource = Source.never[Int]
val pubSink = Sink.asPublisher[Int](false)
val neverPub = neverSource.toMat(pubSink)(Keep.right).run()
val c = TestSubscriber.manualProbe[Int]()
neverPub.subscribe(c)
val subs = c.expectSubscription()
subs.request(1)
c.expectNoMessage(300.millis)
subs.cancel()
}
}
}

View file

@ -104,6 +104,7 @@ import akka.stream.Attributes._
val singleSource = name("singleSource")
val emptySource = name("emptySource")
val maybeSource = name("MaybeSource")
val neverSource = name("neverSource")
val failedSource = name("failedSource")
val concatSource = name("concatSource")
val concatMatSource = name("concatMatSource")

View file

@ -304,6 +304,13 @@ object Source {
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
scaladsl.Source.future(futureElement).asJava
/**
* Never emits any elements, never completes and never fails.
* This stream could be useful in tests.
*/
def never[T]: Source[T, NotUsed] =
scaladsl.Source.never.asJava
/**
* Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream.
* If the `CompletionStage` is completed with a failure the stream is failed.

View file

@ -512,6 +512,14 @@ object Source {
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
fromGraph(new FutureSource[T](futureElement))
/**
* Never emits any elements, never completes and never fails.
* This stream could be useful in tests.
*/
def never[T]: Source[T, NotUsed] = _never
private[this] val _never: Source[Nothing, NotUsed] =
future(Future.never).withAttributes(DefaultAttributes.neverSource)
/**
* Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream.
* If the `CompletionStage` is completed with a failure the stream is failed.