diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md index 8dc1dbc961..2d972c10fa 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md @@ -14,6 +14,22 @@ Limit the throughput to a specific number of elements per time unit, or a specif Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. +See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. + +## Example + +Imagine the server end of a streaming platform. When a client connects and requesta a video content, the server +should return the content. Instead of serving a complete video as soon as bandwith allows, `throttle` can be used +to burst the first 30 seconds and then send a constant of 24 frames per second (let's imagine this streaming +platform stores frames, not bytes). This allows the browser to buffer 30 seconds in case there's a network +outage but prevents sending too much video content at once using all bandwitdh of the user's bandwitdh. + +Scala +: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle } + +Java +: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle } + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java new file mode 100644 index 0000000000..be0a75af17 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java @@ -0,0 +1,49 @@ +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import akka.stream.ThrottleMode; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.util.stream.Stream; +import docs.stream.operators.sourceorflow.ThrottleCommon.Frame; + +/** + * + */ +public class Throttle { + + public static void main(String[] args) { + ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream"); + Materializer mat = Materializer.matFromSystem(actorSystem); + + Source frameSource = + Source + .fromIterator(() -> Stream.iterate(0 , i -> i+1).iterator()) + .map(i -> new Frame(i.intValue())) ; + + // #throttle + int framesPerSecond = 24; + + Source videoThrottling = frameSource + .throttle( + framesPerSecond, + Duration.ofSeconds(1), + framesPerSecond * 30, + ThrottleMode.shaping() + ); + // serialize `Frame` and send over the network. + // #throttle + + videoThrottling + .map(f -> f.i()) + .to(Sink.foreach(System.out::println)) + .run(mat); + + } + + +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala new file mode 100644 index 0000000000..bb913dc923 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala @@ -0,0 +1,45 @@ +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.ThrottleMode +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import scala.concurrent.duration._ + +/** + * + */ +object Throttle extends App { + + implicit val sys = ActorSystem("25fps-stream") + + val frameSource: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.from(0)) + + // #throttle + val framesPerSecond = 24 + + // val frameSource: Source[Frame,_] + val videoThrottling = frameSource + .throttle( + framesPerSecond, + 1.second, + framesPerSecond * 30, // maximumBurst + ThrottleMode.shaping + ) + // serialize `Frame` and send over the network. + // #throttle + + videoThrottling + .to(Sink.foreach(println)) + .run() +} + +object ThrottleCommon{ + + // used in ThrottleJava + case class Frame(i: Int) + +}