Limit and LimitWeighted operator docs #25468

This commit is contained in:
Johan Andrén 2020-01-08 16:26:51 +01:00 committed by GitHub
parent ed1f107ab1
commit 589c6511bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 142 additions and 17 deletions

View file

@ -4,17 +4,27 @@ Limit number of element from upstream to given `max` number.
@ref[Simple operators](../index.md#simple-operators) @ref[Simple operators](../index.md#simple-operators)
@@@div { .group-scala }
## Signature ## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #limit } @apidoc[Flow.limit](Flow$) { scala="#limit(max:Long):FlowOps.this.Repr[Out]" java="#limit(long)" }
@@@
## Description ## Description
Limit number of element from upstream to given `max` number. Limits the number of elements from upstream to a given `max` number, if the limit is passed the operator fails the stream with a @apidoc[StreamLimitReachedException](StreamLimitReachedException).
See also @ref:[limitWeighted](limitWeighted.md) which can choose a weight for each element counting to a total max limit weight. @ref:[take](take.md) is also closely related but completes the stream instead of failing it after a certain number of elements.
## Example
`limit` can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error.
In this sample we take at most 10 000 of the untrusted source elements into the aggregated sequence of elements, if the untrusted source emits more elements the stream and the materialized @scala[`Future[Seq[String]]`]@java[`CompletionStage<List<String>>`] will be failed:
Scala
: @@snip [Limit.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Limit.scala) { #simple }
Java
: @@snip [Limit.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Limit.java) { #simple }
## Reactive Streams semantics ## Reactive Streams semantics
@ -27,4 +37,3 @@ Limit number of element from upstream to given `max` number.
**completes** when upstream completes and the number of emitted elements has not reached max **completes** when upstream completes and the number of emitted elements has not reached max
@@@ @@@

View file

@ -1,21 +1,30 @@
# limitWeighted # limitWeighted
Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. Limit the total weight of incoming elements
@ref[Simple operators](../index.md#simple-operators) @ref[Simple operators](../index.md#simple-operators)
@@@div { .group-scala }
## Signature ## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #limitWeighted } @apidoc[Flow.limitWeighted](Flow) { scala="#limitWeighted[T](max:Long)(costFn:Out=&gt;Long):FlowOps.this.Repr[Out]" java="#limitWeighted(long,akka.japi.function.Function)" }
@@@
## Description ## Description
Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. A weight function returns the weight of each element, then the total accumulated weight is compared to a max and if it has passed the max the stream is failed with a @apidoc[StreamLimitReachedException](StreamLimitReachedException).
Evaluated cost of each element defines how many elements will be allowed to travel downstream.
See also @ref:[limit](limit.md) which puts a limit on the number of elements instead (the same as always returning `1` from the weight function).
## Examples
`limitWeighted` can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error.
In this sample we use the number of bytes in each `ByteString` element as weight and accept at most a total of 10 000 bytes from the untrusted source elements into the aggregated `ByteString` of all bytes, if the untrusted source emits more elements the stream and the materialized @scala[`Future[ByteString]`]@java[`CompletionStage<ByteString>`] will be failed:
Scala
: @@snip [LimitWeighted.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/LimitWeighted.scala) { #simple }
Java
: @@snip [LimitWeighted.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/LimitWeighted.java) { #simple }
## Reactive Streams semantics ## Reactive Streams semantics
@ -28,4 +37,3 @@ Evaluated cost of each element defines how many elements will be allowed to trav
**completes** when upstream completes and the number of emitted elements has not reached max **completes** when upstream completes and the number of emitted elements has not reached max
@@@ @@@

View file

@ -162,7 +162,7 @@ depending on being backpressured by downstream or not.
|Flow|<a name="lazyfutureflow"></a>@ref[lazyFutureFlow](Flow/lazyFutureFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| |Flow|<a name="lazyfutureflow"></a>@ref[lazyFutureFlow](Flow/lazyFutureFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|Flow|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Deprecated by @ref:[`Flow.lazyFutureFlow`](Flow/lazyFutureFlow.md) in combination with @ref:[`prefixAndTail`](Flow/../Source-or-Flow/prefixAndTail.md).| |Flow|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Deprecated by @ref:[`Flow.lazyFutureFlow`](Flow/lazyFutureFlow.md) in combination with @ref:[`prefixAndTail`](Flow/../Source-or-Flow/prefixAndTail.md).|
|Source/Flow|<a name="limit"></a>@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.| |Source/Flow|<a name="limit"></a>@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.|
|Source/Flow|<a name="limitweighted"></a>@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Ensure stream boundedness by evaluating the cost of incoming elements using a cost function.| |Source/Flow|<a name="limitweighted"></a>@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Limit the total weight of incoming elements|
|Source/Flow|<a name="log"></a>@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.| |Source/Flow|<a name="log"></a>@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.| |Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.List;
import java.util.concurrent.CompletionStage;
public class Limit {
public void simple() {
ActorSystem<?> system = null;
// #simple
Source<String, NotUsed> untrustedSource = Source.repeat("element");
CompletionStage<List<String>> elements =
untrustedSource.limit(10000).runWith(Sink.seq(), system);
// #simple
}
}

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import java.util.concurrent.CompletionStage;
public class LimitWeighted {
public void simple() {
ActorSystem<?> system = null;
// #simple
Source<ByteString, NotUsed> untrustedSource = Source.repeat(ByteString.fromString("element"));
CompletionStage<ByteString> allBytes =
untrustedSource
.limitWeighted(
10000, // max bytes
bytes -> (long) bytes.length() // bytes of each chunk
)
.runReduce(ByteString::concat, system);
// #simple
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import scala.concurrent.Future
object Limit {
implicit val system: ActorSystem[_] = ???
def simple(): Unit = {
// #simple
val untrustedSource: Source[String, NotUsed] = Source.repeat("element")
val elements: Future[Seq[String]] =
untrustedSource.limit(10000).runWith(Sink.seq)
// #simple
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.concurrent.Future
object LimitWeighted {
implicit val system: ActorSystem[_] = ???
def simple(): Unit = {
// #simple
val untrustedSource: Source[ByteString, NotUsed] = Source.repeat(ByteString("element"))
val allBytes: Future[ByteString] =
untrustedSource.limitWeighted(max = 10000)(_.length).runReduce(_ ++ _)
// #simple
}
}