diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/never.md b/akka-docs/src/main/paradox/stream/operators/Sink/never.md
new file mode 100644
index 0000000000..efaaf47ddc
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Sink/never.md
@@ -0,0 +1,27 @@
+# Sink.never
+
+Always backpressure never cancel and never consume any elements from the stream.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.never](Sink$) { java="#never()" }
+@apidoc[Sink.never](Sink$) { scala="#never()" }
+
+
+## Description
+
+A `Sink` that will always backpressure never cancel and never consume any elements from the stream.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** never
+
+**backpressures** always
+
+@@@
+
+
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 4c555bca75..79721ca8c6 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -75,6 +75,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|Sink|@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
|Sink|@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
+|Sink|@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.|
|Sink|@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
|Sink|@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
|Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
@@ -506,6 +507,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mergeSorted](Source-or-Flow/mergeSorted.md)
* [monitor](Source-or-Flow/monitor.md)
* [never](Source/never.md)
+* [never](Sink/never.md)
* [onComplete](Sink/onComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala
index 93c221a600..031b36ff77 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala
@@ -4,18 +4,17 @@
package akka.stream.scaladsl
-import scala.concurrent.{ Await, Future }
-import scala.concurrent.duration._
-
-import scala.annotation.nowarn
-import org.reactivestreams.Publisher
-import org.scalatest.concurrent.ScalaFutures
-
import akka.Done
import akka.stream._
import akka.stream.testkit._
-import akka.stream.testkit.scaladsl.TestSink
+import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.DefaultTimeout
+import org.reactivestreams.Publisher
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.annotation.nowarn
+import scala.concurrent.duration._
+import scala.concurrent.{ Await, Future }
class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
@@ -226,6 +225,35 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
}
+ "The never sink" should {
+
+ "always backpressure" in {
+ val (source, doneFuture) = TestSource.probe[Int].toMat(Sink.never)(Keep.both).run()
+ source.ensureSubscription()
+ source.expectRequest()
+ source.sendComplete()
+ Await.result(doneFuture, 100.millis) should ===(Done)
+ }
+
+ "can failed with upstream failure" in {
+ val (source, doneFuture) = TestSource.probe[Int].toMat(Sink.never)(Keep.both).run()
+ source.ensureSubscription()
+ source.expectRequest()
+ source.sendError(new RuntimeException("Oops"))
+ a[RuntimeException] shouldBe thrownBy {
+ Await.result(doneFuture, 100.millis)
+ }
+ }
+
+ "fail its materialized value on abrupt materializer termination" in {
+ @nowarn("msg=deprecated")
+ val mat = ActorMaterializer()
+ val matVal = Source.single(1).runWith(Sink.never)(mat)
+ mat.shutdown()
+ matVal.failed.futureValue shouldBe a[AbruptStageTerminationException]
+ }
+ }
+
"The reduce sink" must {
"sum up 1 to 10 correctly" in {
//#reduce-operator-example
diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
index db589f1ee6..930d34f466 100755
--- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
@@ -17,6 +17,7 @@ import akka.stream.Attributes._
// reusable common attributes
val IODispatcher = ActorAttributes.IODispatcher
val inputBufferOne = inputBuffer(initial = 1, max = 1)
+ val inputBufferZero = inputBuffer(initial = 0, max = 0)
// stage specific default attributes
val fused = name("fused")
@@ -135,6 +136,7 @@ import akka.stream.Attributes._
val publisherSink = name("publisherSink")
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
+ val neverSink = name("neverSink") and inputBufferZero
val actorRefSink = name("actorRefSink")
val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink")
val actorSubscriberSink = name("actorSubscriberSink")
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
index 1f7ad2d736..5e6016df1f 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
@@ -448,6 +448,41 @@ import akka.stream.stage._
}
}
+ @InternalApi
+ private[akka] object NeverSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
+ private val in = Inlet[Any]("NeverSink.in")
+ val shape: SinkShape[Any] = SinkShape(in)
+
+ override def initialAttributes: Attributes = DefaultAttributes.neverSink
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+ val promise = Promise[Done]()
+ val logic = new GraphStageLogic(shape) with InHandler {
+
+ override def onPush(): Unit =
+ promise.tryFailure(new IllegalStateException("NeverSink should not receive any push."))
+
+ override def onUpstreamFinish(): Unit = {
+ super.onUpstreamFinish()
+ promise.trySuccess(Done)
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ super.onUpstreamFailure(ex)
+ promise.tryFailure(ex)
+ }
+
+ override def postStop(): Unit = {
+ if (!promise.isCompleted) promise.tryFailure(new AbruptStageTerminationException(this))
+ }
+
+ setHandler(in, this)
+ }
+
+ (logic, promise.future)
+ }
+ }
+
/**
* INTERNAL API.
*
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
index ea34e588b7..3eeef8b63d 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
@@ -85,6 +85,12 @@ object Sink {
def ignore[T](): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.ignore.toCompletionStage())
+ /**
+ * A [[Sink]] that will always backpressure never cancel and never consume any elements from the stream.
+ * */
+ def never[T]: Sink[T, CompletionStage[Done]] =
+ new Sink(scaladsl.Sink.never.toCompletionStage())
+
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
*
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
index 9568675483..80b8611ae8 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
@@ -301,6 +301,12 @@ object Sink {
*/
def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)
+ /**
+ * A [[Sink]] that will always backpressure never cancel and never consume any elements from the stream.
+ * */
+ def never: Sink[Any, Future[Done]] = _never
+ private[this] val _never: Sink[Any, Future[Done]] = fromGraph(GraphStages.NeverSink)
+
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] which will be completed with `Success` when reaching the