diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md b/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md new file mode 100644 index 0000000000..f0f5e33fc0 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md @@ -0,0 +1,31 @@ +# Sink.takeLast + +Collect the last `n` values emitted from the stream into a collection. + +@ref[Sink operators](../index.md#sink-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #takeLast } + +@@@ + +## Description + +Materializes into a @scala[`Future`] @java[`CompletionStage`] of @scala[`immutable.Seq[T]`] @java[`List`] containing the last `n` collected elements when the stream completes. +If the stream completes before signaling at least n elements, the @scala[`Future`] @java[`CompletionStage`] will complete with the number +of elements taken at that point. +If the stream never completes, the @scala[`Future`] @java[`CompletionStage`] will never complete. +If there is a failure signaled in the stream the @scala[`Future`] @java[`CompletionStage`] will be completed with failure. + +@@@div { .callout } + +**cancels** never + +**backpressures** never + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index cc442c4e1d..020df2c90f 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -62,6 +62,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| |Sink|@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.| |Sink|@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.| +|Sink|@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.| ## Additional Sink and Source converters @@ -372,6 +373,7 @@ Operators meant for inter-operating between Akka Streams and Actors: * [headOption](Sink/headOption.md) * [last](Sink/last.md) * [lastOption](Sink/lastOption.md) +* [takeLast](Sink/takeLast.md) * [seq](Sink/seq.md) * [asPublisher](Sink/asPublisher.md) * [ignore](Sink/ignore.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala new file mode 100644 index 0000000000..ab1c983390 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit.{ StreamSpec, TestPublisher } +import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } + +import scala.collection.immutable +import scala.concurrent.{ Await, Future } + +class TakeLastSinkSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val mat = ActorMaterializer(settings) + + "Sink.takeLast" must { + "return the last 3 elements" in { + val input = 1 to 6 + val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(3)) + val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) + result should be(Seq(4, 5, 6)) + } + + "return the number of elements taken when the stream completes" in { + val input = 1 to 4 + val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(5)) + val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) + result should be(Seq(1, 2, 3, 4)) + } + + "fail future when stream abruptly terminated" in { + val mat = ActorMaterializer() + val probe = TestPublisher.probe() + val future: Future[immutable.Seq[Int]] = + Source.fromPublisher(probe).runWith(Sink.takeLast(2))(mat) + mat.shutdown() + future.failed.futureValue shouldBe an[AbruptTerminationException] + } + + "yield empty seq for empty stream" in { + val future: Future[immutable.Seq[Int]] = Source.empty[Int].runWith(Sink.takeLast(3)) + val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) + result should be(Seq.empty) + } + + "return the last element" in { + val input = 1 to 4 + val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(1)) + val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) + result should be(Seq(4)) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b88653c976..6fcb45d8f6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -23,7 +23,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.generic.CanBuildFrom -import scala.collection.immutable +import scala.collection.{ immutable, mutable } import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import scala.concurrent.{ Future, Promise } @@ -216,6 +216,52 @@ import scala.util.{ Failure, Success, Try } override def toString: String = "LastOptionStage" } +/** + * INTERNAL API + */ +@InternalApi private[akka] final class TakeLastStage[T](n: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { + require(n > 0, "n must be greater than 0") + + val in: Inlet[T] = Inlet("takeLastStage.in") + + override val shape: SinkShape[T] = SinkShape.of(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val p: Promise[immutable.Seq[T]] = Promise() + (new GraphStageLogic(shape) with InHandler { + private[this] val buffer = mutable.Queue.empty[T] + private[this] var count = 0 + + override def preStart(): Unit = pull(in) + + override def onPush(): Unit = { + buffer.enqueue(grab(in)) + if (count < n) + count += 1 + else + buffer.dequeue() + pull(in) + } + + override def onUpstreamFinish(): Unit = { + val elements = buffer.result().toList + buffer.clear() + p.trySuccess(elements) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + + setHandler(in, this) + }, p.future) + } + + override def toString: String = "TakeLastStage" +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 6eb9296231..ec745f864e 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -124,6 +124,7 @@ import akka.stream._ val headOptionSink = name("headOptionSink") and inputBufferOne val lastSink = name("lastSink") val lastOptionSink = name("lastOptionSink") + val takeLastSink = name("takeLastSink") val seqSink = name("seqSink") val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index a6572c2833..ba50a7f168 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -147,7 +147,7 @@ object Sink { * If the stream completes before signaling at least a single element, the CompletionStage will be failed with a [[NoSuchElementException]]. * If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception. * - * See also [[lastOption]]. + * See also [[lastOption]], [[takeLast]]. */ def last[In](): Sink[In, CompletionStage[In]] = new Sink(scaladsl.Sink.last[In].toCompletionStage()) @@ -157,12 +157,24 @@ object Sink { * If the stream completes before signaling at least a single element, the value of the CompletionStage will be an empty [[java.util.Optional]]. * If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception. * - * See also [[head]]. + * See also [[head]], [[takeLast]]. */ def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] = new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue( _.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) + /** + * A `Sink` that materializes into a a `CompletionStage` of `List` containing the last `n` collected elements. + * If the stream completes before signaling at least n elements, the CompletionStage will complete with the number + * of elements taken at that point. + * If the stream never completes the `CompletionStage` will never complete. + * If there is a failure signaled in the stream the `CompletionStage` will be completed with failure. + */ + def takeLast[In](n: Int): Sink[In, CompletionStage[java.util.List[In]]] = { + import scala.collection.JavaConverters._ + new Sink(scaladsl.Sink.takeLast[In](n).mapMaterializedValue(fut ⇒ fut.map(sq ⇒ sq.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) + } + /** * A `Sink` that keeps on collecting incoming elements until upstream terminates. * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 6bdf973d67..09186c2fc7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -188,7 +188,7 @@ object Sink { * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. * If the stream signals an error, the Future will be failed with the stream's exception. * - * See also [[lastOption]]. + * See also [[lastOption]], [[takeLast]]. */ def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink) .mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) @@ -198,10 +198,23 @@ object Sink { * If the stream completes before signaling at least a single element, the value of the Future will be [[None]]. * If the stream signals an error, the Future will be failed with the stream's exception. * - * See also [[last]]. + * See also [[last]], [[takeLast]]. */ def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink) + /** + * A `Sink` that materializes into a a `Future` of `immutable.Seq[T]` containing the last `n` collected elements. + * If the stream completes before signaling at least n elements, the Future will complete with the number + * of elements taken at that point. + * If the stream never completes, the `Future` will never complete. + * If there is a failure signaled in the stream the `Future` will be completed with failure. + */ + def takeLast[T](n: Int): Sink[T, Future[immutable.Seq[T]]] = + if (n == 1) + lastOption.mapMaterializedValue(fut ⇒ fut.map(_.fold(immutable.Seq.empty[T])(m ⇒ immutable.Seq(m)))(ExecutionContexts.sameThreadExecutionContext)) + else + Sink.fromGraph(new TakeLastStage[T](n)).withAttributes(DefaultAttributes.takeLastSink) + /** * A `Sink` that keeps on collecting incoming elements until upstream terminates. * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants) @@ -220,7 +233,7 @@ object Sink { * may be used to ensure boundedness. * Materializes into a `Future` of `That[T]` containing all the collected elements. * `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to - * `Int.MaxValue` elements. See [The Architecture of Scala Collectionss](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info. + * `Int.MaxValue` elements. See [The Architecture of Scala Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info. * This Sink will cancel the stream after having received that many elements. * * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]