From 3dd40fc18c9f0ad9f88823fda08d79a4fed605e3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 5 Jun 2015 18:26:32 +0200 Subject: [PATCH] +str - #17662 - Changes Sink.ignore to return a Future[Unit] --- .../docs/stream/TwitterStreamQuickstartDocSpec.scala | 4 ++-- .../akka/stream/tck/BlackholeSubscriberTest.scala | 3 ++- .../scala/akka/stream/impl/BlackholeSubscriber.scala | 7 ++++--- .../src/main/scala/akka/stream/impl/Sinks.scala | 10 +++++----- .../src/main/scala/akka/stream/javadsl/Sink.scala | 2 +- .../src/main/scala/akka/stream/scaladsl/Sink.scala | 2 +- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 0b25c21eec..f24b33831a 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -106,8 +106,8 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { } "simple broadcast" in { - val writeAuthors: Sink[Author, Unit] = Sink.ignore - val writeHashtags: Sink[Hashtag, Unit] = Sink.ignore + val writeAuthors: Sink[Author, Future[Unit]] = Sink.ignore + val writeHashtags: Sink[Hashtag, Future[Unit]] = Sink.ignore // format: OFF //#flow-graph-broadcast diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala index bca19a9c80..0c72b74f6d 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala @@ -4,12 +4,13 @@ package akka.stream.tck import akka.stream.impl.BlackholeSubscriber +import scala.concurrent.Promise import org.reactivestreams.Publisher import org.reactivestreams.Subscriber class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { - override def createSubscriber(): Subscriber[Int] = new BlackholeSubscriber[Int](2) + override def createSubscriber(): Subscriber[Int] = new BlackholeSubscriber[Int](2, Promise[Unit]()) override def createElement(element: Int): Int = element } diff --git a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala index 8a2355c518..8eed98db97 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala @@ -4,14 +4,14 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference - +import scala.concurrent.Promise import org.reactivestreams.{ Subscriber, Subscription } /** * INTERNAL API */ -private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscriber[T] { +private[akka] class BlackholeSubscriber[T](highWatermark: Int, onComplete: Promise[Unit]) extends Subscriber[T] { private val lowWatermark = Math.max(1, highWatermark / 2) private var requested = 0L @@ -26,10 +26,11 @@ private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscribe override def onError(cause: Throwable): Unit = { ReactiveStreamsCompliance.requireNonNullException(cause) + onComplete.tryFailure(cause) () } - override def onComplete(): Unit = () + override def onComplete(): Unit = onComplete.trySuccess(()) override def onNext(element: T): Unit = { ReactiveStreamsCompliance.requireNonNullElement(element) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index e9687d8f49..86ca3b2a17 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -150,15 +150,15 @@ private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: Sin * Attaches a subscriber to this stream which will just discard all received * elements. */ -private[akka] final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { +private[akka] final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) { override def create(context: MaterializationContext) = { - val effectiveSettings = ActorFlowMaterializer.downcast(context.materializer) - .effectiveSettings(context.effectiveAttributes) - (new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize), ()) + val effectiveSettings = ActorFlowMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes) + val p = Promise[Unit]() + (new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize, p), p.future) } - override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new BlackholeSink(attributes, shape) + override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new BlackholeSink(attributes, shape) override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index c993cf59a3..f525480ad7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -47,7 +47,7 @@ object Sink { /** * A `Sink` that will consume the stream and discard the elements. */ - def ignore[T](): Sink[T, Unit] = + def ignore[T](): Sink[T, Future[Unit]] = new Sink(scaladsl.Sink.ignore) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index d1d784fc85..663c52b9fe 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -95,7 +95,7 @@ object Sink extends SinkApply { /** * A `Sink` that will consume the stream and discard the elements. */ - def ignore: Sink[Any, Unit] = + def ignore: Sink[Any, Future[Unit]] = new Sink(new BlackholeSink(DefaultAttributes.ignoreSink, shape("BlackholeSink"))) /**