diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 6fcb45d8f6..5a6e8e37a0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -175,54 +175,14 @@ import scala.util.{ Failure, Success, Try } new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr)) } -/** - * INTERNAL API - */ -@InternalApi private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { - - val in: Inlet[T] = Inlet("lastOption.in") - - override val shape: SinkShape[T] = SinkShape.of(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val p: Promise[Option[T]] = Promise() - (new GraphStageLogic(shape) with InHandler { - private[this] var prev: T = null.asInstanceOf[T] - - override def preStart(): Unit = pull(in) - - def onPush(): Unit = { - prev = grab(in) - pull(in) - } - - override def onUpstreamFinish(): Unit = { - val head = prev - prev = null.asInstanceOf[T] - p.trySuccess(Option(head)) - completeStage() - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - prev = null.asInstanceOf[T] - p.tryFailure(ex) - failStage(ex) - } - - setHandler(in, this) - }, p.future) - } - - override def toString: String = "LastOptionStage" -} - /** * INTERNAL API */ @InternalApi private[akka] final class TakeLastStage[T](n: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { - require(n > 0, "n must be greater than 0") + if (n <= 0) + throw new IllegalArgumentException("requirement failed: n must be greater than 0") - val in: Inlet[T] = Inlet("takeLastStage.in") + val in: Inlet[T] = Inlet("takeLast.in") override val shape: SinkShape[T] = SinkShape.of(in) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index ba50a7f168..690de6cc64 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -165,8 +165,8 @@ object Sink { /** * A `Sink` that materializes into a a `CompletionStage` of `List` containing the last `n` collected elements. - * If the stream completes before signaling at least n elements, the CompletionStage will complete with the number - * of elements taken at that point. + * + * If the stream completes before signaling at least n elements, the `CompletionStage` will complete with all elements seen so far. * If the stream never completes the `CompletionStage` will never complete. * If there is a failure signaled in the stream the `CompletionStage` will be completed with failure. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 09186c2fc7..e8f58aed92 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -190,8 +190,12 @@ object Sink { * * See also [[lastOption]], [[takeLast]]. */ - def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink) - .mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) + def last[T]: Sink[T, Future[T]] = { + Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastSink) + .mapMaterializedValue { e ⇒ + e.map(_.headOption.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext) + } + } /** * A `Sink` that materializes into a `Future` of the optional last value received. @@ -200,20 +204,22 @@ object Sink { * * See also [[last]], [[takeLast]]. */ - def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink) + def lastOption[T]: Sink[T, Future[Option[T]]] = { + Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastOptionSink) + .mapMaterializedValue { e ⇒ + e.map(_.headOption)(ExecutionContexts.sameThreadExecutionContext) + } + } /** * A `Sink` that materializes into a a `Future` of `immutable.Seq[T]` containing the last `n` collected elements. - * If the stream completes before signaling at least n elements, the Future will complete with the number - * of elements taken at that point. + * + * If the stream completes before signaling at least n elements, the `Future` will complete with all elements seen so far. * If the stream never completes, the `Future` will never complete. * If there is a failure signaled in the stream the `Future` will be completed with failure. */ def takeLast[T](n: Int): Sink[T, Future[immutable.Seq[T]]] = - if (n == 1) - lastOption.mapMaterializedValue(fut ⇒ fut.map(_.fold(immutable.Seq.empty[T])(m ⇒ immutable.Seq(m)))(ExecutionContexts.sameThreadExecutionContext)) - else - Sink.fromGraph(new TakeLastStage[T](n)).withAttributes(DefaultAttributes.takeLastSink) + Sink.fromGraph(new TakeLastStage[T](n)).withAttributes(DefaultAttributes.takeLastSink) /** * A `Sink` that keeps on collecting incoming elements until upstream terminates.