#18021 Sink.seq and FlowOps.limit and .limitWeighted
This commit is contained in:
parent
52655f2836
commit
aadaf15b89
13 changed files with 472 additions and 0 deletions
|
|
@ -0,0 +1,62 @@
|
|||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings }
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class FlowLimitSpec extends AkkaSpec {
|
||||
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val mat = ActorMaterializer(settings)
|
||||
|
||||
"Limit" must {
|
||||
"produce empty sequence when source is empty and n = 0" in {
|
||||
val input = Range(0, 0, 1)
|
||||
val n = input.length
|
||||
val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.headOption)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(None)
|
||||
}
|
||||
|
||||
"produce output that is identical to the input when n = input.length" in {
|
||||
val input = (1 to 6)
|
||||
val n = input.length
|
||||
val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(input.toSeq)
|
||||
}
|
||||
|
||||
"produce output that is identical to the input when n > input.length" in {
|
||||
val input = (1 to 6)
|
||||
val n = input.length + 2 // n > input.length
|
||||
val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(input.toSeq)
|
||||
}
|
||||
|
||||
"produce n messages before throwing a StreamLimitReachedException when n < input.size" in {
|
||||
// TODO: check if it actually produces n messages
|
||||
val input = (1 to 6)
|
||||
val n = input.length - 2 // n < input.length
|
||||
|
||||
val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
|
||||
a[StreamLimitReachedException] shouldBe thrownBy {
|
||||
Await.result(future, 300.millis)
|
||||
}
|
||||
}
|
||||
|
||||
"throw a StreamLimitReachedException when n < 0" in {
|
||||
val input = (1 to 6)
|
||||
val n = -1
|
||||
|
||||
val future = Source(input).limit(n).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
a[StreamLimitReachedException] shouldBe thrownBy {
|
||||
Await.result(future, 300.millis)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings }
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class FlowLimitWeightedSpec extends AkkaSpec {
|
||||
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val mat = ActorMaterializer(settings)
|
||||
|
||||
"Limit" must {
|
||||
"produce empty sequence regardless of cost when source is empty and n = 0" in {
|
||||
val input = Range(0, 0, 1)
|
||||
val n = input.length
|
||||
def costFn(e: Int): Long = 999999L // set to an arbitrarily big value
|
||||
val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.headOption)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(None)
|
||||
}
|
||||
|
||||
"always exhaust a source regardless of n (as long as n > 0) if cost is 0" in {
|
||||
val input = (1 to 15)
|
||||
def costFn(e: Int): Long = 0L
|
||||
val n = 1 // must not matter since costFn always evaluates to 0
|
||||
val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(input.toSeq)
|
||||
}
|
||||
|
||||
"exhaust source if n equals to input length and cost is 1" in {
|
||||
val input = (1 to 16)
|
||||
def costFn(e: Int): Long = 1L
|
||||
val n = input.length
|
||||
val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(input.toSeq)
|
||||
}
|
||||
|
||||
"exhaust a source if n >= accumulated cost" in {
|
||||
val input = List("this", "is", "some", "string")
|
||||
def costFn(e: String): Long = e.length
|
||||
val n = input.flatten.length
|
||||
val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(input.toSeq)
|
||||
}
|
||||
|
||||
"throw a StreamLimitReachedException when n < accumulated cost" in {
|
||||
val input = List("this", "is", "some", "string")
|
||||
def costFn(e: String): Long = e.length
|
||||
val n = input.flatten.length - 1
|
||||
val future = Source(input).limitWeighted(n)(costFn).grouped(Integer.MAX_VALUE).runWith(Sink.head)
|
||||
|
||||
a[StreamLimitReachedException] shouldBe thrownBy {
|
||||
Await.result(future, 300.millis)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class SeqSinkSpec extends AkkaSpec {
|
||||
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val mat = ActorMaterializer(settings)
|
||||
|
||||
"Sink.toSeq" must {
|
||||
"return a Seq[T] from a Source" in {
|
||||
val input = (1 to 6)
|
||||
val future = Source(input).runWith(Sink.seq)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(input.toSeq)
|
||||
}
|
||||
|
||||
"return an empty Seq[T] from an empty Source" in {
|
||||
val input: Seq[Int] = Seq.empty
|
||||
val future = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.seq)
|
||||
val result = Await.result(future, 300.millis)
|
||||
result should be(Seq.empty: Seq[Int])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
package akka.stream
|
||||
|
||||
class StreamLimitReachedException(val n: Long) extends RuntimeException(s"limit of $n reached")
|
||||
|
|
@ -34,6 +34,8 @@ private[stream] object Stages {
|
|||
val mapAsync = name("mapAsync")
|
||||
val mapAsyncUnordered = name("mapAsyncUnordered")
|
||||
val grouped = name("grouped")
|
||||
val limit = name("limit")
|
||||
val limitWeighted = name("limitWeighted")
|
||||
val sliding = name("sliding")
|
||||
val take = name("take")
|
||||
val drop = name("drop")
|
||||
|
|
@ -154,6 +156,10 @@ private[stream] object Stages {
|
|||
override def create(attr: Attributes): Stage[T, immutable.Seq[T]] = fusing.Grouped(n)
|
||||
}
|
||||
|
||||
final case class LimitWeighted[T](max: Long, weightFn: T ⇒ Long, attributes: Attributes = limitWeighted) extends SymbolicStage[T, T] {
|
||||
override def create(attr: Attributes): Stage[T, T] = fusing.LimitWeighted(max, weightFn)
|
||||
}
|
||||
|
||||
final case class Sliding[T](n: Int, step: Int, attributes: Attributes = sliding) extends SymbolicStage[T, immutable.Seq[T]] {
|
||||
require(n > 0, "n must be greater than 0")
|
||||
require(step > 0, "step must be greater than 0")
|
||||
|
|
|
|||
|
|
@ -315,6 +315,20 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
|
|||
else ctx.absorbTermination()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
||||
private[akka] final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends PushStage[T, T] {
|
||||
private var left = n
|
||||
|
||||
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
|
||||
left -= costFn(elem)
|
||||
if (left >= 0) ctx.push(elem)
|
||||
else ctx.fail(new StreamLimitReachedException(n))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -403,6 +403,57 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by limiting the number of elements from upstream.
|
||||
* If the number of incoming elements exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limit(n: Long): javadsl.Flow[In, Out, Mat] = new Flow(delegate.limit(n))
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by evaluating the cost of incoming elements
|
||||
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
|
||||
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.Flow[In, Out, Mat] = {
|
||||
new Flow(delegate.limitWeighted(n)(costFn.apply))
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
|
||||
* possibly smaller than requested due to end-of-stream.
|
||||
|
|
@ -615,6 +666,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* '''Completes when''' predicate returned false or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test))
|
||||
|
||||
|
|
@ -664,6 +717,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]]
|
||||
*/
|
||||
def take(n: Long): javadsl.Flow[In, Out, Mat] =
|
||||
new Flow(delegate.take(n))
|
||||
|
|
@ -684,6 +739,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* '''Completes when''' upstream completes or timer fires
|
||||
*
|
||||
* '''Cancels when''' downstream cancels or timer fires
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]]
|
||||
*/
|
||||
def takeWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] =
|
||||
new Flow(delegate.takeWithin(d))
|
||||
|
|
|
|||
|
|
@ -135,6 +135,19 @@ object Sink {
|
|||
new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(
|
||||
_.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext)))
|
||||
|
||||
/**
|
||||
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
|
||||
* As upstream may be unbounded, `Flow[T].take` or the stricter ``Flow[T].limit` (and their variants)
|
||||
* may be used to ensure boundedness.
|
||||
* Materializes into a Future` of `Seq[T]` containing all the collected elements.
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def seq[In]: Sink[In, Future[java.util.List[In]]] = {
|
||||
import scala.collection.JavaConverters._
|
||||
new Sink(scaladsl.Sink.seq[In].mapMaterializedValue(fut ⇒ fut.map(sq ⇒ sq.asJava)(ExecutionContexts.sameThreadExecutionContext)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
|
|
|
|||
|
|
@ -848,6 +848,57 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new Source(delegate.grouped(n).map(_.asJava))
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by limiting the number of elements from upstream.
|
||||
* If the number of incoming elements exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limit(n: Int): javadsl.Source[Out, Mat] = new Source(delegate.limit(n))
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by evaluating the cost of incoming elements
|
||||
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
|
||||
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.Source[Out, Mat] = {
|
||||
new Source(delegate.limitWeighted(n)(costFn.apply))
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
|
||||
* possibly smaller than requested due to end-of-stream.
|
||||
|
|
|
|||
|
|
@ -265,6 +265,57 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
def grouped(n: Int): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new SubFlow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by limiting the number of elements from upstream.
|
||||
* If the number of incoming elements exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limit(n: Long): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.limit(n))
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by evaluating the cost of incoming elements
|
||||
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
|
||||
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.SubFlow[In, Out, Mat] = {
|
||||
new SubFlow(delegate.limitWeighted(n)(costFn.apply))
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
|
||||
* possibly smaller than requested due to end-of-stream.
|
||||
|
|
@ -280,6 +331,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
|
||||
def sliding(n: Int, step: Int = 1): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new SubFlow(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
|
|
|
|||
|
|
@ -278,6 +278,58 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by limiting the number of elements from upstream.
|
||||
* If the number of incoming elements exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limit(n: Int): javadsl.SubSource[Out, Mat] = new SubSource(delegate.limit(n))
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by evaluating the cost of incoming elements
|
||||
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
|
||||
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def limitWeighted(n: Long)(costFn: function.Function[Out, Long]): javadsl.SubSource[Out, Mat] = {
|
||||
new SubSource(delegate.limitWeighted(n)(costFn.apply))
|
||||
}
|
||||
|
||||
def sliding(n: Int, step: Int = 1): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new SubSource(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
|
|
|
|||
|
|
@ -525,6 +525,8 @@ trait FlowOps[+Out, +Mat] {
|
|||
* '''Completes when''' predicate returned false or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*
|
||||
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: Out ⇒ Boolean): Repr[Out] = andThen(TakeWhile(p))
|
||||
|
||||
|
|
@ -573,6 +575,55 @@ trait FlowOps[+Out, +Mat] {
|
|||
*/
|
||||
def grouped(n: Int): Repr[immutable.Seq[Out]] = andThen(Grouped(n))
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by limiting the number of elements from upstream.
|
||||
* If the number of incoming elements exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]]
|
||||
*/
|
||||
def limit(max: Long): Repr[Out] = limitWeighted(max)(_ ⇒ 1)
|
||||
|
||||
/**
|
||||
* Ensure stream boundedness by evaluating the cost of incoming elements
|
||||
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
|
||||
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
|
||||
* upstream failure `StreamLimitException` downstream.
|
||||
*
|
||||
* Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if `n` is zero
|
||||
* or negative.
|
||||
*
|
||||
* '''Emits when''' the specified number of elements to take has not yet been reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]]
|
||||
*/
|
||||
def limitWeighted[T](max: Long)(costFn: Out ⇒ Long): Repr[Out] = andThen(LimitWeighted(max, costFn))
|
||||
|
||||
/**
|
||||
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
|
||||
* possibly smaller than requested due to end-of-stream.
|
||||
|
|
@ -790,6 +841,8 @@ trait FlowOps[+Out, +Mat] {
|
|||
* '''Completes when''' the defined number of elements has been taken or upstream completes
|
||||
*
|
||||
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
|
||||
*
|
||||
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
|
||||
*/
|
||||
def take(n: Long): Repr[Out] = andThen(Take(n))
|
||||
|
||||
|
|
@ -830,6 +883,8 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
|
||||
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
|
||||
*
|
||||
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
|
||||
*/
|
||||
def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate))
|
||||
|
||||
|
|
|
|||
|
|
@ -117,6 +117,20 @@ object Sink {
|
|||
*/
|
||||
def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink)
|
||||
|
||||
/**
|
||||
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
|
||||
* As upstream may be unbounded, `Flow[T].take` or the stricter ``Flow[T].limit` (and their variants)
|
||||
* may be used to ensure boundedness.
|
||||
* Materializes into a Future` of `Seq[T]` containing all the collected elements.
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def seq[T]: Sink[T, Future[Seq[T]]] = {
|
||||
Flow[T].grouped(Integer.MAX_VALUE).toMat(Sink.headOption)(Keep.right) mapMaterializedValue { e ⇒
|
||||
e.map(_.getOrElse(Seq.empty[T]))(ExecutionContexts.sameThreadExecutionContext)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
||||
*
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue