Merge pull request #29203 from ignasi35/example-for-throttle-op
This commit is contained in:
commit
4faf806266
3 changed files with 134 additions and 0 deletions
|
|
@ -14,6 +14,41 @@ Limit the throughput to a specific number of elements per time unit, or a specif
|
||||||
Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where
|
Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where
|
||||||
a function has to be provided to calculate the individual cost of each element.
|
a function has to be provided to calculate the individual cost of each element.
|
||||||
|
|
||||||
|
The throttle operator combines well with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair.
|
||||||
|
|
||||||
|
See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
Imagine the server end of a streaming platform. When a client connects and request a video content, the server
|
||||||
|
should return the content. Instead of serving a complete video as fast as bandwith allows, `throttle` can be used
|
||||||
|
to limit the network usage to 24 frames per second (let's imagine this streaming platform stores frames, not bytes).
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle }
|
||||||
|
|
||||||
|
The problem in the example above is that when there's a network hiccup, the video playback will interrupt. It can be
|
||||||
|
improved by sending more content than the necessary ahead of time and let the client buffer that. So, `throttle` can be used
|
||||||
|
to burst the first 30 seconds and then send a constant of 24 frames per second. This way, when a request comes in
|
||||||
|
a good chunk of content will be downloaded and after that the server will activate the throttling.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle-with-burst }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle-with-burst }
|
||||||
|
|
||||||
|
The extra argument to set the `ThrottleMode` to `shapping` tells `throttle` to make pauses to avoid exceeding
|
||||||
|
the maximum rate. Alternatively we could set the throttling mode to cause a stream failure when upstream is faster
|
||||||
|
than the throttle rate.
|
||||||
|
|
||||||
|
The examples above don't cover all the parameters supported by `throttle` (e.g `cost`-based throttling). See the
|
||||||
|
@apidoc[api documentation](Flow) { scala="#throttle(cost:Int,per:scala.concurrent.duration.FiniteDuration,maximumBurst:Int,costCalculation:Out=>Int,mode:akka.stream.ThrottleMode):FlowOps.this.Repr[Out]" java="#throttle(int,java.time.Duration,int,akka.japi.function.Function,akka.stream.ThrottleMode)" }
|
||||||
|
for all the details.
|
||||||
|
|
||||||
## Reactive Streams semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.stream.operators.sourceorflow;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.stream.Materializer;
|
||||||
|
import akka.stream.ThrottleMode;
|
||||||
|
import akka.stream.javadsl.Sink;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import docs.stream.operators.sourceorflow.ThrottleCommon.Frame;
|
||||||
|
|
||||||
|
/** */
|
||||||
|
public class Throttle {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream");
|
||||||
|
Materializer mat = Materializer.matFromSystem(actorSystem);
|
||||||
|
|
||||||
|
Source<Frame, NotUsed> frameSource =
|
||||||
|
Source.fromIterator(() -> Stream.iterate(0, i -> i + 1).iterator())
|
||||||
|
.map(i -> new Frame(i.intValue()));
|
||||||
|
|
||||||
|
// #throttle
|
||||||
|
int framesPerSecond = 24;
|
||||||
|
|
||||||
|
Source<Frame, NotUsed> videoThrottling =
|
||||||
|
frameSource.throttle(framesPerSecond, Duration.ofSeconds(1));
|
||||||
|
// serialize `Frame` and send over the network.
|
||||||
|
// #throttle
|
||||||
|
|
||||||
|
// #throttle-with-burst
|
||||||
|
Source<Frame, NotUsed> throttlingWithBurst =
|
||||||
|
frameSource.throttle(
|
||||||
|
framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping());
|
||||||
|
// serialize `Frame` and send over the network.
|
||||||
|
// #throttle-with-burst
|
||||||
|
|
||||||
|
videoThrottling.map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat);
|
||||||
|
throttlingWithBurst.take(1000L).map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.stream.operators.sourceorflow
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.stream.ThrottleMode
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
object Throttle extends App {
|
||||||
|
|
||||||
|
implicit val sys = ActorSystem("25fps-stream")
|
||||||
|
|
||||||
|
val frameSource: Source[Int, NotUsed] =
|
||||||
|
Source.fromIterator(() => Iterator.from(0))
|
||||||
|
|
||||||
|
// #throttle
|
||||||
|
val framesPerSecond = 24
|
||||||
|
|
||||||
|
// val frameSource: Source[Frame,_]
|
||||||
|
val videoThrottling = frameSource.throttle(framesPerSecond, 1.second)
|
||||||
|
// serialize `Frame` and send over the network.
|
||||||
|
// #throttle
|
||||||
|
|
||||||
|
// #throttle-with-burst
|
||||||
|
// val frameSource: Source[Frame,_]
|
||||||
|
val videoThrottlingWithBurst = frameSource.throttle(
|
||||||
|
framesPerSecond,
|
||||||
|
1.second,
|
||||||
|
framesPerSecond * 30, // maximumBurst
|
||||||
|
ThrottleMode.Shaping)
|
||||||
|
// serialize `Frame` and send over the network.
|
||||||
|
// #throttle-with-burst
|
||||||
|
|
||||||
|
videoThrottling.take(1000).to(Sink.foreach(println)).run()
|
||||||
|
videoThrottlingWithBurst.take(1000).to(Sink.foreach(println)).run()
|
||||||
|
}
|
||||||
|
|
||||||
|
object ThrottleCommon {
|
||||||
|
|
||||||
|
// used in ThrottleJava
|
||||||
|
case class Frame(i: Int)
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue