+str Add Sink.never (#31289)

This commit is contained in:
kerr 2022-04-01 16:29:55 +08:00 committed by GitHub
parent db3b283034
commit 313fde32e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 8 deletions

View file

@ -0,0 +1,27 @@
# Sink.never
Always backpressure never cancel and never consume any elements from the stream.
@ref[Sink operators](../index.md#sink-operators)
## Signature
@apidoc[Sink.never](Sink$) { java="#never()" }
@apidoc[Sink.never](Sink$) { scala="#never()" }
## Description
A `Sink` that will always backpressure never cancel and never consume any elements from the stream.
## Reactive Streams semantics
@@@div { .callout }
**cancels** never
**backpressures** always
@@@

View file

@ -75,6 +75,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|Sink|<a name="lazyfuturesink"></a>@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|Sink|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
|Sink|<a name="lazysink"></a>@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|Sink|<a name="never"></a>@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.|
|Sink|<a name="oncomplete"></a>@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
|Sink|<a name="prematerialize"></a>@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
@ -506,6 +507,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mergeSorted](Source-or-Flow/mergeSorted.md)
* [monitor](Source-or-Flow/monitor.md)
* [never](Source/never.md)
* [never](Sink/never.md)
* [onComplete](Sink/onComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)

View file

@ -4,18 +4,17 @@
package akka.stream.scaladsl
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.annotation.nowarn
import org.reactivestreams.Publisher
import org.scalatest.concurrent.ScalaFutures
import akka.Done
import akka.stream._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.DefaultTimeout
import org.reactivestreams.Publisher
import org.scalatest.concurrent.ScalaFutures
import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
@ -226,6 +225,35 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
}
"The never sink" should {
"always backpressure" in {
val (source, doneFuture) = TestSource.probe[Int].toMat(Sink.never)(Keep.both).run()
source.ensureSubscription()
source.expectRequest()
source.sendComplete()
Await.result(doneFuture, 100.millis) should ===(Done)
}
"can failed with upstream failure" in {
val (source, doneFuture) = TestSource.probe[Int].toMat(Sink.never)(Keep.both).run()
source.ensureSubscription()
source.expectRequest()
source.sendError(new RuntimeException("Oops"))
a[RuntimeException] shouldBe thrownBy {
Await.result(doneFuture, 100.millis)
}
}
"fail its materialized value on abrupt materializer termination" in {
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val matVal = Source.single(1).runWith(Sink.never)(mat)
mat.shutdown()
matVal.failed.futureValue shouldBe a[AbruptStageTerminationException]
}
}
"The reduce sink" must {
"sum up 1 to 10 correctly" in {
//#reduce-operator-example

View file

@ -17,6 +17,7 @@ import akka.stream.Attributes._
// reusable common attributes
val IODispatcher = ActorAttributes.IODispatcher
val inputBufferOne = inputBuffer(initial = 1, max = 1)
val inputBufferZero = inputBuffer(initial = 0, max = 0)
// stage specific default attributes
val fused = name("fused")
@ -135,6 +136,7 @@ import akka.stream.Attributes._
val publisherSink = name("publisherSink")
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
val neverSink = name("neverSink") and inputBufferZero
val actorRefSink = name("actorRefSink")
val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink")
val actorSubscriberSink = name("actorSubscriberSink")

View file

@ -448,6 +448,41 @@ import akka.stream.stage._
}
}
@InternalApi
private[akka] object NeverSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
private val in = Inlet[Any]("NeverSink.in")
val shape: SinkShape[Any] = SinkShape(in)
override def initialAttributes: Attributes = DefaultAttributes.neverSink
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val promise = Promise[Done]()
val logic = new GraphStageLogic(shape) with InHandler {
override def onPush(): Unit =
promise.tryFailure(new IllegalStateException("NeverSink should not receive any push."))
override def onUpstreamFinish(): Unit = {
super.onUpstreamFinish()
promise.trySuccess(Done)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
promise.tryFailure(ex)
}
override def postStop(): Unit = {
if (!promise.isCompleted) promise.tryFailure(new AbruptStageTerminationException(this))
}
setHandler(in, this)
}
(logic, promise.future)
}
}
/**
* INTERNAL API.
*

View file

@ -85,6 +85,12 @@ object Sink {
def ignore[T](): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.ignore.toCompletionStage())
/**
* A [[Sink]] that will always backpressure never cancel and never consume any elements from the stream.
* */
def never[T]: Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.never.toCompletionStage())
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
*

View file

@ -301,6 +301,12 @@ object Sink {
*/
def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)
/**
* A [[Sink]] that will always backpressure never cancel and never consume any elements from the stream.
* */
def never: Sink[Any, Future[Done]] = _never
private[this] val _never: Sink[Any, Future[Done]] = fromGraph(GraphStages.NeverSink)
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] which will be completed with `Success` when reaching the