Split example to ease into the details. PR Comments

This commit is contained in:
Ignasi Marimon-Clos 2020-06-09 15:53:04 +02:00
parent 506cbc5bf4
commit 6eb1a9e1ef
3 changed files with 45 additions and 12 deletions

View file

@ -14,18 +14,15 @@ Limit the throughput to a specific number of elements per time unit, or a specif
Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where
a function has to be provided to calculate the individual cost of each element. a function has to be provided to calculate the individual cost of each element.
The throttle operator combines will with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair. The throttle operator combines well with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair.
See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators.
## Example ## Example
Imagine the server end of a streaming platform. When a client connects and requesta a video content, the server Imagine the server end of a streaming platform. When a client connects and request a video content, the server
should return the content. Instead of serving a complete video as soon as bandwith allows, `throttle` can be used should return the content. Instead of serving a complete video as fast as bandwith allows, `throttle` can be used
to burst the first 30 seconds and then send a constant of 24 frames per second (let's imagine this streaming to limit the network usage to 24 frames per second (let's imagine this streaming platform stores frames, not bytes).
platform stores frames, not bytes). This allows the browser to buffer 30 seconds in case there's a network
outage but prevents sending too much video content at once using all bandwitdh of the user's bandwitdh.
Scala Scala
: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle } : @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle }
@ -33,6 +30,25 @@ Scala
Java Java
: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle } : @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle }
The problem in the example above is that when there's a network hiccup, the video playback will interrupt. It can be
improved by sending more content than the necessary ahead of time and let the client buffer that. So, `throttle` can be used
to burst the first 30 seconds and then send a constant of 24 frames per second. This way, when a request comes in
a good chunk of content will be downloaded and after that the server will activate the throttling.
Scala
: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle-with-burst }
Java
: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle-with-burst }
The extra argument to set the `ThrottleMode` to `shapping` tells `throttle` to make pauses to avoid exceeding
the maximum rate. Alternatively we could set the throttling mode to cause a stream failure when upstream is faster
than the throttle rate.
The examples above don't cover all the parameters supported by `throttle` (e.g `cost`-based throttling). See the
@apidoc[api documentation](Flow) { scala="#throttle(cost:Int,per:scala.concurrent.duration.FiniteDuration,maximumBurst:Int,costCalculation:Out=>Int,mode:akka.stream.ThrottleMode):FlowOps.this.Repr[Out]" java="#throttle(int,java.time.Duration,int,akka.japi.function.Function,akka.stream.ThrottleMode)" }
for all the details.
## Reactive Streams semantics ## Reactive Streams semantics
@@@div { .callout } @@@div { .callout }

View file

@ -31,10 +31,18 @@ public class Throttle {
Source<Frame, NotUsed> videoThrottling = Source<Frame, NotUsed> videoThrottling =
frameSource.throttle( frameSource.throttle(
framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping()); framesPerSecond, Duration.ofSeconds(1));
// serialize `Frame` and send over the network. // serialize `Frame` and send over the network.
// #throttle // #throttle
// #throttle-with-burst
Source<Frame, NotUsed> throttlingWithBurst =
frameSource.throttle(
framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping());
// serialize `Frame` and send over the network.
// #throttle-with-burst
videoThrottling.map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat); videoThrottling.map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat);
throttlingWithBurst.take(1000L).map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat);
} }
} }

View file

@ -28,13 +28,22 @@ object Throttle extends App {
// val frameSource: Source[Frame,_] // val frameSource: Source[Frame,_]
val videoThrottling = frameSource.throttle( val videoThrottling = frameSource.throttle(
framesPerSecond, framesPerSecond,
1.second, 1.second)
framesPerSecond * 30, // maximumBurst
ThrottleMode.shaping)
// serialize `Frame` and send over the network. // serialize `Frame` and send over the network.
// #throttle // #throttle
videoThrottling.to(Sink.foreach(println)).run() // #throttle-with-burst
// val frameSource: Source[Frame,_]
val videoThrottlingWithBurst = frameSource.throttle(
framesPerSecond,
1.second,
framesPerSecond * 30, // maximumBurst
ThrottleMode.Shaping)
// serialize `Frame` and send over the network.
// #throttle-with-burst
videoThrottling.take(1000).to(Sink.foreach(println)).run()
videoThrottlingWithBurst.take(1000).to(Sink.foreach(println)).run()
} }
object ThrottleCommon { object ThrottleCommon {