Impl Sink.takeLast operator

Add documentation

reformatting

Address review comments
This commit is contained in:
Nafer Sanabria 2018-07-15 07:57:55 -05:00 committed by Konrad `ktoso` Malawski
parent 2aa333d076
commit 3a92104160
7 changed files with 168 additions and 6 deletions

View file

@ -0,0 +1,31 @@
# Sink.takeLast
Collect the last `n` values emitted from the stream into a collection.
@ref[Sink operators](../index.md#sink-operators)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #takeLast }
@@@
## Description
Materializes into a @scala[`Future`] @java[`CompletionStage`] of @scala[`immutable.Seq[T]`] @java[`List<In>`] containing the last `n` collected elements when the stream completes.
If the stream completes before signaling at least n elements, the @scala[`Future`] @java[`CompletionStage`] will complete with the number
of elements taken at that point.
If the stream never completes, the @scala[`Future`] @java[`CompletionStage`] will never complete.
If there is a failure signaled in the stream the @scala[`Future`] @java[`CompletionStage`] will be completed with failure.
@@@div { .callout }
**cancels** never
**backpressures** never
@@@

View file

@ -62,6 +62,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| |Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
|Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.| |Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.|
|Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.| |Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.|
|Sink|<a name="takelast"></a>@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.|
## Additional Sink and Source converters ## Additional Sink and Source converters
@ -372,6 +373,7 @@ Operators meant for inter-operating between Akka Streams and Actors:
* [headOption](Sink/headOption.md) * [headOption](Sink/headOption.md)
* [last](Sink/last.md) * [last](Sink/last.md)
* [lastOption](Sink/lastOption.md) * [lastOption](Sink/lastOption.md)
* [takeLast](Sink/takeLast.md)
* [seq](Sink/seq.md) * [seq](Sink/seq.md)
* [asPublisher](Sink/asPublisher.md) * [asPublisher](Sink/asPublisher.md)
* [ignore](Sink/ignore.md) * [ignore](Sink/ignore.md)

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings }
import scala.collection.immutable
import scala.concurrent.{ Await, Future }
class TakeLastSinkSpec extends StreamSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val mat = ActorMaterializer(settings)
"Sink.takeLast" must {
"return the last 3 elements" in {
val input = 1 to 6
val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(3))
val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault)
result should be(Seq(4, 5, 6))
}
"return the number of elements taken when the stream completes" in {
val input = 1 to 4
val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(5))
val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault)
result should be(Seq(1, 2, 3, 4))
}
"fail future when stream abruptly terminated" in {
val mat = ActorMaterializer()
val probe = TestPublisher.probe()
val future: Future[immutable.Seq[Int]] =
Source.fromPublisher(probe).runWith(Sink.takeLast(2))(mat)
mat.shutdown()
future.failed.futureValue shouldBe an[AbruptTerminationException]
}
"yield empty seq for empty stream" in {
val future: Future[immutable.Seq[Int]] = Source.empty[Int].runWith(Sink.takeLast(3))
val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault)
result should be(Seq.empty)
}
"return the last element" in {
val input = 1 to 4
val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(1))
val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault)
result should be(Seq(4))
}
}
}

View file

@ -23,7 +23,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.generic.CanBuildFrom import scala.collection.generic.CanBuildFrom
import scala.collection.immutable import scala.collection.{ immutable, mutable }
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
@ -216,6 +216,52 @@ import scala.util.{ Failure, Success, Try }
override def toString: String = "LastOptionStage" override def toString: String = "LastOptionStage"
} }
/**
* INTERNAL API
*/
@InternalApi private[akka] final class TakeLastStage[T](n: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
val in: Inlet[T] = Inlet("takeLastStage.in")
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[immutable.Seq[T]] = Promise()
(new GraphStageLogic(shape) with InHandler {
private[this] val buffer = mutable.Queue.empty[T]
private[this] var count = 0
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
buffer.enqueue(grab(in))
if (count < n)
count += 1
else
buffer.dequeue()
pull(in)
}
override def onUpstreamFinish(): Unit = {
val elements = buffer.result().toList
buffer.clear()
p.trySuccess(elements)
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
setHandler(in, this)
}, p.future)
}
override def toString: String = "TakeLastStage"
}
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -124,6 +124,7 @@ import akka.stream._
val headOptionSink = name("headOptionSink") and inputBufferOne val headOptionSink = name("headOptionSink") and inputBufferOne
val lastSink = name("lastSink") val lastSink = name("lastSink")
val lastOptionSink = name("lastOptionSink") val lastOptionSink = name("lastOptionSink")
val takeLastSink = name("takeLastSink")
val seqSink = name("seqSink") val seqSink = name("seqSink")
val publisherSink = name("publisherSink") val publisherSink = name("publisherSink")
val fanoutPublisherSink = name("fanoutPublisherSink") val fanoutPublisherSink = name("fanoutPublisherSink")

View file

@ -147,7 +147,7 @@ object Sink {
* If the stream completes before signaling at least a single element, the CompletionStage will be failed with a [[NoSuchElementException]]. * If the stream completes before signaling at least a single element, the CompletionStage will be failed with a [[NoSuchElementException]].
* If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception. * If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception.
* *
* See also [[lastOption]]. * See also [[lastOption]], [[takeLast]].
*/ */
def last[In](): Sink[In, CompletionStage[In]] = def last[In](): Sink[In, CompletionStage[In]] =
new Sink(scaladsl.Sink.last[In].toCompletionStage()) new Sink(scaladsl.Sink.last[In].toCompletionStage())
@ -157,12 +157,24 @@ object Sink {
* If the stream completes before signaling at least a single element, the value of the CompletionStage will be an empty [[java.util.Optional]]. * If the stream completes before signaling at least a single element, the value of the CompletionStage will be an empty [[java.util.Optional]].
* If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception. * If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception.
* *
* See also [[head]]. * See also [[head]], [[takeLast]].
*/ */
def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] = def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] =
new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue( new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(
_.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) _.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava))
/**
* A `Sink` that materializes into a a `CompletionStage` of `List<In>` containing the last `n` collected elements.
* If the stream completes before signaling at least n elements, the CompletionStage will complete with the number
* of elements taken at that point.
* If the stream never completes the `CompletionStage` will never complete.
* If there is a failure signaled in the stream the `CompletionStage` will be completed with failure.
*/
def takeLast[In](n: Int): Sink[In, CompletionStage[java.util.List[In]]] = {
import scala.collection.JavaConverters._
new Sink(scaladsl.Sink.takeLast[In](n).mapMaterializedValue(fut fut.map(sq sq.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava))
}
/** /**
* A `Sink` that keeps on collecting incoming elements until upstream terminates. * A `Sink` that keeps on collecting incoming elements until upstream terminates.
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants) * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)

View file

@ -188,7 +188,7 @@ object Sink {
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
* If the stream signals an error, the Future will be failed with the stream's exception. * If the stream signals an error, the Future will be failed with the stream's exception.
* *
* See also [[lastOption]]. * See also [[lastOption]], [[takeLast]].
*/ */
def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink) def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink)
.mapMaterializedValue(e e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) .mapMaterializedValue(e e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
@ -198,10 +198,23 @@ object Sink {
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]]. * If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
* If the stream signals an error, the Future will be failed with the stream's exception. * If the stream signals an error, the Future will be failed with the stream's exception.
* *
* See also [[last]]. * See also [[last]], [[takeLast]].
*/ */
def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink) def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink)
/**
* A `Sink` that materializes into a a `Future` of `immutable.Seq[T]` containing the last `n` collected elements.
* If the stream completes before signaling at least n elements, the Future will complete with the number
* of elements taken at that point.
* If the stream never completes, the `Future` will never complete.
* If there is a failure signaled in the stream the `Future` will be completed with failure.
*/
def takeLast[T](n: Int): Sink[T, Future[immutable.Seq[T]]] =
if (n == 1)
lastOption.mapMaterializedValue(fut fut.map(_.fold(immutable.Seq.empty[T])(m immutable.Seq(m)))(ExecutionContexts.sameThreadExecutionContext))
else
Sink.fromGraph(new TakeLastStage[T](n)).withAttributes(DefaultAttributes.takeLastSink)
/** /**
* A `Sink` that keeps on collecting incoming elements until upstream terminates. * A `Sink` that keeps on collecting incoming elements until upstream terminates.
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants) * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)
@ -220,7 +233,7 @@ object Sink {
* may be used to ensure boundedness. * may be used to ensure boundedness.
* Materializes into a `Future` of `That[T]` containing all the collected elements. * Materializes into a `Future` of `That[T]` containing all the collected elements.
* `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to * `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to
* `Int.MaxValue` elements. See [The Architecture of Scala Collectionss](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info. * `Int.MaxValue` elements. See [The Architecture of Scala Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info.
* This Sink will cancel the stream after having received that many elements. * This Sink will cancel the stream after having received that many elements.
* *
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]