Rewrite Sink.ignore as a GraphStage #21527

This commit is contained in:
Ortigali 2016-10-26 15:38:49 +05:00 committed by Johan Andrén
parent aa8c253d14
commit f970412af7
3 changed files with 39 additions and 20 deletions

View file

@ -120,23 +120,6 @@ private[akka] final class FanoutPublisherSink[In](
new FanoutPublisherSink[In](attr, amendShape(attr))
}
/**
* INTERNAL API
* Attaches a subscriber to this stream which will just discard all received
* elements.
*/
final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) {
override def create(context: MaterializationContext) = {
val effectiveSettings = ActorMaterializerHelper.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
val p = Promise[Done]()
(new SinkholeSubscriber[Any](p), p.future)
}
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape)
override def withAttributes(attr: Attributes): AtomicModule = new SinkholeSink(attr, amendShape(attr))
}
/**
* INTERNAL API
* Attaches a subscriber to this stream.