From 94fe1fb26dd202bd3603b8e20281244a655d44cd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 18 Nov 2015 00:09:04 +0100 Subject: [PATCH] +str - Adds Sink.last and Sink.lastOption to mirror Sink.head and Sink.headOption * Renames BlackholeSubscriber to SinkholeSunbscriber * Makes SinkholeSubscriber request Long.MaxValue * SinkholeSink seems like the best name ever --- .../server/StreamingResponseSpecs.scala | 2 +- .../stream/tck/BlackholeSubscriberTest.scala | 17 --- .../tck/HeadOptionSinkSubscriberTest.scala | 18 --- .../stream/tck/SinkholeSubscriberTest.scala | 46 +++++++ .../akka/stream/scaladsl/AttributesSpec.scala | 4 +- .../akka/stream/scaladsl/HeadSinkSpec.scala | 43 ++---- .../akka/stream/scaladsl/LastSinkSpec.scala | 63 +++++++++ .../stream/impl/BlackholeSubscriber.scala | 50 ------- .../akka/stream/impl/SinkholeSubscriber.scala | 34 +++++ .../main/scala/akka/stream/impl/Sinks.scala | 125 +++++++++--------- .../main/scala/akka/stream/impl/Stages.scala | 7 +- .../main/scala/akka/stream/javadsl/Sink.scala | 21 +++ .../scala/akka/stream/scaladsl/Sink.scala | 29 +++- 13 files changed, 268 insertions(+), 191 deletions(-) delete mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala delete mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SinkholeSubscriberTest.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala index c5b5c19f45..b5c7a5d37e 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala @@ -21,7 +21,7 @@ class StreamingResponseSpecs extends RoutingSpec { Get() ~> route ~> check { status should ===(StatusCodes.OK) - responseAs[String] should === ("") + responseAs[String] should ===("") } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala deleted file mode 100644 index 0c72b74f6d..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import akka.stream.impl.BlackholeSubscriber -import scala.concurrent.Promise -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber - -class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { - - override def createSubscriber(): Subscriber[Int] = new BlackholeSubscriber[Int](2, Promise[Unit]()) - - override def createElement(element: Int): Int = element -} - diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala deleted file mode 100644 index ceaca5f123..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import akka.stream.impl.HeadSink -import akka.stream.scaladsl._ -import org.reactivestreams.Subscriber - -import scala.concurrent.Promise - -class HeadOptionSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { - import HeadSink._ - - override def createSubscriber(): Subscriber[Int] = new HeadOptionSinkSubscriber[Int] - - override def createElement(element: Int): Int = element -} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SinkholeSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SinkholeSubscriberTest.scala new file mode 100644 index 0000000000..cb08df6172 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SinkholeSubscriberTest.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.impl.SinkholeSubscriber +import org.reactivestreams.tck.{ TestEnvironment, SubscriberWhiteboxVerification } +import org.reactivestreams.tck.SubscriberWhiteboxVerification.{ SubscriberPuppet, WhiteboxSubscriberProbe } +import org.scalatest.testng.{ TestNGSuiteLike } +import java.lang.{ Integer ⇒ JInt } +import scala.concurrent.Promise +import org.reactivestreams.{ Subscription, Subscriber } + +class SinkholeSubscriberTest extends SubscriberWhiteboxVerification[JInt](new TestEnvironment()) with TestNGSuiteLike { + override def createSubscriber(probe: WhiteboxSubscriberProbe[JInt]): Subscriber[JInt] = { + new Subscriber[JInt] { + val hole = new SinkholeSubscriber[JInt](Promise[Unit]()) + + override def onError(t: Throwable): Unit = { + hole.onError(t) + probe.registerOnError(t) + } + + override def onSubscribe(s: Subscription): Unit = { + probe.registerOnSubscribe(new SubscriberPuppet() { + override def triggerRequest(elements: Long): Unit = s.request(elements) + override def signalCancel(): Unit = s.cancel() + }) + hole.onSubscribe(s) + } + + override def onComplete(): Unit = { + hole.onComplete() + probe.registerOnComplete() + } + + override def onNext(t: JInt): Unit = { + hole.onNext(t) + probe.registerOnNext(t) + } + } + } + + override def createElement(element: Int): JInt = element +} + diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index 963679d857..0c42807ba2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -15,7 +15,7 @@ import scala.concurrent.Promise import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module import org.scalatest.concurrent.ScalaFutures -import akka.stream.impl.BlackholeSubscriber +import akka.stream.impl.SinkholeSubscriber object AttributesSpec { @@ -26,7 +26,7 @@ object AttributesSpec { final class AttributesSink(val attributes: Attributes, shape: SinkShape[Nothing]) extends SinkModule[Nothing, Future[Attributes]](shape) { override def create(context: MaterializationContext) = - (new BlackholeSubscriber(0, Promise()), Future.successful(context.effectiveAttributes)) + (new SinkholeSubscriber(Promise()), Future.successful(context.effectiveAttributes)) override protected def newInstance(shape: SinkShape[Nothing]): SinkModule[Nothing, Future[Attributes]] = new AttributesSink(attributes, shape) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 1d7df4564e..cdc32b4d55 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl import org.reactivestreams.Subscriber - import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ @@ -49,27 +48,16 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { } "yield the first error" in assertAllStagesStopped { - val p = TestPublisher.manualProbe[Int]() - val f = Source(p).runWith(Sink.head) - val proc = p.expectSubscription() - proc.expectRequest() val ex = new RuntimeException("ex") - proc.sendError(ex) - Await.ready(f, 100.millis) - f.value.get should be(Failure(ex)) + intercept[RuntimeException] { + Await.result(Source.failed[Int](ex).runWith(Sink.head), 1.second) + } should be theSameInstanceAs (ex) } - "yield NoSuchElementExcption for empty stream" in assertAllStagesStopped { - val p = TestPublisher.manualProbe[Int]() - val f = Source(p).runWith(Sink.head) - val proc = p.expectSubscription() - proc.expectRequest() - proc.sendComplete() - Await.ready(f, 100.millis) - f.value.get match { - case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("head of empty stream") - case x ⇒ fail("expected NoSuchElementException, got " + x) - } + "yield NoSuchElementException for empty stream" in assertAllStagesStopped { + intercept[NoSuchElementException] { + Await.result(Source.empty[Int].runWith(Sink.head), 1.second) + }.getMessage should be("head of empty stream") } } @@ -86,23 +74,14 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { } "yield the first error" in assertAllStagesStopped { - val p = TestPublisher.manualProbe[Int]() - val f = Source(p).runWith(Sink.head) - val proc = p.expectSubscription() - proc.expectRequest() val ex = new RuntimeException("ex") - proc.sendError(ex) - Await.ready(f, 100.millis) - f.value.get should be(Failure(ex)) + intercept[RuntimeException] { + Await.result(Source.failed[Int](ex).runWith(Sink.head), 1.second) + } should be theSameInstanceAs (ex) } "yield None for empty stream" in assertAllStagesStopped { - val p = TestPublisher.manualProbe[Int]() - val f = Source(p).runWith(Sink.headOption) - val proc = p.expectSubscription() - proc.expectRequest() - proc.sendComplete() - Await.result(f, 100.millis) should be(None) + Await.result(Source.empty[Int].runWith(Sink.headOption), 1.second) should be(None) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala new file mode 100644 index 0000000000..95f41bb1c0 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import org.reactivestreams.Subscriber + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Failure + +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.testkit._ +import akka.stream.testkit.Utils._ + +class LastSinkSpec extends AkkaSpec with ScriptedTest { + + val settings = ActorMaterializerSettings(system) + + implicit val materializer = ActorMaterializer(settings) + + "A Flow with Sink.last" must { + + "yield the last value" in assertAllStagesStopped { + Await.result(Source(1 to 42).map(identity).runWith(Sink.last), 1.second) should be(42) + } + + "yield the first error" in assertAllStagesStopped { + val ex = new RuntimeException("ex") + intercept[RuntimeException] { + Await.result(Source.failed[Int](ex).runWith(Sink.last), 1.second) + } should be theSameInstanceAs (ex) + } + + "yield NoSuchElementException for empty stream" in assertAllStagesStopped { + intercept[NoSuchElementException] { + Await.result(Source.empty[Int].runWith(Sink.last), 1.second) + }.getMessage should be("last of empty stream") + } + + } + "A Flow with Sink.lastOption" must { + + "yield the last value" in assertAllStagesStopped { + Await.result(Source(1 to 42).map(identity).runWith(Sink.lastOption), 1.second) should be(Some(42)) + } + + "yield the first error" in assertAllStagesStopped { + val ex = new RuntimeException("ex") + intercept[RuntimeException] { + Await.result(Source.failed[Int](ex).runWith(Sink.lastOption), 1.second) + } should be theSameInstanceAs (ex) + } + + "yield None for empty stream" in assertAllStagesStopped { + Await.result(Source.empty[Int].runWith(Sink.lastOption), 1.second) should be(None) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala deleted file mode 100644 index 5ed8595903..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.Promise -import org.reactivestreams.{ Subscriber, Subscription } - -/** - * INTERNAL API - */ - -private[akka] class BlackholeSubscriber[T](highWatermark: Int, onComplete: Promise[Unit]) extends Subscriber[T] { - - private val lowWatermark: Int = Math.max(1, highWatermark / 2) - private var requested = 0L - private var subscription: Subscription = null - - override def onSubscribe(sub: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(sub) - if (subscription ne null) sub.cancel() - else { - subscription = sub - requestMore() - } - } - - override def onError(cause: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(cause) - onComplete.tryFailure(cause) - } - - override def onComplete(): Unit = { - onComplete.trySuccess(()) - } - - override def onNext(element: T): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(element) - requested -= 1 - requestMore() - } - - protected def requestMore(): Unit = - if (requested < lowWatermark) { - val amount = highWatermark - requested - requested += amount - subscription.request(amount) - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala new file mode 100644 index 0000000000..563af923b0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.Promise +import org.reactivestreams.{ Subscriber, Subscription } + +/** + * INTERNAL API + */ + +private[akka] final class SinkholeSubscriber[T](whenComplete: Promise[Unit]) extends Subscriber[T] { + private[this] var running: Boolean = false + + override def onSubscribe(sub: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(sub) + if (running) sub.cancel() + else { + running = true + sub.request(Long.MaxValue) + } + } + + override def onError(cause: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(cause) + whenComplete.tryFailure(cause) + } + + override def onComplete(): Unit = whenComplete.trySuccess(()) + + override def onNext(element: T): Unit = ReactiveStreamsCompliance.requireNonNullElement(element) +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 31ee8fa994..74dadc2ed9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -3,18 +3,17 @@ */ package akka.stream.impl -import akka.actor.{ Deploy, ActorRef, Props } -import akka.dispatch.ExecutionContexts +import akka.actor.{ ActorRef, Props } import akka.stream.actor.ActorPublisherMessage.Request import akka.stream.impl.StreamLayout.Module import akka.stream._ +import akka.stream.stage.{ InHandler, GraphStageLogic, SinkStage } import akka.util.Timeout -import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.{ FiniteDuration, _ } import scala.concurrent.{ Future, Promise } import scala.language.postfixOps -import scala.util.Try /** * INTERNAL API @@ -87,75 +86,22 @@ private[akka] final class FanoutPublisherSink[In]( new FanoutPublisherSink[In](attr, amendShape(attr)) } -/** - * INTERNAL API - */ -private[akka] object HeadSink { - final class HeadOptionSinkSubscriber[In] extends Subscriber[In] { - private[this] var subscription: Subscription = null - private[this] val promise: Promise[Option[In]] = Promise[Option[In]]() - def future: Future[Option[In]] = promise.future - override def onSubscribe(s: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(s) - if (subscription ne null) s.cancel() - else { - subscription = s - s.request(1) - } - } - - override def onNext(elem: In): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(elem) - promise.trySuccess(Some(elem)) - subscription.cancel() - subscription = null - } - - override def onError(t: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(t) - promise.tryFailure(t) - } - - override def onComplete(): Unit = - promise.trySuccess(None) - } - -} - -/** - * INTERNAL API - * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first - * element that is signaled to this stream (wrapped in a [[Some]]), - * which can be either an element (after which the upstream subscription is canceled), - * an error condition (putting the Future into the corresponding failed state) or - * the end-of-stream (yielding [[None]]). - */ -private[akka] final class HeadOptionSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[Option[In]]](shape) { - override def create(context: MaterializationContext) = { - val sub = new HeadSink.HeadOptionSinkSubscriber[In] - (sub, sub.future) - } - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[Option[In]]] = new HeadOptionSink[In](attributes, shape) - override def withAttributes(attr: Attributes): Module = new HeadOptionSink[In](attr, amendShape(attr)) - override def toString: String = "HeadOptionSink" -} - /** * INTERNAL API * Attaches a subscriber to this stream which will just discard all received * elements. */ -private[akka] final class BlackholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) { +private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) { override def create(context: MaterializationContext) = { val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes) val p = Promise[Unit]() - (new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize, p), p.future) + (new SinkholeSubscriber[Any](p), p.future) } - override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new BlackholeSink(attributes, shape) - override def withAttributes(attr: Attributes): Module = new BlackholeSink(attr, amendShape(attr)) - override def toString: String = "BlackholeSink" + override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new SinkholeSink(attributes, shape) + override def withAttributes(attr: Attributes): Module = new SinkholeSink(attr, amendShape(attr)) + override def toString: String = "SinkholeSink" } /** @@ -246,3 +192,58 @@ private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: A new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout) override def toString: String = "AcknowledgeSink" } + +private[akka] final class LastOptionStage[T] extends SinkStage[T, Future[Option[T]]]("lastOption") { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val p: Promise[Option[T]] = Promise() + (new GraphStageLogic(shape) { + override def preStart(): Unit = pull(in) + setHandler(in, new InHandler { + private[this] var prev: T = null.asInstanceOf[T] + + override def onPush(): Unit = { + prev = grab(in) + pull(in) + } + + override def onUpstreamFinish(): Unit = { + val head = prev + prev = null.asInstanceOf[T] + p.trySuccess(Option(head)) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + prev = null.asInstanceOf[T] + p.tryFailure(ex) + failStage(ex) + } + }) + }, p.future) + } +} + +private[akka] final class HeadOptionStage[T] extends SinkStage[T, Future[Option[T]]]("headOption") { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val p: Promise[Option[T]] = Promise() + (new GraphStageLogic(shape) { + override def preStart(): Unit = pull(in) + setHandler(in, new InHandler { + override def onPush(): Unit = { + p.trySuccess(Option(grab(in))) + completeStage() + } + + override def onUpstreamFinish(): Unit = { + p.trySuccess(None) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + }) + }, p.future) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 84f1dc9fec..5b9f2baf6f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -25,8 +25,6 @@ private[stream] object Stages { object DefaultAttributes { val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") - val timerTransform = name("timerTransform") - val stageFactory = name("stageFactory") val fused = name("fused") val map = name("map") val log = name("log") @@ -63,9 +61,6 @@ private[stream] object Stages { val zip = name("zip") val unzip = name("unzip") val concat = name("concat") - val flexiMerge = name("flexiMerge") - val flexiRoute = name("flexiRoute") - val identityJunction = name("identityJunction") val repeat = name("repeat") val publisherSource = name("publisherSource") @@ -90,6 +85,8 @@ private[stream] object Stages { val cancelledSink = name("cancelledSink") val headSink = name("headSink") and inputBuffer(initial = 1, max = 1) val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1) + val lastSink = name("lastSink") + val lastOptionSink = name("lastOptionSink") val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index f9f9644a0c..8d7e466c77 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -113,6 +113,27 @@ object Sink { new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue( _.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext))) + /** + * A `Sink` that materializes into a `Future` of the last value received. + * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[lastOption]]. + */ + def last[In](): Sink[In, Future[In]] = + new Sink(scaladsl.Sink.last[In]) + + /** + * A `Sink` that materializes into a `Future` of the optional last value received. + * If the stream completes before signaling at least a single element, the value of the Future will be an empty [[akka.japi.Option]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[head]]. + */ + def lastOption[In](): Sink[In, Future[akka.japi.Option[In]]] = + new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue( + _.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext))) + /** * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index c72a775ddd..0c41721a8b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -85,8 +85,9 @@ object Sink { * * See also [[headOption]]. */ - def head[T]: Sink[T, Future[T]] = new Sink[T, Future[Option[T]]](new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadSink"))) - .mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) + def head[T]: Sink[T, Future[T]] = + Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headSink) + .mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) /** * A `Sink` that materializes into a `Future` of the optional first value received. @@ -95,7 +96,27 @@ object Sink { * * See also [[head]]. */ - def headOption[T]: Sink[T, Future[Option[T]]] = new Sink(new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadOptionSink"))) + def headOption[T]: Sink[T, Future[Option[T]]] = + Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headOptionSink) + + /** + * A `Sink` that materializes into a `Future` of the last value received. + * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[lastOption]]. + */ + def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink) + .mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) + + /** + * A `Sink` that materializes into a `Future` of the optional last value received. + * If the stream completes before signaling at least a single element, the value of the Future will be [[None]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[last]]. + */ + def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. @@ -117,7 +138,7 @@ object Sink { * A `Sink` that will consume the stream and discard the elements. */ def ignore: Sink[Any, Future[Unit]] = - new Sink(new BlackholeSink(DefaultAttributes.ignoreSink, shape("BlackholeSink"))) + new Sink(new SinkholeSink(DefaultAttributes.ignoreSink, shape("SinkholeSink"))) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized