From 313fde32e13f18032eac63eaffc584043fe2d8b0 Mon Sep 17 00:00:00 2001 From: kerr Date: Fri, 1 Apr 2022 16:29:55 +0800 Subject: [PATCH] +str Add Sink.never (#31289) --- .../paradox/stream/operators/Sink/never.md | 27 ++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../scala/akka/stream/scaladsl/SinkSpec.scala | 44 +++++++++++++++---- .../main/scala/akka/stream/impl/Stages.scala | 2 + .../akka/stream/impl/fusing/GraphStages.scala | 35 +++++++++++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 6 +++ .../scala/akka/stream/scaladsl/Sink.scala | 6 +++ 7 files changed, 114 insertions(+), 8 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/never.md diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/never.md b/akka-docs/src/main/paradox/stream/operators/Sink/never.md new file mode 100644 index 0000000000..efaaf47ddc --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/never.md @@ -0,0 +1,27 @@ +# Sink.never + +Always backpressure never cancel and never consume any elements from the stream. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.never](Sink$) { java="#never()" } +@apidoc[Sink.never](Sink$) { scala="#never()" } + + +## Description + +A `Sink` that will always backpressure never cancel and never consume any elements from the stream. + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** never + +**backpressures** always + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 4c555bca75..79721ca8c6 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -75,6 +75,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |Sink|@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.| |Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).| |Sink|@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.| +|Sink|@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.| |Sink|@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.| |Sink|@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.| |Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| @@ -506,6 +507,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [mergeSorted](Source-or-Flow/mergeSorted.md) * [monitor](Source-or-Flow/monitor.md) * [never](Source/never.md) +* [never](Sink/never.md) * [onComplete](Sink/onComplete.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 93c221a600..031b36ff77 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -4,18 +4,17 @@ package akka.stream.scaladsl -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ - -import scala.annotation.nowarn -import org.reactivestreams.Publisher -import org.scalatest.concurrent.ScalaFutures - import akka.Done import akka.stream._ import akka.stream.testkit._ -import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.testkit.DefaultTimeout +import org.reactivestreams.Publisher +import org.scalatest.concurrent.ScalaFutures + +import scala.annotation.nowarn +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { @@ -226,6 +225,35 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The never sink" should { + + "always backpressure" in { + val (source, doneFuture) = TestSource.probe[Int].toMat(Sink.never)(Keep.both).run() + source.ensureSubscription() + source.expectRequest() + source.sendComplete() + Await.result(doneFuture, 100.millis) should ===(Done) + } + + "can failed with upstream failure" in { + val (source, doneFuture) = TestSource.probe[Int].toMat(Sink.never)(Keep.both).run() + source.ensureSubscription() + source.expectRequest() + source.sendError(new RuntimeException("Oops")) + a[RuntimeException] shouldBe thrownBy { + Await.result(doneFuture, 100.millis) + } + } + + "fail its materialized value on abrupt materializer termination" in { + @nowarn("msg=deprecated") + val mat = ActorMaterializer() + val matVal = Source.single(1).runWith(Sink.never)(mat) + mat.shutdown() + matVal.failed.futureValue shouldBe a[AbruptStageTerminationException] + } + } + "The reduce sink" must { "sum up 1 to 10 correctly" in { //#reduce-operator-example diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index db589f1ee6..930d34f466 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -17,6 +17,7 @@ import akka.stream.Attributes._ // reusable common attributes val IODispatcher = ActorAttributes.IODispatcher val inputBufferOne = inputBuffer(initial = 1, max = 1) + val inputBufferZero = inputBuffer(initial = 0, max = 0) // stage specific default attributes val fused = name("fused") @@ -135,6 +136,7 @@ import akka.stream.Attributes._ val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") + val neverSink = name("neverSink") and inputBufferZero val actorRefSink = name("actorRefSink") val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink") val actorSubscriberSink = name("actorSubscriberSink") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 1f7ad2d736..5e6016df1f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -448,6 +448,41 @@ import akka.stream.stage._ } } + @InternalApi + private[akka] object NeverSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] { + private val in = Inlet[Any]("NeverSink.in") + val shape: SinkShape[Any] = SinkShape(in) + + override def initialAttributes: Attributes = DefaultAttributes.neverSink + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogic(shape) with InHandler { + + override def onPush(): Unit = + promise.tryFailure(new IllegalStateException("NeverSink should not receive any push.")) + + override def onUpstreamFinish(): Unit = { + super.onUpstreamFinish() + promise.trySuccess(Done) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + super.onUpstreamFailure(ex) + promise.tryFailure(ex) + } + + override def postStop(): Unit = { + if (!promise.isCompleted) promise.tryFailure(new AbruptStageTerminationException(this)) + } + + setHandler(in, this) + } + + (logic, promise.future) + } + } + /** * INTERNAL API. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index ea34e588b7..3eeef8b63d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -85,6 +85,12 @@ object Sink { def ignore[T](): Sink[T, CompletionStage[Done]] = new Sink(scaladsl.Sink.ignore.toCompletionStage()) + /** + * A [[Sink]] that will always backpressure never cancel and never consume any elements from the stream. + * */ + def never[T]: Sink[T, CompletionStage[Done]] = + new Sink(scaladsl.Sink.never.toCompletionStage()) + /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 9568675483..80b8611ae8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -301,6 +301,12 @@ object Sink { */ def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink) + /** + * A [[Sink]] that will always backpressure never cancel and never consume any elements from the stream. + * */ + def never: Sink[Any, Future[Done]] = _never + private[this] val _never: Sink[Any, Future[Done]] = fromGraph(GraphStages.NeverSink) + /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[scala.concurrent.Future]] which will be completed with `Success` when reaching the