diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index ae33bd13ae..caa9c27a11 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -120,23 +120,6 @@ private[akka] final class FanoutPublisherSink[In]( new FanoutPublisherSink[In](attr, amendShape(attr)) } -/** - * INTERNAL API - * Attaches a subscriber to this stream which will just discard all received - * elements. - */ -final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) { - - override def create(context: MaterializationContext) = { - val effectiveSettings = ActorMaterializerHelper.downcast(context.materializer).effectiveSettings(context.effectiveAttributes) - val p = Promise[Done]() - (new SinkholeSubscriber[Any](p), p.future) - } - - override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape) - override def withAttributes(attr: Attributes): AtomicModule = new SinkholeSink(attr, amendShape(attr)) -} - /** * INTERNAL API * Attaches a subscriber to this stream. diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 7f664d1935..0b79c4c25b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -328,6 +328,42 @@ object GraphStages { override def toString: String = "FutureSource" } + /** + * INTERNAL API + * Discards all received elements. + */ + object IgnoreSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] { + + val in = Inlet[Any]("Ignore.in") + val shape = SinkShape(in) + + override def initialAttributes = DefaultAttributes.ignoreSink + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogic(shape) with InHandler { + + override def preStart(): Unit = pull(in) + override def onPush(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + super.onUpstreamFinish() + promise.trySuccess(Done) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + super.onUpstreamFailure(ex) + promise.tryFailure(ex) + } + + setHandler(in, this) + } + + (logic, promise.future) + } + + } + /** * INTERNAL API. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index d44e243de8..895d48acc1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -10,7 +10,8 @@ import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, InHandler } +import akka.stream.impl.fusing.GraphStages +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -187,8 +188,7 @@ object Sink { /** * A `Sink` that will consume the stream and discard the elements. */ - def ignore: Sink[Any, Future[Done]] = - new Sink(new SinkholeSink(DefaultAttributes.ignoreSink, shape("SinkholeSink"))) + def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized