From aadaf15b894aa711bb2542338727c33438f976db Mon Sep 17 00:00:00 2001 From: lolski Date: Thu, 19 Nov 2015 00:11:07 +0800 Subject: [PATCH] #18021 Sink.seq and FlowOps.limit and .limitWeighted --- .../akka/stream/scaladsl/FlowLimitSpec.scala | 62 ++++++++++++++++++ .../scaladsl/FlowLimitWeightedSpec.scala | 63 +++++++++++++++++++ .../akka/stream/scaladsl/SeqSinkSpec.scala | 30 +++++++++ .../stream/StreamLimitReachedException.scala | 3 + .../main/scala/akka/stream/impl/Stages.scala | 6 ++ .../scala/akka/stream/impl/fusing/Ops.scala | 14 +++++ .../main/scala/akka/stream/javadsl/Flow.scala | 57 +++++++++++++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 13 ++++ .../scala/akka/stream/javadsl/Source.scala | 51 +++++++++++++++ .../scala/akka/stream/javadsl/SubFlow.scala | 52 +++++++++++++++ .../scala/akka/stream/javadsl/SubSource.scala | 52 +++++++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 55 ++++++++++++++++ .../scala/akka/stream/scaladsl/Sink.scala | 14 +++++ 13 files changed, 472 insertions(+) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitSpec.scala new file mode 100644 index 0000000000..4c6279e661 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitSpec.scala @@ -0,0 +1,62 @@ +package akka.stream.scaladsl + +import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowLimitSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val mat = ActorMaterializer(settings) + + "Limit" must { + "produce empty sequence when source is empty and n = 0" in { + val input = Range(0, 0, 1) + val n = input.length + val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.headOption) + val result = Await.result(future, 300.millis) + result should be(None) + } + + "produce output that is identical to the input when n = input.length" in { + val input = (1 to 6) + val n = input.length + val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head) + val result = Await.result(future, 300.millis) + result should be(input.toSeq) + } + + "produce output that is identical to the input when n > input.length" in { + val input = (1 to 6) + val n = input.length + 2 // n > input.length + val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head) + val result = Await.result(future, 300.millis) + result should be(input.toSeq) + } + + "produce n messages before throwing a StreamLimitReachedException when n < input.size" in { + // TODO: check if it actually produces n messages + val input = (1 to 6) + val n = input.length - 2 // n < input.length + + val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head) + + a[StreamLimitReachedException] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + "throw a StreamLimitReachedException when n < 0" in { + val input = (1 to 6) + val n = -1 + + val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head) + a[StreamLimitReachedException] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala new file mode 100644 index 0000000000..5565ca0699 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala @@ -0,0 +1,63 @@ +package akka.stream.scaladsl + +import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowLimitWeightedSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val mat = ActorMaterializer(settings) + + "Limit" must { + "produce empty sequence regardless of cost when source is empty and n = 0" in { + val input = Range(0, 0, 1) + val n = input.length + def costFn(e: Int): Long = 999999L // set to an arbitrarily big value + val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.headOption) + val result = Await.result(future, 300.millis) + result should be(None) + } + + "always exhaust a source regardless of n (as long as n > 0) if cost is 0" in { + val input = (1 to 15) + def costFn(e: Int): Long = 0L + val n = 1 // must not matter since costFn always evaluates to 0 + val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head) + val result = Await.result(future, 300.millis) + result should be(input.toSeq) + } + + "exhaust source if n equals to input length and cost is 1" in { + val input = (1 to 16) + def costFn(e: Int): Long = 1L + val n = input.length + val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head) + val result = Await.result(future, 300.millis) + result should be(input.toSeq) + } + + "exhaust a source if n >= accumulated cost" in { + val input = List("this", "is", "some", "string") + def costFn(e: String): Long = e.length + val n = input.flatten.length + val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head) + val result = Await.result(future, 300.millis) + result should be(input.toSeq) + } + + "throw a StreamLimitReachedException when n < accumulated cost" in { + val input = List("this", "is", "some", "string") + def costFn(e: String): Long = e.length + val n = input.flatten.length - 1 + val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head) + + a[StreamLimitReachedException] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala new file mode 100644 index 0000000000..7d8fe9863b --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala @@ -0,0 +1,30 @@ +package akka.stream.scaladsl + +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.duration._ + +class SeqSinkSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val mat = ActorMaterializer(settings) + + "Sink.toSeq" must { + "return a Seq[T] from a Source" in { + val input = (1 to 6) + val future = Source(input).runWith(Sink.seq) + val result = Await.result(future, 300.millis) + result should be(input.toSeq) + } + + "return an empty Seq[T] from an empty Source" in { + val input: Seq[Int] = Seq.empty + val future = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.seq) + val result = Await.result(future, 300.millis) + result should be(Seq.empty: Seq[Int]) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala b/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala new file mode 100644 index 0000000000..abffbb5b05 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala @@ -0,0 +1,3 @@ +package akka.stream + +class StreamLimitReachedException(val n: Long) extends RuntimeException(s"limit of $n reached") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 8cdea06926..b513a40316 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -34,6 +34,8 @@ private[stream] object Stages { val mapAsync = name("mapAsync") val mapAsyncUnordered = name("mapAsyncUnordered") val grouped = name("grouped") + val limit = name("limit") + val limitWeighted = name("limitWeighted") val sliding = name("sliding") val take = name("take") val drop = name("drop") @@ -154,6 +156,10 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[T, immutable.Seq[T]] = fusing.Grouped(n) } + final case class LimitWeighted[T](max: Long, weightFn: T ⇒ Long, attributes: Attributes = limitWeighted) extends SymbolicStage[T, T] { + override def create(attr: Attributes): Stage[T, T] = fusing.LimitWeighted(max, weightFn) + } + final case class Sliding[T](n: Int, step: Int, attributes: Attributes = sliding) extends SymbolicStage[T, immutable.Seq[T]] { require(n > 0, "n must be greater than 0") require(step > 0, "step must be greater than 0") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index b1f14bd228..a52a59ef6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -315,6 +315,20 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut else ctx.absorbTermination() } +/** + * INTERNAL API + */ + +private[akka] final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends PushStage[T, T] { + private var left = n + + override def onPush(elem: T, ctx: Context[T]): SyncDirective = { + left -= costFn(elem) + if (left >= 0) ctx.push(elem) + else ctx.fail(new StreamLimitReachedException(n)) + } +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 1cef680167..9930543f8c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -403,6 +403,57 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step + /** + * Ensure stream boundedness by limiting the number of elements from upstream. + * If the number of incoming elements exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limit(n: Long): javadsl.Flow[In, Out, Mat] = new Flow(delegate.limit(n)) + + /** + * Ensure stream boundedness by evaluating the cost of incoming elements + * using a cost function. Exactly how many elements will be allowed to travel downstream depends on the + * evaluated cost of each element. If the accumulated cost exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.Flow[In, Out, Mat] = { + new Flow(delegate.limitWeighted(n)(costFn.apply)) + } + /** * Apply a sliding window over the stream and return the windows as groups of elements, with the last group * possibly smaller than requested due to end-of-stream. @@ -615,6 +666,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Completes when''' predicate returned false or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels + * + * See also [[Flow.limit]], [[Flow.limitWeighted]] */ def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test)) @@ -664,6 +717,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Completes when''' the defined number of elements has been taken or upstream completes * * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.limit]], [[Flow.limitWeighted]] */ def take(n: Long): javadsl.Flow[In, Out, Mat] = new Flow(delegate.take(n)) @@ -684,6 +739,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Completes when''' upstream completes or timer fires * * '''Cancels when''' downstream cancels or timer fires + * + * See also [[Flow.limit]], [[Flow.limitWeighted]] */ def takeWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWithin(d)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 10b403d381..ef13ce7f40 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -135,6 +135,19 @@ object Sink { new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue( _.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext))) + /** + * A `Sink` that keeps on collecting incoming elements until upstream terminates. + * As upstream may be unbounded, `Flow[T].take` or the stricter ``Flow[T].limit` (and their variants) + * may be used to ensure boundedness. + * Materializes into a Future` of `Seq[T]` containing all the collected elements. + * + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def seq[In]: Sink[In, Future[java.util.List[In]]] = { + import scala.collection.JavaConverters._ + new Sink(scaladsl.Sink.seq[In].mapMaterializedValue(fut ⇒ fut.map(sq ⇒ sq.asJava)(ExecutionContexts.sameThreadExecutionContext))) + } + /** * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 61f2269b42..9f5e2fa465 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -848,6 +848,57 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.grouped(n).map(_.asJava)) + /** + * Ensure stream boundedness by limiting the number of elements from upstream. + * If the number of incoming elements exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limit(n: Int): javadsl.Source[Out, Mat] = new Source(delegate.limit(n)) + + /** + * Ensure stream boundedness by evaluating the cost of incoming elements + * using a cost function. Exactly how many elements will be allowed to travel downstream depends on the + * evaluated cost of each element. If the accumulated cost exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.Source[Out, Mat] = { + new Source(delegate.limitWeighted(n)(costFn.apply)) + } + /** * Apply a sliding window over the stream and return the windows as groups of elements, with the last group * possibly smaller than requested due to end-of-stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index eb6d3ad1ba..45b04fdbb7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -265,6 +265,57 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def grouped(n: Int): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step + /** + * Ensure stream boundedness by limiting the number of elements from upstream. + * If the number of incoming elements exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limit(n: Long): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.limit(n)) + + /** + * Ensure stream boundedness by evaluating the cost of incoming elements + * using a cost function. Exactly how many elements will be allowed to travel downstream depends on the + * evaluated cost of each element. If the accumulated cost exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.SubFlow[In, Out, Mat] = { + new SubFlow(delegate.limitWeighted(n)(costFn.apply)) + } + /** * Apply a sliding window over the stream and return the windows as groups of elements, with the last group * possibly smaller than requested due to end-of-stream. @@ -280,6 +331,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ + def sliding(n: Int, step: Int = 1): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 6616918796..11ec39881a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -278,6 +278,58 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ + + /** + * Ensure stream boundedness by limiting the number of elements from upstream. + * If the number of incoming elements exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limit(n: Int): javadsl.SubSource[Out, Mat] = new SubSource(delegate.limit(n)) + + /** + * Ensure stream boundedness by evaluating the cost of incoming elements + * using a cost function. Exactly how many elements will be allowed to travel downstream depends on the + * evaluated cost of each element. If the accumulated cost exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.SubSource[Out, Mat] = { + new SubSource(delegate.limitWeighted(n)(costFn.apply)) + } + def sliding(n: Int, step: Int = 1): SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3af32daa15..ff6cb4fec6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -525,6 +525,8 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' predicate returned false or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels + * + * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] */ def takeWhile(p: Out ⇒ Boolean): Repr[Out] = andThen(TakeWhile(p)) @@ -573,6 +575,55 @@ trait FlowOps[+Out, +Mat] { */ def grouped(n: Int): Repr[immutable.Seq[Out]] = andThen(Grouped(n)) + /** + * Ensure stream boundedness by limiting the number of elements from upstream. + * If the number of incoming elements exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]] + */ + def limit(max: Long): Repr[Out] = limitWeighted(max)(_ ⇒ 1) + + /** + * Ensure stream boundedness by evaluating the cost of incoming elements + * using a cost function. Exactly how many elements will be allowed to travel downstream depends on the + * evaluated cost of each element. If the accumulated cost exceeds max, it will signal + * upstream failure `StreamLimitException` downstream. + * + * Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + * + * '''Emits when''' the specified number of elements to take has not yet been reached + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' the defined number of elements has been taken or upstream completes + * + * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]] + */ + def limitWeighted[T](max: Long)(costFn: Out ⇒ Long): Repr[Out] = andThen(LimitWeighted(max, costFn)) + /** * Apply a sliding window over the stream and return the windows as groups of elements, with the last group * possibly smaller than requested due to end-of-stream. @@ -790,6 +841,8 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' the defined number of elements has been taken or upstream completes * * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * + * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] */ def take(n: Long): Repr[Out] = andThen(Take(n)) @@ -830,6 +883,8 @@ trait FlowOps[+Out, +Mat] { * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * + * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 1f55b3c15e..bc02859882 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -117,6 +117,20 @@ object Sink { */ def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink) + /** + * A `Sink` that keeps on collecting incoming elements until upstream terminates. + * As upstream may be unbounded, `Flow[T].take` or the stricter ``Flow[T].limit` (and their variants) + * may be used to ensure boundedness. + * Materializes into a Future` of `Seq[T]` containing all the collected elements. + * + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def seq[T]: Sink[T, Future[Seq[T]]] = { + Flow[T].grouped(Integer.MAX_VALUE).toMat(Sink.headOption)(Keep.right) mapMaterializedValue { e ⇒ + e.map(_.getOrElse(Seq.empty[T]))(ExecutionContexts.sameThreadExecutionContext) + } + } + /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. *